Marwan Alwali 4d06ca4b5e update
2025-09-20 14:26:19 +03:00

174 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# myapp/management/commands/import_icd10.py
import xmlschema
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from emr.models import Icd10
class Command(BaseCommand):
help = "Import ICD-10-CM tabular XML into Icd10 model (auto-detects root)."
def add_arguments(self, parser):
parser.add_argument("--xsd", required=True, help="Path to icd10cm-tabular-2026.xsd")
parser.add_argument("--xml", required=True, help="Path to icd10cm-tabular-2026.xml")
parser.add_argument("--truncate", action="store_true", help="Delete existing Icd10 rows first")
# ------------------------ helpers ------------------------
def _as_text(self, val):
if val is None:
return None
if isinstance(val, dict):
# xmlschema may convert text/attributes into #text/@value/etc.
return val.get("#text") or val.get("@value") or val.get("value") or str(val)
return str(val)
def _ensure_list(self, maybe_list):
if maybe_list is None:
return []
if isinstance(maybe_list, list):
return maybe_list
return [maybe_list]
def _find_first_with_key(self, data, key):
"""Depth-first search: return the first dict that directly contains `key`."""
if isinstance(data, dict):
if key in data:
return data
for v in data.values():
found = self._find_first_with_key(v, key)
if found is not None:
return found
elif isinstance(data, list):
for item in data:
found = self._find_first_with_key(item, key)
if found is not None:
return found
return None
def _collect_rows(self, chapters):
"""
Build Icd10 rows + parent links from a chapters dict/list.
Expected minimal structure:
chapter -> section? -> diag (recursive)
"""
rows = []
parent_links = []
def import_diag(diag, chapter_name, section_name, parent_code=None):
code = self._as_text(diag.get("name"))
desc = self._as_text(diag.get("desc"))
if not code:
return
children = self._ensure_list(diag.get("diag"))
is_header = bool(children) and not (desc and desc.strip())
rows.append(Icd10(
code=code,
description=desc,
chapter_name=self._as_text(chapter_name),
section_name=self._as_text(section_name),
parent=None, # set later
is_header=is_header,
))
if parent_code:
parent_links.append((code, parent_code))
for child in children:
import_diag(child, chapter_name, section_name, parent_code=code)
# Normalize chapters to a list
chapters = self._ensure_list(chapters)
for ch in chapters:
ch_name = self._as_text(ch.get("name"))
# Sections may be missing in some packs; diags may be directly under chapter
sections = self._ensure_list(ch.get("section"))
if sections:
for sec in sections:
sec_name = self._as_text(sec.get("name"))
for d in self._ensure_list(sec.get("diag")):
import_diag(d, ch_name, sec_name, parent_code=None)
else:
# If no sections, look for diags at chapter level
for d in self._ensure_list(ch.get("diag")):
import_diag(d, ch_name, None, parent_code=None)
return rows, parent_links
def handle(self, *args, **opts):
xsd_path = opts["xsd"]
xml_path = opts["xml"]
try:
xs = xmlschema.XMLSchema(xsd_path)
except Exception as e:
raise CommandError(f"Failed to load XSD: {e}")
try:
# to_dict() already flattens namespaces into keys; well auto-detect paths.
data = xs.to_dict(xml_path)
except Exception as e:
raise CommandError(f"Failed to parse XML: {e}")
# If the root is a single-key dict, unwrap while keeping reference
if isinstance(data, dict) and len(data) == 1:
root_key, root_val = next(iter(data.items()))
root = root_val
else:
root = data
# Find the dict that *contains* the "chapter" key (any depth)
container_with_chapter = self._find_first_with_key(root, "chapter")
if not container_with_chapter:
# Fall back: sometimes structure uses "chapters"
container_with_chapter = self._find_first_with_key(root, "chapters")
if container_with_chapter and isinstance(container_with_chapter["chapters"], dict):
# Normalize "chapters" -> "chapter" if its nested like {"chapters": {"chapter": [...]}}
if "chapter" in container_with_chapter["chapters"]:
container_with_chapter = container_with_chapter["chapters"]
if not container_with_chapter or ("chapter" not in container_with_chapter):
# Give user a quick peek at available top-level keys to debug
preview_keys = list(root.keys()) if isinstance(root, dict) else type(root)
raise CommandError(
"Could not locate 'chapter' anywhere in the parsed XML. "
f"Top-level preview: {preview_keys}"
)
chapters = container_with_chapter.get("chapter")
if chapters is None:
raise CommandError("Found container for chapters, but 'chapter' is empty.")
# Optionally truncate
if opts["truncate"]:
self.stdout.write(self.style.WARNING("Truncating existing Icd10 data..."))
Icd10.objects.all().delete()
# Collect rows + parent links
self.stdout.write("Collecting ICD-10 rows...")
rows, parent_links = self._collect_rows(chapters)
self.stdout.write(self.style.SUCCESS(f"Collected {len(rows)} rows. Inserting..."))
BATCH = 1000
with transaction.atomic():
for i in range(0, len(rows), BATCH):
Icd10.objects.bulk_create(rows[i:i+BATCH], ignore_conflicts=True)
# Link parents
if parent_links:
self.stdout.write("Linking parents...")
code_to_obj = {o.code: o for o in Icd10.objects.only("id", "code")}
updates = []
for child_code, parent_code in parent_links:
child = code_to_obj.get(child_code)
parent = code_to_obj.get(parent_code)
if child and parent and child.parent_id != parent.id:
child.parent_id = parent.id
updates.append(child)
for i in range(0, len(updates), BATCH):
Icd10.objects.bulk_update(updates[i:i+BATCH], ["parent"])
self.stdout.write(self.style.SUCCESS(f"Linked {len(updates)} parent relations."))
self.stdout.write(self.style.SUCCESS("ICD-10 import completed successfully."))