375 lines
14 KiB
Python
375 lines
14 KiB
Python
"""
|
|
Django management command for generating DICOM files from DICOMImage model data.
|
|
"""
|
|
|
|
import os
|
|
import numpy as np
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.utils import timezone
|
|
from django.db import models
|
|
from django.conf import settings
|
|
from pydicom.uid import generate_uid
|
|
|
|
from radiology.models import DICOMImage, ImagingSeries, ImagingStudy
|
|
from radiology.services import DICOMGenerator, DICOMValidator, DICOMUtilities
|
|
from patients.models import PatientProfile
|
|
from core.models import Tenant
|
|
from accounts.models import User
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Generate DICOM files from DICOMImage model data'
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
'--image-id',
|
|
type=str,
|
|
help='Generate DICOM for specific DICOMImage ID'
|
|
)
|
|
parser.add_argument(
|
|
'--series-id',
|
|
type=str,
|
|
help='Generate DICOMs for all images in a series'
|
|
)
|
|
parser.add_argument(
|
|
'--study-id',
|
|
type=str,
|
|
help='Generate DICOMs for all images in a study'
|
|
)
|
|
parser.add_argument(
|
|
'--create-test-data',
|
|
action='store_true',
|
|
help='Create test study with DICOM files'
|
|
)
|
|
parser.add_argument(
|
|
'--modality',
|
|
type=str,
|
|
default='CT',
|
|
choices=['CT', 'MR', 'CR', 'DX', 'US', 'XA', 'RF', 'MG', 'NM', 'PT'],
|
|
help='Modality for test data (default: CT)'
|
|
)
|
|
parser.add_argument(
|
|
'--num-images',
|
|
type=int,
|
|
default=10,
|
|
help='Number of images to generate for test data (default: 10)'
|
|
)
|
|
parser.add_argument(
|
|
'--validate-only',
|
|
action='store_true',
|
|
help='Only validate existing DICOM files without generating new ones'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--generate-missing',
|
|
action='store_true',
|
|
help='Generate DICOM files for all database records that are missing files'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--batch-size',
|
|
type=int,
|
|
default=100,
|
|
help='Number of files to process in each batch (default: 100)'
|
|
)
|
|
parser.add_argument(
|
|
'--output-dir',
|
|
type=str,
|
|
help='Custom output directory for DICOM files'
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
generator = DICOMGenerator()
|
|
|
|
# Set default output directory to media/dicom
|
|
default_dicom_dir = os.path.join(settings.MEDIA_ROOT, 'dicom')
|
|
|
|
# Override output directory if specified, otherwise use default
|
|
if options['output_dir']:
|
|
generator.dicom_storage_path = options['output_dir']
|
|
else:
|
|
generator.dicom_storage_path = default_dicom_dir
|
|
|
|
# Ensure the directory exists
|
|
os.makedirs(generator.dicom_storage_path, exist_ok=True)
|
|
self.stdout.write(f"Using output directory: {generator.dicom_storage_path}")
|
|
|
|
try:
|
|
if options['create_test_data']:
|
|
self._create_test_data(generator, options)
|
|
elif options['validate_only']:
|
|
self._validate_existing_dicoms()
|
|
elif options['image_id']:
|
|
self._generate_single_image(generator, options['image_id'])
|
|
elif options['series_id']:
|
|
self._generate_series_images(generator, options['series_id'])
|
|
elif options['study_id']:
|
|
self._generate_study_images(generator, options['study_id'])
|
|
else:
|
|
self._generate_all_missing_dicoms(generator)
|
|
|
|
except Exception as e:
|
|
raise CommandError(f'Error generating DICOM files: {str(e)}')
|
|
|
|
def _create_test_data(self, generator, options):
|
|
"""Create test study with DICOM files."""
|
|
self.stdout.write("Creating test study with DICOM files...")
|
|
|
|
# Get or create test tenant
|
|
tenant, created = Tenant.objects.get_or_create(
|
|
name='Test Hospital',
|
|
defaults={
|
|
'display_name': 'Test Hospital',
|
|
'address_line1': '123 Test Street',
|
|
'city': 'Test City',
|
|
'state': 'Test State',
|
|
'postal_code': '12345',
|
|
'phone_number': '+1234567890',
|
|
'email': 'test@hospital.com',
|
|
'is_active': True
|
|
}
|
|
)
|
|
if created:
|
|
self.stdout.write(f"Created test tenant: {tenant.name}")
|
|
|
|
# Get or create test patient
|
|
from datetime import date as date_obj
|
|
patient, created = PatientProfile.objects.get_or_create(
|
|
mrn='TEST001',
|
|
tenant=tenant,
|
|
defaults={
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'date_of_birth': date_obj(1980, 1, 1),
|
|
'gender': 'MALE'
|
|
}
|
|
)
|
|
if created:
|
|
self.stdout.write(f"Created test patient: {patient.first_name} {patient.last_name}")
|
|
|
|
# Get or create test physician
|
|
physician, created = User.objects.get_or_create(
|
|
username='test_physician',
|
|
defaults={
|
|
'tenant': tenant,
|
|
'first_name': 'Dr. Jane',
|
|
'last_name': 'Smith',
|
|
'email': 'physician@test.com'
|
|
}
|
|
)
|
|
if created:
|
|
self.stdout.write(f"Created test physician: {physician.first_name} {physician.last_name}")
|
|
|
|
# Create test study
|
|
study = ImagingStudy.objects.create(
|
|
tenant=tenant,
|
|
study_instance_uid=generate_uid(),
|
|
patient=patient,
|
|
referring_physician=physician,
|
|
modality=options['modality'],
|
|
study_description=f"Test {options['modality']} Study",
|
|
body_part='CHEST',
|
|
study_datetime=timezone.now(),
|
|
status='SCHEDULED',
|
|
priority='ROUTINE',
|
|
manufacturer='Test Manufacturer',
|
|
model_name='Test Model',
|
|
station_name='Test Station'
|
|
)
|
|
|
|
self.stdout.write(f"Created test study: {study.accession_number}")
|
|
|
|
# Create test series with DICOM files
|
|
series = generator.create_test_dicom_series(
|
|
study=study,
|
|
modality=options['modality'],
|
|
num_images=options['num_images']
|
|
)
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(
|
|
f"Successfully created test series with {options['num_images']} DICOM files"
|
|
)
|
|
)
|
|
self.stdout.write(f"Study ID: {study.study_id}")
|
|
self.stdout.write(f"Series ID: {series.series_id}")
|
|
self.stdout.write(f"Accession Number: {study.accession_number}")
|
|
|
|
def _validate_existing_dicoms(self):
|
|
"""Validate all existing DICOM files."""
|
|
self.stdout.write("Validating existing DICOM files...")
|
|
|
|
images = DICOMImage.objects.filter(file_path__isnull=False).exclude(file_path='')
|
|
total_images = images.count()
|
|
|
|
if total_images == 0:
|
|
self.stdout.write("No DICOM images found with file paths.")
|
|
return
|
|
|
|
valid_count = 0
|
|
invalid_count = 0
|
|
|
|
for image in images:
|
|
if image.has_dicom_file():
|
|
validation_result = DICOMValidator.validate_dicom_file(image.file_path)
|
|
if validation_result['valid']:
|
|
valid_count += 1
|
|
self.stdout.write(f"✓ Valid: {image.sop_instance_uid}")
|
|
else:
|
|
invalid_count += 1
|
|
self.stdout.write(
|
|
self.style.ERROR(
|
|
f"✗ Invalid: {image.sop_instance_uid} - {validation_result['errors']}"
|
|
)
|
|
)
|
|
else:
|
|
invalid_count += 1
|
|
self.stdout.write(
|
|
self.style.WARNING(f"⚠ Missing file: {image.sop_instance_uid}")
|
|
)
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(
|
|
f"Validation complete: {valid_count} valid, {invalid_count} invalid/missing"
|
|
)
|
|
)
|
|
|
|
def _generate_single_image(self, generator, image_id):
|
|
"""Generate DICOM for a single image."""
|
|
try:
|
|
image = DICOMImage.objects.get(image_id=image_id)
|
|
except DICOMImage.DoesNotExist:
|
|
raise CommandError(f'DICOMImage with ID {image_id} not found')
|
|
|
|
self.stdout.write(f"Generating DICOM for image: {image.sop_instance_uid}")
|
|
|
|
# Validate before generation
|
|
validation = image.validate_for_dicom_generation()
|
|
if not validation['valid']:
|
|
raise CommandError(f'Validation failed: {validation["errors"]}')
|
|
|
|
if validation['warnings']:
|
|
for warning in validation['warnings']:
|
|
self.stdout.write(self.style.WARNING(f"Warning: {warning}"))
|
|
|
|
# Generate DICOM file
|
|
file_path = image.generate_dicom_file()
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f"Successfully generated DICOM file: {file_path}")
|
|
)
|
|
self.stdout.write(f"File size: {image.file_size_mb} MB")
|
|
|
|
def _generate_series_images(self, generator, series_id):
|
|
"""Generate DICOMs for all images in a series."""
|
|
try:
|
|
series = ImagingSeries.objects.get(series_id=series_id)
|
|
except ImagingSeries.DoesNotExist:
|
|
raise CommandError(f'ImagingSeries with ID {series_id} not found')
|
|
|
|
images = series.images.all()
|
|
if not images.exists():
|
|
self.stdout.write(f"No images found in series {series.series_instance_uid}")
|
|
return
|
|
|
|
self.stdout.write(f"Generating DICOMs for {images.count()} images in series...")
|
|
|
|
generated_count = 0
|
|
for image in images:
|
|
try:
|
|
validation = image.validate_for_dicom_generation()
|
|
if validation['valid']:
|
|
file_path = image.generate_dicom_file()
|
|
generated_count += 1
|
|
self.stdout.write(f"✓ Generated: {image.sop_instance_uid}")
|
|
else:
|
|
self.stdout.write(
|
|
self.style.ERROR(
|
|
f"✗ Skipped {image.sop_instance_uid}: {validation['errors']}"
|
|
)
|
|
)
|
|
except Exception as e:
|
|
self.stdout.write(
|
|
self.style.ERROR(f"✗ Error generating {image.sop_instance_uid}: {str(e)}")
|
|
)
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f"Generated {generated_count} DICOM files")
|
|
)
|
|
|
|
def _generate_study_images(self, generator, study_id):
|
|
"""Generate DICOMs for all images in a study."""
|
|
try:
|
|
study = ImagingStudy.objects.get(study_id=study_id)
|
|
except ImagingStudy.DoesNotExist:
|
|
raise CommandError(f'ImagingStudy with ID {study_id} not found')
|
|
|
|
images = DICOMImage.objects.filter(series__study=study)
|
|
if not images.exists():
|
|
self.stdout.write(f"No images found in study {study.accession_number}")
|
|
return
|
|
|
|
self.stdout.write(f"Generating DICOMs for {images.count()} images in study...")
|
|
|
|
generated_count = 0
|
|
for image in images:
|
|
try:
|
|
validation = image.validate_for_dicom_generation()
|
|
if validation['valid']:
|
|
file_path = image.generate_dicom_file()
|
|
generated_count += 1
|
|
self.stdout.write(f"✓ Generated: {image.sop_instance_uid}")
|
|
else:
|
|
self.stdout.write(
|
|
self.style.ERROR(
|
|
f"✗ Skipped {image.sop_instance_uid}: {validation['errors']}"
|
|
)
|
|
)
|
|
except Exception as e:
|
|
self.stdout.write(
|
|
self.style.ERROR(f"✗ Error generating {image.sop_instance_uid}: {str(e)}")
|
|
)
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f"Generated {generated_count} DICOM files for study {study.accession_number}")
|
|
)
|
|
|
|
def _generate_all_missing_dicoms(self, generator):
|
|
"""Generate DICOM files for all images that don't have files."""
|
|
self.stdout.write("Generating DICOM files for all images without files...")
|
|
|
|
# Find images that either have no file_path OR the file doesn't exist
|
|
all_images = DICOMImage.objects.all()
|
|
images_without_files = [img for img in all_images if not img.has_dicom_file()]
|
|
|
|
total_images = len(images_without_files)
|
|
if total_images == 0:
|
|
self.stdout.write("All DICOM images already have files.")
|
|
return
|
|
|
|
self.stdout.write(f"Found {total_images} images without DICOM files")
|
|
|
|
generated_count = 0
|
|
for image in images_without_files:
|
|
try:
|
|
validation = image.validate_for_dicom_generation()
|
|
if validation['valid']:
|
|
file_path = image.generate_dicom_file()
|
|
generated_count += 1
|
|
if generated_count % 10 == 0:
|
|
self.stdout.write(f"Generated {generated_count}/{total_images} files...")
|
|
else:
|
|
self.stdout.write(
|
|
self.style.ERROR(
|
|
f"✗ Skipped {image.sop_instance_uid}: {validation['errors']}"
|
|
)
|
|
)
|
|
except Exception as e:
|
|
self.stdout.write(
|
|
self.style.ERROR(f"✗ Error generating {image.sop_instance_uid}: {str(e)}")
|
|
)
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f"Generated {generated_count} DICOM files")
|
|
)
|