531 lines
20 KiB
Python
531 lines
20 KiB
Python
"""
|
|
HIS Patient Import Views
|
|
|
|
Handles importing patient data from HIS/MOH Statistics CSV
|
|
and sending surveys to imported patients.
|
|
"""
|
|
|
|
import csv
|
|
import io
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
from django.contrib import messages
|
|
from django.utils import timezone
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.core.paginator import Paginator
|
|
from django.db import transaction
|
|
from django.db.models import Q
|
|
from django.http import JsonResponse
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.views.decorators.http import require_http_methods, require_POST
|
|
|
|
from apps.core.services import AuditService
|
|
from apps.organizations.models import Hospital, Patient
|
|
from apps.surveys.forms import HISPatientImportForm, HISSurveySendForm
|
|
from apps.surveys.models import SurveyInstance, SurveyStatus, SurveyTemplate, BulkSurveyJob
|
|
from apps.surveys.services import SurveyDeliveryService
|
|
from apps.surveys.tasks import send_bulk_surveys
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@login_required
|
|
def his_patient_import(request):
|
|
"""
|
|
Import patients from HIS/MOH Statistics CSV.
|
|
|
|
CSV Format (MOH Statistics):
|
|
SNo, Facility Name, Visit Type, Admit Date, Discharge Date,
|
|
Patient Name, File Number, SSN, Mobile No, Payment Type,
|
|
Gender, Full Age, Nationality, Date of Birth, Diagnosis
|
|
"""
|
|
user = request.user
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and not user.is_hospital_admin():
|
|
messages.error(request, "You don't have permission to import patient data.")
|
|
return redirect("surveys:instance_list")
|
|
|
|
# Session storage for imported patients
|
|
session_key = f"his_import_{user.id}"
|
|
|
|
if request.method == "POST":
|
|
form = HISPatientImportForm(request.POST, request.FILES, request=request)
|
|
|
|
if form.is_valid():
|
|
try:
|
|
hospital = form.cleaned_data["hospital"]
|
|
csv_file = form.cleaned_data["csv_file"]
|
|
skip_rows = form.cleaned_data["skip_header_rows"]
|
|
|
|
# Parse CSV
|
|
decoded_file = csv_file.read().decode("utf-8-sig")
|
|
io_string = io.StringIO(decoded_file)
|
|
reader = csv.reader(io_string)
|
|
|
|
# Skip header/metadata rows
|
|
for _ in range(skip_rows):
|
|
next(reader, None)
|
|
|
|
# Read header row
|
|
header = next(reader, None)
|
|
if not header:
|
|
messages.error(request, "CSV file is empty or has no data rows.")
|
|
return render(request, "surveys/his_patient_import.html", {"form": form})
|
|
|
|
# Find column indices
|
|
header = [h.strip().lower() for h in header]
|
|
col_map = {
|
|
"file_number": _find_column(header, ["file number", "file_number", "mrn", "file no"]),
|
|
"patient_name": _find_column(header, ["patient name", "patient_name", "name"]),
|
|
"mobile_no": _find_column(header, ["mobile no", "mobile_no", "mobile", "phone"]),
|
|
"ssn": _find_column(header, ["ssn", "national id", "national_id", "id number"]),
|
|
"gender": _find_column(header, ["gender", "sex"]),
|
|
"visit_type": _find_column(header, ["visit type", "visit_type", "type"]),
|
|
"admit_date": _find_column(header, ["admit date", "admit_date", "admission date"]),
|
|
"discharge_date": _find_column(header, ["discharge date", "discharge_date"]),
|
|
"facility": _find_column(header, ["facility name", "facility", "hospital"]),
|
|
"nationality": _find_column(header, ["nationality", "country"]),
|
|
"dob": _find_column(header, ["date of birth", "dob", "birth date"]),
|
|
}
|
|
|
|
# Check required columns
|
|
if col_map["file_number"] is None:
|
|
messages.error(request, "Could not find 'File Number' column in CSV.")
|
|
return render(request, "surveys/his_patient_import.html", {"form": form})
|
|
|
|
if col_map["patient_name"] is None:
|
|
messages.error(request, "Could not find 'Patient Name' column in CSV.")
|
|
return render(request, "surveys/his_patient_import.html", {"form": form})
|
|
|
|
# Process data rows
|
|
imported_patients = []
|
|
errors = []
|
|
row_num = skip_rows + 1
|
|
|
|
for row in reader:
|
|
row_num += 1
|
|
if not row or not any(row): # Skip empty rows
|
|
continue
|
|
|
|
try:
|
|
# Extract data
|
|
file_number = _get_cell(row, col_map["file_number"], "").strip()
|
|
patient_name = _get_cell(row, col_map["patient_name"], "").strip()
|
|
mobile_no = _get_cell(row, col_map["mobile_no"], "").strip()
|
|
ssn = _get_cell(row, col_map["ssn"], "").strip()
|
|
gender = _get_cell(row, col_map["gender"], "").strip().lower()
|
|
visit_type = _get_cell(row, col_map["visit_type"], "").strip()
|
|
admit_date = _get_cell(row, col_map["admit_date"], "").strip()
|
|
discharge_date = _get_cell(row, col_map["discharge_date"], "").strip()
|
|
facility = _get_cell(row, col_map["facility"], "").strip()
|
|
nationality = _get_cell(row, col_map["nationality"], "").strip()
|
|
dob = _get_cell(row, col_map["dob"], "").strip()
|
|
|
|
# Skip if missing required fields
|
|
if not file_number or not patient_name:
|
|
continue
|
|
|
|
# Clean phone number
|
|
if mobile_no:
|
|
mobile_no = mobile_no.replace(" ", "").replace("-", "")
|
|
if not mobile_no.startswith("+"):
|
|
# Assume Saudi number if starts with 0
|
|
if mobile_no.startswith("05"):
|
|
mobile_no = "+966" + mobile_no[1:]
|
|
elif mobile_no.startswith("5"):
|
|
mobile_no = "+966" + mobile_no
|
|
|
|
# Parse name (First Middle Last format)
|
|
name_parts = patient_name.split()
|
|
first_name = name_parts[0] if name_parts else ""
|
|
last_name = name_parts[-1] if len(name_parts) > 1 else ""
|
|
|
|
# Parse dates
|
|
parsed_admit = _parse_date(admit_date)
|
|
parsed_discharge = _parse_date(discharge_date)
|
|
parsed_dob = _parse_date(dob)
|
|
|
|
# Normalize gender
|
|
if gender in ["male", "m"]:
|
|
gender = "male"
|
|
elif gender in ["female", "f"]:
|
|
gender = "female"
|
|
else:
|
|
gender = ""
|
|
|
|
imported_patients.append(
|
|
{
|
|
"row_num": row_num,
|
|
"file_number": file_number,
|
|
"patient_name": patient_name,
|
|
"first_name": first_name,
|
|
"last_name": last_name,
|
|
"mobile_no": mobile_no,
|
|
"ssn": ssn,
|
|
"gender": gender,
|
|
"visit_type": visit_type,
|
|
"admit_date": parsed_admit.isoformat() if parsed_admit else None,
|
|
"discharge_date": parsed_discharge.isoformat() if parsed_discharge else None,
|
|
"facility": facility,
|
|
"nationality": nationality,
|
|
"dob": parsed_dob.isoformat() if parsed_dob else None,
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
errors.append(f"Row {row_num}: {str(e)}")
|
|
logger.error(f"Error processing row {row_num}: {e}")
|
|
|
|
# Store in session for review step
|
|
request.session[session_key] = {
|
|
"hospital_id": str(hospital.id),
|
|
"patients": imported_patients,
|
|
"errors": errors,
|
|
"total_count": len(imported_patients),
|
|
}
|
|
|
|
# Log audit
|
|
AuditService.log_event(
|
|
event_type="his_patient_import",
|
|
description=f"Imported {len(imported_patients)} patients from HIS CSV by {user.get_full_name()}",
|
|
user=user,
|
|
metadata={
|
|
"hospital": hospital.name,
|
|
"total_count": len(imported_patients),
|
|
"error_count": len(errors),
|
|
},
|
|
)
|
|
|
|
if imported_patients:
|
|
messages.success(
|
|
request,
|
|
f"Successfully parsed {len(imported_patients)} patient records. Please review before creating.",
|
|
)
|
|
return redirect("surveys:his_patient_review")
|
|
else:
|
|
messages.error(request, "No valid patient records found in CSV.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing HIS CSV: {str(e)}", exc_info=True)
|
|
messages.error(request, f"Error processing CSV: {str(e)}")
|
|
else:
|
|
form = HISPatientImportForm(request=request)
|
|
|
|
context = {
|
|
"form": form,
|
|
}
|
|
return render(request, "surveys/his_patient_import.html", context)
|
|
|
|
|
|
@login_required
|
|
def his_patient_review(request):
|
|
"""
|
|
Review imported patients before creating records and sending surveys.
|
|
Shows summary statistics instead of full patient list for performance.
|
|
"""
|
|
user = request.user
|
|
session_key = f"his_import_{user.id}"
|
|
import_data = request.session.get(session_key)
|
|
|
|
if not import_data:
|
|
messages.error(request, "No import data found. Please upload CSV first.")
|
|
return redirect("surveys:his_patient_import")
|
|
|
|
hospital = get_object_or_404(Hospital, id=import_data["hospital_id"])
|
|
patients = import_data["patients"]
|
|
errors = import_data.get("errors", [])
|
|
|
|
# Calculate summary statistics
|
|
total_count = len(patients)
|
|
new_count = 0
|
|
existing_count = 0
|
|
visit_types = {}
|
|
has_mobile = 0
|
|
missing_mobile = 0
|
|
|
|
# Check for existing patients and calculate stats
|
|
for p in patients:
|
|
existing = Patient.objects.filter(mrn=p["file_number"]).first()
|
|
p["exists"] = existing is not None
|
|
|
|
if p["exists"]:
|
|
existing_count += 1
|
|
else:
|
|
new_count += 1
|
|
|
|
# Count visit types
|
|
visit_type = p.get("visit_type", "Unknown") or "Unknown"
|
|
visit_types[visit_type] = visit_types.get(visit_type, 0) + 1
|
|
|
|
# Count mobile availability
|
|
if p.get("mobile_no"):
|
|
has_mobile += 1
|
|
else:
|
|
missing_mobile += 1
|
|
|
|
if request.method == "POST":
|
|
action = request.POST.get("action")
|
|
|
|
if action == "create":
|
|
# Create/update all patient records (no individual selection needed)
|
|
created_count = 0
|
|
updated_count = 0
|
|
|
|
with transaction.atomic():
|
|
for p in patients:
|
|
patient, created = Patient.objects.update_or_create(
|
|
mrn=p["file_number"],
|
|
defaults={
|
|
"first_name": p["first_name"],
|
|
"last_name": p["last_name"],
|
|
"phone": p["mobile_no"],
|
|
"national_id": p["ssn"],
|
|
"gender": p["gender"] if p["gender"] else "",
|
|
"primary_hospital": hospital,
|
|
},
|
|
)
|
|
if created:
|
|
created_count += 1
|
|
else:
|
|
updated_count += 1
|
|
# Store only the patient ID to avoid serialization issues
|
|
p["patient_id"] = str(patient.id)
|
|
|
|
# Update session with created patients
|
|
request.session[session_key]["patients"] = patients
|
|
request.session.modified = True
|
|
|
|
messages.success(
|
|
request, f"Created {created_count} new patients, updated {updated_count} existing patients."
|
|
)
|
|
return redirect("surveys:his_patient_survey_send")
|
|
|
|
elif action == "cancel":
|
|
del request.session[session_key]
|
|
messages.info(request, "Import cancelled.")
|
|
return redirect("surveys:his_patient_import")
|
|
|
|
# Get a small sample for preview (first 5 records)
|
|
sample_patients = patients[:5]
|
|
|
|
context = {
|
|
"hospital": hospital,
|
|
"errors": errors,
|
|
"total_count": total_count,
|
|
"new_count": new_count,
|
|
"existing_count": existing_count,
|
|
"visit_types": sorted(visit_types.items(), key=lambda x: x[1], reverse=True),
|
|
"has_mobile": has_mobile,
|
|
"missing_mobile": missing_mobile,
|
|
"sample_patients": sample_patients,
|
|
"sample_count": len(sample_patients),
|
|
}
|
|
return render(request, "surveys/his_patient_review.html", context)
|
|
|
|
|
|
@login_required
|
|
def his_patient_survey_send(request):
|
|
"""
|
|
Send surveys to imported patients - Queues a background task.
|
|
"""
|
|
user = request.user
|
|
session_key = f"his_import_{user.id}"
|
|
import_data = request.session.get(session_key)
|
|
|
|
if not import_data:
|
|
messages.error(request, "No import data found. Please upload CSV first.")
|
|
return redirect("surveys:his_patient_import")
|
|
|
|
hospital = get_object_or_404(Hospital, id=import_data["hospital_id"])
|
|
all_patients = import_data["patients"]
|
|
|
|
# Filter only patients with records in database
|
|
# Get patient IDs from session data
|
|
patient_ids = [p["patient_id"] for p in all_patients if "patient_id" in p]
|
|
|
|
# Fetch all patients in one query for efficiency
|
|
patient_objs = {str(p.id): p for p in Patient.objects.filter(id__in=patient_ids)}
|
|
|
|
# Build render data with patient objects (don't modify session data!)
|
|
patients = []
|
|
for p in all_patients:
|
|
if "patient_id" in p and p["patient_id"] in patient_objs:
|
|
# Create a copy and add the patient object for template rendering
|
|
patient_copy = dict(p)
|
|
patient_copy["patient_obj"] = patient_objs[p["patient_id"]]
|
|
patients.append(patient_copy)
|
|
|
|
if not patients:
|
|
messages.error(request, "No patients have been created yet. Please create patients first.")
|
|
return redirect("surveys:his_patient_review")
|
|
|
|
if request.method == "POST":
|
|
form = HISSurveySendForm(data=request.POST, request=request)
|
|
|
|
if form.is_valid():
|
|
survey_template = form.cleaned_data["survey_template"]
|
|
delivery_channel = form.cleaned_data["delivery_channel"]
|
|
custom_message = form.cleaned_data.get("custom_message", "")
|
|
selected_ids = request.POST.getlist("selected_patients")
|
|
|
|
# Filter selected patients from render data
|
|
selected_patients_render = [p for p in patients if p["file_number"] in selected_ids]
|
|
|
|
if not selected_patients_render:
|
|
messages.error(request, "No patients selected.")
|
|
return render(
|
|
request,
|
|
"surveys/his_patient_survey_send.html",
|
|
{
|
|
"form": form,
|
|
"hospital": hospital,
|
|
"patients": patients,
|
|
"total_count": len(patients),
|
|
},
|
|
)
|
|
|
|
# Prepare patient data for job (exclude patient_obj which is not serializable)
|
|
selected_patients = [
|
|
{k: v for k, v in p.items() if k != "patient_obj"}
|
|
for p in selected_patients_render
|
|
]
|
|
|
|
# Create bulk job
|
|
job = BulkSurveyJob.objects.create(
|
|
name=f"HIS Import - {hospital.name} - {timezone.now().strftime('%Y-%m-%d %H:%M')}",
|
|
status=BulkSurveyJob.JobStatus.PENDING,
|
|
source=BulkSurveyJob.JobSource.HIS_IMPORT,
|
|
created_by=user,
|
|
hospital=hospital,
|
|
survey_template=survey_template,
|
|
total_patients=len(selected_patients),
|
|
delivery_channel=delivery_channel,
|
|
custom_message=custom_message,
|
|
patient_data=selected_patients,
|
|
)
|
|
|
|
# Queue the background task
|
|
send_bulk_surveys.delay(str(job.id))
|
|
|
|
# Log audit
|
|
AuditService.log_event(
|
|
event_type="his_survey_queued",
|
|
description=f"Queued bulk survey job for {len(selected_patients)} patients",
|
|
user=user,
|
|
metadata={
|
|
"job_id": str(job.id),
|
|
"hospital": hospital.name,
|
|
"survey_template": survey_template.name,
|
|
"patient_count": len(selected_patients),
|
|
},
|
|
)
|
|
|
|
# Clear session
|
|
del request.session[session_key]
|
|
|
|
messages.success(
|
|
request,
|
|
f"Survey sending job queued for {len(selected_patients)} patients. "
|
|
f"You can check the status in Survey Jobs.",
|
|
)
|
|
return redirect("surveys:bulk_job_status", job_id=job.id)
|
|
else:
|
|
form = HISSurveySendForm(request=request)
|
|
|
|
context = {
|
|
"form": form,
|
|
"hospital": hospital,
|
|
"patients": patients,
|
|
"total_count": len(patients),
|
|
}
|
|
return render(request, "surveys/his_patient_survey_send.html", context)
|
|
|
|
|
|
# Helper functions
|
|
|
|
|
|
def _find_column(header, possible_names):
|
|
"""Find column index by possible names"""
|
|
for name in possible_names:
|
|
for i, h in enumerate(header):
|
|
if name.lower() in h.lower():
|
|
return i
|
|
return None
|
|
|
|
|
|
def _get_cell(row, index, default=""):
|
|
"""Safely get cell value"""
|
|
if index is None or index >= len(row):
|
|
return default
|
|
return row[index].strip() if row[index] else default
|
|
|
|
|
|
def _parse_date(date_str):
|
|
"""Parse date from various formats"""
|
|
if not date_str:
|
|
return None
|
|
|
|
formats = [
|
|
"%d-%b-%Y %H:%M:%S",
|
|
"%d-%b-%Y",
|
|
"%d/%m/%Y %H:%M:%S",
|
|
"%d/%m/%Y",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
"%Y-%m-%d",
|
|
]
|
|
|
|
for fmt in formats:
|
|
try:
|
|
return datetime.strptime(date_str.strip(), fmt).date()
|
|
except ValueError:
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
@login_required
|
|
def bulk_job_status(request, job_id):
|
|
"""
|
|
View status of a bulk survey job.
|
|
"""
|
|
user = request.user
|
|
job = get_object_or_404(BulkSurveyJob, id=job_id)
|
|
|
|
# Check permission
|
|
if not user.is_px_admin() and job.created_by != user and job.hospital != user.hospital:
|
|
messages.error(request, "You don't have permission to view this job.")
|
|
return redirect("surveys:instance_list")
|
|
|
|
context = {
|
|
"job": job,
|
|
"progress": job.progress_percentage,
|
|
"is_complete": job.is_complete,
|
|
"results": job.results,
|
|
}
|
|
return render(request, "surveys/bulk_job_status.html", context)
|
|
|
|
|
|
@login_required
|
|
def bulk_job_list(request):
|
|
"""
|
|
List all bulk survey jobs for the user.
|
|
"""
|
|
user = request.user
|
|
|
|
# Filter jobs
|
|
if user.is_px_admin():
|
|
jobs = BulkSurveyJob.objects.all()
|
|
elif user.hospital:
|
|
jobs = BulkSurveyJob.objects.filter(hospital=user.hospital)
|
|
else:
|
|
jobs = BulkSurveyJob.objects.filter(created_by=user)
|
|
|
|
jobs = jobs.order_by("-created_at")[:50] # Last 50 jobs
|
|
|
|
context = {
|
|
"jobs": jobs,
|
|
}
|
|
return render(request, "surveys/bulk_job_list.html", context)
|