hospital-management/tools/analyze_models.py
2025-10-06 15:25:37 +03:00

671 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Analyze specified Django apps for model duplication & responsibility leaks (healthcare-focused).
- Scans only the provided app labels (no imports, AST-only).
- Emits: model_map.json, overlaps.json, modular_refactoring_report.md
Run:
python tools/analyze_models.py \
--project-root . \
--apps core accounts blood_bank patients appointments inpatients emr pharmacy laboratory radiology \
operating_theatre billing inventory hr analytics communications integration quality \
facility_management insurance_approvals \
--output-dir ./_refactor_report
"""
import argparse
import ast
import json
import os
import re
import sys
from collections import defaultdict
from pathlib import Path
from textwrap import dedent
# === Tunables (extend as your codebase evolves) ================================
# Keywords to help guess domains by file/model/field names (heuristic only)
DOMAIN_KEYWORDS = {
"inventory": {"stock","item","sku","warehouse","uom","batch","lot","reorder","ledger","movement"},
"laboratory": {"lab","specimen","test","result","panel","analyte","accession"},
"radiology": {"study","series","modality","dicom","report","imaging"},
"pharmacy": {"rx","prescription","dispense","med","drug","formulary","dose","pharma"},
"billing": {"invoice","claim","payer","contract","tariff","price","payment","sbs","drg","adrg"},
"emr": {"note","vital","diagnosis","procedure","encounter","allergy","observation","problem","plan"},
"hr": {"employee","schedule","shift","roster","timesheet","payroll","leave"},
"core": {"patient","encounter","tenant","facility","identifier","audit","attachment"},
"inpatients": {"admission","discharge","ward","bed","stay","transfer"},
"appointments": {"appointment","slot","schedule","booking"},
"operating_theatre": {"ot","or","theatre","surgery","anesthesia","case","procedure"},
"blood_bank": {"donor","bag","component","crossmatch","blood","transfusion"},
"analytics": {"kpi","metric","dashboard","reporting","aggregate"},
"communications": {"message","sms","email","notification"},
"integration": {"hl7","fhir","elm","ocpi","ocpp","webhook","adapter"},
"quality": {"audit","incident","complaint","indicator","risk","quality"},
"facility_management": {"asset","ticket","workorder","maintenance","hvac","bme","facility"},
"insurance_approvals": {"auth","authorization","approval","preauth","payer","policy"},
"patients": {"patient","demographic","identifier","contact"},
"accounts": {"user","auth","role","permission","group"},
}
# Canonical ownership proposal tailored to your apps
CANONICAL_OWNERSHIP = {
"core": {"Patient","Encounter","Facility","Tenant","AuditEvent","Attachment","Identifier"},
"patients": {"PatientProfile"}, # if you keep a split between core primitives and profile extras
"appointments": {"Appointment","Schedule","Slot"},
"inpatients": {"Admission","Transfer","DischargeSummary","Ward","Bed","Stay"},
"emr": {"ClinicalNote","Allergy","Problem","Procedure","Vital"},
"pharmacy": {"Drug","Prescription","Dispense","Formulary"},
"laboratory": {"LabOrder","LabTest","LabPanel","LabResult","Specimen"},
"radiology": {"RadOrder","Study","Series","RadReport"},
"operating_theatre": {"SurgicalCase","AnesthesiaNote"}, # narrative note will be centralized
"billing": {"Payer","Contract","PriceRule","Claim","Invoice","Payment"},
"inventory": {"Item","StockLocation","StockLedger","ReorderRule","UnitOfMeasure","Batch","Lot"},
"hr": {"Employee","Shift","Schedule","Timesheet"},
"analytics": {"Metric","KPI","ReportConfig"}, # read-only aggregates / configs
"communications": {"MessageTemplate","OutboundMessage","Notification"},
"integration": {"Endpoint","Channel","Mapping","Job","EventLog"},
"quality": {"Incident","QualityIndicator","AuditFinding","CAPA"},
"facility_management": {"Asset","WorkOrder","MaintenancePlan","Ticket"},
"blood_bank": {"Donor","Donation","Component","Crossmatch","Transfusion"},
"insurance_approvals": {"AuthorizationRequest","AuthorizationResponse"},
}
# Strong signals of inventory logic leaking elsewhere
INVENTORY_LEAK_TOKENS = {"stock","warehouse","reorder","ledger","uom","sku","onhand","qty","quantity"}
# Names to ignore when comparing (abstract bases, mixins)
GENERIC_NAMES = {"BaseModel","TimestampedModel","AbstractBase","CommonModel","NameMixin"}
# Similarity threshold for flagging "these two models look the same"
FIELDSET_SIMILARITY_THRESHOLD = 0.6
# ==============================================================================
def find_models_py(project_root: Path, app_label: str) -> Path | None:
parts = app_label.split(".")
for i in range(len(parts), 0, -1):
candidate = project_root.joinpath(*parts[:i])
if candidate.exists():
break
else:
candidate = project_root.joinpath(*parts)
models_py = candidate / "models.py"
return models_py if models_py.exists() else None
def is_model_class(node: ast.ClassDef) -> bool:
for base in node.bases:
if isinstance(base, ast.Attribute) and base.attr == "Model": return True
if isinstance(base, ast.Name) and base.id == "Model": return True
return False
def extract_fields(node: ast.ClassDef) -> dict:
fields, fks, m2ms = {}, [], []
for stmt in node.body:
if isinstance(stmt, ast.Assign) and len(stmt.targets) == 1 and isinstance(stmt.targets[0], ast.Name):
name = stmt.targets[0].id
if isinstance(stmt.value, ast.Call):
call = stmt.value
ftype = call.func.attr if isinstance(call.func, ast.Attribute) else (call.func.id if isinstance(call.func, ast.Name) else None)
fields[name] = ftype or "Field"
if ftype in {"ForeignKey","OneToOneField","ManyToManyField"} and call.args:
a0 = call.args[0]; target = None
if isinstance(a0, ast.Constant) and isinstance(a0.value, str): target = a0.value
elif isinstance(a0, ast.Attribute):
pieces=[]; cur=a0
while isinstance(cur, ast.Attribute): pieces.append(cur.attr); cur=cur.value
if isinstance(cur, ast.Name): pieces.append(cur.id)
target=".".join(reversed(pieces))
elif isinstance(a0, ast.Name): target = a0.id
if ftype == "ManyToManyField": m2ms.append(target)
else: fks.append(target)
return {"fields": fields, "fks": fks, "m2ms": m2ms}
def tokenize_identifiers(*names: str) -> set[str]:
toks=set()
for n in names or []:
for t in re.split(r"[_\W]+",(n or "").lower()):
if t: toks.add(t)
return toks
def jaccard(a:set,b:set)->float:
if not a and not b: return 0.0
return len(a & b)/max(1, len(a | b))
def domain_guess(app_label: str, models_in_app: list[dict]) -> str | None:
toks = tokenize_identifiers(app_label)
for m in models_in_app:
toks |= tokenize_identifiers(m["name"])
for f in m["fields"].keys(): toks |= tokenize_identifiers(f)
best, score = None, 0
for d, kw in DOMAIN_KEYWORDS.items():
s = len(toks & kw)
if s > score: best, score = d, s
return best
def analyze(project_root: Path, app_labels: list[str]) -> dict:
model_map = {"apps": [], "by_app": defaultdict(list)}
for app in app_labels:
mp = find_models_py(project_root, app)
if not mp: continue
try: src = mp.read_text(encoding="utf-8")
except Exception as e: print(f"ERROR reading {mp}: {e}", file=sys.stderr); continue
try: tree = ast.parse(src)
except SyntaxError as e: print(f"SYNTAX ERROR in {mp}: {e}", file=sys.stderr); continue
models=[]
for node in tree.body:
if isinstance(node, ast.ClassDef) and node.name not in GENERIC_NAMES and is_model_class(node):
d = extract_fields(node)
models.append({"name": node.name, "fields": d["fields"], "fks": d["fks"], "m2ms": d["m2ms"], "lineno": node.lineno, "file": str(mp)})
if models:
model_map["apps"].append(app)
model_map["by_app"][app] = models
# index fully-qualified labels for reverse deps
fq = {}
for app, models in model_map["by_app"].items():
for m in models: fq[f"{app}.{m['name']}"] = (app, m["name"])
rev = defaultdict(lambda: defaultdict(list)) # target -> {source_app: [model_names]}
for app, models in model_map["by_app"].items():
for m in models:
for ref in (m["fks"] + m["m2ms"]):
if not ref: continue
cand=None
if "." in (ref or ""): cand = ref
else:
matches=[F for F in fq.keys() if F.endswith(f".{ref}")]
if matches: cand = matches[0]
if cand: rev[cand][app].append(m["name"])
app_domains = {app: domain_guess(app, models) for app, models in model_map["by_app"].items()}
model_map["app_domains"] = app_domains
model_map["reverse_deps"] = {k: dict(v) for k, v in rev.items()}
return model_map
def detect_overlaps(model_map: dict) -> dict:
overlaps={"name_collisions":[],"fieldset_similar":[],"inventory_leaks":[],"canonical_conflicts":[]}
idx=defaultdict(list)
for app, models in model_map["by_app"].items():
for m in models: idx[m["name"]].append({"app": app, "model": m})
# name collisions
for name, occ in idx.items():
if len(occ) > 1 and name not in GENERIC_NAMES:
overlaps["name_collisions"].append({
"name": name,
"occurrences": [{"app": o["app"], "file": o["model"]["file"], "lineno": o["model"]["lineno"]} for o in occ]
})
# fieldset similarities
def fieldset(m): return set(m["fields"].keys()) | set(m["fks"]) | set(m["m2ms"])
allm=[]
for app, models in model_map["by_app"].items():
for m in models: allm.append((app, m))
for i in range(len(allm)):
aa, ma = allm[i]; fa = fieldset(ma)
for j in range(i+1, len(allm)):
ab, mb = allm[j]
if aa == ab: continue
fb = fieldset(mb)
sim = jaccard(fa, fb)
if sim >= FIELDSET_SIMILARITY_THRESHOLD:
overlaps["fieldset_similar"].append({
"a": {"app": aa, "name": ma["name"], "file": ma["file"]},
"b": {"app": ab, "name": mb["name"], "file": mb["file"]},
"similarity": round(sim, 2),
"shared": sorted(list(fa & fb))[:25],
})
# inventory leaks
for app, models in model_map["by_app"].items():
domain = model_map["app_domains"].get(app, "")
if domain in {"laboratory","radiology","pharmacy","emr","inpatients","blood_bank","operating_theatre"}:
for m in models:
toks = tokenize_identifiers(m["name"])
for f in m["fields"].keys(): toks |= tokenize_identifiers(f)
leaks = toks & INVENTORY_LEAK_TOKENS
if leaks:
overlaps["inventory_leaks"].append({"app": app, "model": m["name"], "file": m["file"], "tokens": sorted(list(leaks))})
# canonical conflicts
canonical_flat={}
for app, model_names in CANONICAL_OWNERSHIP.items():
for mn in model_names: canonical_flat.setdefault(mn, set()).add(app)
for app, models in model_map["by_app"].items():
for m in models:
owners = canonical_flat.get(m["name"])
if owners and app not in owners:
overlaps["canonical_conflicts"].append({
"model": m["name"], "app": app, "expected": sorted(list(owners)), "file": m["file"]
})
return overlaps
def make_matrix(model_map: dict) -> list[dict]:
# Allowed capabilities per domain (heuristic)
permitted = {
"inventory":{"catalog","stock","ledger"},
"laboratory":{"orders","results","panels"},
"radiology":{"orders","reports","studies"},
"pharmacy":{"prescriptions","dispenses","formulary"},
"billing":{"claims","invoices","payments"},
"emr":{"clinical_docs","observations"},
"hr":{"staffing","scheduling"},
"core":{"patients","encounters","tenancy","audit","attachments"},
"inpatients":{"admissions","transfers","beds"},
"appointments":{"booking","scheduling"},
"operating_theatre":{"surgical_cases","anesthesia"},
"blood_bank":{"donations","components","crossmatch","transfusions"},
"analytics":{"reporting","aggregations"},
"communications":{"notifications","templates"},
"integration":{"connectors","mappings"},
"quality":{"incidents","indicators","audits"},
"facility_management":{"assets","workorders","maintenance"},
"insurance_approvals":{"authorizations"},
"patients":{"profiles"},
"accounts":{"users_auth"},
}
rows=[]
for app in sorted(model_map["by_app"].keys()):
domain = model_map["app_domains"].get(app) or "unknown"
rows.append({"app": app, "domain_guess": domain, "allowed": sorted(list(permitted.get(domain, set()))), "note": "Heuristic; adjust if needed."})
return rows
def to_markdown_report(model_map: dict, overlaps: dict, matrix_rows: list[dict]) -> str:
apps = sorted(model_map["by_app"].keys())
total_models = sum(len(model_map["by_app"][a]) for a in apps)
inv = ["| App | Domain Guess | # Models | Models |","|---|---|---:|---|"]
for app in apps:
domain = model_map["app_domains"].get(app) or ""
names = ", ".join(sorted(m["name"] for m in model_map["by_app"][app]))
inv.append(f"| `{app}` | {domain} | {len(model_map['by_app'][app])} | {names} |")
mat = ["| App | Domain | Allowed Capabilities | Note |","|---|---|---|---|"]
for r in matrix_rows:
mat.append(f"| `{r['app']}` | {r['domain_guess'] or ''} | {', '.join(r['allowed']) or ''} | {r['note']} |")
def block(obj): return "```json\n"+json.dumps(obj, indent=2)+"\n```"
return dedent(f"""
# Modular Refactoring Report (Drop-and-Reseed Friendly)
**Executive Summary**
- Apps scanned: **{len(apps)}**
- Total models found: **{total_models}**
- Name collisions: **{len(overlaps['name_collisions'])}**
- Field-set similarities: **{len(overlaps['fieldset_similar'])}**
- Inventory leaks in clinical apps: **{len(overlaps['inventory_leaks'])}**
- Canonical ownership conflicts: **{len(overlaps['canonical_conflicts'])}**
---
## 1) App & Model Inventory
{os.linesep.join(inv)}
### Model Map (JSON)
{block({k:(v if k!='by_app' else {a:v for a,v in model_map['by_app'].items()}) for k,v in model_map.items() if k!='reverse_deps'})}
---
## 2) Detected Overlaps & Risks
### A) Name Collisions
{block(overlaps['name_collisions']) if overlaps['name_collisions'] else "_None detected_"}
### B) Field-Set Similarities (possible duplicates) — threshold ≥ {FIELDSET_SIMILARITY_THRESHOLD}
{block(overlaps['fieldset_similar']) if overlaps['fieldset_similar'] else "_None detected_"}
### C) Inventory Responsibility Leaks in Clinical Apps
{block(overlaps['inventory_leaks']) if overlaps['inventory_leaks'] else "_None detected_"}
### D) Canonical Ownership Conflicts
{block(overlaps['canonical_conflicts']) if overlaps['canonical_conflicts'] else "_None detected_"}
---
## 3) Responsibility Matrix (Apps × Capabilities)
{os.linesep.join(mat)}
---
## 4) Target Architecture (Text Diagram)
```
core ──► (patient, encounter, tenant, audit, attachment)
├── emr ──► clinical docs + observations (uses core.Patient/Encounter)
├── documentation ──► owns ALL notes/reports (centralized), linkable everywhere
├── inventory ──► items/stock/ledger (FK used by lab/pharmacy/blood_bank)
├── pharmacy ──► prescriptions/dispenses (FK inventory)
├── laboratory ──► orders/tests/results (FK inventory)
├── radiology ──► orders/studies/reports (narrative via documentation)
├── inpatients ──► admissions/transfers/beds (admission & progress notes via documentation)
├── operating_theatre ──► surgical cases/anesthesia (notes via documentation)
├── blood_bank ──► donors/components/crossmatch/transfusions (no stock logic)
├── billing ──► claims/invoices/payments (FK patient/encounter only)
├── appointments ──► booking/scheduling
├── hr ──► staffing/scheduling
├── insurance_approvals ──► payer authorizations
├── facility_management ──► assets/workorders/maintenance
├── analytics ──► reporting/aggregations (read-only)
├── communications ──► notifications/templates
└── integration ──► connectors/mappings/jobs
```
**Rules**
- No stock logic in lab/radiology/pharmacy/blood_bank/inpatients.
- No billing logic in EMR/clinical apps.
- PHI stays in `core`, `emr`, and `documentation`.
- All narrative notes/reports live in `documentation.Document`; domain apps **link**.
---
## 5) Refactor Plan (Drop & Reseed)
1. **Delete** duplicated models from non-canonical apps (see overlaps).
2. **Keep** canonical models in the domain owners defined above.
3. Replace cross-app copies with **FKs to canonical models** or service calls.
4. Introduce **service modules** per domain (e.g., `inventory.services.record_stock_movement`).
5. Standardize model fields: `tenant`, `external_id`, `created_at`, `updated_at`, `created_by`, `updated_by`, `is_active`.
6. Regenerate seeds per domain (inventory catalog, lab tests, formulary, payer contracts).
7. Implement centralized documentation (below) and remove local notes/report models.
---
## 6) Centralized Documentation (New `documentation/` app)
**Purpose:** One canonical store for all notes & reports; link to any object.
Create: `documentation/models.py`
```python
from django.conf import settings
from django.db import models
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
import uuid
class NoteType(models.TextChoices):
ADMISSION = "ADMISSION", "Admission Note"
PROGRESS = "PROGRESS", "Progress Note"
DISCHARGE = "DISCHARGE", "Discharge Summary"
RADIOLOGY_REPORT = "RADIOLOGY_REPORT", "Radiology Report"
LAB_REPORT = "LAB_REPORT", "Laboratory Narrative"
PROCEDURE = "PROCEDURE", "Procedure Note"
ANESTHESIA = "ANESTHESIA", "Anesthesia Note"
BLOODBANK = "BLOODBANK", "Transfusion Note"
OTHER = "OTHER", "Other"
class Document(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
tenant = models.ForeignKey("core.Tenant", on_delete=models.CASCADE, related_name="documents")
patient = models.ForeignKey("core.Patient", on_delete=models.CASCADE, related_name="documents")
encounter = models.ForeignKey("core.Encounter", on_delete=models.SET_NULL, null=True, blank=True, related_name="documents")
doc_type = models.CharField(max_length=40, choices=NoteType.choices)
title = models.CharField(max_length=255)
status = models.CharField(max_length=20, default="final") # draft|amended|final|entered-in-error
authored_at = models.DateTimeField(auto_now_add=True)
authored_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, related_name="documents_authored")
signed_at = models.DateTimeField(null=True, blank=True)
signed_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True, related_name="documents_signed")
body_markdown = models.TextField(blank=True, default="")
body_json = models.JSONField(blank=True, null=True)
fhir_profile = models.CharField(max_length=120, blank=True, default="")
code = models.CharField(max_length=64, blank=True, default="")
is_confidential = models.BooleanField(default=False)
is_amendment = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class DocumentVersion(models.Model):
document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="versions")
version = models.PositiveIntegerField()
snapshot_markdown = models.TextField(blank=True, default="")
snapshot_json = models.JSONField(blank=True, null=True)
changed_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True)
changed_at = models.DateTimeField(auto_now_add=True)
class Meta:
unique_together = [("document", "version")]
ordering = ["-version"]
class DocumentLink(models.Model):
document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="links")
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.CharField(max_length=64)
target = GenericForeignKey("content_type", "object_id")
role = models.CharField(max_length=40, blank=True, default="context") # context|source|result|followup
created_at = models.DateTimeField(auto_now_add=True)
class DocumentAttachment(models.Model):
document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="attachments")
file = models.FileField(upload_to="documents/%Y/%m/")
title = models.CharField(max_length=255, blank=True, default="")
created_at = models.DateTimeField(auto_now_add=True)
```
**DRF (serializers & viewsets)**
- Endpoints:
- `POST /api/v1/documentation/documents/` (create)
- `GET /api/v1/documentation/documents/?patient=&encounter=&doc_type=`
- `POST /api/v1/documentation/documents/{{id}}/amend/`
- `POST /api/v1/documentation/documents/{{id}}/link/`
- `GET /api/v1/documentation/documents/{{id}}/`
- Add tenant scoping and object-level permissions.
**Services**: `documentation/services.py`
```python
from .models import Document, DocumentVersion, DocumentLink
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone
def create_document_and_link(*, tenant, patient, encounter=None, doc_type:str, title:str,
body_markdown:str="", body_json=None, links=None, author=None, sign=False):
doc = Document.objects.create(
tenant=tenant, patient=patient, encounter=encounter,
doc_type=doc_type, title=title, body_markdown=body_markdown, body_json=body_json or {{}},
authored_by=author, status="draft" if not sign else "final",
signed_at=timezone.now() if sign else None, signed_by=author if sign else None
)
DocumentVersion.objects.create(document=doc, version=1, snapshot_markdown=body_markdown, snapshot_json=body_json or {{}}, changed_by=author)
for ln in links or []:
ct = ContentType.objects.get(app_label=ln["app_label"], model=ln["model"].lower())
DocumentLink.objects.create(document=doc, content_type=ct, object_id=str(ln["pk"]), role=ln.get("role","context"))
return doc
def sign_document(doc: Document, user):
doc.status = "final"; doc.signed_at = timezone.now(); doc.signed_by = user; doc.save(update_fields=["status","signed_at","signed_by"])
return doc
def amend_document(doc: Document, user, *, new_markdown:str="", new_json=None):
nxt = (doc.versions.first().version + 1) if doc.versions.exists() else 2
DocumentVersion.objects.create(document=doc, version=nxt, snapshot_markdown=new_markdown, snapshot_json=new_json or {{}}, changed_by=user)
doc.body_markdown, doc.body_json, doc.is_amendment = new_markdown, (new_json or {{}}), True
doc.save(update_fields=["body_markdown","body_json","updated_at","is_amendment"])
return doc
```
**Refactor each domain to use services:**
- **inpatients**: Admission/Progress notes → `documentation` (doc_type `ADMISSION` / `PROGRESS`).
- **radiology**: Narrative report → `documentation` (doc_type `RADIOLOGY_REPORT`, link to Study as `result`).
- **laboratory**: Narrative report (if needed) → `documentation` (doc_type `LAB_REPORT`, link to Order).
- **operating_theatre**: Anesthesia/Procedure notes → `documentation`.
- **blood_bank**: Transfusion note → `documentation`.
**Delete** local note/report models once the adapters are wired.
---
## 7) Breaking Changes & Reseed Checklist
- [ ] Removed duplicated/non-canonical models
- [ ] Updated FK targets to canonical owners
- [ ] Replaced local notes/reports with `documentation.Document` links
- [ ] Admin & DRF routes updated
- [ ] Seed order: `core` → `inventory` → `emr` → domain catalogs → `billing`
- [ ] Tenancy filters enforced (Managers/Mixins)
- [ ] PHI audit verified (create/update/sign/amend trails)
---
## 8) Smoke Tests (must pass)
1) Create Patient & Encounter (core).
2) Create Admission (inpatients) → create `ADMISSION` note via documentation & link to Admission.
3) Create Lab Order (laboratory) that consumes inventory items via FK; finalize → (optional) `LAB_REPORT`.
4) Create Radiology Study (radiology) → finalize report via documentation (`RADIOLOGY_REPORT`), link as `result`.
5) Raise Invoice (billing) against Encounter; **no clinical duplication** in billing.
6) Verify Patient timeline shows all Documents; each domain detail page shows linked documents.
---
## 9) How to Run
**A) Generate audit report**
```bash
python tools/analyze_models.py \\
--project-root . \\
--apps core accounts blood_bank patients appointments inpatients emr pharmacy laboratory radiology \\
operating_theatre billing inventory hr analytics communications integration quality \\
facility_management insurance_approvals \\
--output-dir ./_refactor_report
```
Review `_refactor_report/modular_refactoring_report.md`.
**B) Implement `documentation/` app**
- Add to `LOCAL_APPS`: `documentation`
- Create models/services as above; make DRF serializers/viewsets.
- Replace local notes/report models with calls to `documentation.services`.
**C) Drop & Reseed**
- Reset DB, run migrations, seed catalogs (inventory items, lab tests, formulary, payers, etc.).
- Run **Smoke Tests**.
---
## 10) App Responsibility Matrix (tailored to your list)
| App | Canonical Ownership |
|---|---|
| `core` | Patient, Encounter, Tenant, Facility, AuditEvent, Attachment, Identifier |
| `accounts` | Users/Auth/RBAC; no clinical/business logic |
| `patients` | PatientProfile & demographics extensions (if used) |
| `appointments` | Appointment, Schedule, Slot |
| `inpatients` | Admission, Transfer, DischargeSummary, Ward, Bed, Stay (**notes via documentation**) |
| `emr` | Clinical documentation scaffolding if needed; avoid storing notes (use `documentation`) |
| `pharmacy` | Drug, Formulary, Prescription, Dispense (**stock via inventory FK**) |
| `laboratory` | LabOrder, LabTest, LabPanel, LabResult, Specimen (**no stock logic**) |
| `radiology` | RadOrder, Study, Series (**narrative report via documentation**) |
| `operating_theatre` | SurgicalCase, AnesthesiaNote (**note content via documentation**) |
| `billing` | Payer, Contract, Claim, Invoice, Payment |
| `inventory` | Item, StockLocation, StockLedger, ReorderRule, UnitOfMeasure, Batch, Lot |
| `hr` | Employee, Shift, Schedule, Timesheet |
| `analytics` | Reporting/aggregations (read-only over canonical stores) |
| `communications` | Notifications, MessageTemplate |
| `integration` | Connectors (FHIR/HL7/ELM/etc.), Mappings, Jobs, EventLog |
| `quality` | Incident, Indicator, AuditFinding, CAPA |
| `facility_management` | Asset, WorkOrder, MaintenancePlan, Ticket |
| `blood_bank` | Donor, Donation, Component, Crossmatch, Transfusion (**no stock logic**) |
| `insurance_approvals` | AuthorizationRequest/Response |
**Golden Rules**
- **Inventory** is the *only* place with stock/ledger logic.
- **Billing** is the *only* place with claims/invoices/payments.
- **Documentation** is the *only* place for free-text notes/reports; everything else **links**.
""").strip()
def main():
parser = argparse.ArgumentParser(
description="Analyze Django apps for model duplication & responsibility leaks",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Example:
python tools/analyze_models.py \\
--project-root . \\
--apps core accounts blood_bank patients appointments inpatients emr pharmacy laboratory radiology \\
operating_theatre billing inventory hr analytics communications integration quality \\
facility_management insurance_approvals \\
--output-dir ./_refactor_report
"""
)
parser.add_argument(
'--project-root',
type=Path,
required=True,
help='Path to Django project root directory'
)
parser.add_argument(
'--apps',
nargs='+',
required=True,
help='List of Django app labels to analyze'
)
parser.add_argument(
'--output-dir',
type=Path,
default=Path('./_refactor_report'),
help='Output directory for generated reports (default: ./_refactor_report)'
)
args = parser.parse_args()
# Validate project root
if not args.project_root.exists():
print(f"ERROR: Project root does not exist: {args.project_root}", file=sys.stderr)
sys.exit(1)
# Create output directory
args.output_dir.mkdir(parents=True, exist_ok=True)
print(f"Analyzing {len(args.apps)} Django apps...")
print(f"Project root: {args.project_root}")
print(f"Output directory: {args.output_dir}")
print()
# Analyze models
model_map = analyze(args.project_root, args.apps)
# Detect overlaps
overlaps = detect_overlaps(model_map)
# Generate responsibility matrix
matrix_rows = make_matrix(model_map)
# Generate markdown report
markdown_report = to_markdown_report(model_map, overlaps, matrix_rows)
# Write outputs
model_map_file = args.output_dir / 'model_map.json'
overlaps_file = args.output_dir / 'overlaps.json'
report_file = args.output_dir / 'modular_refactoring_report.md'
# Write model_map.json
with open(model_map_file, 'w', encoding='utf-8') as f:
json.dump(model_map, f, indent=2, default=str)
print(f"✓ Generated: {model_map_file}")
# Write overlaps.json
with open(overlaps_file, 'w', encoding='utf-8') as f:
json.dump(overlaps, f, indent=2)
print(f"✓ Generated: {overlaps_file}")
# Write markdown report
with open(report_file, 'w', encoding='utf-8') as f:
f.write(markdown_report)
print(f"✓ Generated: {report_file}")
print()
print("=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"Apps scanned: {len(model_map['apps'])}")
print(f"Total models: {sum(len(models) for models in model_map['by_app'].values())}")
print(f"Name collisions: {len(overlaps['name_collisions'])}")
print(f"Field-set similarities: {len(overlaps['fieldset_similar'])}")
print(f"Inventory leaks: {len(overlaps['inventory_leaks'])}")
print(f"Canonical conflicts: {len(overlaps['canonical_conflicts'])}")
print()
print(f"Review the detailed report at: {report_file}")
print("=" * 80)
if __name__ == '__main__':
main()