HH/apps/analytics/ask_views.py
ismail c5f76b3855
Some checks are pending
Build and Push Docker Image / build (push) Waiting to run
updates
2026-05-11 14:45:30 +03:00

1281 lines
49 KiB
Python

"""
Ask Your Data — Fully AI-driven Conversational Analytics
Architecture:
1. User asks a natural language question
2. LLM analyzes the question + data schema → returns a structured query spec (JSON)
3. Query executor safely runs the spec against Django ORM
4. LLM generates a narrative answer from the real results
5. Returns: {answer_text, data, chart_type, query_spec}
The LLM touches the actual data to give correct, contextual answers.
"""
import json
import logging
import hashlib
from decimal import Decimal
from django.contrib.auth.decorators import login_required
from django.core.cache import cache
from django.core.exceptions import FieldError
from django.db.models import Avg, Count, F, Q, Sum, ExpressionWrapper, FloatField
from django.db.models.functions import TruncDate, TruncMonth, TruncWeek, Cast
from django.http import JsonResponse
from django.shortcuts import render
from django.utils import timezone
from datetime import timedelta
from apps.core.decorators import block_source_user
from apps.complaints.models import Complaint
from apps.feedback.models import Feedback
from apps.observations.models import Observation
from apps.organizations.models import Department, Hospital, Patient
from apps.accounts.models import User
from apps.physicians.models import PhysicianMonthlyRating
from apps.px_action_center.models import PXAction
from apps.surveys.models import SurveyInstance
logger = logging.getLogger(__name__)
class DecimalEncoder(json.JSONEncoder):
"""JSON encoder that handles Decimal objects by converting them to float."""
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)
_db_schema_cache = None
def _get_db_schema():
"""Auto-generate database schema description from Django models for the LLM."""
global _db_schema_cache
if _db_schema_cache:
return _db_schema_cache
type_map = {
"CharField": "text",
"TextField": "text",
"IntegerField": "integer",
"PositiveIntegerField": "integer",
"FloatField": "decimal",
"DecimalField": "decimal",
"BooleanField": "boolean",
"DateField": "date",
"DateTimeField": "datetime",
"JSONField": "json",
"EmailField": "text",
"GenericIPAddressField": "text",
"UUIDField": "uuid",
}
skip_fields = {
"id",
"created_at",
"updated_at",
"content_type",
"object_id",
"metadata",
"password",
"access_token",
"token_expires_at",
"comment_analyzed",
"comment_analysis",
"patient_contacted",
"patient_contacted_at",
"patient_contacted_by",
"issue_resolved",
"contact_notes",
"completed_language",
"open_count",
"last_opened_at",
"time_spent_seconds",
"is_featured",
"is_public",
"is_deleted",
"deleted_at",
"deleted_by",
"client_ip",
"user_agent",
"action_id",
"resolution_survey_sent_at",
"monthly_follow_up_due_at",
"monthly_follow_up_completed_at",
"monthly_follow_up_completed_by",
"monthly_follow_up_notes",
"submitter_notified_at",
"responsible_person_notified_at",
"resolution_survey",
}
model_configs = {
"complaints": {
"model": Complaint,
"label": "COMPLAINTS",
"fk_traversals": {
"department": "department__name_en",
"category": "category__name_en",
"domain": "domain__name_en",
"source": "source__name_en",
"hospital": "hospital__name",
"location": "location__name_en",
"main_section": "main_section__name_en",
"subsection": "subsection__name_en",
},
"extra_fields": ["resolution_time (calculated: avg hours between activated_at and resolved_at)"],
"date_field": "created_at",
"notes": "Has department, hospital, status fields directly.",
},
"surveys": {
"model": SurveyInstance,
"label": "SURVEYS",
"fk_traversals": {
"survey_template": "survey_template__name",
"survey_type": "survey_template__survey_type",
"patient": "patient__name_en",
"staff": "staff__name_en",
"journey_instance": "journey_instance__id",
"hospital": "hospital__name",
},
"extra_fields": [],
"date_field": "completed_at",
"notes": "NO direct department field. Use survey_template__survey_type for survey type. Hospital auto-filtered.",
},
"actions": {
"model": PXAction,
"label": "ACTIONS (PX Actions)",
"fk_traversals": {
"department": "department__name_en",
"hospital": "hospital__name",
},
"extra_fields": [],
"date_field": "created_at",
"notes": "Has department, hospital, status fields directly.",
},
"feedback": {
"model": Feedback,
"label": "FEEDBACK",
"fk_traversals": {
"department": "department__name_en",
"hospital": "hospital__name",
"staff": "staff__name_en",
},
"extra_fields": [],
"date_field": "created_at",
"notes": "Has department, hospital, status fields directly.",
},
"physicians": {
"model": PhysicianMonthlyRating,
"label": "PHYSICIAN RATINGS",
"fk_traversals": {
"staff": "staff__name_en",
"physician_name": "staff__name_en",
"department": "staff__department__name_en",
"hospital": "staff__department__hospital__name",
},
"extra_fields": [],
"date_field": "created_at",
"notes": "NO direct hospital or department field. Traverse via staff FK. Use staff__name_en for physician name, staff__department__name_en for department. Use average_rating for rating, total_surveys for survey count.",
},
"observations": {
"model": Observation,
"label": "OBSERVATIONS",
"fk_traversals": {
"category": "category__name_en",
"assigned_department": "assigned_department__name_en",
"hospital": "hospital__name",
"staff": "staff__name_en",
},
"extra_fields": [],
"date_field": "created_at",
"notes": "Uses assigned_department (NOT department). Has hospital FK.",
},
"patients": {
"model": Patient,
"label": "PATIENTS",
"fk_traversals": {
"hospital": "primary_hospital__name",
},
"extra_fields": [],
"date_field": "created_at",
"notes": "Stores patient demographics. Use nationality for country queries, gender for gender breakdown, city for geographic queries.",
},
"users": {
"model": User,
"label": "USERS (Staff)",
"fk_traversals": {
"department": "department__name_en",
"hospital": "hospital__name",
},
"extra_fields": [],
"date_field": "date_joined",
"notes": "User accounts for staff/admin. Has role field for user type filtering.",
},
}
schema_parts = []
for model_key, config in model_configs.items():
Model = config["model"]
lines = [f'{config["label"]} (model: "{model_key}"):']
field_descriptions = []
for field in Model._meta.get_fields():
if field.name in skip_fields:
continue
if field.is_relation:
if field.many_to_many or field.one_to_many:
continue
if field.name in config["fk_traversals"]:
traversal = config["fk_traversals"][field.name]
if field.name == traversal:
continue
field_descriptions.append(f" - {traversal} (FK → {field.related_model._meta.model_name})")
else:
field_type = type_map.get(field.__class__.__name__, "unknown")
desc = f" - {field.name} ({field_type}"
if field.choices:
choice_vals = [c[0] for c in field.choices]
desc += f", choices: {', '.join(choice_vals)}"
desc += ")"
field_descriptions.append(desc)
for extra in config.get("extra_fields", []):
field_descriptions.append(f" - {extra}")
lines.extend(field_descriptions)
if config.get("notes"):
lines.append(f" NOTE: {config['notes']}")
schema_parts.append("\n".join(lines))
schema = "\n\n".join(schema_parts)
_db_schema_cache = schema
return schema
# =============================================================================
# Step 0: LLM Client
# =============================================================================
# UI View
# =============================================================================
@block_source_user
@login_required
def ask_your_data(request):
"""Conversational Analytics page."""
user = request.user
hospital = None
if user.is_px_admin() and hasattr(request, "tenant_hospital") and request.tenant_hospital:
hospital = request.tenant_hospital
elif hasattr(user, "hospital") and user.hospital:
hospital = user.hospital
return render(
request,
"analytics/ask_your_data.html",
{
"selected_hospital": hospital,
},
)
# =============================================================================
# Query API — Fully AI-driven
# =============================================================================
@block_source_user
@login_required
def ask_data_query(request):
"""
POST {"question": "..."} → AI understands, queries data, returns answer + data.
"""
if request.method != "POST":
return JsonResponse({"error": "POST required"}, status=405)
try:
body = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON"}, status=400)
question = body.get("question", "").strip()
if not question:
return JsonResponse({"error": "Question is required"}, status=400)
if len(question) > 500:
return JsonResponse({"error": "Question too long (max 500 chars)"}, status=400)
conversation_history = body.get("conversation_history", [])
if not isinstance(conversation_history, list):
conversation_history = []
user = request.user
hospital = None
if user.is_px_admin() and hasattr(request, "tenant_hospital") and request.tenant_hospital:
hospital = request.tenant_hospital
elif hasattr(user, "hospital") and user.hospital:
hospital = user.hospital
# Cache by question + hospital (use hash to avoid special chars in cache keys)
q_hash = hashlib.md5(question.lower().strip().encode()).hexdigest()[:16]
h_id = hospital.id if hospital else "all"
cache_key = f"ask_v2_{q_hash}_{h_id}"
cached = cache.get(cache_key)
if cached:
return JsonResponse(cached)
result = _process_question(question, user, hospital, conversation_history)
cache.set(cache_key, result, 1800) # 30 min cache
return JsonResponse(result)
# =============================================================================
# Core: AI processes question, executes query, generates answer
# =============================================================================
def _process_question(question, user, hospital, conversation_history=None):
"""
Full AI pipeline with conversation memory:
1. LLM generates query spec with context
2. Execute query spec against Django ORM
3. LLM generates narrative answer from real results
"""
if conversation_history is None:
conversation_history = []
from apps.analytics.services.ai_analytics import _client
hospital_ctx = f"Hospital: {hospital.name}" if hospital else "All hospitals (no filter)"
# Step 1: LLM generates query spec (with rule-based fallback)
query_spec = _generate_query_spec(question, hospital_ctx, _client, conversation_history)
# If LLM failed, fall back to rule-based parser WITH conversation context
if not query_spec or "error" in query_spec:
query_spec = _rule_based_parse(question, hospital_ctx, conversation_history)
if not query_spec or "error" in query_spec:
return {
"answer": query_spec.get("error", "I couldn't understand this question. Please try rephrasing it."),
"data": {},
"chart_type": "none",
}
# Step 2: Execute the query
try:
data = _execute_query(query_spec, user, hospital)
except Exception as e:
logger.exception(f"Query execution error for: {question}")
return {
"answer": "I couldn't retrieve the data for that question. Try simplifying or asking about a specific metric.",
"data": {},
"chart_type": "none",
}
if not data or data.get("error"):
return {
"answer": "No data found matching your question. Try a different time period, department, or metric.",
"data": {},
"chart_type": "none",
}
# Step 3: LLM generates narrative answer AND Arrow.js component from real data
answer_text, arrow_component = _generate_answer_and_component(question, query_spec, data, _client)
# Determine chart type
chart_type = _determine_chart_type(query_spec, data)
return {
"answer": answer_text,
"data": data,
"chart_type": chart_type,
"component": arrow_component,
}
# =============================================================================
# Step 1: LLM generates query spec (with rule-based fallback)
# =============================================================================
def _rule_based_parse(question, hospital_ctx, conversation_history=None):
"""Fallback rule-based parser when LLM is unavailable — uses conversation context."""
if conversation_history is None:
conversation_history = []
q = question.lower().strip()
# Try to detect model from conversation history first
model = _rb_detect_model_from_history(conversation_history, q)
if not model:
model = _rb_detect_model(q)
if not model:
# If still no model, check if it's a follow-up like "from pakistan", "by department"
if q.startswith(("and ", "but ", "what about ", "how about ", "show ", "list ", "of ", "from ", "by ", "in ", "how many")):
# Likely a follow-up — try to use the last model from history
for msg in reversed(conversation_history):
if msg.get("role") == "user" and any(m in msg.get("content", "").lower() for m in ["complaint", "survey", "action", "feedback", "patient", "observation"]):
model = _rb_detect_model(msg["content"])
break
if not model:
return {
"error": "I couldn't determine what data you're asking about. Try mentioning complaints, surveys, actions, feedback, patients, or observations."
}
operation, group_by, limit = _rb_detect_operation(q, model)
filters = _rb_extract_filters(q, model)
spec = {
"model": model,
"operation": operation,
"filters": filters,
}
if group_by:
spec["group_by"] = group_by
if limit:
spec["limit"] = limit
return spec
def _rb_detect_model_from_history(conversation_history, current_question):
"""Detect model from conversation history for follow-up questions."""
q = current_question.lower().strip()
# If it looks like a follow-up (short, starts with conjunction, or references previous context)
if len(q.split()) <= 8 or q.startswith(("and ", "but ", "what ", "how ", "show ", "list ", "of ", "from ", "by ", "in ")):
# Scan backwards for the last user question with a known model keyword
for msg in reversed(conversation_history):
if msg.get("role") != "user":
continue
prev_q = msg.get("content", "").lower()
model = _rb_detect_model(prev_q)
if model:
return model
return None
def _rb_detect_model(q):
if "complaint" in q:
return "complaints"
elif "survey" in q or "nps" in q or "satisfaction" in q:
return "surveys"
elif "action" in q:
return "actions"
elif "feedback" in q:
return "feedback"
elif "physician" in q or "doctor" in q:
return "physicians"
elif "observation" in q:
return "observations"
elif "patient" in q:
return "patients"
elif "user" in q or "staff" in q or "employee" in q:
return "users"
return None
def _rb_detect_operation(q, model):
"""Detect the operation type from the question."""
# Follow-up questions with specific values should be count operations
# e.g., "what about india?", "and from pakistan?", "how about egypt?"
if any(q.startswith(prefix) for prefix in ["what about", "how about", "and from", "and how many", "from "]):
return "count", None, None
if any(w in q for w in ["how many", "total number", "count of"]):
if any(w in q for w in ["by ", "per ", "breakdown", "distribution"]):
return "grouped_count", None, None
return "count", None, None
if any(w in q for w in ["average", "avg", "mean score"]):
return "average", None, None
if any(w in q for w in ["trend", "over time", "over the", "monthly", "daily"]):
return "trend", None, None
if any(w in q for w in ["leaderboard", "top physicians", "best rated"]):
return "leaderboard", None, 10
if any(w in q for w in ["list", "show me all", "show all"]):
return "list", None, 20
# Default to grouped count
return "grouped_count", "department__name_en", None
def _rb_extract_filters(q, model):
filters = []
# Time filters
if "last month" in q:
filters.append({"field": "date", "op": "=", "value": "last_month"})
elif "this month" in q:
filters.append({"field": "date", "op": "=", "value": "this_month"})
elif "last week" in q:
filters.append({"field": "date", "op": "=", "value": "last_week"})
elif "this week" in q:
filters.append({"field": "date", "op": "=", "value": "this_week"})
elif "last quarter" in q or "90 day" in q or "3 month" in q:
filters.append({"field": "date", "op": "=", "value": "last_quarter"})
elif "last year" in q or "12 month" in q:
filters.append({"field": "date", "op": "=", "value": "last_year"})
elif "30 day" in q or "1 month" in q:
filters.append({"field": "date", "op": "=", "value": "30d"})
elif "7 day" in q or "1 week" in q:
filters.append({"field": "date", "op": "=", "value": "7d"})
elif "6 month" in q:
filters.append({"field": "date", "op": "=", "value": "6m"})
# Status/type filters
if "open" in q and model == "complaints":
filters.append({"field": "status", "op": "=", "value": "open"})
if "overdue" in q:
filters.append({"field": "is_overdue", "op": "=", "value": True})
if "resolved" in q or "closed" in q:
filters.append({"field": "status", "op": "in", "value": ["resolved", "closed"]})
if "negative" in q and model == "surveys":
filters.append({"field": "is_negative", "op": "=", "value": True})
if "high" in q and "severity" in q:
filters.append({"field": "severity", "op": "=", "value": "high"})
if "critical" in q:
filters.append({"field": "severity", "op": "=", "value": "critical"})
if "medium" in q and "severity" in q:
filters.append({"field": "severity", "op": "=", "value": "medium"})
if "low" in q and "severity" in q:
filters.append({"field": "severity", "op": "=", "value": "low"})
# Department mentions
depts = [
"cardiology",
"emergency",
"er",
"icu",
"surgery",
"pediatric",
"orthopedic",
"radiology",
"lab",
"pharmacy",
"nursing",
]
for dept in depts:
if dept in q:
field = "department__name_en" if model != "physicians" else "department__name_en"
filters.append({"field": field, "op": "icontains", "value": dept})
break
# Nationality/country/city queries — use iexact
if model == "patients" and ("from " in q or "nationality" in q or "country" in q or "what about" in q or "how about" in q or "and from" in q):
# Extract the country name from the question
# Try various patterns
for phrase in ["from ", "about ", "in "]:
idx = q.find(phrase)
if idx >= 0:
rest = q[idx + len(phrase):].strip()
word = rest.split()[0] if rest.split() else None
if word:
country = word.strip("?.,!;:")
if country and country.lower() not in ("the", "are", "we", "have", "do", "does", "what", "how", "many", "there"):
filters.append({"field": "nationality", "op": "iexact", "value": country.capitalize()})
break
return filters
def _generate_query_spec(question, hospital_ctx, client, conversation_history=None):
"""Ask the LLM to generate a structured query spec with conversation context."""
if conversation_history is None:
conversation_history = []
if not client.is_configured():
return {"error": "AI service is currently unavailable. Please try again later."}
db_schema = _get_db_schema()
system_prompt = f"""You are a healthcare data analyst that converts natural language questions into structured Django ORM query specs for a Patient Experience (PX360) system.
CONVERSATION CONTEXT:
- The user may ask follow-up questions that reference previous queries.
- If the question contains pronouns ("those", "them", "it") or comparative terms ("same", "above"), use the CONVERSATION HISTORY to understand the context.
- For follow-up questions like "what about X?", "and from Y?", "how about Z?" — use the SAME model as the previous question and return a "count" operation with a filter for X/Y/Z.
- "of those" or "from those" means filter the previous result set further.
- "show by department" means group the previous result by department.
DATABASE SCHEMA — use ONLY these exact field names and FK traversals:
{db_schema}
CRITICAL RULES:
- Hospital filtering is handled automatically. DO NOT include "hospital" in filters.
- Use the EXACT field names shown above. If a field doesn't exist on a model, do NOT reference it.
- For FK fields, use __ notation to traverse (e.g., department__name_en, category__name_en).
- Surveys do NOT have a direct department field — do NOT filter surveys by department.
- Observations use assigned_department (NOT department).
- Physician ratings: use staff__name_en for name, staff__department__name_en for department.
TIME EXPRESSIONS (return as relative):
- "last month"{{"type": "relative", "value": "last_month"}}
- "this month"{{"type": "relative", "value": "this_month"}}
- "last week"{{"type": "relative", "value": "last_week"}}
- "last quarter" / "last 90 days"{{"type": "relative", "value": "last_quarter"}}
- "last year" / "last 12 months"{{"type": "relative", "value": "last_year"}}
- "last 30 days"{{"type": "relative", "value": "30d"}}
- "last 7 days"{{"type": "relative", "value": "7d"}}
- "this year"{{"type": "relative", "value": "this_year"}}
- "last 6 months"{{"type": "relative", "value": "6m"}}
QUERY OPERATIONS:
- "count" → returns single number
- "grouped_count" → returns [{{group, count}}, ...] — use group_by field
- "average" → returns single number — use aggregate_field
- "sum" → returns single number — use aggregate_field
- "trend" → returns [{{date, count}}, ...] — use truncate: month|week|day
- "list" → returns [{{...}}, ...] — use fields array
- "top_n" → returns [{{group, count}}, ...] — use group_by + limit
- "comparison" → returns [{{group, metric}}, ...] — use group_by
- "percentage" → returns percentage — use numerator_filters for subset
- "leaderboard" → returns physician ranking table ONLY (model must be "physicians")
FILTER OPERATORS:
- equals: {{"field": "status", "op": "=", "value": "open"}}
- in: {{"field": "status", "op": "in", "value": ["resolved", "closed"]}}
- contains (fuzzy): {{"field": "department__name_en", "op": "icontains", "value": "cardiology"}}
- iexact (case-insensitive exact): {{"field": "nationality", "op": "iexact", "value": "Sudan"}} — use for nationality, gender, city
- gte/lte/gt/lt for dates and numbers
- is_true: {{"field": "is_overdue", "op": "=", "value": true}}
IMPORTANT: For nationality, country, gender, city — ALWAYS use "iexact", NOT "icontains".
Example: {{"field": "nationality", "op": "iexact", "value": "Sudan"}} — NOT icontains.
RESPOND WITH ONLY VALID JSON. No markdown. No explanation.
{{
"model": "complaints|surveys|actions|feedback|physicians|observations|patients|users",
"operation": "count|grouped_count|average|trend|list|top_n|percentage|comparison|leaderboard",
"filters": [{{"field": "...", "op": "=", "value": "..."}}],
"group_by": "field_name",
"fields": ["field1", "field2"],
"truncate": "month|week|day",
"limit": 10,
"order_by": "-count",
"aggregate_field": "total_score"
}}"""
# Build conversation context for the prompt
history_context = ""
if conversation_history:
history_context = "\nCONVERSATION HISTORY (most recent first):\n"
# Only include last 6 messages (3 exchanges)
for msg in conversation_history[-6:]:
role_label = "USER" if msg.get("role") == "user" else "ASSISTANT"
content = msg.get("content", "")
data = msg.get("data", {})
history_context += f"{role_label}: {content}\n"
if data and data.get("headers") and data.get("rows"):
# Show a summary of the data
history_context += f" Data: {len(data['rows'])} rows, columns: {data['headers']}\n"
user_prompt = f"""{hospital_ctx}{history_context}
Question: {question}
Generate the query spec."""
raw = client.chat(system_prompt, user_prompt, temperature=0.05, max_tokens=512)
if not raw:
return {"error": "AI service unavailable"}
try:
# Try to parse JSON (strip markdown if present)
raw = raw.strip()
if raw.startswith("```json"):
raw = raw[7:]
elif raw.startswith("```"):
raw = raw[3:]
if raw.endswith("```"):
raw = raw[:-3]
raw = raw.strip()
spec = json.loads(raw)
# Validate required fields
if "model" not in spec or "operation" not in spec:
return {"error": "I couldn't parse a valid query from your question. Could you rephrase?"}
# Validate model
valid_models = ["complaints", "surveys", "actions", "feedback", "physicians", "observations"]
if spec["model"] not in valid_models:
return {"error": f"I don't have data for that. Available: {', '.join(valid_models)}"}
return spec
except json.JSONDecodeError:
return {"error": "I couldn't understand the question format. Please try rephrasing it."}
# =============================================================================
# Step 2: Execute query spec
# =============================================================================
def _execute_query(spec, user, hospital):
"""Safely execute a query spec against Django ORM."""
model = spec["model"]
operation = spec["operation"]
filters = spec.get("filters", [])
group_by = spec.get("group_by")
fields = spec.get("fields")
truncate = spec.get("truncate")
limit = spec.get("limit", 50)
order_by = spec.get("order_by")
agg_field = spec.get("aggregate_field")
qs = _get_queryset(model, user, hospital)
qs = _apply_filters(qs, model, filters)
if operation == "count":
return {"type": "metric", "value": qs.count()}
elif operation == "grouped_count" or operation == "top_n":
if not group_by:
group_by = "department__name_en"
results = list(qs.values(group_by).annotate(count=Count("id")).order_by("-count")[:limit])
label = _get_field_label(group_by)
return {
"type": "bar",
"headers": [label, "Count"],
"rows": [{label: r[group_by] or "Unknown", "Count": r["count"]} for r in results],
}
elif operation == "average":
field = agg_field or "total_score"
# Special handling for resolution_time calculation
if field == "resolution_time" and model == "complaints":
# Calculate average resolution time in hours
# Use database-agnostic approach: cast duration to microseconds then divide
result = (
qs.filter(resolved_at__isnull=False)
.annotate(
resolution_hours=ExpressionWrapper(
Cast(F("resolved_at") - F("activated_at"), FloatField()) / 3600000000.0, # microseconds to hours
output_field=FloatField(),
)
)
.aggregate(avg=Avg("resolution_hours"))
)
avg_hours = result["avg"] or 0
return {"type": "metric", "value": round(avg_hours, 1), "unit": "hours"}
elif operation == "sum":
field = agg_field
result = qs.aggregate(total=Sum(field))
return {"type": "metric", "value": result["total"] or 0}
elif operation == "trend":
trunc_func = {"month": TruncMonth, "week": TruncWeek, "day": TruncDate}.get(truncate or "month", TruncMonth)
date_field = _get_date_field(model)
results = list(
qs.annotate(date=trunc_func(date_field)).values("date").annotate(count=Count("id")).order_by("date")
)
return {
"type": "line",
"headers": ["Date", "Count"],
"rows": [
{"Date": r["date"].strftime("%b %Y") if truncate == "month" else str(r["date"]), "Count": r["count"]}
for r in results
],
}
elif operation == "list":
if not fields:
fields = _get_default_fields(model)
if not order_by:
order_by = f"-{_get_date_field(model)}"
results = list(qs.values(*fields).order_by(order_by)[:limit])
headers = [_get_field_label(f) for f in fields]
rows = []
for r in results:
row = {}
for f in fields:
val = r.get(f)
if hasattr(val, "isoformat"):
val = val.strftime("%Y-%m-%d")
elif hasattr(val, "hex"):
val = str(val)[:8]
elif val is None:
val = ""
row[_get_field_label(f)] = val
rows.append(row)
return {"type": "table", "headers": headers, "rows": rows}
elif operation == "percentage":
total = qs.count()
if total == 0:
return {"type": "metric", "value": 0}
numerator_filters = spec.get("numerator_filters", [])
if numerator_filters:
num_qs = _get_queryset(model, user, hospital)
num_qs = _apply_filters(num_qs, model, filters + numerator_filters)
numerator = num_qs.count()
else:
numerator = total
pct = round((numerator / total * 100), 1) if total > 0 else 0
return {"type": "metric", "value": pct, "numerator": numerator, "denominator": total}
elif operation == "leaderboard":
if model != "physicians":
if not group_by:
group_by = "resolved_by__name" if model == "complaints" else "department__name_en"
results = list(qs.values(group_by).annotate(count=Count("id")).order_by("-count")[:limit])
label = _get_field_label(group_by)
return {
"type": "bar",
"headers": [label, "Count"],
"rows": [{label: r[group_by] or "Unknown", "Count": r["count"]} for r in results],
}
qs = qs.order_by("-average_rating")[:limit]
return {
"type": "table",
"headers": ["Rank", "Name", "Rating", "Surveys", "Positive", "Negative"],
"rows": [
{
"Rank": i + 1,
"Name": p.staff.name_en if p.staff else "",
"Rating": round(p.average_rating, 2) if p.average_rating else 0,
"Surveys": p.total_surveys or 0,
"Positive": p.positive_count or 0,
"Negative": p.negative_count or 0,
}
for i, p in enumerate(qs)
],
}
elif operation == "comparison":
if not group_by:
group_by = "department__name_en"
results = list(qs.values(group_by).annotate(count=Count("id")).order_by("-count"))
label = _get_field_label(group_by)
return {
"type": "bar",
"headers": [label, "Count"],
"rows": [{label: r[group_by] or "Unknown", "Count": r["count"]} for r in results],
}
return {"error": "Unknown operation"}
def _get_queryset(model_name, user, hospital):
"""Get base queryset with role/hospital filtering."""
model_map = {
"complaints": Complaint,
"surveys": SurveyInstance,
"actions": PXAction,
"feedback": Feedback,
"physicians": PhysicianMonthlyRating,
"observations": Observation,
"patients": Patient,
"users": User,
}
Model = model_map.get(model_name, Complaint)
qs = Model.objects.all()
# Hospital filter
if hospital:
if model_name == "surveys":
qs = qs.filter(survey_template__hospital=hospital)
elif model_name == "physicians":
qs = qs.filter(staff__department__hospital=hospital)
elif model_name == "patients":
qs = qs.filter(primary_hospital=hospital)
elif model_name == "users":
qs = qs.filter(hospital=hospital)
elif hasattr(Model, "hospital"):
qs = qs.filter(hospital=hospital)
return qs
def _apply_filters(qs, model_name, filters):
"""Apply filters from spec to queryset."""
date_field = _get_date_field(model_name)
# Field mappings for related model lookups
field_mappings = {
"surveys": {
"survey_type": "survey_template__survey_type",
"hospital": "survey_template__hospital",
},
"observations": {
"department": "assigned_department__name_en",
},
"physicians": {
"physician_name": "staff__name_en",
"department": "staff__department__name_en",
"hospital": "staff__department__hospital",
"rating": "average_rating",
"survey_count": "total_surveys",
},
}
for f in filters:
field = f["field"]
op = f["op"]
value = f["value"]
# Skip hospital filter - already handled by _get_queryset
if field == "hospital":
continue
# Map field to actual ORM path if there's a mapping
model_mappings = field_mappings.get(model_name, {})
field = model_mappings.get(field, field)
# Normalize dict-style relative dates to plain strings
if isinstance(value, dict) and value.get("type") == "relative":
value = value.get("value", value)
# Resolve "date" alias to the model's actual date field
if field == "date":
field = date_field
# Handle relative time filters
if field.endswith("_at") or field == "created_at" or field == "completed_at" or field == "month":
if isinstance(value, str):
start, end = _parse_relative_date(value)
if start and end:
qs = qs.filter(**{f"{field}__gte": start, f"{field}__lt": end})
continue
# Standard filter operators
if op == "=":
try:
qs = qs.filter(**{field: value})
except FieldError:
logger.warning(f"Skipping invalid filter: {field}={value} for model: {model_name}")
continue
elif op == "in":
try:
qs = qs.filter(**{f"{field}__in": value})
except FieldError:
logger.warning(f"Skipping invalid filter: {field} in {value} for model: {model_name}")
continue
elif op == "icontains":
try:
qs = qs.filter(**{f"{field}__icontains": value})
except FieldError:
logger.warning(f"Skipping invalid filter: {field} icontains {value} for model: {model_name}")
continue
elif op == "iexact":
try:
qs = qs.filter(**{f"{field}__iexact": value})
except FieldError:
logger.warning(f"Skipping invalid filter: {field} iexact {value} for model: {model_name}")
continue
elif op == "contains":
try:
qs = qs.filter(**{f"{field}__icontains": value})
except FieldError:
logger.warning(f"Skipping invalid filter: {field} contains {value} for model: {model_name}")
continue
elif op in ("gt", "gte", "lt", "lte"):
# Check if value is a relative date string
if isinstance(value, str):
start, end = _parse_relative_date(value)
if start and end:
# gte/gt → start boundary, lte/lt → end boundary
boundary = start if op in ("gte", "gt") else end
try:
qs = qs.filter(**{f"{field}__{op}": boundary})
except FieldError:
logger.warning(f"Skipping invalid filter: {field} {op} {boundary} for model: {model_name}")
continue
# If not a known relative date, skip this filter to avoid crash
logger.warning(f"Unknown date value for {field}: {value}")
continue
try:
qs = qs.filter(**{f"{field}__{op}": value})
except FieldError:
logger.warning(f"Skipping invalid filter: {field} {op} {value} for model: {model_name}")
continue
elif op == "!=":
try:
qs = qs.exclude(**{field: value})
except FieldError:
logger.warning(f"Skipping invalid filter: {field} != {value} for model: {model_name}")
continue
return qs
def _parse_relative_date(value):
"""Parse relative date string into (start, end) datetimes."""
if not isinstance(value, str) or len(value) < 2:
return None, None
now = timezone.now()
date_map = {
"last_month": (
(now.replace(day=1) - timedelta(days=1)).replace(day=1),
now.replace(day=1),
),
"this_month": (now.replace(day=1, hour=0, minute=0, second=0, microsecond=0), now),
"last_week": (now - timedelta(days=7), now),
"this_week": (now - timedelta(days=now.weekday()), now),
"last_quarter": (now - timedelta(days=90), now),
"this_year": (now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0), now),
"last_year": (now - timedelta(days=365), now),
"30d": (now - timedelta(days=30), now),
"7d": (now - timedelta(days=7), now),
"90d": (now - timedelta(days=90), now),
"6m": (now - timedelta(days=180), now),
"1y": (now - timedelta(days=365), now),
}
return date_map.get(value, (None, None))
def _get_date_field(model_name):
fields = {
"complaints": "created_at",
"surveys": "completed_at",
"actions": "created_at",
"feedback": "created_at",
"physicians": "month",
"observations": "created_at",
"patients": "created_at",
"users": "date_joined",
}
return fields.get(model_name, "created_at")
def _get_default_fields(model_name):
fields = {
"complaints": ["title", "severity", "status", "department__name_en", "created_at"],
"surveys": ["total_score", "is_negative", "survey_template__survey_type", "completed_at"],
"actions": ["title", "status", "category", "department__name_en", "created_at"],
"feedback": ["feedback_type", "rating", "sentiment", "category", "created_at"],
"observations": ["title", "severity", "status", "category__name_en", "created_at"],
"physicians": ["physician_name", "department__name_en", "rating", "survey_count"],
"patients": ["mrn", "first_name", "last_name", "nationality", "gender", "city", "created_at"],
"users": ["username", "first_name", "last_name", "email", "role", "date_joined"],
}
return fields.get(model_name, ["id", "created_at"])
def _get_field_label(field):
labels = {
"department__name_en": "Department",
"category": "Category",
"severity": "Severity",
"status": "Status",
"source__name_en": "Source",
"complaint_source_type": "Source Type",
"feedback_type": "Type",
"sentiment": "Sentiment",
"survey_type": "Survey Type",
"survey_template__survey_type": "Survey Type",
"physician_name": "Name",
"specialization": "Specialization",
"rating": "Rating",
"survey_count": "Surveys",
"positive_count": "Positive",
"neutral_count": "Neutral",
"negative_count": "Negative",
"month": "Month",
"title": "Title",
"total_score": "Score",
"is_negative": "Negative",
"priority": "Priority",
"is_overdue": "Overdue",
"created_at": "Created",
"completed_at": "Completed",
"resolved_at": "Resolved",
"closed_at": "Closed",
"resolution_time": "Resolution Time",
"source_type": "Source",
"category__name_en": "Category",
"mrn": "MRN",
"first_name": "First Name",
"last_name": "Last Name",
"nationality": "Nationality",
"gender": "Gender",
"city": "City",
"date_of_birth": "Date of Birth",
"email": "Email",
"username": "Username",
"role": "Role",
"date_joined": "Joined",
}
return labels.get(field, field.replace("__name_en", "").replace("__", " ").replace("_", " ").title())
# =============================================================================
# Step 3: LLM generates narrative answer + Arrow.js interactive component
# =============================================================================
def _generate_answer_and_component(question, query_spec, data, client):
"""Generate both a narrative answer AND an interactive Arrow.js component."""
# For metric queries, always generate a simple factual answer from the data
if data.get("type") == "metric":
value = data.get("value", 0)
if value is None:
value = 0
return f"{value}", None
if not client.is_configured():
return _simple_answer(data), None
system_prompt = """You are a frontend developer that generates interactive Arrow.js components for a healthcare analytics dashboard.
ARROW.JS API (available as globals):
- html`...` — Tagged template literal for DOM. ${() => expr} for reactive bindings, @click for events
- reactive(state) — Creates reactive state object. Changes auto-update DOM
- ApexCharts — Available for charts (https://apexcharts.com/docs/chart-types/)
DATA AVAILABLE:
- window.askData — {headers: [...], rows: [...]} or {type: "metric", value: N}
- ApexCharts — Available if typeof ApexCharts !== 'undefined'
REQUIRED: Your component MUST include:
1. View toggle buttons (Chart / Table / Cards) using reactive state
2. Chart view using ApexCharts (bar for grouped, area/line for trends)
3. Table view with collapsible rows
4. Cards view with individual metric cards
5. Smooth loading/error states
6. Click-to-filter on bar charts (clicking a department filters the table)
COMPONENT PATTERN:
```javascript
const data = window.askData || {};
const state = reactive({ view: 'chart', loading: false, filter: null });
function onBarClick(label) {
state.filter = state.filter === label ? null : label;
}
export default html`
<div class="p-3">
<!-- View Toggle -->
<div class="flex gap-2 mb-3">
${['chart', 'table', 'cards'].map(v => html`
<button @click="${() => state.view = v}"
class="px-3 py-1 text-sm rounded transition ${() => state.view === v ? 'bg-purple-600 text-white' : 'bg-gray-100 text-gray-700 hover:bg-gray-200'}">
${v.charAt(0).toUpperCase() + v.slice(1)}
</button>
`)}
</div>
${() => {
if (state.view === 'chart') return renderChart(data, state);
if (state.view === 'table') return renderTable(data, state);
if (state.view === 'cards') return renderCards(data, state);
return html`<p>No data</p>`;
}}
</div>
`;
```
APEXCHARTS PATTERN (inside renderChart):
```javascript
function renderChart(data, state) {
if (typeof ApexCharts === 'undefined') return html`<p class="text-gray-400 text-sm p-4">Chart unavailable</p>`;
const container = html`<div class="apex-chart-container" style="height:280px;"></div>`;
setTimeout(() => {
const el = container.querySelector('.apex-chart-container');
if (el && !el._chart) {
el._chart = new ApexCharts(el, {
series: [{ name: data.headers[1], data: data.rows.map(r => r[data.headers[1]]) }],
chart: { type: 'bar', height: 280, toolbar: { show: false } },
colors: ['#7c3aed', '#3b82f6', '#10b981', '#f59e0b', '#ef4444', '#8b5cf6', '#06b6d4', '#ec4899'],
plotOptions: { bar: { distributed: true, borderRadius: 4, columnWidth: '60%' } },
xaxis: { categories: data.rows.map(r => r[data.headers[0]]) },
tooltip: { y: { formatter: v => v } },
});
el._chart.render();
}
}, 0);
return container;
}
```
RULES:
- Use ONLY: html`...`, reactive(...), ApexCharts
- DO NOT use fetch, window.alert, or console.log
- Export with: export default html`...`
- Keep component under 80 lines
- Use Tailwind-like classes for styling
RESPOND WITH ONLY VALID JSON:
{
"answer": "1-2 sentence insight",
"component": "Arrow.js component code string"
}"""
user_prompt = f"""Question: {question}
Data headers: {data.get('headers', [])}
Data rows ({len(data.get('rows', []))} items): {json.dumps(data.get('rows', [])[:3], indent=2, cls=DecimalEncoder)}
Data type: {data.get('type', 'unknown')}
Generate the Arrow.js component with view toggle (chart/table/cards), ApexCharts, and click-to-filter."""
raw = client.chat(system_prompt, user_prompt, temperature=0.1, max_tokens=1024)
if raw:
try:
result = json.loads(raw)
answer = result.get("answer", _simple_answer(data))
component = result.get("component")
# Validate component has required exports
if component and "export default" in component:
return answer, component
return answer, None
except (json.JSONDecodeError, KeyError):
pass
return _simple_answer(data), None
def _simple_answer(data):
"""Fallback answer when LLM is unavailable."""
if data.get("type") == "metric":
value = data.get("value", 0)
if "numerator" in data and "denominator" in data:
return f"{data['numerator']} out of {data['denominator']} ({value}%)"
return str(value)
elif data.get("type") == "bar" and data.get("rows"):
top = data["rows"][0]
return f"Top: {list(top.values())[0]} with {list(top.values())[1]}"
elif data.get("rows"):
return f"Found {len(data['rows'])} records."
return "Data retrieved."
# =============================================================================
# Chart type detection
# =============================================================================
def _determine_chart_type(spec, data):
"""Determine the best chart type for the data."""
data_type = data.get("type")
operation = spec.get("operation")
if data_type == "metric":
return "metric"
elif data_type == "bar" or operation in ("grouped_count", "top_n", "comparison"):
return "bar"
elif data_type == "line" or operation == "trend":
return "line"
elif data_type == "table" or operation in ("list", "leaderboard"):
return "table"
return "bar"