""" Django management command for DICOM utilities and operations. """ import os from django.core.management.base import BaseCommand, CommandError from django.utils import timezone from pydicom import dcmread from pydicom.uid import generate_uid from radiology.models import DICOMImage, ImagingSeries, ImagingStudy from radiology.services import DICOMGenerator, DICOMValidator, DICOMUtilities class Command(BaseCommand): help = 'DICOM utilities and operations' def add_arguments(self, parser): parser.add_argument( '--inspect-file', type=str, help='Inspect a DICOM file and show metadata' ) parser.add_argument( '--import-dicom', type=str, help='Import existing DICOM file into database' ) parser.add_argument( '--export-metadata', type=str, help='Export DICOM metadata to JSON file' ) parser.add_argument( '--generate-uids', type=int, help='Generate specified number of DICOM UIDs' ) parser.add_argument( '--cleanup-orphaned', action='store_true', help='Clean up orphaned DICOM files' ) parser.add_argument( '--verify-integrity', action='store_true', help='Verify integrity of all DICOM files' ) def handle(self, *args, **options): try: if options['inspect_file']: self._inspect_dicom_file(options['inspect_file']) elif options['import_dicom']: self._import_dicom_file(options['import_dicom']) elif options['export_metadata']: self._export_metadata(options['export_metadata']) elif options['generate_uids']: self._generate_uids(options['generate_uids']) elif options['cleanup_orphaned']: self._cleanup_orphaned_files() elif options['verify_integrity']: self._verify_integrity() else: self.stdout.write("Please specify an operation. Use --help for options.") except Exception as e: raise CommandError(f'Error in DICOM utilities: {str(e)}') def _inspect_dicom_file(self, file_path): """Inspect a DICOM file and display metadata.""" if not os.path.exists(file_path): raise CommandError(f'File not found: {file_path}') self.stdout.write(f"Inspecting DICOM file: {file_path}") self.stdout.write("-" * 60) try: ds = dcmread(file_path) # Basic information self.stdout.write(f"Patient Name: {getattr(ds, 'PatientName', 'N/A')}") self.stdout.write(f"Patient ID: {getattr(ds, 'PatientID', 'N/A')}") self.stdout.write(f"Study Date: {getattr(ds, 'StudyDate', 'N/A')}") self.stdout.write(f"Study Description: {getattr(ds, 'StudyDescription', 'N/A')}") self.stdout.write(f"Modality: {getattr(ds, 'Modality', 'N/A')}") self.stdout.write(f"Series Number: {getattr(ds, 'SeriesNumber', 'N/A')}") self.stdout.write(f"Instance Number: {getattr(ds, 'InstanceNumber', 'N/A')}") # Image dimensions self.stdout.write(f"Rows: {getattr(ds, 'Rows', 'N/A')}") self.stdout.write(f"Columns: {getattr(ds, 'Columns', 'N/A')}") self.stdout.write(f"Bits Allocated: {getattr(ds, 'BitsAllocated', 'N/A')}") self.stdout.write(f"Bits Stored: {getattr(ds, 'BitsStored', 'N/A')}") # UIDs self.stdout.write(f"Study Instance UID: {getattr(ds, 'StudyInstanceUID', 'N/A')}") self.stdout.write(f"Series Instance UID: {getattr(ds, 'SeriesInstanceUID', 'N/A')}") self.stdout.write(f"SOP Instance UID: {getattr(ds, 'SOPInstanceUID', 'N/A')}") self.stdout.write(f"SOP Class UID: {getattr(ds, 'SOPClassUID', 'N/A')}") # File information file_size = os.path.getsize(file_path) self.stdout.write(f"File Size: {file_size} bytes ({file_size / (1024*1024):.2f} MB)") # Validation validation = DICOMValidator.validate_dicom_file(file_path) if validation['valid']: self.stdout.write(self.style.SUCCESS("✓ File is valid DICOM")) else: self.stdout.write(self.style.ERROR(f"✗ File validation failed: {validation['errors']}")) except Exception as e: self.stdout.write(self.style.ERROR(f"Error reading DICOM file: {str(e)}")) def _import_dicom_file(self, file_path): """Import existing DICOM file into database.""" if not os.path.exists(file_path): raise CommandError(f'File not found: {file_path}') self.stdout.write(f"Importing DICOM file: {file_path}") try: ds = dcmread(file_path) # Extract metadata study_uid = str(ds.StudyInstanceUID) series_uid = str(ds.SeriesInstanceUID) sop_uid = str(ds.SOPInstanceUID) # Check if image already exists if DICOMImage.objects.filter(sop_instance_uid=sop_uid).exists(): self.stdout.write(f"DICOM image already exists: {sop_uid}") return # Find or create study try: study = ImagingStudy.objects.get(study_instance_uid=study_uid) except ImagingStudy.DoesNotExist: self.stdout.write(f"Study not found in database: {study_uid}") return # Find or create series series, created = ImagingSeries.objects.get_or_create( series_instance_uid=series_uid, defaults={ 'study': study, 'series_number': int(getattr(ds, 'SeriesNumber', 1)), 'modality': str(getattr(ds, 'Modality', 'CT')), 'series_description': str(getattr(ds, 'SeriesDescription', '')), 'series_datetime': timezone.now(), } ) if created: self.stdout.write(f"Created new series: {series_uid}") # Create DICOM image record dicom_image = DICOMImage.objects.create( series=series, sop_instance_uid=sop_uid, instance_number=int(getattr(ds, 'InstanceNumber', 1)), sop_class_uid=str(getattr(ds, 'SOPClassUID', '')), rows=int(getattr(ds, 'Rows', 512)), columns=int(getattr(ds, 'Columns', 512)), bits_allocated=int(getattr(ds, 'BitsAllocated', 16)), bits_stored=int(getattr(ds, 'BitsStored', 16)), file_path=file_path, file_size=os.path.getsize(file_path), window_center=float(getattr(ds, 'WindowCenter', 0)) if hasattr(ds, 'WindowCenter') else None, window_width=float(getattr(ds, 'WindowWidth', 0)) if hasattr(ds, 'WindowWidth') else None, ) self.stdout.write( self.style.SUCCESS(f"Successfully imported DICOM image: {sop_uid}") ) except Exception as e: self.stdout.write(self.style.ERROR(f"Error importing DICOM file: {str(e)}")) def _export_metadata(self, output_file): """Export DICOM metadata to JSON file.""" import json self.stdout.write("Exporting DICOM metadata...") metadata = [] for image in DICOMImage.objects.all(): image_data = { 'image_id': str(image.image_id), 'sop_instance_uid': image.sop_instance_uid, 'instance_number': image.instance_number, 'rows': image.rows, 'columns': image.columns, 'bits_allocated': image.bits_allocated, 'file_path': image.file_path, 'file_size': image.file_size, 'series': { 'series_id': str(image.series.series_id), 'series_instance_uid': image.series.series_instance_uid, 'series_number': image.series.series_number, 'modality': image.series.modality, 'series_description': image.series.series_description, }, 'study': { 'study_id': str(image.study.study_id), 'study_instance_uid': image.study.study_instance_uid, 'accession_number': image.study.accession_number, 'study_description': image.study.study_description, }, 'patient': { 'patient_id': image.patient.patient_id, 'first_name': image.patient.first_name, 'last_name': image.patient.last_name, } } metadata.append(image_data) with open(output_file, 'w') as f: json.dump(metadata, f, indent=2, default=str) self.stdout.write( self.style.SUCCESS(f"Exported metadata for {len(metadata)} images to {output_file}") ) def _generate_uids(self, count): """Generate DICOM UIDs.""" self.stdout.write(f"Generating {count} DICOM UIDs:") self.stdout.write("-" * 60) for i in range(count): uid = generate_uid() self.stdout.write(f"{i+1:3d}: {uid}") def _cleanup_orphaned_files(self): """Clean up orphaned DICOM files.""" self.stdout.write("Cleaning up orphaned DICOM files...") generator = DICOMGenerator() dicom_dir = generator.dicom_storage_path if not os.path.exists(dicom_dir): self.stdout.write("DICOM storage directory does not exist.") return # Get all file paths from database db_file_paths = set( DICOMImage.objects.filter(file_path__isnull=False) .exclude(file_path='') .values_list('file_path', flat=True) ) # Find all DICOM files on disk disk_files = [] for root, dirs, files in os.walk(dicom_dir): for file in files: if file.endswith('.dcm'): disk_files.append(os.path.join(root, file)) # Find orphaned files orphaned_files = [f for f in disk_files if f not in db_file_paths] if not orphaned_files: self.stdout.write("No orphaned DICOM files found.") return self.stdout.write(f"Found {len(orphaned_files)} orphaned files:") for file_path in orphaned_files: self.stdout.write(f" {file_path}") # Ask for confirmation confirm = input("Delete these files? (y/N): ") if confirm.lower() == 'y': deleted_count = 0 for file_path in orphaned_files: try: os.remove(file_path) deleted_count += 1 except Exception as e: self.stdout.write(self.style.ERROR(f"Error deleting {file_path}: {str(e)}")) self.stdout.write( self.style.SUCCESS(f"Deleted {deleted_count} orphaned files") ) else: self.stdout.write("Cleanup cancelled.") def _verify_integrity(self): """Verify integrity of all DICOM files.""" self.stdout.write("Verifying integrity of all DICOM files...") images = DICOMImage.objects.all() total_images = images.count() if total_images == 0: self.stdout.write("No DICOM images found.") return valid_count = 0 invalid_count = 0 missing_count = 0 for image in images: if not image.file_path: missing_count += 1 self.stdout.write(f"⚠ No file path: {image.sop_instance_uid}") continue if not image.has_dicom_file(): missing_count += 1 self.stdout.write(f"⚠ Missing file: {image.file_path}") continue # Validate file validation = DICOMValidator.validate_dicom_file(image.file_path) if validation['valid']: valid_count += 1 # Check if metadata matches if validation['sop_instance_uid'] != image.sop_instance_uid: self.stdout.write( self.style.WARNING( f"⚠ UID mismatch: {image.sop_instance_uid} vs {validation['sop_instance_uid']}" ) ) if validation['file_size'] != image.file_size: self.stdout.write( self.style.WARNING( f"⚠ Size mismatch: {image.file_size} vs {validation['file_size']}" ) ) else: invalid_count += 1 self.stdout.write( self.style.ERROR( f"✗ Invalid: {image.sop_instance_uid} - {validation['errors']}" ) ) self.stdout.write("-" * 60) self.stdout.write(f"Total images: {total_images}") self.stdout.write(f"Valid files: {valid_count}") self.stdout.write(f"Invalid files: {invalid_count}") self.stdout.write(f"Missing files: {missing_count}") if invalid_count == 0 and missing_count == 0: self.stdout.write(self.style.SUCCESS("✓ All DICOM files are valid and present")) else: self.stdout.write( self.style.WARNING(f"⚠ Found {invalid_count + missing_count} issues") )