agdar/finance/reports_service.py
Marwan Alwali 2f1681b18c update
2025-11-11 13:44:48 +03:00

706 lines
24 KiB
Python

"""
Financial Reports Service for comprehensive financial reporting.
This module provides services for generating various financial reports including:
- Revenue reports by clinic/therapist
- Daily/weekly/monthly summaries
- Debtor reports
- Commission tracking
- Export to Excel/CSV
"""
import logging
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Dict, List, Optional, Tuple
from io import BytesIO
from django.db.models import Sum, Count, Q, F, Avg
from django.db.models.functions import TruncDate, TruncWeek, TruncMonth
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from finance.models import Invoice, Payment, Package, PackagePurchase, Service
from core.models import User, Clinic, Patient
logger = logging.getLogger(__name__)
class FinancialReportsService:
"""Service for generating financial reports."""
@staticmethod
def get_revenue_by_clinic(
tenant,
start_date: datetime.date,
end_date: datetime.date
) -> List[Dict]:
"""
Generate revenue report by clinic.
Args:
tenant: Tenant instance
start_date: Start date for report
end_date: End date for report
Returns:
List of dictionaries with clinic revenue data
"""
from finance.models import InvoiceLineItem
# Get all clinics for tenant
clinics = Clinic.objects.filter(tenant=tenant)
report_data = []
for clinic in clinics:
# Get invoices for this clinic's services
clinic_line_items = InvoiceLineItem.objects.filter(
invoice__tenant=tenant,
invoice__issue_date__gte=start_date,
invoice__issue_date__lte=end_date,
service__clinic=clinic
)
total_revenue = clinic_line_items.aggregate(
total=Sum('total')
)['total'] or Decimal('0')
total_invoices = clinic_line_items.values('invoice').distinct().count()
# Get paid amount
paid_invoices = clinic_line_items.filter(
invoice__status=Invoice.Status.PAID
)
paid_amount = paid_invoices.aggregate(
total=Sum('total')
)['total'] or Decimal('0')
report_data.append({
'clinic_name': clinic.name,
'clinic_specialty': clinic.get_specialty_display(),
'total_revenue': total_revenue,
'paid_amount': paid_amount,
'outstanding': total_revenue - paid_amount,
'invoice_count': total_invoices,
})
# Sort by revenue descending
report_data.sort(key=lambda x: x['total_revenue'], reverse=True)
return report_data
@staticmethod
def get_revenue_by_therapist(
tenant,
start_date: datetime.date,
end_date: datetime.date,
clinic_id: Optional[str] = None
) -> List[Dict]:
"""
Generate revenue report by therapist.
Args:
tenant: Tenant instance
start_date: Start date for report
end_date: End date for report
clinic_id: Optional clinic filter
Returns:
List of dictionaries with therapist revenue data
"""
# Get appointments with invoices
from appointments.models import Appointment
appointments = Appointment.objects.filter(
tenant=tenant,
appointment_date__gte=start_date,
appointment_date__lte=end_date,
invoices__isnull=False
).select_related('provider', 'clinic')
if clinic_id:
appointments = appointments.filter(clinic_id=clinic_id)
# Group by therapist
therapist_data = {}
for appointment in appointments:
provider_id = str(appointment.provider.id)
if provider_id not in therapist_data:
therapist_data[provider_id] = {
'therapist_name': appointment.provider.get_full_name(),
'clinic': appointment.clinic.name if appointment.clinic else 'N/A',
'total_revenue': Decimal('0'),
'paid_amount': Decimal('0'),
'session_count': 0,
}
# Get invoice totals for this appointment
for invoice in appointment.invoices.all():
therapist_data[provider_id]['total_revenue'] += invoice.total
therapist_data[provider_id]['paid_amount'] += invoice.amount_paid
therapist_data[provider_id]['session_count'] += 1
# Convert to list and calculate outstanding
report_data = []
for data in therapist_data.values():
data['outstanding'] = data['total_revenue'] - data['paid_amount']
report_data.append(data)
# Sort by revenue descending
report_data.sort(key=lambda x: x['total_revenue'], reverse=True)
return report_data
@staticmethod
def get_daily_summary(
tenant,
date: datetime.date
) -> Dict:
"""
Generate daily financial summary.
Args:
tenant: Tenant instance
date: Date for summary
Returns:
Dictionary with daily summary data
"""
# Get invoices for the day
invoices = Invoice.objects.filter(
tenant=tenant,
issue_date=date
)
# Get payments for the day
payments = Payment.objects.filter(
invoice__tenant=tenant,
payment_date__date=date,
status=Payment.Status.COMPLETED
)
summary = {
'date': date,
'invoices': {
'count': invoices.count(),
'total_amount': invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'),
'by_status': {
'draft': invoices.filter(status=Invoice.Status.DRAFT).count(),
'issued': invoices.filter(status=Invoice.Status.ISSUED).count(),
'paid': invoices.filter(status=Invoice.Status.PAID).count(),
'partially_paid': invoices.filter(status=Invoice.Status.PARTIALLY_PAID).count(),
'cancelled': invoices.filter(status=Invoice.Status.CANCELLED).count(),
}
},
'payments': {
'count': payments.count(),
'total_amount': payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'),
'by_method': {}
},
'packages': {
'sold': PackagePurchase.objects.filter(
tenant=tenant,
purchase_date=date
).count(),
'revenue': PackagePurchase.objects.filter(
tenant=tenant,
purchase_date=date
).aggregate(
total=Sum('invoice__total')
)['total'] or Decimal('0')
}
}
# Payment methods breakdown
for method_code, method_name in Payment.PaymentMethod.choices:
method_payments = payments.filter(method=method_code)
amount = method_payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0')
summary['payments']['by_method'][method_code] = {
'name': method_name,
'count': method_payments.count(),
'amount': amount
}
return summary
@staticmethod
def get_weekly_summary(
tenant,
start_date: datetime.date
) -> Dict:
"""
Generate weekly financial summary.
Args:
tenant: Tenant instance
start_date: Start date of week (Monday)
Returns:
Dictionary with weekly summary data
"""
end_date = start_date + timedelta(days=6)
# Get invoices for the week
invoices = Invoice.objects.filter(
tenant=tenant,
issue_date__gte=start_date,
issue_date__lte=end_date
)
# Get payments for the week
payments = Payment.objects.filter(
invoice__tenant=tenant,
payment_date__date__gte=start_date,
payment_date__date__lte=end_date,
status=Payment.Status.COMPLETED
)
# Daily breakdown
daily_data = []
current_date = start_date
while current_date <= end_date:
daily_invoices = invoices.filter(issue_date=current_date)
daily_payments = payments.filter(payment_date__date=current_date)
daily_data.append({
'date': current_date,
'invoices_count': daily_invoices.count(),
'invoices_amount': daily_invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'),
'payments_count': daily_payments.count(),
'payments_amount': daily_payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'),
})
current_date += timedelta(days=1)
summary = {
'start_date': start_date,
'end_date': end_date,
'total_invoices': invoices.count(),
'total_invoiced': invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'),
'total_payments': payments.count(),
'total_collected': payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'),
'daily_breakdown': daily_data
}
return summary
@staticmethod
def get_monthly_summary(
tenant,
year: int,
month: int
) -> Dict:
"""
Generate monthly financial summary.
Args:
tenant: Tenant instance
year: Year
month: Month (1-12)
Returns:
Dictionary with monthly summary data
"""
from calendar import monthrange
start_date = datetime(year, month, 1).date()
last_day = monthrange(year, month)[1]
end_date = datetime(year, month, last_day).date()
# Get invoices for the month
invoices = Invoice.objects.filter(
tenant=tenant,
issue_date__gte=start_date,
issue_date__lte=end_date
)
# Get payments for the month
payments = Payment.objects.filter(
invoice__tenant=tenant,
payment_date__date__gte=start_date,
payment_date__date__lte=end_date,
status=Payment.Status.COMPLETED
)
# Weekly breakdown
weekly_data = []
current_date = start_date
week_num = 1
while current_date <= end_date:
week_end = min(current_date + timedelta(days=6), end_date)
week_invoices = invoices.filter(
issue_date__gte=current_date,
issue_date__lte=week_end
)
week_payments = payments.filter(
payment_date__date__gte=current_date,
payment_date__date__lte=week_end
)
weekly_data.append({
'week': week_num,
'start_date': current_date,
'end_date': week_end,
'invoices_count': week_invoices.count(),
'invoices_amount': week_invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'),
'payments_count': week_payments.count(),
'payments_amount': week_payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'),
})
current_date = week_end + timedelta(days=1)
week_num += 1
summary = {
'year': year,
'month': month,
'month_name': datetime(year, month, 1).strftime('%B'),
'start_date': start_date,
'end_date': end_date,
'total_invoices': invoices.count(),
'total_invoiced': invoices.aggregate(Sum('total'))['total__sum'] or Decimal('0'),
'total_payments': payments.count(),
'total_collected': payments.aggregate(Sum('amount'))['amount__sum'] or Decimal('0'),
'weekly_breakdown': weekly_data
}
return summary
@staticmethod
def get_debtor_report(
tenant,
as_of_date: Optional[datetime.date] = None
) -> List[Dict]:
"""
Generate debtor report showing outstanding invoices.
Args:
tenant: Tenant instance
as_of_date: Date to calculate outstanding as of (defaults to today)
Returns:
List of dictionaries with debtor data
"""
if as_of_date is None:
as_of_date = timezone.now().date()
# Get all unpaid/partially paid invoices
outstanding_invoices = Invoice.objects.filter(
tenant=tenant,
status__in=[Invoice.Status.ISSUED, Invoice.Status.PARTIALLY_PAID, Invoice.Status.OVERDUE],
issue_date__lte=as_of_date
).select_related('patient')
# Group by patient
debtor_data = {}
for invoice in outstanding_invoices:
patient_id = str(invoice.patient.id)
if patient_id not in debtor_data:
debtor_data[patient_id] = {
'patient_mrn': invoice.patient.mrn,
'patient_name': invoice.patient.full_name_en,
'patient_phone': invoice.patient.phone_number,
'total_outstanding': Decimal('0'),
'invoice_count': 0,
'oldest_invoice_date': invoice.issue_date,
'days_overdue': 0,
'invoices': []
}
amount_due = invoice.amount_due
days_overdue = (as_of_date - invoice.due_date).days if invoice.due_date < as_of_date else 0
debtor_data[patient_id]['total_outstanding'] += amount_due
debtor_data[patient_id]['invoice_count'] += 1
debtor_data[patient_id]['days_overdue'] = max(
debtor_data[patient_id]['days_overdue'],
days_overdue
)
if invoice.issue_date < debtor_data[patient_id]['oldest_invoice_date']:
debtor_data[patient_id]['oldest_invoice_date'] = invoice.issue_date
debtor_data[patient_id]['invoices'].append({
'invoice_number': invoice.invoice_number,
'issue_date': invoice.issue_date,
'due_date': invoice.due_date,
'total': invoice.total,
'paid': invoice.amount_paid,
'outstanding': amount_due,
'days_overdue': days_overdue
})
# Convert to list
report_data = list(debtor_data.values())
# Sort by total outstanding descending
report_data.sort(key=lambda x: x['total_outstanding'], reverse=True)
return report_data
@staticmethod
def get_commission_report(
tenant,
start_date: datetime.date,
end_date: datetime.date
) -> List[Dict]:
"""
Generate commission report for payments.
Args:
tenant: Tenant instance
start_date: Start date for report
end_date: End date for report
Returns:
List of dictionaries with commission data
"""
# Get all payments in period
payments = Payment.objects.filter(
invoice__tenant=tenant,
payment_date__date__gte=start_date,
payment_date__date__lte=end_date,
status=Payment.Status.COMPLETED
).select_related('invoice', 'invoice__patient', 'processed_by')
report_data = []
for payment in payments:
# Check if commission-free (stored in notes or separate field)
is_commission_free = 'COMMISSION_FREE' in (payment.notes or '')
report_data.append({
'payment_date': payment.payment_date,
'invoice_number': payment.invoice.invoice_number,
'patient_name': payment.invoice.patient.full_name_en,
'amount': payment.amount,
'method': payment.get_method_display(),
'processed_by': payment.processed_by.get_full_name() if payment.processed_by else 'N/A',
'commission_free': is_commission_free,
'reference': payment.reference or 'N/A'
})
return report_data
@staticmethod
def export_to_excel(
report_data: List[Dict],
report_title: str,
columns: List[Tuple[str, str]]
) -> BytesIO:
"""
Export report data to Excel format.
Args:
report_data: List of dictionaries with report data
report_title: Title for the report
columns: List of tuples (field_name, column_header)
Returns:
BytesIO object containing Excel file
"""
try:
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill
from openpyxl.utils import get_column_letter
except ImportError:
logger.error("openpyxl not installed. Cannot export to Excel.")
raise ImportError("openpyxl is required for Excel export")
# Create workbook
wb = openpyxl.Workbook()
ws = wb.active
ws.title = report_title[:31] # Excel sheet name limit
# Add title
ws.merge_cells('A1:' + get_column_letter(len(columns)) + '1')
title_cell = ws['A1']
title_cell.value = report_title
title_cell.font = Font(size=14, bold=True)
title_cell.alignment = Alignment(horizontal='center')
# Add generation date
ws.merge_cells('A2:' + get_column_letter(len(columns)) + '2')
date_cell = ws['A2']
date_cell.value = f"Generated: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}"
date_cell.alignment = Alignment(horizontal='center')
# Add headers
header_fill = PatternFill(start_color='366092', end_color='366092', fill_type='solid')
header_font = Font(color='FFFFFF', bold=True)
for col_idx, (field, header) in enumerate(columns, start=1):
cell = ws.cell(row=4, column=col_idx)
cell.value = header
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center')
# Add data
for row_idx, data in enumerate(report_data, start=5):
for col_idx, (field, _) in enumerate(columns, start=1):
value = data.get(field, '')
# Format decimals
if isinstance(value, Decimal):
value = float(value)
# Format dates
if isinstance(value, (datetime, datetime.date)):
value = value.strftime('%Y-%m-%d')
ws.cell(row=row_idx, column=col_idx, value=value)
# Auto-adjust column widths
for col_idx in range(1, len(columns) + 1):
ws.column_dimensions[get_column_letter(col_idx)].width = 15
# Save to BytesIO
output = BytesIO()
wb.save(output)
output.seek(0)
return output
@staticmethod
def export_to_csv(
report_data: List[Dict],
columns: List[Tuple[str, str]]
) -> str:
"""
Export report data to CSV format.
Args:
report_data: List of dictionaries with report data
columns: List of tuples (field_name, column_header)
Returns:
CSV string
"""
import csv
from io import StringIO
output = StringIO()
writer = csv.writer(output)
# Write headers
writer.writerow([header for _, header in columns])
# Write data
for data in report_data:
row = []
for field, _ in columns:
value = data.get(field, '')
# Format decimals
if isinstance(value, Decimal):
value = f"{value:.2f}"
# Format dates
if isinstance(value, (datetime, datetime.date)):
value = value.strftime('%Y-%m-%d')
row.append(value)
writer.writerow(row)
return output.getvalue()
class DuplicateInvoiceChecker:
"""Service for detecting duplicate invoices."""
@staticmethod
def check_duplicate(
tenant,
patient_id: str,
issue_date: datetime.date,
total: Decimal,
tolerance: Decimal = Decimal('0.01')
) -> Optional[Invoice]:
"""
Check if a duplicate invoice exists.
Args:
tenant: Tenant instance
patient_id: Patient ID
issue_date: Invoice issue date
total: Invoice total amount
tolerance: Amount tolerance for matching (default 0.01)
Returns:
Duplicate invoice if found, None otherwise
"""
# Look for invoices with same patient, date, and similar amount
potential_duplicates = Invoice.objects.filter(
tenant=tenant,
patient_id=patient_id,
issue_date=issue_date,
total__gte=total - tolerance,
total__lte=total + tolerance,
status__in=[Invoice.Status.DRAFT, Invoice.Status.ISSUED, Invoice.Status.PAID]
).order_by('-created_at')
if potential_duplicates.exists():
return potential_duplicates.first()
return None
@staticmethod
def find_all_duplicates(tenant) -> List[Dict]:
"""
Find all potential duplicate invoices in the system.
Args:
tenant: Tenant instance
Returns:
List of duplicate invoice groups
"""
from django.db.models import Count
# Group invoices by patient, date, and amount
duplicates = Invoice.objects.filter(
tenant=tenant,
status__in=[Invoice.Status.DRAFT, Invoice.Status.ISSUED, Invoice.Status.PAID]
).values(
'patient_id', 'issue_date', 'total'
).annotate(
count=Count('id')
).filter(count__gt=1)
duplicate_groups = []
for dup in duplicates:
invoices = Invoice.objects.filter(
tenant=tenant,
patient_id=dup['patient_id'],
issue_date=dup['issue_date'],
total=dup['total']
).select_related('patient')
duplicate_groups.append({
'patient_name': invoices.first().patient.full_name_en,
'patient_mrn': invoices.first().patient.mrn,
'issue_date': dup['issue_date'],
'total': dup['total'],
'count': dup['count'],
'invoices': [
{
'id': inv.id,
'invoice_number': inv.invoice_number,
'status': inv.get_status_display(),
'created_at': inv.created_at
}
for inv in invoices
]
})
return duplicate_groups