#!/usr/bin/env python3 """ Analyze specified Django apps for model duplication & responsibility leaks (healthcare-focused). - Scans only the provided app labels (no imports, AST-only). - Emits: model_map.json, overlaps.json, modular_refactoring_report.md Run: python tools/analyze_models.py \ --project-root . \ --apps core accounts blood_bank patients appointments inpatients emr pharmacy laboratory radiology \ operating_theatre billing inventory hr analytics communications integration quality \ facility_management insurance_approvals \ --output-dir ./_refactor_report """ import argparse import ast import json import os import re import sys from collections import defaultdict from pathlib import Path from textwrap import dedent # === Tunables (extend as your codebase evolves) ================================ # Keywords to help guess domains by file/model/field names (heuristic only) DOMAIN_KEYWORDS = { "inventory": {"stock","item","sku","warehouse","uom","batch","lot","reorder","ledger","movement"}, "laboratory": {"lab","specimen","test","result","panel","analyte","accession"}, "radiology": {"study","series","modality","dicom","report","imaging"}, "pharmacy": {"rx","prescription","dispense","med","drug","formulary","dose","pharma"}, "billing": {"invoice","claim","payer","contract","tariff","price","payment","sbs","drg","adrg"}, "emr": {"note","vital","diagnosis","procedure","encounter","allergy","observation","problem","plan"}, "hr": {"employee","schedule","shift","roster","timesheet","payroll","leave"}, "core": {"patient","encounter","tenant","facility","identifier","audit","attachment"}, "inpatients": {"admission","discharge","ward","bed","stay","transfer"}, "appointments": {"appointment","slot","schedule","booking"}, "operating_theatre": {"ot","or","theatre","surgery","anesthesia","case","procedure"}, "blood_bank": {"donor","bag","component","crossmatch","blood","transfusion"}, "analytics": {"kpi","metric","dashboard","reporting","aggregate"}, "communications": {"message","sms","email","notification"}, "integration": {"hl7","fhir","elm","ocpi","ocpp","webhook","adapter"}, "quality": {"audit","incident","complaint","indicator","risk","quality"}, "facility_management": {"asset","ticket","workorder","maintenance","hvac","bme","facility"}, "insurance_approvals": {"auth","authorization","approval","preauth","payer","policy"}, "patients": {"patient","demographic","identifier","contact"}, "accounts": {"user","auth","role","permission","group"}, } # Canonical ownership proposal tailored to your apps CANONICAL_OWNERSHIP = { "core": {"Patient","Encounter","Facility","Tenant","AuditEvent","Attachment","Identifier"}, "patients": {"PatientProfile"}, # if you keep a split between core primitives and profile extras "appointments": {"Appointment","Schedule","Slot"}, "inpatients": {"Admission","Transfer","DischargeSummary","Ward","Bed","Stay"}, "emr": {"ClinicalNote","Allergy","Problem","Procedure","Vital"}, "pharmacy": {"Drug","Prescription","Dispense","Formulary"}, "laboratory": {"LabOrder","LabTest","LabPanel","LabResult","Specimen"}, "radiology": {"RadOrder","Study","Series","RadReport"}, "operating_theatre": {"SurgicalCase","AnesthesiaNote"}, # narrative note will be centralized "billing": {"Payer","Contract","PriceRule","Claim","Invoice","Payment"}, "inventory": {"Item","StockLocation","StockLedger","ReorderRule","UnitOfMeasure","Batch","Lot"}, "hr": {"Employee","Shift","Schedule","Timesheet"}, "analytics": {"Metric","KPI","ReportConfig"}, # read-only aggregates / configs "communications": {"MessageTemplate","OutboundMessage","Notification"}, "integration": {"Endpoint","Channel","Mapping","Job","EventLog"}, "quality": {"Incident","QualityIndicator","AuditFinding","CAPA"}, "facility_management": {"Asset","WorkOrder","MaintenancePlan","Ticket"}, "blood_bank": {"Donor","Donation","Component","Crossmatch","Transfusion"}, "insurance_approvals": {"AuthorizationRequest","AuthorizationResponse"}, } # Strong signals of inventory logic leaking elsewhere INVENTORY_LEAK_TOKENS = {"stock","warehouse","reorder","ledger","uom","sku","onhand","qty","quantity"} # Names to ignore when comparing (abstract bases, mixins) GENERIC_NAMES = {"BaseModel","TimestampedModel","AbstractBase","CommonModel","NameMixin"} # Similarity threshold for flagging "these two models look the same" FIELDSET_SIMILARITY_THRESHOLD = 0.6 # ============================================================================== def find_models_py(project_root: Path, app_label: str) -> Path | None: parts = app_label.split(".") for i in range(len(parts), 0, -1): candidate = project_root.joinpath(*parts[:i]) if candidate.exists(): break else: candidate = project_root.joinpath(*parts) models_py = candidate / "models.py" return models_py if models_py.exists() else None def is_model_class(node: ast.ClassDef) -> bool: for base in node.bases: if isinstance(base, ast.Attribute) and base.attr == "Model": return True if isinstance(base, ast.Name) and base.id == "Model": return True return False def extract_fields(node: ast.ClassDef) -> dict: fields, fks, m2ms = {}, [], [] for stmt in node.body: if isinstance(stmt, ast.Assign) and len(stmt.targets) == 1 and isinstance(stmt.targets[0], ast.Name): name = stmt.targets[0].id if isinstance(stmt.value, ast.Call): call = stmt.value ftype = call.func.attr if isinstance(call.func, ast.Attribute) else (call.func.id if isinstance(call.func, ast.Name) else None) fields[name] = ftype or "Field" if ftype in {"ForeignKey","OneToOneField","ManyToManyField"} and call.args: a0 = call.args[0]; target = None if isinstance(a0, ast.Constant) and isinstance(a0.value, str): target = a0.value elif isinstance(a0, ast.Attribute): pieces=[]; cur=a0 while isinstance(cur, ast.Attribute): pieces.append(cur.attr); cur=cur.value if isinstance(cur, ast.Name): pieces.append(cur.id) target=".".join(reversed(pieces)) elif isinstance(a0, ast.Name): target = a0.id if ftype == "ManyToManyField": m2ms.append(target) else: fks.append(target) return {"fields": fields, "fks": fks, "m2ms": m2ms} def tokenize_identifiers(*names: str) -> set[str]: toks=set() for n in names or []: for t in re.split(r"[_\W]+",(n or "").lower()): if t: toks.add(t) return toks def jaccard(a:set,b:set)->float: if not a and not b: return 0.0 return len(a & b)/max(1, len(a | b)) def domain_guess(app_label: str, models_in_app: list[dict]) -> str | None: toks = tokenize_identifiers(app_label) for m in models_in_app: toks |= tokenize_identifiers(m["name"]) for f in m["fields"].keys(): toks |= tokenize_identifiers(f) best, score = None, 0 for d, kw in DOMAIN_KEYWORDS.items(): s = len(toks & kw) if s > score: best, score = d, s return best def analyze(project_root: Path, app_labels: list[str]) -> dict: model_map = {"apps": [], "by_app": defaultdict(list)} for app in app_labels: mp = find_models_py(project_root, app) if not mp: continue try: src = mp.read_text(encoding="utf-8") except Exception as e: print(f"ERROR reading {mp}: {e}", file=sys.stderr); continue try: tree = ast.parse(src) except SyntaxError as e: print(f"SYNTAX ERROR in {mp}: {e}", file=sys.stderr); continue models=[] for node in tree.body: if isinstance(node, ast.ClassDef) and node.name not in GENERIC_NAMES and is_model_class(node): d = extract_fields(node) models.append({"name": node.name, "fields": d["fields"], "fks": d["fks"], "m2ms": d["m2ms"], "lineno": node.lineno, "file": str(mp)}) if models: model_map["apps"].append(app) model_map["by_app"][app] = models # index fully-qualified labels for reverse deps fq = {} for app, models in model_map["by_app"].items(): for m in models: fq[f"{app}.{m['name']}"] = (app, m["name"]) rev = defaultdict(lambda: defaultdict(list)) # target -> {source_app: [model_names]} for app, models in model_map["by_app"].items(): for m in models: for ref in (m["fks"] + m["m2ms"]): if not ref: continue cand=None if "." in (ref or ""): cand = ref else: matches=[F for F in fq.keys() if F.endswith(f".{ref}")] if matches: cand = matches[0] if cand: rev[cand][app].append(m["name"]) app_domains = {app: domain_guess(app, models) for app, models in model_map["by_app"].items()} model_map["app_domains"] = app_domains model_map["reverse_deps"] = {k: dict(v) for k, v in rev.items()} return model_map def detect_overlaps(model_map: dict) -> dict: overlaps={"name_collisions":[],"fieldset_similar":[],"inventory_leaks":[],"canonical_conflicts":[]} idx=defaultdict(list) for app, models in model_map["by_app"].items(): for m in models: idx[m["name"]].append({"app": app, "model": m}) # name collisions for name, occ in idx.items(): if len(occ) > 1 and name not in GENERIC_NAMES: overlaps["name_collisions"].append({ "name": name, "occurrences": [{"app": o["app"], "file": o["model"]["file"], "lineno": o["model"]["lineno"]} for o in occ] }) # fieldset similarities def fieldset(m): return set(m["fields"].keys()) | set(m["fks"]) | set(m["m2ms"]) allm=[] for app, models in model_map["by_app"].items(): for m in models: allm.append((app, m)) for i in range(len(allm)): aa, ma = allm[i]; fa = fieldset(ma) for j in range(i+1, len(allm)): ab, mb = allm[j] if aa == ab: continue fb = fieldset(mb) sim = jaccard(fa, fb) if sim >= FIELDSET_SIMILARITY_THRESHOLD: overlaps["fieldset_similar"].append({ "a": {"app": aa, "name": ma["name"], "file": ma["file"]}, "b": {"app": ab, "name": mb["name"], "file": mb["file"]}, "similarity": round(sim, 2), "shared": sorted(list(fa & fb))[:25], }) # inventory leaks for app, models in model_map["by_app"].items(): domain = model_map["app_domains"].get(app, "") if domain in {"laboratory","radiology","pharmacy","emr","inpatients","blood_bank","operating_theatre"}: for m in models: toks = tokenize_identifiers(m["name"]) for f in m["fields"].keys(): toks |= tokenize_identifiers(f) leaks = toks & INVENTORY_LEAK_TOKENS if leaks: overlaps["inventory_leaks"].append({"app": app, "model": m["name"], "file": m["file"], "tokens": sorted(list(leaks))}) # canonical conflicts canonical_flat={} for app, model_names in CANONICAL_OWNERSHIP.items(): for mn in model_names: canonical_flat.setdefault(mn, set()).add(app) for app, models in model_map["by_app"].items(): for m in models: owners = canonical_flat.get(m["name"]) if owners and app not in owners: overlaps["canonical_conflicts"].append({ "model": m["name"], "app": app, "expected": sorted(list(owners)), "file": m["file"] }) return overlaps def make_matrix(model_map: dict) -> list[dict]: # Allowed capabilities per domain (heuristic) permitted = { "inventory":{"catalog","stock","ledger"}, "laboratory":{"orders","results","panels"}, "radiology":{"orders","reports","studies"}, "pharmacy":{"prescriptions","dispenses","formulary"}, "billing":{"claims","invoices","payments"}, "emr":{"clinical_docs","observations"}, "hr":{"staffing","scheduling"}, "core":{"patients","encounters","tenancy","audit","attachments"}, "inpatients":{"admissions","transfers","beds"}, "appointments":{"booking","scheduling"}, "operating_theatre":{"surgical_cases","anesthesia"}, "blood_bank":{"donations","components","crossmatch","transfusions"}, "analytics":{"reporting","aggregations"}, "communications":{"notifications","templates"}, "integration":{"connectors","mappings"}, "quality":{"incidents","indicators","audits"}, "facility_management":{"assets","workorders","maintenance"}, "insurance_approvals":{"authorizations"}, "patients":{"profiles"}, "accounts":{"users_auth"}, } rows=[] for app in sorted(model_map["by_app"].keys()): domain = model_map["app_domains"].get(app) or "unknown" rows.append({"app": app, "domain_guess": domain, "allowed": sorted(list(permitted.get(domain, set()))), "note": "Heuristic; adjust if needed."}) return rows def to_markdown_report(model_map: dict, overlaps: dict, matrix_rows: list[dict]) -> str: apps = sorted(model_map["by_app"].keys()) total_models = sum(len(model_map["by_app"][a]) for a in apps) inv = ["| App | Domain Guess | # Models | Models |","|---|---|---:|---|"] for app in apps: domain = model_map["app_domains"].get(app) or "" names = ", ".join(sorted(m["name"] for m in model_map["by_app"][app])) inv.append(f"| `{app}` | {domain} | {len(model_map['by_app'][app])} | {names} |") mat = ["| App | Domain | Allowed Capabilities | Note |","|---|---|---|---|"] for r in matrix_rows: mat.append(f"| `{r['app']}` | {r['domain_guess'] or ''} | {', '.join(r['allowed']) or '—'} | {r['note']} |") def block(obj): return "```json\n"+json.dumps(obj, indent=2)+"\n```" return dedent(f""" # Modular Refactoring Report (Drop-and-Reseed Friendly) **Executive Summary** - Apps scanned: **{len(apps)}** - Total models found: **{total_models}** - Name collisions: **{len(overlaps['name_collisions'])}** - Field-set similarities: **{len(overlaps['fieldset_similar'])}** - Inventory leaks in clinical apps: **{len(overlaps['inventory_leaks'])}** - Canonical ownership conflicts: **{len(overlaps['canonical_conflicts'])}** --- ## 1) App & Model Inventory {os.linesep.join(inv)} ### Model Map (JSON) {block({k:(v if k!='by_app' else {a:v for a,v in model_map['by_app'].items()}) for k,v in model_map.items() if k!='reverse_deps'})} --- ## 2) Detected Overlaps & Risks ### A) Name Collisions {block(overlaps['name_collisions']) if overlaps['name_collisions'] else "_None detected_"} ### B) Field-Set Similarities (possible duplicates) — threshold ≥ {FIELDSET_SIMILARITY_THRESHOLD} {block(overlaps['fieldset_similar']) if overlaps['fieldset_similar'] else "_None detected_"} ### C) Inventory Responsibility Leaks in Clinical Apps {block(overlaps['inventory_leaks']) if overlaps['inventory_leaks'] else "_None detected_"} ### D) Canonical Ownership Conflicts {block(overlaps['canonical_conflicts']) if overlaps['canonical_conflicts'] else "_None detected_"} --- ## 3) Responsibility Matrix (Apps × Capabilities) {os.linesep.join(mat)} --- ## 4) Target Architecture (Text Diagram) ``` core ──► (patient, encounter, tenant, audit, attachment) │ ├── emr ──► clinical docs + observations (uses core.Patient/Encounter) ├── documentation ──► owns ALL notes/reports (centralized), linkable everywhere ├── inventory ──► items/stock/ledger (FK used by lab/pharmacy/blood_bank) ├── pharmacy ──► prescriptions/dispenses (FK inventory) ├── laboratory ──► orders/tests/results (FK inventory) ├── radiology ──► orders/studies/reports (narrative via documentation) ├── inpatients ──► admissions/transfers/beds (admission & progress notes via documentation) ├── operating_theatre ──► surgical cases/anesthesia (notes via documentation) ├── blood_bank ──► donors/components/crossmatch/transfusions (no stock logic) ├── billing ──► claims/invoices/payments (FK patient/encounter only) ├── appointments ──► booking/scheduling ├── hr ──► staffing/scheduling ├── insurance_approvals ──► payer authorizations ├── facility_management ──► assets/workorders/maintenance ├── analytics ──► reporting/aggregations (read-only) ├── communications ──► notifications/templates └── integration ──► connectors/mappings/jobs ``` **Rules** - No stock logic in lab/radiology/pharmacy/blood_bank/inpatients. - No billing logic in EMR/clinical apps. - PHI stays in `core`, `emr`, and `documentation`. - All narrative notes/reports live in `documentation.Document`; domain apps **link**. --- ## 5) Refactor Plan (Drop & Reseed) 1. **Delete** duplicated models from non-canonical apps (see overlaps). 2. **Keep** canonical models in the domain owners defined above. 3. Replace cross-app copies with **FKs to canonical models** or service calls. 4. Introduce **service modules** per domain (e.g., `inventory.services.record_stock_movement`). 5. Standardize model fields: `tenant`, `external_id`, `created_at`, `updated_at`, `created_by`, `updated_by`, `is_active`. 6. Regenerate seeds per domain (inventory catalog, lab tests, formulary, payer contracts). 7. Implement centralized documentation (below) and remove local notes/report models. --- ## 6) Centralized Documentation (New `documentation/` app) **Purpose:** One canonical store for all notes & reports; link to any object. Create: `documentation/models.py` ```python from django.conf import settings from django.db import models from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType import uuid class NoteType(models.TextChoices): ADMISSION = "ADMISSION", "Admission Note" PROGRESS = "PROGRESS", "Progress Note" DISCHARGE = "DISCHARGE", "Discharge Summary" RADIOLOGY_REPORT = "RADIOLOGY_REPORT", "Radiology Report" LAB_REPORT = "LAB_REPORT", "Laboratory Narrative" PROCEDURE = "PROCEDURE", "Procedure Note" ANESTHESIA = "ANESTHESIA", "Anesthesia Note" BLOODBANK = "BLOODBANK", "Transfusion Note" OTHER = "OTHER", "Other" class Document(models.Model): id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) tenant = models.ForeignKey("core.Tenant", on_delete=models.CASCADE, related_name="documents") patient = models.ForeignKey("core.Patient", on_delete=models.CASCADE, related_name="documents") encounter = models.ForeignKey("core.Encounter", on_delete=models.SET_NULL, null=True, blank=True, related_name="documents") doc_type = models.CharField(max_length=40, choices=NoteType.choices) title = models.CharField(max_length=255) status = models.CharField(max_length=20, default="final") # draft|amended|final|entered-in-error authored_at = models.DateTimeField(auto_now_add=True) authored_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, related_name="documents_authored") signed_at = models.DateTimeField(null=True, blank=True) signed_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True, related_name="documents_signed") body_markdown = models.TextField(blank=True, default="") body_json = models.JSONField(blank=True, null=True) fhir_profile = models.CharField(max_length=120, blank=True, default="") code = models.CharField(max_length=64, blank=True, default="") is_confidential = models.BooleanField(default=False) is_amendment = models.BooleanField(default=False) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class DocumentVersion(models.Model): document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="versions") version = models.PositiveIntegerField() snapshot_markdown = models.TextField(blank=True, default="") snapshot_json = models.JSONField(blank=True, null=True) changed_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True) changed_at = models.DateTimeField(auto_now_add=True) class Meta: unique_together = [("document", "version")] ordering = ["-version"] class DocumentLink(models.Model): document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="links") content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) object_id = models.CharField(max_length=64) target = GenericForeignKey("content_type", "object_id") role = models.CharField(max_length=40, blank=True, default="context") # context|source|result|followup created_at = models.DateTimeField(auto_now_add=True) class DocumentAttachment(models.Model): document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="attachments") file = models.FileField(upload_to="documents/%Y/%m/") title = models.CharField(max_length=255, blank=True, default="") created_at = models.DateTimeField(auto_now_add=True) ``` **DRF (serializers & viewsets)** - Endpoints: - `POST /api/v1/documentation/documents/` (create) - `GET /api/v1/documentation/documents/?patient=&encounter=&doc_type=` - `POST /api/v1/documentation/documents/{{id}}/amend/` - `POST /api/v1/documentation/documents/{{id}}/link/` - `GET /api/v1/documentation/documents/{{id}}/` - Add tenant scoping and object-level permissions. **Services**: `documentation/services.py` ```python from .models import Document, DocumentVersion, DocumentLink from django.contrib.contenttypes.models import ContentType from django.utils import timezone def create_document_and_link(*, tenant, patient, encounter=None, doc_type:str, title:str, body_markdown:str="", body_json=None, links=None, author=None, sign=False): doc = Document.objects.create( tenant=tenant, patient=patient, encounter=encounter, doc_type=doc_type, title=title, body_markdown=body_markdown, body_json=body_json or {{}}, authored_by=author, status="draft" if not sign else "final", signed_at=timezone.now() if sign else None, signed_by=author if sign else None ) DocumentVersion.objects.create(document=doc, version=1, snapshot_markdown=body_markdown, snapshot_json=body_json or {{}}, changed_by=author) for ln in links or []: ct = ContentType.objects.get(app_label=ln["app_label"], model=ln["model"].lower()) DocumentLink.objects.create(document=doc, content_type=ct, object_id=str(ln["pk"]), role=ln.get("role","context")) return doc def sign_document(doc: Document, user): doc.status = "final"; doc.signed_at = timezone.now(); doc.signed_by = user; doc.save(update_fields=["status","signed_at","signed_by"]) return doc def amend_document(doc: Document, user, *, new_markdown:str="", new_json=None): nxt = (doc.versions.first().version + 1) if doc.versions.exists() else 2 DocumentVersion.objects.create(document=doc, version=nxt, snapshot_markdown=new_markdown, snapshot_json=new_json or {{}}, changed_by=user) doc.body_markdown, doc.body_json, doc.is_amendment = new_markdown, (new_json or {{}}), True doc.save(update_fields=["body_markdown","body_json","updated_at","is_amendment"]) return doc ``` **Refactor each domain to use services:** - **inpatients**: Admission/Progress notes → `documentation` (doc_type `ADMISSION` / `PROGRESS`). - **radiology**: Narrative report → `documentation` (doc_type `RADIOLOGY_REPORT`, link to Study as `result`). - **laboratory**: Narrative report (if needed) → `documentation` (doc_type `LAB_REPORT`, link to Order). - **operating_theatre**: Anesthesia/Procedure notes → `documentation`. - **blood_bank**: Transfusion note → `documentation`. **Delete** local note/report models once the adapters are wired. --- ## 7) Breaking Changes & Reseed Checklist - [ ] Removed duplicated/non-canonical models - [ ] Updated FK targets to canonical owners - [ ] Replaced local notes/reports with `documentation.Document` links - [ ] Admin & DRF routes updated - [ ] Seed order: `core` → `inventory` → `emr` → domain catalogs → `billing` - [ ] Tenancy filters enforced (Managers/Mixins) - [ ] PHI audit verified (create/update/sign/amend trails) --- ## 8) Smoke Tests (must pass) 1) Create Patient & Encounter (core). 2) Create Admission (inpatients) → create `ADMISSION` note via documentation & link to Admission. 3) Create Lab Order (laboratory) that consumes inventory items via FK; finalize → (optional) `LAB_REPORT`. 4) Create Radiology Study (radiology) → finalize report via documentation (`RADIOLOGY_REPORT`), link as `result`. 5) Raise Invoice (billing) against Encounter; **no clinical duplication** in billing. 6) Verify Patient timeline shows all Documents; each domain detail page shows linked documents. --- ## 9) How to Run **A) Generate audit report** ```bash python tools/analyze_models.py \\ --project-root . \\ --apps core accounts blood_bank patients appointments inpatients emr pharmacy laboratory radiology \\ operating_theatre billing inventory hr analytics communications integration quality \\ facility_management insurance_approvals \\ --output-dir ./_refactor_report ``` Review `_refactor_report/modular_refactoring_report.md`. **B) Implement `documentation/` app** - Add to `LOCAL_APPS`: `documentation` - Create models/services as above; make DRF serializers/viewsets. - Replace local notes/report models with calls to `documentation.services`. **C) Drop & Reseed** - Reset DB, run migrations, seed catalogs (inventory items, lab tests, formulary, payers, etc.). - Run **Smoke Tests**. --- ## 10) App Responsibility Matrix (tailored to your list) | App | Canonical Ownership | |---|---| | `core` | Patient, Encounter, Tenant, Facility, AuditEvent, Attachment, Identifier | | `accounts` | Users/Auth/RBAC; no clinical/business logic | | `patients` | PatientProfile & demographics extensions (if used) | | `appointments` | Appointment, Schedule, Slot | | `inpatients` | Admission, Transfer, DischargeSummary, Ward, Bed, Stay (**notes via documentation**) | | `emr` | Clinical documentation scaffolding if needed; avoid storing notes (use `documentation`) | | `pharmacy` | Drug, Formulary, Prescription, Dispense (**stock via inventory FK**) | | `laboratory` | LabOrder, LabTest, LabPanel, LabResult, Specimen (**no stock logic**) | | `radiology` | RadOrder, Study, Series (**narrative report via documentation**) | | `operating_theatre` | SurgicalCase, AnesthesiaNote (**note content via documentation**) | | `billing` | Payer, Contract, Claim, Invoice, Payment | | `inventory` | Item, StockLocation, StockLedger, ReorderRule, UnitOfMeasure, Batch, Lot | | `hr` | Employee, Shift, Schedule, Timesheet | | `analytics` | Reporting/aggregations (read-only over canonical stores) | | `communications` | Notifications, MessageTemplate | | `integration` | Connectors (FHIR/HL7/ELM/etc.), Mappings, Jobs, EventLog | | `quality` | Incident, Indicator, AuditFinding, CAPA | | `facility_management` | Asset, WorkOrder, MaintenancePlan, Ticket | | `blood_bank` | Donor, Donation, Component, Crossmatch, Transfusion (**no stock logic**) | | `insurance_approvals` | AuthorizationRequest/Response | **Golden Rules** - **Inventory** is the *only* place with stock/ledger logic. - **Billing** is the *only* place with claims/invoices/payments. - **Documentation** is the *only* place for free-text notes/reports; everything else **links**. """).strip() def main(): parser = argparse.ArgumentParser( description="Analyze Django apps for model duplication & responsibility leaks", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Example: python tools/analyze_models.py \\ --project-root . \\ --apps core accounts blood_bank patients appointments inpatients emr pharmacy laboratory radiology \\ operating_theatre billing inventory hr analytics communications integration quality \\ facility_management insurance_approvals \\ --output-dir ./_refactor_report """ ) parser.add_argument( '--project-root', type=Path, required=True, help='Path to Django project root directory' ) parser.add_argument( '--apps', nargs='+', required=True, help='List of Django app labels to analyze' ) parser.add_argument( '--output-dir', type=Path, default=Path('./_refactor_report'), help='Output directory for generated reports (default: ./_refactor_report)' ) args = parser.parse_args() # Validate project root if not args.project_root.exists(): print(f"ERROR: Project root does not exist: {args.project_root}", file=sys.stderr) sys.exit(1) # Create output directory args.output_dir.mkdir(parents=True, exist_ok=True) print(f"Analyzing {len(args.apps)} Django apps...") print(f"Project root: {args.project_root}") print(f"Output directory: {args.output_dir}") print() # Analyze models model_map = analyze(args.project_root, args.apps) # Detect overlaps overlaps = detect_overlaps(model_map) # Generate responsibility matrix matrix_rows = make_matrix(model_map) # Generate markdown report markdown_report = to_markdown_report(model_map, overlaps, matrix_rows) # Write outputs model_map_file = args.output_dir / 'model_map.json' overlaps_file = args.output_dir / 'overlaps.json' report_file = args.output_dir / 'modular_refactoring_report.md' # Write model_map.json with open(model_map_file, 'w', encoding='utf-8') as f: json.dump(model_map, f, indent=2, default=str) print(f"✓ Generated: {model_map_file}") # Write overlaps.json with open(overlaps_file, 'w', encoding='utf-8') as f: json.dump(overlaps, f, indent=2) print(f"✓ Generated: {overlaps_file}") # Write markdown report with open(report_file, 'w', encoding='utf-8') as f: f.write(markdown_report) print(f"✓ Generated: {report_file}") print() print("=" * 80) print("SUMMARY") print("=" * 80) print(f"Apps scanned: {len(model_map['apps'])}") print(f"Total models: {sum(len(models) for models in model_map['by_app'].values())}") print(f"Name collisions: {len(overlaps['name_collisions'])}") print(f"Field-set similarities: {len(overlaps['fieldset_similar'])}") print(f"Inventory leaks: {len(overlaps['inventory_leaks'])}") print(f"Canonical conflicts: {len(overlaps['canonical_conflicts'])}") print() print(f"Review the detailed report at: {report_file}") print("=" * 80) if __name__ == '__main__': main()