733 lines
32 KiB
Python
733 lines
32 KiB
Python
"""
|
|
Report generation services for PX360 - Simplified Version
|
|
|
|
Handles data fetching, filtering, aggregation, and export
|
|
for custom reports across all data sources. No chart functionality.
|
|
"""
|
|
|
|
import csv
|
|
import io
|
|
from datetime import datetime, timedelta
|
|
from django.db.models import (
|
|
Count,
|
|
Sum,
|
|
Avg,
|
|
Min,
|
|
Max,
|
|
F,
|
|
Q,
|
|
Value,
|
|
FloatField,
|
|
IntegerField,
|
|
CharField,
|
|
ExpressionWrapper,
|
|
DurationField,
|
|
)
|
|
from django.db.models.functions import TruncDate, TruncMonth, TruncWeek, TruncYear, Extract
|
|
from django.http import HttpResponse
|
|
from django.utils import timezone
|
|
from django.conf import settings
|
|
|
|
|
|
class ReportBuilderService:
|
|
"""
|
|
Service for building custom reports from various data sources.
|
|
|
|
Provides:
|
|
- Data fetching with dynamic filters
|
|
- Column selection
|
|
- Grouping and aggregation
|
|
- Summary statistics
|
|
"""
|
|
|
|
# Available fields for each data source
|
|
SOURCE_FIELDS = {
|
|
"complaints": {
|
|
"id": {"label": "ID", "field": "id", "type": "string"},
|
|
"reference_number": {"label": "Reference Number", "field": "reference_number", "type": "string"},
|
|
"title": {"label": "Title", "field": "title", "type": "string"},
|
|
"description": {"label": "Description", "field": "description", "type": "text"},
|
|
"status": {"label": "Status", "field": "status", "type": "choice"},
|
|
"severity": {"label": "Severity", "field": "severity", "type": "choice"},
|
|
"priority": {"label": "Priority", "field": "priority", "type": "choice"},
|
|
"source": {"label": "Source", "field": "complaint_source_type", "type": "choice"},
|
|
"hospital": {"label": "Hospital", "field": "hospital__name", "type": "string"},
|
|
"department": {"label": "Department", "field": "department__name", "type": "string"},
|
|
"section": {"label": "Section", "field": "section__name", "type": "string"},
|
|
"patient_name": {"label": "Patient Name", "field": "patient__first_name", "type": "string"},
|
|
"patient_mobile": {"label": "Patient Mobile", "field": "patient__mobile_number", "type": "string"},
|
|
"created_at": {"label": "Created Date", "field": "created_at", "type": "datetime"},
|
|
"updated_at": {"label": "Updated Date", "field": "updated_at", "type": "datetime"},
|
|
"due_at": {"label": "Due Date", "field": "due_at", "type": "datetime"},
|
|
"resolved_at": {"label": "Resolved Date", "field": "resolved_at", "type": "datetime"},
|
|
"is_overdue": {"label": "Is Overdue", "field": "is_overdue", "type": "boolean"},
|
|
"resolution_time_hours": {
|
|
"label": "Resolution Time (Hours)",
|
|
"field": "resolution_time_hours",
|
|
"type": "number",
|
|
},
|
|
"journey_type": {"label": "Journey Type", "field": "journey__journey_type", "type": "string"},
|
|
},
|
|
"inquiries": {
|
|
"id": {"label": "ID", "field": "id", "type": "string"},
|
|
"reference_number": {"label": "Reference Number", "field": "reference_number", "type": "string"},
|
|
"title": {"label": "Title", "field": "title", "type": "string"},
|
|
"description": {"label": "Description", "field": "description", "type": "text"},
|
|
"status": {"label": "Status", "field": "status", "type": "choice"},
|
|
"category": {"label": "Category", "field": "category__name_en", "type": "string"},
|
|
"hospital": {"label": "Hospital", "field": "hospital__name", "type": "string"},
|
|
"department": {"label": "Department", "field": "department__name", "type": "string"},
|
|
"created_at": {"label": "Created Date", "field": "created_at", "type": "datetime"},
|
|
"updated_at": {"label": "Updated Date", "field": "updated_at", "type": "datetime"},
|
|
},
|
|
"observations": {
|
|
"id": {"label": "ID", "field": "id", "type": "string"},
|
|
"tracking_code": {"label": "Tracking Code", "field": "tracking_code", "type": "string"},
|
|
"title": {"label": "Title", "field": "title", "type": "string"},
|
|
"description": {"label": "Description", "field": "description", "type": "text"},
|
|
"status": {"label": "Status", "field": "status", "type": "choice"},
|
|
"severity": {"label": "Severity", "field": "severity", "type": "choice"},
|
|
"category": {"label": "Category", "field": "category__name_en", "type": "string"},
|
|
"hospital": {"label": "Hospital", "field": "hospital__name", "type": "string"},
|
|
"department": {"label": "Department", "field": "assigned_department__name", "type": "string"},
|
|
"location": {"label": "Location", "field": "location_text", "type": "string"},
|
|
"created_at": {"label": "Created Date", "field": "created_at", "type": "datetime"},
|
|
"incident_datetime": {"label": "Incident Date", "field": "incident_datetime", "type": "datetime"},
|
|
},
|
|
"px_actions": {
|
|
"id": {"label": "ID", "field": "id", "type": "string"},
|
|
"title": {"label": "Title", "field": "title", "type": "string"},
|
|
"description": {"label": "Description", "field": "description", "type": "text"},
|
|
"status": {"label": "Status", "field": "status", "type": "choice"},
|
|
"priority": {"label": "Priority", "field": "priority", "type": "choice"},
|
|
"action_type": {"label": "Action Type", "field": "action_type", "type": "choice"},
|
|
"hospital": {"label": "Hospital", "field": "hospital__name", "type": "string"},
|
|
"department": {"label": "Department", "field": "department__name", "type": "string"},
|
|
"created_at": {"label": "Created Date", "field": "created_at", "type": "datetime"},
|
|
"due_at": {"label": "Due Date", "field": "due_at", "type": "datetime"},
|
|
"completed_at": {"label": "Completed Date", "field": "completed_at", "type": "datetime"},
|
|
"is_overdue": {"label": "Is Overdue", "field": "is_overdue", "type": "boolean"},
|
|
},
|
|
"surveys": {
|
|
"id": {"label": "ID", "field": "id", "type": "string"},
|
|
"survey_template": {"label": "Survey Template", "field": "survey_template__name", "type": "string"},
|
|
"status": {"label": "Status", "field": "status", "type": "choice"},
|
|
"total_score": {"label": "Total Score", "field": "total_score", "type": "number"},
|
|
"is_negative": {"label": "Is Negative", "field": "is_negative", "type": "boolean"},
|
|
"patient_type": {"label": "Patient Type", "field": "journey__patient_type", "type": "string"},
|
|
"journey_type": {"label": "Journey Type", "field": "journey__journey_type", "type": "string"},
|
|
"hospital": {"label": "Hospital", "field": "survey_template__hospital__name", "type": "string"},
|
|
"department": {"label": "Department", "field": "journey__department__name", "type": "string"},
|
|
"created_at": {"label": "Created Date", "field": "created_at", "type": "datetime"},
|
|
"completed_at": {"label": "Completed Date", "field": "completed_at", "type": "datetime"},
|
|
},
|
|
"physicians": {
|
|
"id": {"label": "ID", "field": "id", "type": "string"},
|
|
"physician_name": {"label": "Physician Name", "field": "physician__full_name", "type": "string"},
|
|
"department": {"label": "Department", "field": "department__name", "type": "string"},
|
|
"month": {"label": "Month", "field": "month", "type": "string"},
|
|
"year": {"label": "Year", "field": "year", "type": "number"},
|
|
"total_surveys": {"label": "Total Surveys", "field": "total_surveys", "type": "number"},
|
|
"avg_rating": {"label": "Average Rating", "field": "avg_rating", "type": "number"},
|
|
"positive_count": {"label": "Positive", "field": "positive_count", "type": "number"},
|
|
"neutral_count": {"label": "Neutral", "field": "neutral_count", "type": "number"},
|
|
"negative_count": {"label": "Negative", "field": "negative_count", "type": "number"},
|
|
},
|
|
}
|
|
|
|
# Filter options for each data source
|
|
SOURCE_FILTERS = {
|
|
"complaints": [
|
|
{"name": "date_range", "label": "Date Range", "type": "daterange"},
|
|
{"name": "status", "label": "Status", "type": "multiselect"},
|
|
{"name": "severity", "label": "Severity", "type": "multiselect"},
|
|
{"name": "priority", "label": "Priority", "type": "multiselect"},
|
|
{"name": "hospital", "label": "Hospital", "type": "select"},
|
|
{"name": "department", "label": "Department", "type": "select"},
|
|
{"name": "section", "label": "Section", "type": "select"},
|
|
{"name": "source", "label": "Source", "type": "multiselect"},
|
|
{"name": "is_overdue", "label": "Is Overdue", "type": "boolean"},
|
|
{"name": "journey_type", "label": "Journey Type", "type": "select"},
|
|
],
|
|
"inquiries": [
|
|
{"name": "date_range", "label": "Date Range", "type": "daterange"},
|
|
{"name": "status", "label": "Status", "type": "multiselect"},
|
|
{"name": "hospital", "label": "Hospital", "type": "select"},
|
|
{"name": "department", "label": "Department", "type": "select"},
|
|
{"name": "category", "label": "Category", "type": "select"},
|
|
],
|
|
"observations": [
|
|
{"name": "date_range", "label": "Date Range", "type": "daterange"},
|
|
{"name": "status", "label": "Status", "type": "multiselect"},
|
|
{"name": "severity", "label": "Severity", "type": "multiselect"},
|
|
{"name": "hospital", "label": "Hospital", "type": "select"},
|
|
{"name": "department", "label": "Department", "type": "select"},
|
|
{"name": "category", "label": "Category", "type": "select"},
|
|
],
|
|
"px_actions": [
|
|
{"name": "date_range", "label": "Date Range", "type": "daterange"},
|
|
{"name": "status", "label": "Status", "type": "multiselect"},
|
|
{"name": "priority", "label": "Priority", "type": "multiselect"},
|
|
{"name": "action_type", "label": "Action Type", "type": "multiselect"},
|
|
{"name": "hospital", "label": "Hospital", "type": "select"},
|
|
{"name": "department", "label": "Department", "type": "select"},
|
|
{"name": "is_overdue", "label": "Is Overdue", "type": "boolean"},
|
|
],
|
|
"surveys": [
|
|
{"name": "date_range", "label": "Date Range", "type": "daterange"},
|
|
{"name": "status", "label": "Status", "type": "multiselect"},
|
|
{"name": "is_negative", "label": "Is Negative", "type": "boolean"},
|
|
{"name": "hospital", "label": "Hospital", "type": "select"},
|
|
{"name": "department", "label": "Department", "type": "select"},
|
|
{"name": "patient_type", "label": "Patient Type", "type": "select"},
|
|
{"name": "journey_type", "label": "Journey Type", "type": "select"},
|
|
],
|
|
"physicians": [
|
|
{"name": "month_range", "label": "Month Range", "type": "monthrange"},
|
|
{"name": "hospital", "label": "Hospital", "type": "select"},
|
|
{"name": "department", "label": "Department", "type": "select"},
|
|
],
|
|
}
|
|
|
|
@classmethod
|
|
def get_queryset(cls, data_source, user=None):
|
|
"""Get the base queryset for a data source."""
|
|
from apps.complaints.models import Complaint
|
|
from apps.observations.models import Observation
|
|
from apps.px_action_center.models import PXAction
|
|
from apps.surveys.models import SurveyInstance
|
|
from apps.physicians.models import PhysicianMonthlyRating
|
|
|
|
querysets = {
|
|
"complaints": Complaint.objects.all(),
|
|
"inquiries": Complaint.objects.filter(complaint_type="inquiry"),
|
|
"observations": Observation.objects.all(),
|
|
"px_actions": PXAction.objects.all(),
|
|
"surveys": SurveyInstance.objects.all(),
|
|
"physicians": PhysicianMonthlyRating.objects.all(),
|
|
}
|
|
|
|
queryset = querysets.get(data_source)
|
|
if queryset is None:
|
|
return None
|
|
|
|
if user and user.is_authenticated:
|
|
if not user.is_px_admin():
|
|
hospital = getattr(user, "_tenant_hospital_cache", None) or getattr(user, "hospital", None)
|
|
if hospital:
|
|
if data_source == "observations":
|
|
queryset = queryset.filter(assigned_department__hospital=hospital)
|
|
elif data_source == "surveys":
|
|
queryset = queryset.filter(journey__hospital=hospital)
|
|
else:
|
|
queryset = queryset.filter(hospital=hospital)
|
|
|
|
return queryset
|
|
|
|
@classmethod
|
|
def apply_filters(cls, queryset, filters, data_source):
|
|
"""Apply filters to a queryset."""
|
|
# Date range filter
|
|
if "date_range" in filters:
|
|
date_range = filters["date_range"]
|
|
date_field = "created_at"
|
|
|
|
if date_range == "7d":
|
|
start_date = timezone.now() - timedelta(days=7)
|
|
elif date_range == "30d":
|
|
start_date = timezone.now() - timedelta(days=30)
|
|
elif date_range == "90d":
|
|
start_date = timezone.now() - timedelta(days=90)
|
|
elif date_range == "ytd":
|
|
start_date = timezone.now().replace(month=1, day=1)
|
|
elif date_range == "custom" and "start_date" in filters and "end_date" in filters:
|
|
start_date = filters["start_date"]
|
|
end_date = filters["end_date"]
|
|
queryset = queryset.filter(**{f"{date_field}__gte": start_date, f"{date_field}__lte": end_date})
|
|
return queryset
|
|
else:
|
|
start_date = timezone.now() - timedelta(days=30)
|
|
|
|
queryset = queryset.filter(**{f"{date_field}__gte": start_date})
|
|
|
|
# Hospital filter
|
|
if "hospital" in filters and filters["hospital"]:
|
|
queryset = queryset.filter(hospital_id=filters["hospital"])
|
|
|
|
# Department filter
|
|
if "department" in filters and filters["department"]:
|
|
if data_source == "observations":
|
|
queryset = queryset.filter(assigned_department_id=filters["department"])
|
|
elif data_source == "surveys":
|
|
queryset = queryset.filter(journey__department_id=filters["department"])
|
|
else:
|
|
queryset = queryset.filter(department_id=filters["department"])
|
|
|
|
# Section filter
|
|
if "section" in filters and filters["section"]:
|
|
queryset = queryset.filter(section_id=filters["section"])
|
|
|
|
# Status filter
|
|
if "status" in filters and filters["status"]:
|
|
if isinstance(filters["status"], list):
|
|
queryset = queryset.filter(status__in=filters["status"])
|
|
else:
|
|
queryset = queryset.filter(status=filters["status"])
|
|
|
|
# Severity filter
|
|
if "severity" in filters and filters["severity"]:
|
|
if isinstance(filters["severity"], list):
|
|
queryset = queryset.filter(severity__in=filters["severity"])
|
|
else:
|
|
queryset = queryset.filter(severity=filters["severity"])
|
|
|
|
# Priority filter
|
|
if "priority" in filters and filters["priority"]:
|
|
if isinstance(filters["priority"], list):
|
|
queryset = queryset.filter(priority__in=filters["priority"])
|
|
else:
|
|
queryset = queryset.filter(priority=filters["priority"])
|
|
|
|
# Source filter (for complaints)
|
|
if "source" in filters and filters["source"]:
|
|
if isinstance(filters["source"], list):
|
|
queryset = queryset.filter(complaint_source_type__in=filters["source"])
|
|
else:
|
|
queryset = queryset.filter(complaint_source_type=filters["source"])
|
|
|
|
# Is overdue filter
|
|
if "is_overdue" in filters:
|
|
if filters["is_overdue"] == "true" or filters["is_overdue"] is True:
|
|
queryset = queryset.filter(is_overdue=True)
|
|
elif filters["is_overdue"] == "false" or filters["is_overdue"] is False:
|
|
queryset = queryset.filter(is_overdue=False)
|
|
|
|
# Is negative filter (for surveys)
|
|
if "is_negative" in filters:
|
|
if filters["is_negative"] == "true" or filters["is_negative"] is True:
|
|
queryset = queryset.filter(is_negative=True)
|
|
elif filters["is_negative"] == "false" or filters["is_negative"] is False:
|
|
queryset = queryset.filter(is_negative=False)
|
|
|
|
# Journey type filter
|
|
if "journey_type" in filters and filters["journey_type"]:
|
|
if data_source == "complaints":
|
|
queryset = queryset.filter(journey__journey_type=filters["journey_type"])
|
|
elif data_source == "surveys":
|
|
queryset = queryset.filter(journey__journey_type=filters["journey_type"])
|
|
|
|
# Patient type filter
|
|
if "patient_type" in filters and filters["patient_type"]:
|
|
queryset = queryset.filter(journey__patient_type=filters["patient_type"])
|
|
|
|
return queryset
|
|
|
|
@classmethod
|
|
def apply_grouping(cls, queryset, grouping_config, data_source):
|
|
"""Apply grouping and aggregation to a queryset."""
|
|
if not grouping_config or "field" not in grouping_config:
|
|
return queryset
|
|
|
|
field = grouping_config["field"]
|
|
aggregation = grouping_config.get("aggregation", "count")
|
|
|
|
# Determine truncation for date fields
|
|
if "created_at" in field or "date" in field.lower():
|
|
trunc_by = grouping_config.get("trunc_by", "day")
|
|
if trunc_by == "day":
|
|
queryset = queryset.annotate(period=TruncDate(field))
|
|
elif trunc_by == "week":
|
|
queryset = queryset.annotate(period=TruncWeek(field))
|
|
elif trunc_by == "month":
|
|
queryset = queryset.annotate(period=TruncMonth(field))
|
|
elif trunc_by == "year":
|
|
queryset = queryset.annotate(period=TruncYear(field))
|
|
field = "period"
|
|
|
|
# Apply aggregation
|
|
if aggregation == "count":
|
|
return queryset.values(field).annotate(count=Count("id")).order_by(field)
|
|
elif aggregation == "sum":
|
|
sum_field = grouping_config.get("sum_field", "id")
|
|
return queryset.values(field).annotate(total=Sum(sum_field)).order_by(field)
|
|
elif aggregation == "avg":
|
|
avg_field = grouping_config.get("avg_field", "total_score")
|
|
return queryset.values(field).annotate(average=Avg(avg_field)).order_by(field)
|
|
|
|
return queryset
|
|
|
|
@classmethod
|
|
def get_field_value(cls, obj, field_path):
|
|
"""Get a value from an object using dot notation."""
|
|
parts = field_path.split("__")
|
|
value = obj
|
|
for part in parts:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, dict):
|
|
value = value.get(part)
|
|
else:
|
|
value = getattr(value, part, None)
|
|
return value
|
|
|
|
@classmethod
|
|
def format_value(cls, value, field_type):
|
|
"""Format a value for display."""
|
|
if value is None:
|
|
return ""
|
|
|
|
if field_type == "datetime":
|
|
if isinstance(value, str):
|
|
return value
|
|
return value.strftime("%Y-%m-%d %H:%M")
|
|
elif field_type == "date":
|
|
if isinstance(value, str):
|
|
return value
|
|
return value.strftime("%Y-%m-%d")
|
|
elif field_type == "boolean":
|
|
return "Yes" if value else "No"
|
|
elif field_type == "number":
|
|
if isinstance(value, (int, float)):
|
|
return round(value, 2) if isinstance(value, float) else value
|
|
return value
|
|
|
|
return str(value) if value else ""
|
|
|
|
@classmethod
|
|
def generate_report_data(
|
|
cls, data_source, filter_config, column_config, grouping_config, sort_config=None, user=None
|
|
):
|
|
"""Generate report data with filters, columns, and grouping."""
|
|
queryset = cls.get_queryset(data_source, user=user)
|
|
queryset = cls.apply_filters(queryset, filter_config, data_source)
|
|
|
|
# Determine columns to select
|
|
if not column_config:
|
|
column_config = list(cls.SOURCE_FIELDS.get(data_source, {}).keys())[:10]
|
|
|
|
fields_info = cls.SOURCE_FIELDS.get(data_source, {})
|
|
|
|
if grouping_config and "field" in grouping_config:
|
|
# Grouped data
|
|
grouped_data = cls.apply_grouping(queryset, grouping_config, data_source)
|
|
|
|
rows = []
|
|
for item in grouped_data:
|
|
row = {}
|
|
for key, value in item.items():
|
|
row[key] = cls.format_value(value, "number" if key == "count" else "string")
|
|
rows.append(row)
|
|
|
|
return {
|
|
"rows": rows,
|
|
"columns": list(grouped_data[0].keys()) if grouped_data else ["field", "count"],
|
|
"grouped": True,
|
|
}
|
|
else:
|
|
# Regular data
|
|
select_fields = []
|
|
for col in column_config:
|
|
if col in fields_info:
|
|
select_fields.append(fields_info[col]["field"])
|
|
|
|
# Apply sorting
|
|
if sort_config:
|
|
for sort_item in sort_config:
|
|
field = sort_item.get("field")
|
|
direction = sort_item.get("direction", "asc")
|
|
if field in fields_info:
|
|
order_field = fields_info[field]["field"]
|
|
if direction == "desc":
|
|
order_field = f"-{order_field}"
|
|
queryset = queryset.order_by(order_field)
|
|
|
|
# Limit results for performance
|
|
queryset = queryset[:1000]
|
|
|
|
rows = []
|
|
for obj in queryset:
|
|
row = {}
|
|
for col in column_config:
|
|
if col in fields_info:
|
|
field_info = fields_info[col]
|
|
value = cls.get_field_value(obj, field_info["field"])
|
|
row[col] = cls.format_value(value, field_info["type"])
|
|
rows.append(row)
|
|
|
|
# Return both keys (for data access) and labels (for display)
|
|
column_labels = [fields_info.get(col, {"label": col})["label"] for col in column_config]
|
|
|
|
return {
|
|
"rows": rows,
|
|
"columns": column_labels,
|
|
"column_keys": column_config, # Add field keys for data access
|
|
"grouped": False,
|
|
}
|
|
|
|
@classmethod
|
|
def generate_summary(cls, data_source, filter_config, user=None):
|
|
"""Generate summary statistics for a data source."""
|
|
queryset = cls.get_queryset(data_source, user=user)
|
|
queryset = cls.apply_filters(queryset, filter_config, data_source)
|
|
|
|
summary = {
|
|
"total_count": queryset.count(),
|
|
}
|
|
|
|
if data_source == "complaints":
|
|
summary["open_count"] = queryset.filter(status="open").count()
|
|
summary["resolved_count"] = queryset.filter(status="resolved").count()
|
|
summary["overdue_count"] = queryset.filter(is_overdue=True).count()
|
|
# Calculate average resolution time in hours (SQLite-compatible)
|
|
resolved_complaints = queryset.filter(resolved_at__isnull=False)
|
|
if resolved_complaints.exists():
|
|
# Calculate in Python to avoid SQLite DurationField limitation
|
|
total_hours = 0
|
|
count = 0
|
|
for complaint in resolved_complaints.values("activated_at", "resolved_at"):
|
|
if complaint["activated_at"] and complaint["resolved_at"]:
|
|
delta = complaint["resolved_at"] - complaint["activated_at"]
|
|
total_hours += delta.total_seconds() / 3600.0
|
|
count += 1
|
|
summary["avg_resolution_time"] = round(total_hours / count, 2) if count > 0 else 0
|
|
else:
|
|
summary["avg_resolution_time"] = 0
|
|
|
|
elif data_source == "surveys":
|
|
summary["completed_count"] = queryset.filter(status="completed").count()
|
|
summary["pending_count"] = queryset.filter(status="pending").count()
|
|
summary["negative_count"] = queryset.filter(is_negative=True).count()
|
|
summary["avg_score"] = queryset.filter(status="completed").aggregate(avg=Avg("total_score"))["avg"] or 0
|
|
|
|
elif data_source == "px_actions":
|
|
summary["open_count"] = queryset.filter(status="open").count()
|
|
summary["completed_count"] = queryset.filter(status="completed").count()
|
|
summary["overdue_count"] = queryset.filter(is_overdue=True).count()
|
|
|
|
elif data_source == "observations":
|
|
summary["new_count"] = queryset.filter(status="new").count()
|
|
summary["resolved_count"] = queryset.filter(status="resolved").count()
|
|
|
|
return summary
|
|
|
|
|
|
class ReportExportService:
|
|
"""Service for exporting reports to various formats."""
|
|
|
|
@classmethod
|
|
def export_to_csv(cls, data, columns, column_keys=None, filename="report"):
|
|
"""Export report data to CSV.
|
|
|
|
Args:
|
|
data: List of row dicts
|
|
columns: List of column labels (for header row)
|
|
column_keys: List of column keys (for data access). If None, uses columns.
|
|
filename: Output filename without extension
|
|
"""
|
|
response = HttpResponse(content_type="text/csv")
|
|
response["Content-Disposition"] = f'attachment; filename="{filename}.csv"'
|
|
|
|
writer = csv.writer(response)
|
|
writer.writerow(columns) # Write header row with labels
|
|
|
|
# Use column_keys for data access if provided, otherwise use columns
|
|
keys = column_keys if column_keys else columns
|
|
|
|
for row in data:
|
|
writer.writerow([row.get(key, "") for key in keys])
|
|
|
|
return response
|
|
|
|
@classmethod
|
|
def export_to_excel(cls, data, columns, column_keys=None, filename="report"):
|
|
"""Export report data to Excel (XLSX).
|
|
|
|
Args:
|
|
data: List of row dicts
|
|
columns: List of column labels (for header row)
|
|
column_keys: List of column keys (for data access). If None, uses columns.
|
|
filename: Output filename without extension
|
|
"""
|
|
try:
|
|
import openpyxl
|
|
from openpyxl.styles import Font, PatternFill, Alignment
|
|
from openpyxl.utils import get_column_letter
|
|
except ImportError:
|
|
# Fall back to CSV if openpyxl not available
|
|
return cls.export_to_csv(data, columns, column_keys, filename)
|
|
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "Report"
|
|
|
|
# Header row
|
|
header_fill = PatternFill(start_color="1F4E79", end_color="1F4E79", fill_type="solid")
|
|
header_font = Font(bold=True, color="FFFFFF")
|
|
|
|
for col_idx, col_name in enumerate(columns, 1):
|
|
cell = ws.cell(row=1, column=col_idx, value=col_name)
|
|
cell.fill = header_fill
|
|
cell.font = header_font
|
|
cell.alignment = Alignment(horizontal="center")
|
|
|
|
# Use column_keys for data access if provided, otherwise use columns
|
|
keys = column_keys if column_keys else columns
|
|
|
|
# Data rows
|
|
for row_idx, row_data in enumerate(data, 2):
|
|
for col_idx, key in enumerate(keys, 1):
|
|
value = row_data.get(key, "")
|
|
ws.cell(row=row_idx, column=col_idx, value=str(value) if value else "")
|
|
|
|
# Auto-adjust column widths
|
|
for col_idx, col_name in enumerate(columns, 1):
|
|
max_length = len(str(col_name))
|
|
for row in ws.iter_rows(min_row=2, max_row=ws.max_row, min_col=col_idx, max_col=col_idx):
|
|
for cell in row:
|
|
if cell.value:
|
|
max_length = max(max_length, len(str(cell.value)))
|
|
ws.column_dimensions[get_column_letter(col_idx)].width = min(max_length + 2, 50)
|
|
|
|
# Create response
|
|
response = HttpResponse(content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
|
|
response["Content-Disposition"] = f'attachment; filename="{filename}.xlsx"'
|
|
|
|
wb.save(response)
|
|
return response
|
|
|
|
@classmethod
|
|
def export_to_pdf(cls, data, columns, column_keys=None, title="Report", filename="report"):
|
|
"""Export report data to PDF.
|
|
|
|
Args:
|
|
data: List of row dicts
|
|
columns: List of column labels (for header row)
|
|
column_keys: List of column keys (for data access). If None, uses columns.
|
|
title: Report title
|
|
filename: Output filename without extension
|
|
"""
|
|
from django.template.loader import render_to_string
|
|
|
|
# Use column_keys for data access if provided, otherwise use columns
|
|
keys = column_keys if column_keys else columns
|
|
|
|
# Prepare data with proper column access
|
|
formatted_data = []
|
|
for row in data:
|
|
formatted_row = {col: row.get(key, "") for col, key in zip(columns, keys)}
|
|
formatted_data.append(formatted_row)
|
|
|
|
try:
|
|
from weasyprint import HTML
|
|
|
|
html_content = render_to_string(
|
|
"reports/report_pdf.html",
|
|
{
|
|
"title": title,
|
|
"columns": columns,
|
|
"data": formatted_data,
|
|
"generated_at": timezone.now(),
|
|
},
|
|
)
|
|
|
|
response = HttpResponse(content_type="application/pdf")
|
|
response["Content-Disposition"] = f'attachment; filename="{filename}.pdf"'
|
|
|
|
HTML(string=html_content).write_pdf(response)
|
|
return response
|
|
|
|
except ImportError:
|
|
# Fall back to CSV if weasyprint not available
|
|
return cls.export_to_csv(data, columns, column_keys, filename)
|
|
|
|
@classmethod
|
|
def generate_chart_data(cls, data, chart_config):
|
|
"""
|
|
Generate chart data structure for visualization.
|
|
|
|
Args:
|
|
data: List of dictionaries with report data
|
|
chart_config: Dict with chart configuration
|
|
- type: ChartType (bar, line, pie, donut, area)
|
|
- x_axis: Field name for x-axis categories
|
|
- y_axis: Field name for y-axis values
|
|
- title: Chart title
|
|
|
|
Returns:
|
|
Dict with chart data ready for rendering
|
|
"""
|
|
if not data or not chart_config:
|
|
return None
|
|
|
|
chart_type = chart_config.get("type", "bar")
|
|
x_field = chart_config.get("x_axis")
|
|
y_field = chart_config.get("y_axis")
|
|
title = chart_config.get("title", "Chart")
|
|
|
|
if not x_field or not y_field:
|
|
return None
|
|
|
|
# Aggregate data by x_field
|
|
aggregated = {}
|
|
for row in data:
|
|
x_val = row.get(x_field, "Unknown")
|
|
y_val = row.get(y_field, 0)
|
|
if x_val not in aggregated:
|
|
aggregated[x_val] = 0
|
|
try:
|
|
aggregated[x_val] += float(y_val) if y_val else 0
|
|
except (ValueError, TypeError):
|
|
aggregated[x_val] += 1 # Count if not numeric
|
|
|
|
# Sort by value descending for pie/donut, by key for others
|
|
if chart_type in ["pie", "donut"]:
|
|
sorted_items = sorted(aggregated.items(), key=lambda x: x[1], reverse=True)
|
|
else:
|
|
sorted_items = sorted(aggregated.items())
|
|
|
|
labels = [str(item[0]) for item in sorted_items]
|
|
values = [item[1] for item in sorted_items]
|
|
|
|
# Generate colors
|
|
colors = cls._generate_chart_colors(len(labels))
|
|
|
|
return {
|
|
"type": chart_type,
|
|
"title": title,
|
|
"labels": labels,
|
|
"datasets": [
|
|
{"label": y_field, "data": values, "backgroundColor": colors, "borderColor": colors, "borderWidth": 1}
|
|
],
|
|
}
|
|
|
|
@classmethod
|
|
def _generate_chart_colors(cls, count):
|
|
"""Generate a list of colors for charts."""
|
|
base_colors = [
|
|
"#005696",
|
|
"#007bbd",
|
|
"#00a8e8",
|
|
"#00d4ff",
|
|
"#10b981",
|
|
"#34d399",
|
|
"#059669",
|
|
"#047857",
|
|
"#f59e0b",
|
|
"#fbbf24",
|
|
"#d97706",
|
|
"#b45309",
|
|
"#ef4444",
|
|
"#f87171",
|
|
"#dc2626",
|
|
"#b91c1c",
|
|
"#8b5cf6",
|
|
"#a78bfa",
|
|
"#7c3aed",
|
|
"#6d28d9",
|
|
]
|
|
# Repeat colors if more needed
|
|
colors = []
|
|
for i in range(count):
|
|
colors.append(base_colors[i % len(base_colors)])
|
|
return colors
|