671 lines
32 KiB
Python
671 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Analyze specified Django apps for model duplication & responsibility leaks (healthcare-focused).
|
||
- Scans only the provided app labels (no imports, AST-only).
|
||
- Emits: model_map.json, overlaps.json, modular_refactoring_report.md
|
||
|
||
Run:
|
||
python tools/analyze_models.py \
|
||
--project-root . \
|
||
--apps core accounts blood_bank patients appointments inpatients emr pharmacy laboratory radiology \
|
||
operating_theatre billing inventory hr analytics communications integration quality \
|
||
facility_management insurance_approvals \
|
||
--output-dir ./_refactor_report
|
||
"""
|
||
import argparse
|
||
import ast
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
from collections import defaultdict
|
||
from pathlib import Path
|
||
from textwrap import dedent
|
||
|
||
# === Tunables (extend as your codebase evolves) ================================
|
||
|
||
# Keywords to help guess domains by file/model/field names (heuristic only)
|
||
DOMAIN_KEYWORDS = {
|
||
"inventory": {"stock","item","sku","warehouse","uom","batch","lot","reorder","ledger","movement"},
|
||
"laboratory": {"lab","specimen","test","result","panel","analyte","accession"},
|
||
"radiology": {"study","series","modality","dicom","report","imaging"},
|
||
"pharmacy": {"rx","prescription","dispense","med","drug","formulary","dose","pharma"},
|
||
"billing": {"invoice","claim","payer","contract","tariff","price","payment","sbs","drg","adrg"},
|
||
"emr": {"note","vital","diagnosis","procedure","encounter","allergy","observation","problem","plan"},
|
||
"hr": {"employee","schedule","shift","roster","timesheet","payroll","leave"},
|
||
"core": {"patient","encounter","tenant","facility","identifier","audit","attachment"},
|
||
"inpatients": {"admission","discharge","ward","bed","stay","transfer"},
|
||
"appointments": {"appointment","slot","schedule","booking"},
|
||
"operating_theatre": {"ot","or","theatre","surgery","anesthesia","case","procedure"},
|
||
"blood_bank": {"donor","bag","component","crossmatch","blood","transfusion"},
|
||
"analytics": {"kpi","metric","dashboard","reporting","aggregate"},
|
||
"communications": {"message","sms","email","notification"},
|
||
"integration": {"hl7","fhir","elm","ocpi","ocpp","webhook","adapter"},
|
||
"quality": {"audit","incident","complaint","indicator","risk","quality"},
|
||
"facility_management": {"asset","ticket","workorder","maintenance","hvac","bme","facility"},
|
||
"insurance_approvals": {"auth","authorization","approval","preauth","payer","policy"},
|
||
"patients": {"patient","demographic","identifier","contact"},
|
||
"accounts": {"user","auth","role","permission","group"},
|
||
}
|
||
|
||
# Canonical ownership proposal tailored to your apps
|
||
CANONICAL_OWNERSHIP = {
|
||
"core": {"Patient","Encounter","Facility","Tenant","AuditEvent","Attachment","Identifier"},
|
||
"patients": {"PatientProfile"}, # if you keep a split between core primitives and profile extras
|
||
"appointments": {"Appointment","Schedule","Slot"},
|
||
"inpatients": {"Admission","Transfer","DischargeSummary","Ward","Bed","Stay"},
|
||
"emr": {"ClinicalNote","Allergy","Problem","Procedure","Vital"},
|
||
"pharmacy": {"Drug","Prescription","Dispense","Formulary"},
|
||
"laboratory": {"LabOrder","LabTest","LabPanel","LabResult","Specimen"},
|
||
"radiology": {"RadOrder","Study","Series","RadReport"},
|
||
"operating_theatre": {"SurgicalCase","AnesthesiaNote"}, # narrative note will be centralized
|
||
"billing": {"Payer","Contract","PriceRule","Claim","Invoice","Payment"},
|
||
"inventory": {"Item","StockLocation","StockLedger","ReorderRule","UnitOfMeasure","Batch","Lot"},
|
||
"hr": {"Employee","Shift","Schedule","Timesheet"},
|
||
"analytics": {"Metric","KPI","ReportConfig"}, # read-only aggregates / configs
|
||
"communications": {"MessageTemplate","OutboundMessage","Notification"},
|
||
"integration": {"Endpoint","Channel","Mapping","Job","EventLog"},
|
||
"quality": {"Incident","QualityIndicator","AuditFinding","CAPA"},
|
||
"facility_management": {"Asset","WorkOrder","MaintenancePlan","Ticket"},
|
||
"blood_bank": {"Donor","Donation","Component","Crossmatch","Transfusion"},
|
||
"insurance_approvals": {"AuthorizationRequest","AuthorizationResponse"},
|
||
}
|
||
|
||
# Strong signals of inventory logic leaking elsewhere
|
||
INVENTORY_LEAK_TOKENS = {"stock","warehouse","reorder","ledger","uom","sku","onhand","qty","quantity"}
|
||
|
||
# Names to ignore when comparing (abstract bases, mixins)
|
||
GENERIC_NAMES = {"BaseModel","TimestampedModel","AbstractBase","CommonModel","NameMixin"}
|
||
|
||
# Similarity threshold for flagging "these two models look the same"
|
||
FIELDSET_SIMILARITY_THRESHOLD = 0.6
|
||
# ==============================================================================
|
||
|
||
def find_models_py(project_root: Path, app_label: str) -> Path | None:
|
||
parts = app_label.split(".")
|
||
for i in range(len(parts), 0, -1):
|
||
candidate = project_root.joinpath(*parts[:i])
|
||
if candidate.exists():
|
||
break
|
||
else:
|
||
candidate = project_root.joinpath(*parts)
|
||
models_py = candidate / "models.py"
|
||
return models_py if models_py.exists() else None
|
||
|
||
def is_model_class(node: ast.ClassDef) -> bool:
|
||
for base in node.bases:
|
||
if isinstance(base, ast.Attribute) and base.attr == "Model": return True
|
||
if isinstance(base, ast.Name) and base.id == "Model": return True
|
||
return False
|
||
|
||
def extract_fields(node: ast.ClassDef) -> dict:
|
||
fields, fks, m2ms = {}, [], []
|
||
for stmt in node.body:
|
||
if isinstance(stmt, ast.Assign) and len(stmt.targets) == 1 and isinstance(stmt.targets[0], ast.Name):
|
||
name = stmt.targets[0].id
|
||
if isinstance(stmt.value, ast.Call):
|
||
call = stmt.value
|
||
ftype = call.func.attr if isinstance(call.func, ast.Attribute) else (call.func.id if isinstance(call.func, ast.Name) else None)
|
||
fields[name] = ftype or "Field"
|
||
if ftype in {"ForeignKey","OneToOneField","ManyToManyField"} and call.args:
|
||
a0 = call.args[0]; target = None
|
||
if isinstance(a0, ast.Constant) and isinstance(a0.value, str): target = a0.value
|
||
elif isinstance(a0, ast.Attribute):
|
||
pieces=[]; cur=a0
|
||
while isinstance(cur, ast.Attribute): pieces.append(cur.attr); cur=cur.value
|
||
if isinstance(cur, ast.Name): pieces.append(cur.id)
|
||
target=".".join(reversed(pieces))
|
||
elif isinstance(a0, ast.Name): target = a0.id
|
||
if ftype == "ManyToManyField": m2ms.append(target)
|
||
else: fks.append(target)
|
||
return {"fields": fields, "fks": fks, "m2ms": m2ms}
|
||
|
||
def tokenize_identifiers(*names: str) -> set[str]:
|
||
toks=set()
|
||
for n in names or []:
|
||
for t in re.split(r"[_\W]+",(n or "").lower()):
|
||
if t: toks.add(t)
|
||
return toks
|
||
|
||
def jaccard(a:set,b:set)->float:
|
||
if not a and not b: return 0.0
|
||
return len(a & b)/max(1, len(a | b))
|
||
|
||
def domain_guess(app_label: str, models_in_app: list[dict]) -> str | None:
|
||
toks = tokenize_identifiers(app_label)
|
||
for m in models_in_app:
|
||
toks |= tokenize_identifiers(m["name"])
|
||
for f in m["fields"].keys(): toks |= tokenize_identifiers(f)
|
||
best, score = None, 0
|
||
for d, kw in DOMAIN_KEYWORDS.items():
|
||
s = len(toks & kw)
|
||
if s > score: best, score = d, s
|
||
return best
|
||
|
||
def analyze(project_root: Path, app_labels: list[str]) -> dict:
|
||
model_map = {"apps": [], "by_app": defaultdict(list)}
|
||
for app in app_labels:
|
||
mp = find_models_py(project_root, app)
|
||
if not mp: continue
|
||
try: src = mp.read_text(encoding="utf-8")
|
||
except Exception as e: print(f"ERROR reading {mp}: {e}", file=sys.stderr); continue
|
||
try: tree = ast.parse(src)
|
||
except SyntaxError as e: print(f"SYNTAX ERROR in {mp}: {e}", file=sys.stderr); continue
|
||
models=[]
|
||
for node in tree.body:
|
||
if isinstance(node, ast.ClassDef) and node.name not in GENERIC_NAMES and is_model_class(node):
|
||
d = extract_fields(node)
|
||
models.append({"name": node.name, "fields": d["fields"], "fks": d["fks"], "m2ms": d["m2ms"], "lineno": node.lineno, "file": str(mp)})
|
||
if models:
|
||
model_map["apps"].append(app)
|
||
model_map["by_app"][app] = models
|
||
|
||
# index fully-qualified labels for reverse deps
|
||
fq = {}
|
||
for app, models in model_map["by_app"].items():
|
||
for m in models: fq[f"{app}.{m['name']}"] = (app, m["name"])
|
||
|
||
rev = defaultdict(lambda: defaultdict(list)) # target -> {source_app: [model_names]}
|
||
for app, models in model_map["by_app"].items():
|
||
for m in models:
|
||
for ref in (m["fks"] + m["m2ms"]):
|
||
if not ref: continue
|
||
cand=None
|
||
if "." in (ref or ""): cand = ref
|
||
else:
|
||
matches=[F for F in fq.keys() if F.endswith(f".{ref}")]
|
||
if matches: cand = matches[0]
|
||
if cand: rev[cand][app].append(m["name"])
|
||
|
||
app_domains = {app: domain_guess(app, models) for app, models in model_map["by_app"].items()}
|
||
model_map["app_domains"] = app_domains
|
||
model_map["reverse_deps"] = {k: dict(v) for k, v in rev.items()}
|
||
return model_map
|
||
|
||
def detect_overlaps(model_map: dict) -> dict:
|
||
overlaps={"name_collisions":[],"fieldset_similar":[],"inventory_leaks":[],"canonical_conflicts":[]}
|
||
idx=defaultdict(list)
|
||
for app, models in model_map["by_app"].items():
|
||
for m in models: idx[m["name"]].append({"app": app, "model": m})
|
||
# name collisions
|
||
for name, occ in idx.items():
|
||
if len(occ) > 1 and name not in GENERIC_NAMES:
|
||
overlaps["name_collisions"].append({
|
||
"name": name,
|
||
"occurrences": [{"app": o["app"], "file": o["model"]["file"], "lineno": o["model"]["lineno"]} for o in occ]
|
||
})
|
||
# fieldset similarities
|
||
def fieldset(m): return set(m["fields"].keys()) | set(m["fks"]) | set(m["m2ms"])
|
||
allm=[]
|
||
for app, models in model_map["by_app"].items():
|
||
for m in models: allm.append((app, m))
|
||
for i in range(len(allm)):
|
||
aa, ma = allm[i]; fa = fieldset(ma)
|
||
for j in range(i+1, len(allm)):
|
||
ab, mb = allm[j]
|
||
if aa == ab: continue
|
||
fb = fieldset(mb)
|
||
sim = jaccard(fa, fb)
|
||
if sim >= FIELDSET_SIMILARITY_THRESHOLD:
|
||
overlaps["fieldset_similar"].append({
|
||
"a": {"app": aa, "name": ma["name"], "file": ma["file"]},
|
||
"b": {"app": ab, "name": mb["name"], "file": mb["file"]},
|
||
"similarity": round(sim, 2),
|
||
"shared": sorted(list(fa & fb))[:25],
|
||
})
|
||
# inventory leaks
|
||
for app, models in model_map["by_app"].items():
|
||
domain = model_map["app_domains"].get(app, "")
|
||
if domain in {"laboratory","radiology","pharmacy","emr","inpatients","blood_bank","operating_theatre"}:
|
||
for m in models:
|
||
toks = tokenize_identifiers(m["name"])
|
||
for f in m["fields"].keys(): toks |= tokenize_identifiers(f)
|
||
leaks = toks & INVENTORY_LEAK_TOKENS
|
||
if leaks:
|
||
overlaps["inventory_leaks"].append({"app": app, "model": m["name"], "file": m["file"], "tokens": sorted(list(leaks))})
|
||
# canonical conflicts
|
||
canonical_flat={}
|
||
for app, model_names in CANONICAL_OWNERSHIP.items():
|
||
for mn in model_names: canonical_flat.setdefault(mn, set()).add(app)
|
||
for app, models in model_map["by_app"].items():
|
||
for m in models:
|
||
owners = canonical_flat.get(m["name"])
|
||
if owners and app not in owners:
|
||
overlaps["canonical_conflicts"].append({
|
||
"model": m["name"], "app": app, "expected": sorted(list(owners)), "file": m["file"]
|
||
})
|
||
return overlaps
|
||
|
||
def make_matrix(model_map: dict) -> list[dict]:
|
||
# Allowed capabilities per domain (heuristic)
|
||
permitted = {
|
||
"inventory":{"catalog","stock","ledger"},
|
||
"laboratory":{"orders","results","panels"},
|
||
"radiology":{"orders","reports","studies"},
|
||
"pharmacy":{"prescriptions","dispenses","formulary"},
|
||
"billing":{"claims","invoices","payments"},
|
||
"emr":{"clinical_docs","observations"},
|
||
"hr":{"staffing","scheduling"},
|
||
"core":{"patients","encounters","tenancy","audit","attachments"},
|
||
"inpatients":{"admissions","transfers","beds"},
|
||
"appointments":{"booking","scheduling"},
|
||
"operating_theatre":{"surgical_cases","anesthesia"},
|
||
"blood_bank":{"donations","components","crossmatch","transfusions"},
|
||
"analytics":{"reporting","aggregations"},
|
||
"communications":{"notifications","templates"},
|
||
"integration":{"connectors","mappings"},
|
||
"quality":{"incidents","indicators","audits"},
|
||
"facility_management":{"assets","workorders","maintenance"},
|
||
"insurance_approvals":{"authorizations"},
|
||
"patients":{"profiles"},
|
||
"accounts":{"users_auth"},
|
||
}
|
||
rows=[]
|
||
for app in sorted(model_map["by_app"].keys()):
|
||
domain = model_map["app_domains"].get(app) or "unknown"
|
||
rows.append({"app": app, "domain_guess": domain, "allowed": sorted(list(permitted.get(domain, set()))), "note": "Heuristic; adjust if needed."})
|
||
return rows
|
||
|
||
def to_markdown_report(model_map: dict, overlaps: dict, matrix_rows: list[dict]) -> str:
|
||
apps = sorted(model_map["by_app"].keys())
|
||
total_models = sum(len(model_map["by_app"][a]) for a in apps)
|
||
inv = ["| App | Domain Guess | # Models | Models |","|---|---|---:|---|"]
|
||
for app in apps:
|
||
domain = model_map["app_domains"].get(app) or ""
|
||
names = ", ".join(sorted(m["name"] for m in model_map["by_app"][app]))
|
||
inv.append(f"| `{app}` | {domain} | {len(model_map['by_app'][app])} | {names} |")
|
||
mat = ["| App | Domain | Allowed Capabilities | Note |","|---|---|---|---|"]
|
||
for r in matrix_rows:
|
||
mat.append(f"| `{r['app']}` | {r['domain_guess'] or ''} | {', '.join(r['allowed']) or '—'} | {r['note']} |")
|
||
def block(obj): return "```json\n"+json.dumps(obj, indent=2)+"\n```"
|
||
return dedent(f"""
|
||
# Modular Refactoring Report (Drop-and-Reseed Friendly)
|
||
|
||
**Executive Summary**
|
||
- Apps scanned: **{len(apps)}**
|
||
- Total models found: **{total_models}**
|
||
- Name collisions: **{len(overlaps['name_collisions'])}**
|
||
- Field-set similarities: **{len(overlaps['fieldset_similar'])}**
|
||
- Inventory leaks in clinical apps: **{len(overlaps['inventory_leaks'])}**
|
||
- Canonical ownership conflicts: **{len(overlaps['canonical_conflicts'])}**
|
||
|
||
---
|
||
|
||
## 1) App & Model Inventory
|
||
{os.linesep.join(inv)}
|
||
|
||
### Model Map (JSON)
|
||
{block({k:(v if k!='by_app' else {a:v for a,v in model_map['by_app'].items()}) for k,v in model_map.items() if k!='reverse_deps'})}
|
||
|
||
---
|
||
|
||
## 2) Detected Overlaps & Risks
|
||
|
||
### A) Name Collisions
|
||
{block(overlaps['name_collisions']) if overlaps['name_collisions'] else "_None detected_"}
|
||
|
||
### B) Field-Set Similarities (possible duplicates) — threshold ≥ {FIELDSET_SIMILARITY_THRESHOLD}
|
||
{block(overlaps['fieldset_similar']) if overlaps['fieldset_similar'] else "_None detected_"}
|
||
|
||
### C) Inventory Responsibility Leaks in Clinical Apps
|
||
{block(overlaps['inventory_leaks']) if overlaps['inventory_leaks'] else "_None detected_"}
|
||
|
||
### D) Canonical Ownership Conflicts
|
||
{block(overlaps['canonical_conflicts']) if overlaps['canonical_conflicts'] else "_None detected_"}
|
||
|
||
---
|
||
|
||
## 3) Responsibility Matrix (Apps × Capabilities)
|
||
{os.linesep.join(mat)}
|
||
|
||
---
|
||
|
||
## 4) Target Architecture (Text Diagram)
|
||
|
||
```
|
||
core ──► (patient, encounter, tenant, audit, attachment)
|
||
│
|
||
├── emr ──► clinical docs + observations (uses core.Patient/Encounter)
|
||
├── documentation ──► owns ALL notes/reports (centralized), linkable everywhere
|
||
├── inventory ──► items/stock/ledger (FK used by lab/pharmacy/blood_bank)
|
||
├── pharmacy ──► prescriptions/dispenses (FK inventory)
|
||
├── laboratory ──► orders/tests/results (FK inventory)
|
||
├── radiology ──► orders/studies/reports (narrative via documentation)
|
||
├── inpatients ──► admissions/transfers/beds (admission & progress notes via documentation)
|
||
├── operating_theatre ──► surgical cases/anesthesia (notes via documentation)
|
||
├── blood_bank ──► donors/components/crossmatch/transfusions (no stock logic)
|
||
├── billing ──► claims/invoices/payments (FK patient/encounter only)
|
||
├── appointments ──► booking/scheduling
|
||
├── hr ──► staffing/scheduling
|
||
├── insurance_approvals ──► payer authorizations
|
||
├── facility_management ──► assets/workorders/maintenance
|
||
├── analytics ──► reporting/aggregations (read-only)
|
||
├── communications ──► notifications/templates
|
||
└── integration ──► connectors/mappings/jobs
|
||
```
|
||
|
||
**Rules**
|
||
- No stock logic in lab/radiology/pharmacy/blood_bank/inpatients.
|
||
- No billing logic in EMR/clinical apps.
|
||
- PHI stays in `core`, `emr`, and `documentation`.
|
||
- All narrative notes/reports live in `documentation.Document`; domain apps **link**.
|
||
|
||
---
|
||
|
||
## 5) Refactor Plan (Drop & Reseed)
|
||
|
||
1. **Delete** duplicated models from non-canonical apps (see overlaps).
|
||
2. **Keep** canonical models in the domain owners defined above.
|
||
3. Replace cross-app copies with **FKs to canonical models** or service calls.
|
||
4. Introduce **service modules** per domain (e.g., `inventory.services.record_stock_movement`).
|
||
5. Standardize model fields: `tenant`, `external_id`, `created_at`, `updated_at`, `created_by`, `updated_by`, `is_active`.
|
||
6. Regenerate seeds per domain (inventory catalog, lab tests, formulary, payer contracts).
|
||
7. Implement centralized documentation (below) and remove local notes/report models.
|
||
|
||
---
|
||
|
||
## 6) Centralized Documentation (New `documentation/` app)
|
||
|
||
**Purpose:** One canonical store for all notes & reports; link to any object.
|
||
|
||
Create: `documentation/models.py`
|
||
```python
|
||
from django.conf import settings
|
||
from django.db import models
|
||
from django.contrib.contenttypes.fields import GenericForeignKey
|
||
from django.contrib.contenttypes.models import ContentType
|
||
import uuid
|
||
|
||
class NoteType(models.TextChoices):
|
||
ADMISSION = "ADMISSION", "Admission Note"
|
||
PROGRESS = "PROGRESS", "Progress Note"
|
||
DISCHARGE = "DISCHARGE", "Discharge Summary"
|
||
RADIOLOGY_REPORT = "RADIOLOGY_REPORT", "Radiology Report"
|
||
LAB_REPORT = "LAB_REPORT", "Laboratory Narrative"
|
||
PROCEDURE = "PROCEDURE", "Procedure Note"
|
||
ANESTHESIA = "ANESTHESIA", "Anesthesia Note"
|
||
BLOODBANK = "BLOODBANK", "Transfusion Note"
|
||
OTHER = "OTHER", "Other"
|
||
|
||
class Document(models.Model):
|
||
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
|
||
tenant = models.ForeignKey("core.Tenant", on_delete=models.CASCADE, related_name="documents")
|
||
patient = models.ForeignKey("core.Patient", on_delete=models.CASCADE, related_name="documents")
|
||
encounter = models.ForeignKey("core.Encounter", on_delete=models.SET_NULL, null=True, blank=True, related_name="documents")
|
||
|
||
doc_type = models.CharField(max_length=40, choices=NoteType.choices)
|
||
title = models.CharField(max_length=255)
|
||
status = models.CharField(max_length=20, default="final") # draft|amended|final|entered-in-error
|
||
authored_at = models.DateTimeField(auto_now_add=True)
|
||
authored_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, related_name="documents_authored")
|
||
signed_at = models.DateTimeField(null=True, blank=True)
|
||
signed_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True, related_name="documents_signed")
|
||
|
||
body_markdown = models.TextField(blank=True, default="")
|
||
body_json = models.JSONField(blank=True, null=True)
|
||
|
||
fhir_profile = models.CharField(max_length=120, blank=True, default="")
|
||
code = models.CharField(max_length=64, blank=True, default="")
|
||
|
||
is_confidential = models.BooleanField(default=False)
|
||
is_amendment = models.BooleanField(default=False)
|
||
|
||
created_at = models.DateTimeField(auto_now_add=True)
|
||
updated_at = models.DateTimeField(auto_now=True)
|
||
|
||
class DocumentVersion(models.Model):
|
||
document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="versions")
|
||
version = models.PositiveIntegerField()
|
||
snapshot_markdown = models.TextField(blank=True, default="")
|
||
snapshot_json = models.JSONField(blank=True, null=True)
|
||
changed_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True)
|
||
changed_at = models.DateTimeField(auto_now_add=True)
|
||
|
||
class Meta:
|
||
unique_together = [("document", "version")]
|
||
ordering = ["-version"]
|
||
|
||
class DocumentLink(models.Model):
|
||
document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="links")
|
||
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
|
||
object_id = models.CharField(max_length=64)
|
||
target = GenericForeignKey("content_type", "object_id")
|
||
role = models.CharField(max_length=40, blank=True, default="context") # context|source|result|followup
|
||
created_at = models.DateTimeField(auto_now_add=True)
|
||
|
||
class DocumentAttachment(models.Model):
|
||
document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="attachments")
|
||
file = models.FileField(upload_to="documents/%Y/%m/")
|
||
title = models.CharField(max_length=255, blank=True, default="")
|
||
created_at = models.DateTimeField(auto_now_add=True)
|
||
```
|
||
|
||
**DRF (serializers & viewsets)**
|
||
- Endpoints:
|
||
- `POST /api/v1/documentation/documents/` (create)
|
||
- `GET /api/v1/documentation/documents/?patient=&encounter=&doc_type=`
|
||
- `POST /api/v1/documentation/documents/{{id}}/amend/`
|
||
- `POST /api/v1/documentation/documents/{{id}}/link/`
|
||
- `GET /api/v1/documentation/documents/{{id}}/`
|
||
- Add tenant scoping and object-level permissions.
|
||
|
||
**Services**: `documentation/services.py`
|
||
```python
|
||
from .models import Document, DocumentVersion, DocumentLink
|
||
from django.contrib.contenttypes.models import ContentType
|
||
from django.utils import timezone
|
||
|
||
def create_document_and_link(*, tenant, patient, encounter=None, doc_type:str, title:str,
|
||
body_markdown:str="", body_json=None, links=None, author=None, sign=False):
|
||
doc = Document.objects.create(
|
||
tenant=tenant, patient=patient, encounter=encounter,
|
||
doc_type=doc_type, title=title, body_markdown=body_markdown, body_json=body_json or {{}},
|
||
authored_by=author, status="draft" if not sign else "final",
|
||
signed_at=timezone.now() if sign else None, signed_by=author if sign else None
|
||
)
|
||
DocumentVersion.objects.create(document=doc, version=1, snapshot_markdown=body_markdown, snapshot_json=body_json or {{}}, changed_by=author)
|
||
for ln in links or []:
|
||
ct = ContentType.objects.get(app_label=ln["app_label"], model=ln["model"].lower())
|
||
DocumentLink.objects.create(document=doc, content_type=ct, object_id=str(ln["pk"]), role=ln.get("role","context"))
|
||
return doc
|
||
|
||
def sign_document(doc: Document, user):
|
||
doc.status = "final"; doc.signed_at = timezone.now(); doc.signed_by = user; doc.save(update_fields=["status","signed_at","signed_by"])
|
||
return doc
|
||
|
||
def amend_document(doc: Document, user, *, new_markdown:str="", new_json=None):
|
||
nxt = (doc.versions.first().version + 1) if doc.versions.exists() else 2
|
||
DocumentVersion.objects.create(document=doc, version=nxt, snapshot_markdown=new_markdown, snapshot_json=new_json or {{}}, changed_by=user)
|
||
doc.body_markdown, doc.body_json, doc.is_amendment = new_markdown, (new_json or {{}}), True
|
||
doc.save(update_fields=["body_markdown","body_json","updated_at","is_amendment"])
|
||
return doc
|
||
```
|
||
|
||
**Refactor each domain to use services:**
|
||
- **inpatients**: Admission/Progress notes → `documentation` (doc_type `ADMISSION` / `PROGRESS`).
|
||
- **radiology**: Narrative report → `documentation` (doc_type `RADIOLOGY_REPORT`, link to Study as `result`).
|
||
- **laboratory**: Narrative report (if needed) → `documentation` (doc_type `LAB_REPORT`, link to Order).
|
||
- **operating_theatre**: Anesthesia/Procedure notes → `documentation`.
|
||
- **blood_bank**: Transfusion note → `documentation`.
|
||
|
||
**Delete** local note/report models once the adapters are wired.
|
||
|
||
---
|
||
|
||
## 7) Breaking Changes & Reseed Checklist
|
||
|
||
- [ ] Removed duplicated/non-canonical models
|
||
- [ ] Updated FK targets to canonical owners
|
||
- [ ] Replaced local notes/reports with `documentation.Document` links
|
||
- [ ] Admin & DRF routes updated
|
||
- [ ] Seed order: `core` → `inventory` → `emr` → domain catalogs → `billing`
|
||
- [ ] Tenancy filters enforced (Managers/Mixins)
|
||
- [ ] PHI audit verified (create/update/sign/amend trails)
|
||
|
||
---
|
||
|
||
## 8) Smoke Tests (must pass)
|
||
|
||
1) Create Patient & Encounter (core).
|
||
2) Create Admission (inpatients) → create `ADMISSION` note via documentation & link to Admission.
|
||
3) Create Lab Order (laboratory) that consumes inventory items via FK; finalize → (optional) `LAB_REPORT`.
|
||
4) Create Radiology Study (radiology) → finalize report via documentation (`RADIOLOGY_REPORT`), link as `result`.
|
||
5) Raise Invoice (billing) against Encounter; **no clinical duplication** in billing.
|
||
6) Verify Patient timeline shows all Documents; each domain detail page shows linked documents.
|
||
|
||
---
|
||
|
||
## 9) How to Run
|
||
|
||
**A) Generate audit report**
|
||
```bash
|
||
python tools/analyze_models.py \\
|
||
--project-root . \\
|
||
--apps core accounts blood_bank patients appointments inpatients emr pharmacy laboratory radiology \\
|
||
operating_theatre billing inventory hr analytics communications integration quality \\
|
||
facility_management insurance_approvals \\
|
||
--output-dir ./_refactor_report
|
||
```
|
||
Review `_refactor_report/modular_refactoring_report.md`.
|
||
|
||
**B) Implement `documentation/` app**
|
||
- Add to `LOCAL_APPS`: `documentation`
|
||
- Create models/services as above; make DRF serializers/viewsets.
|
||
- Replace local notes/report models with calls to `documentation.services`.
|
||
|
||
**C) Drop & Reseed**
|
||
- Reset DB, run migrations, seed catalogs (inventory items, lab tests, formulary, payers, etc.).
|
||
- Run **Smoke Tests**.
|
||
|
||
---
|
||
|
||
## 10) App Responsibility Matrix (tailored to your list)
|
||
|
||
| App | Canonical Ownership |
|
||
|---|---|
|
||
| `core` | Patient, Encounter, Tenant, Facility, AuditEvent, Attachment, Identifier |
|
||
| `accounts` | Users/Auth/RBAC; no clinical/business logic |
|
||
| `patients` | PatientProfile & demographics extensions (if used) |
|
||
| `appointments` | Appointment, Schedule, Slot |
|
||
| `inpatients` | Admission, Transfer, DischargeSummary, Ward, Bed, Stay (**notes via documentation**) |
|
||
| `emr` | Clinical documentation scaffolding if needed; avoid storing notes (use `documentation`) |
|
||
| `pharmacy` | Drug, Formulary, Prescription, Dispense (**stock via inventory FK**) |
|
||
| `laboratory` | LabOrder, LabTest, LabPanel, LabResult, Specimen (**no stock logic**) |
|
||
| `radiology` | RadOrder, Study, Series (**narrative report via documentation**) |
|
||
| `operating_theatre` | SurgicalCase, AnesthesiaNote (**note content via documentation**) |
|
||
| `billing` | Payer, Contract, Claim, Invoice, Payment |
|
||
| `inventory` | Item, StockLocation, StockLedger, ReorderRule, UnitOfMeasure, Batch, Lot |
|
||
| `hr` | Employee, Shift, Schedule, Timesheet |
|
||
| `analytics` | Reporting/aggregations (read-only over canonical stores) |
|
||
| `communications` | Notifications, MessageTemplate |
|
||
| `integration` | Connectors (FHIR/HL7/ELM/etc.), Mappings, Jobs, EventLog |
|
||
| `quality` | Incident, Indicator, AuditFinding, CAPA |
|
||
| `facility_management` | Asset, WorkOrder, MaintenancePlan, Ticket |
|
||
| `blood_bank` | Donor, Donation, Component, Crossmatch, Transfusion (**no stock logic**) |
|
||
| `insurance_approvals` | AuthorizationRequest/Response |
|
||
|
||
**Golden Rules**
|
||
- **Inventory** is the *only* place with stock/ledger logic.
|
||
- **Billing** is the *only* place with claims/invoices/payments.
|
||
- **Documentation** is the *only* place for free-text notes/reports; everything else **links**.
|
||
""").strip()
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Analyze Django apps for model duplication & responsibility leaks",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Example:
|
||
python tools/analyze_models.py \\
|
||
--project-root . \\
|
||
--apps core accounts blood_bank patients appointments inpatients emr pharmacy laboratory radiology \\
|
||
operating_theatre billing inventory hr analytics communications integration quality \\
|
||
facility_management insurance_approvals \\
|
||
--output-dir ./_refactor_report
|
||
"""
|
||
)
|
||
parser.add_argument(
|
||
'--project-root',
|
||
type=Path,
|
||
required=True,
|
||
help='Path to Django project root directory'
|
||
)
|
||
parser.add_argument(
|
||
'--apps',
|
||
nargs='+',
|
||
required=True,
|
||
help='List of Django app labels to analyze'
|
||
)
|
||
parser.add_argument(
|
||
'--output-dir',
|
||
type=Path,
|
||
default=Path('./_refactor_report'),
|
||
help='Output directory for generated reports (default: ./_refactor_report)'
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Validate project root
|
||
if not args.project_root.exists():
|
||
print(f"ERROR: Project root does not exist: {args.project_root}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# Create output directory
|
||
args.output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
print(f"Analyzing {len(args.apps)} Django apps...")
|
||
print(f"Project root: {args.project_root}")
|
||
print(f"Output directory: {args.output_dir}")
|
||
print()
|
||
|
||
# Analyze models
|
||
model_map = analyze(args.project_root, args.apps)
|
||
|
||
# Detect overlaps
|
||
overlaps = detect_overlaps(model_map)
|
||
|
||
# Generate responsibility matrix
|
||
matrix_rows = make_matrix(model_map)
|
||
|
||
# Generate markdown report
|
||
markdown_report = to_markdown_report(model_map, overlaps, matrix_rows)
|
||
|
||
# Write outputs
|
||
model_map_file = args.output_dir / 'model_map.json'
|
||
overlaps_file = args.output_dir / 'overlaps.json'
|
||
report_file = args.output_dir / 'modular_refactoring_report.md'
|
||
|
||
# Write model_map.json
|
||
with open(model_map_file, 'w', encoding='utf-8') as f:
|
||
json.dump(model_map, f, indent=2, default=str)
|
||
print(f"✓ Generated: {model_map_file}")
|
||
|
||
# Write overlaps.json
|
||
with open(overlaps_file, 'w', encoding='utf-8') as f:
|
||
json.dump(overlaps, f, indent=2)
|
||
print(f"✓ Generated: {overlaps_file}")
|
||
|
||
# Write markdown report
|
||
with open(report_file, 'w', encoding='utf-8') as f:
|
||
f.write(markdown_report)
|
||
print(f"✓ Generated: {report_file}")
|
||
|
||
print()
|
||
print("=" * 80)
|
||
print("SUMMARY")
|
||
print("=" * 80)
|
||
print(f"Apps scanned: {len(model_map['apps'])}")
|
||
print(f"Total models: {sum(len(models) for models in model_map['by_app'].values())}")
|
||
print(f"Name collisions: {len(overlaps['name_collisions'])}")
|
||
print(f"Field-set similarities: {len(overlaps['fieldset_similar'])}")
|
||
print(f"Inventory leaks: {len(overlaps['inventory_leaks'])}")
|
||
print(f"Canonical conflicts: {len(overlaps['canonical_conflicts'])}")
|
||
print()
|
||
print(f"Review the detailed report at: {report_file}")
|
||
print("=" * 80)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|