""" Financial Reports Service for comprehensive financial reporting. This module provides services for generating various financial reports including: - Revenue reports by clinic/therapist - Daily/weekly/monthly summaries - Debtor reports - Commission tracking - Export to Excel/CSV """ import logging from datetime import datetime, timedelta from decimal import Decimal from typing import Dict, List, Optional, Tuple from io import BytesIO from django.db.models import Sum, Count, Q, F, Avg from django.db.models.functions import TruncDate, TruncWeek, TruncMonth from django.utils import timezone from django.utils.translation import gettext_lazy as _ from finance.models import Invoice, Payment, Package, PackagePurchase, Service from core.models import User, Clinic, Patient logger = logging.getLogger(__name__) class FinancialReportsService: """Service for generating financial reports.""" @staticmethod def get_revenue_by_clinic( tenant, start_date: datetime.date, end_date: datetime.date ) -> List[Dict]: """ Generate revenue report by clinic. Args: tenant: Tenant instance start_date: Start date for report end_date: End date for report Returns: List of dictionaries with clinic revenue data """ from finance.models import InvoiceLineItem # Get all clinics for tenant clinics = Clinic.objects.filter(tenant=tenant) report_data = [] for clinic in clinics: # Get invoices for this clinic's services clinic_line_items = InvoiceLineItem.objects.filter( invoice__tenant=tenant, invoice__issue_date__gte=start_date, invoice__issue_date__lte=end_date, service__clinic=clinic ) total_revenue = clinic_line_items.aggregate( total=Sum('total') )['total'] or Decimal('0') total_invoices = clinic_line_items.values('invoice').distinct().count() # Get paid amount paid_invoices = clinic_line_items.filter( invoice__status=Invoice.Status.PAID ) paid_amount = paid_invoices.aggregate( total=Sum('total') )['total'] or Decimal('0') report_data.append({ 'clinic_name': clinic.name, 'clinic_specialty': clinic.get_specialty_display(), 'total_revenue': total_revenue, 'paid_amount': paid_amount, 'outstanding': total_revenue - paid_amount, 'invoice_count': total_invoices, }) # Sort by revenue descending report_data.sort(key=lambda x: x['total_revenue'], reverse=True) return report_data @staticmethod def get_revenue_by_therapist( tenant, start_date: datetime.date, end_date: datetime.date, clinic_id: Optional[str] = None ) -> List[Dict]: """ Generate revenue report by therapist. Args: tenant: Tenant instance start_date: Start date for report end_date: End date for report clinic_id: Optional clinic filter Returns: List of dictionaries with therapist revenue data """ # Get appointments with invoices from appointments.models import Appointment appointments = Appointment.objects.filter( tenant=tenant, appointment_date__gte=start_date, appointment_date__lte=end_date, invoices__isnull=False ).select_related('provider', 'clinic') if clinic_id: appointments = appointments.filter(clinic_id=clinic_id) # Group by therapist therapist_data = {} for appointment in appointments: provider_id = str(appointment.provider.id) if provider_id not in therapist_data: therapist_data[provider_id] = { 'therapist_name': appointment.provider.get_full_name(), 'clinic': appointment.clinic.name if appointment.clinic else 'N/A', 'total_revenue': Decimal('0'), 'paid_amount': Decimal('0'), 'session_count': 0, } # Get invoice totals for this appointment for invoice in appointment.invoices.all(): therapist_data[provider_id]['total_revenue'] += invoice.total therapist_data[provider_id]['paid_amount'] += invoice.amount_paid therapist_data[provider_id]['session_count'] += 1 # Convert to list and calculate outstanding report_data = [] for data in therapist_data.values(): data['outstanding'] = data['total_revenue'] - data['paid_amount'] report_data.append(data) # Sort by revenue descending report_data.sort(key=lambda x: x['total_revenue'], reverse=True) return report_data @staticmethod def get_daily_summary( tenant, date: datetime.date ) -> Dict: """ Generate daily financial summary. Args: tenant: Tenant instance date: Date for summary Returns: Dictionary with daily summary data """ # Get invoices for the day invoices = Invoice.objects.filter( tenant=tenant, issue_date=date ) # Get payments for the day payments = Payment.objects.filter( invoice__tenant=tenant, payment_date__date=date, status=Payment.Status.COMPLETED ) summary = { 'date': date, 'invoices': { 'count': invoices.count(), 'total_amount': invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'), 'by_status': { 'draft': invoices.filter(status=Invoice.Status.DRAFT).count(), 'issued': invoices.filter(status=Invoice.Status.ISSUED).count(), 'paid': invoices.filter(status=Invoice.Status.PAID).count(), 'partially_paid': invoices.filter(status=Invoice.Status.PARTIALLY_PAID).count(), 'cancelled': invoices.filter(status=Invoice.Status.CANCELLED).count(), } }, 'payments': { 'count': payments.count(), 'total_amount': payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'), 'by_method': {} }, 'packages': { 'sold': PackagePurchase.objects.filter( tenant=tenant, purchase_date=date ).count(), 'revenue': PackagePurchase.objects.filter( tenant=tenant, purchase_date=date ).aggregate( total=Sum('invoice__total') )['total'] or Decimal('0') } } # Payment methods breakdown for method_code, method_name in Payment.PaymentMethod.choices: method_payments = payments.filter(method=method_code) amount = method_payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0') summary['payments']['by_method'][method_code] = { 'name': method_name, 'count': method_payments.count(), 'amount': amount } return summary @staticmethod def get_weekly_summary( tenant, start_date: datetime.date ) -> Dict: """ Generate weekly financial summary. Args: tenant: Tenant instance start_date: Start date of week (Monday) Returns: Dictionary with weekly summary data """ end_date = start_date + timedelta(days=6) # Get invoices for the week invoices = Invoice.objects.filter( tenant=tenant, issue_date__gte=start_date, issue_date__lte=end_date ) # Get payments for the week payments = Payment.objects.filter( invoice__tenant=tenant, payment_date__date__gte=start_date, payment_date__date__lte=end_date, status=Payment.Status.COMPLETED ) # Daily breakdown daily_data = [] current_date = start_date while current_date <= end_date: daily_invoices = invoices.filter(issue_date=current_date) daily_payments = payments.filter(payment_date__date=current_date) daily_data.append({ 'date': current_date, 'invoices_count': daily_invoices.count(), 'invoices_amount': daily_invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'), 'payments_count': daily_payments.count(), 'payments_amount': daily_payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'), }) current_date += timedelta(days=1) summary = { 'start_date': start_date, 'end_date': end_date, 'total_invoices': invoices.count(), 'total_invoiced': invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'), 'total_payments': payments.count(), 'total_collected': payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'), 'daily_breakdown': daily_data } return summary @staticmethod def get_monthly_summary( tenant, year: int, month: int ) -> Dict: """ Generate monthly financial summary. Args: tenant: Tenant instance year: Year month: Month (1-12) Returns: Dictionary with monthly summary data """ from calendar import monthrange start_date = datetime(year, month, 1).date() last_day = monthrange(year, month)[1] end_date = datetime(year, month, last_day).date() # Get invoices for the month invoices = Invoice.objects.filter( tenant=tenant, issue_date__gte=start_date, issue_date__lte=end_date ) # Get payments for the month payments = Payment.objects.filter( invoice__tenant=tenant, payment_date__date__gte=start_date, payment_date__date__lte=end_date, status=Payment.Status.COMPLETED ) # Weekly breakdown weekly_data = [] current_date = start_date week_num = 1 while current_date <= end_date: week_end = min(current_date + timedelta(days=6), end_date) week_invoices = invoices.filter( issue_date__gte=current_date, issue_date__lte=week_end ) week_payments = payments.filter( payment_date__date__gte=current_date, payment_date__date__lte=week_end ) weekly_data.append({ 'week': week_num, 'start_date': current_date, 'end_date': week_end, 'invoices_count': week_invoices.count(), 'invoices_amount': week_invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'), 'payments_count': week_payments.count(), 'payments_amount': week_payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'), }) current_date = week_end + timedelta(days=1) week_num += 1 summary = { 'year': year, 'month': month, 'month_name': datetime(year, month, 1).strftime('%B'), 'start_date': start_date, 'end_date': end_date, 'total_invoices': invoices.count(), 'total_invoiced': invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'), 'total_payments': payments.count(), 'total_collected': payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'), 'weekly_breakdown': weekly_data } return summary @staticmethod def get_debtor_report( tenant, as_of_date: Optional[datetime.date] = None ) -> List[Dict]: """ Generate debtor report showing outstanding invoices. Args: tenant: Tenant instance as_of_date: Date to calculate outstanding as of (defaults to today) Returns: List of dictionaries with debtor data """ if as_of_date is None: as_of_date = timezone.now().date() # Get all unpaid/partially paid invoices outstanding_invoices = Invoice.objects.filter( tenant=tenant, status__in=[Invoice.Status.ISSUED, Invoice.Status.PARTIALLY_PAID, Invoice.Status.OVERDUE], issue_date__lte=as_of_date ).select_related('patient') # Group by patient debtor_data = {} for invoice in outstanding_invoices: patient_id = str(invoice.patient.id) if patient_id not in debtor_data: debtor_data[patient_id] = { 'patient_mrn': invoice.patient.mrn, 'patient_name': invoice.patient.full_name_en, 'patient_phone': invoice.patient.phone_number, 'total_outstanding': Decimal('0'), 'invoice_count': 0, 'oldest_invoice_date': invoice.issue_date, 'days_overdue': 0, 'invoices': [] } amount_due = invoice.amount_due days_overdue = (as_of_date - invoice.due_date).days if invoice.due_date < as_of_date else 0 debtor_data[patient_id]['total_outstanding'] += amount_due debtor_data[patient_id]['invoice_count'] += 1 debtor_data[patient_id]['days_overdue'] = max( debtor_data[patient_id]['days_overdue'], days_overdue ) if invoice.issue_date < debtor_data[patient_id]['oldest_invoice_date']: debtor_data[patient_id]['oldest_invoice_date'] = invoice.issue_date debtor_data[patient_id]['invoices'].append({ 'invoice_number': invoice.invoice_number, 'issue_date': invoice.issue_date, 'due_date': invoice.due_date, 'total': invoice.total, 'paid': invoice.amount_paid, 'outstanding': amount_due, 'days_overdue': days_overdue }) # Convert to list report_data = list(debtor_data.values()) # Sort by total outstanding descending report_data.sort(key=lambda x: x['total_outstanding'], reverse=True) return report_data @staticmethod def get_commission_report( tenant, start_date: datetime.date, end_date: datetime.date ) -> List[Dict]: """ Generate commission report for payments. Args: tenant: Tenant instance start_date: Start date for report end_date: End date for report Returns: List of dictionaries with commission data """ # Get all payments in period payments = Payment.objects.filter( invoice__tenant=tenant, payment_date__date__gte=start_date, payment_date__date__lte=end_date, status=Payment.Status.COMPLETED ).select_related('invoice', 'invoice__patient', 'processed_by') report_data = [] for payment in payments: # Check if commission-free (stored in notes or separate field) is_commission_free = 'COMMISSION_FREE' in (payment.notes or '') report_data.append({ 'payment_date': payment.payment_date, 'invoice_number': payment.invoice.invoice_number, 'patient_name': payment.invoice.patient.full_name_en, 'amount': payment.amount, 'method': payment.get_method_display(), 'processed_by': payment.processed_by.get_full_name() if payment.processed_by else 'N/A', 'commission_free': is_commission_free, 'reference': payment.reference or 'N/A' }) return report_data @staticmethod def export_to_excel( report_data: List[Dict], report_title: str, columns: List[Tuple[str, str]] ) -> BytesIO: """ Export report data to Excel format. Args: report_data: List of dictionaries with report data report_title: Title for the report columns: List of tuples (field_name, column_header) Returns: BytesIO object containing Excel file """ try: import openpyxl from openpyxl.styles import Font, Alignment, PatternFill from openpyxl.utils import get_column_letter except ImportError: logger.error("openpyxl not installed. Cannot export to Excel.") raise ImportError("openpyxl is required for Excel export") # Create workbook wb = openpyxl.Workbook() ws = wb.active ws.title = report_title[:31] # Excel sheet name limit # Add title ws.merge_cells('A1:' + get_column_letter(len(columns)) + '1') title_cell = ws['A1'] title_cell.value = report_title title_cell.font = Font(size=14, bold=True) title_cell.alignment = Alignment(horizontal='center') # Add generation date ws.merge_cells('A2:' + get_column_letter(len(columns)) + '2') date_cell = ws['A2'] date_cell.value = f"Generated: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}" date_cell.alignment = Alignment(horizontal='center') # Add headers header_fill = PatternFill(start_color='366092', end_color='366092', fill_type='solid') header_font = Font(color='FFFFFF', bold=True) for col_idx, (field, header) in enumerate(columns, start=1): cell = ws.cell(row=4, column=col_idx) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal='center') # Add data for row_idx, data in enumerate(report_data, start=5): for col_idx, (field, _) in enumerate(columns, start=1): value = data.get(field, '') # Format decimals if isinstance(value, Decimal): value = float(value) # Format dates if isinstance(value, (datetime, datetime.date)): value = value.strftime('%Y-%m-%d') ws.cell(row=row_idx, column=col_idx, value=value) # Auto-adjust column widths for col_idx in range(1, len(columns) + 1): ws.column_dimensions[get_column_letter(col_idx)].width = 15 # Save to BytesIO output = BytesIO() wb.save(output) output.seek(0) return output @staticmethod def export_to_csv( report_data: List[Dict], columns: List[Tuple[str, str]] ) -> str: """ Export report data to CSV format. Args: report_data: List of dictionaries with report data columns: List of tuples (field_name, column_header) Returns: CSV string """ import csv from io import StringIO output = StringIO() writer = csv.writer(output) # Write headers writer.writerow([header for _, header in columns]) # Write data for data in report_data: row = [] for field, _ in columns: value = data.get(field, '') # Format decimals if isinstance(value, Decimal): value = f"{value:.2f}" # Format dates if isinstance(value, (datetime, datetime.date)): value = value.strftime('%Y-%m-%d') row.append(value) writer.writerow(row) return output.getvalue() class DuplicateInvoiceChecker: """Service for detecting duplicate invoices.""" @staticmethod def check_duplicate( tenant, patient_id: str, issue_date: datetime.date, total: Decimal, tolerance: Decimal = Decimal('0.01') ) -> Optional[Invoice]: """ Check if a duplicate invoice exists. Args: tenant: Tenant instance patient_id: Patient ID issue_date: Invoice issue date total: Invoice total amount tolerance: Amount tolerance for matching (default 0.01) Returns: Duplicate invoice if found, None otherwise """ # Look for invoices with same patient, date, and similar amount potential_duplicates = Invoice.objects.filter( tenant=tenant, patient_id=patient_id, issue_date=issue_date, total__gte=total - tolerance, total__lte=total + tolerance, status__in=[Invoice.Status.DRAFT, Invoice.Status.ISSUED, Invoice.Status.PAID] ).order_by('-created_at') if potential_duplicates.exists(): return potential_duplicates.first() return None @staticmethod def find_all_duplicates(tenant) -> List[Dict]: """ Find all potential duplicate invoices in the system. Args: tenant: Tenant instance Returns: List of duplicate invoice groups """ from django.db.models import Count # Group invoices by patient, date, and amount duplicates = Invoice.objects.filter( tenant=tenant, status__in=[Invoice.Status.DRAFT, Invoice.Status.ISSUED, Invoice.Status.PAID] ).values( 'patient_id', 'issue_date', 'total' ).annotate( count=Count('id') ).filter(count__gt=1) duplicate_groups = [] for dup in duplicates: invoices = Invoice.objects.filter( tenant=tenant, patient_id=dup['patient_id'], issue_date=dup['issue_date'], total=dup['total'] ).select_related('patient') duplicate_groups.append({ 'patient_name': invoices.first().patient.full_name_en, 'patient_mrn': invoices.first().patient.mrn, 'issue_date': dup['issue_date'], 'total': dup['total'], 'count': dup['count'], 'invoices': [ { 'id': inv.id, 'invoice_number': inv.invoice_number, 'status': inv.get_status_display(), 'created_at': inv.created_at } for inv in invoices ] }) return duplicate_groups