""" Radiology services for DICOM generation and management. """ import os import numpy as np from datetime import datetime, date from typing import Optional, Dict, Any, Tuple from django.conf import settings from django.utils import timezone from pydicom import Dataset, FileMetaDataset from pydicom.uid import ( ExplicitVRLittleEndian, ImplicitVRLittleEndian, CTImageStorage, MRImageStorage, ComputedRadiographyImageStorage, DigitalXRayImageStorageForPresentation, UltrasoundImageStorage, generate_uid ) from .models import DICOMImage, ImagingSeries, ImagingStudy class DICOMGenerator: """ Service class for generating DICOM files from DICOMImage model data. """ # DICOM SOP Class UIDs for different modalities SOP_CLASS_UIDS = { 'CT': CTImageStorage, 'MR': MRImageStorage, 'CR': ComputedRadiographyImageStorage, 'DX': DigitalXRayImageStorageForPresentation, 'US': UltrasoundImageStorage, 'XA': '1.2.840.10008.5.1.4.1.1.12.1', # X-Ray Angiography 'RF': '1.2.840.10008.5.1.4.1.1.12.2', # Radiofluoroscopy 'MG': '1.2.840.10008.5.1.4.1.1.1.2', # Mammography 'NM': '1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine 'PT': '1.2.840.10008.5.1.4.1.1.128', # PET } def __init__(self): """Initialize DICOM generator.""" self.media_root = getattr(settings, 'MEDIA_ROOT', 'media') self.dicom_storage_path = os.path.join(self.media_root, 'dicom') # Ensure DICOM storage directory exists os.makedirs(self.dicom_storage_path, exist_ok=True) def generate_dicom_from_model(self, dicom_image: DICOMImage, pixel_data: Optional[np.ndarray] = None) -> str: """ Generate a DICOM file from a DICOMImage model instance. Args: dicom_image: DICOMImage model instance pixel_data: Optional numpy array for pixel data. If None, generates synthetic data. Returns: str: Path to the generated DICOM file """ # Create DICOM dataset ds = Dataset() ds.file_meta = FileMetaDataset() # Set transfer syntax ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian ds.file_meta.MediaStorageSOPClassUID = dicom_image.sop_class_uid or self._get_sop_class_uid(dicom_image.series.modality) ds.file_meta.MediaStorageSOPInstanceUID = dicom_image.sop_instance_uid ds.file_meta.ImplementationClassUID = generate_uid() ds.file_meta.ImplementationVersionName = "HospitalMgmt_v4.0" # Patient Information patient = dicom_image.patient ds.PatientName = f"{patient.last_name}^{patient.first_name}" ds.PatientID = str(patient.patient_id) ds.PatientBirthDate = patient.date_of_birth.strftime('%Y%m%d') if patient.date_of_birth else '' ds.PatientSex = patient.gender if hasattr(patient, 'gender') else 'O' # Study Information study = dicom_image.study ds.StudyInstanceUID = study.study_instance_uid ds.StudyDate = study.study_datetime.strftime('%Y%m%d') ds.StudyTime = study.study_datetime.strftime('%H%M%S.%f')[:-3] ds.StudyDescription = study.study_description ds.AccessionNumber = study.accession_number ds.ReferringPhysicianName = f"{study.referring_physician.last_name}^{study.referring_physician.first_name}" # Series Information series = dicom_image.series ds.SeriesInstanceUID = series.series_instance_uid ds.SeriesNumber = str(series.series_number) ds.SeriesDescription = series.series_description or '' ds.SeriesDate = series.series_datetime.strftime('%Y%m%d') ds.SeriesTime = series.series_datetime.strftime('%H%M%S.%f')[:-3] ds.Modality = series.modality ds.ProtocolName = series.protocol_name or '' # Instance Information ds.SOPClassUID = dicom_image.sop_class_uid or self._get_sop_class_uid(series.modality) ds.SOPInstanceUID = dicom_image.sop_instance_uid ds.InstanceNumber = str(dicom_image.instance_number) # Image Information if dicom_image.content_date: ds.ContentDate = dicom_image.content_date.strftime('%Y%m%d') if dicom_image.content_time: ds.ContentTime = dicom_image.content_time.strftime('%H%M%S.%f')[:-3] if dicom_image.acquisition_datetime: ds.AcquisitionDate = dicom_image.acquisition_datetime.strftime('%Y%m%d') ds.AcquisitionTime = dicom_image.acquisition_datetime.strftime('%H%M%S.%f')[:-3] # Image Type if dicom_image.image_type: ds.ImageType = dicom_image.image_type.split('\\') else: ds.ImageType = ['ORIGINAL', 'PRIMARY'] # Equipment Information if study.manufacturer: ds.Manufacturer = study.manufacturer if study.model_name: ds.ManufacturerModelName = study.model_name if study.station_name: ds.StationName = study.station_name # Image Dimensions and Pixel Data ds.Rows = dicom_image.rows ds.Columns = dicom_image.columns ds.BitsAllocated = dicom_image.bits_allocated ds.BitsStored = dicom_image.bits_stored ds.HighBit = dicom_image.bits_stored - 1 ds.PixelRepresentation = 0 # Unsigned by default ds.SamplesPerPixel = 1 ds.PhotometricInterpretation = "MONOCHROME2" # Image Position and Orientation if dicom_image.image_position: ds.ImagePositionPatient = [float(x) for x in dicom_image.image_position.split('\\')] if dicom_image.image_orientation: ds.ImageOrientationPatient = [float(x) for x in dicom_image.image_orientation.split('\\')] if dicom_image.slice_location is not None: ds.SliceLocation = str(dicom_image.slice_location) # Pixel Spacing if series.pixel_spacing: ds.PixelSpacing = [float(x) for x in series.pixel_spacing.split('\\')] if series.slice_thickness: ds.SliceThickness = str(series.slice_thickness) if series.spacing_between_slices: ds.SpacingBetweenSlices = str(series.spacing_between_slices) # Window/Level Settings if dicom_image.window_center is not None: ds.WindowCenter = str(dicom_image.window_center) if dicom_image.window_width is not None: ds.WindowWidth = str(dicom_image.window_width) # Patient Position if series.patient_position: ds.PatientPosition = series.patient_position # Technical Parameters if study.kvp: ds.KVP = str(study.kvp) if study.exposure_time: ds.ExposureTime = str(study.exposure_time) # Contrast Information if series.contrast_agent: ds.ContrastBolusAgent = series.contrast_agent if series.contrast_route: ds.ContrastBolusRoute = series.contrast_route # Generate or use provided pixel data if pixel_data is None: pixel_data = self._generate_synthetic_pixel_data( dicom_image.rows, dicom_image.columns, dicom_image.bits_allocated ) # Set pixel data ds.PixelData = pixel_data.tobytes() if dicom_image.bits_allocated <= 8: ds["PixelData"].VR = "OB" else: ds["PixelData"].VR = "OW" # Transfer Syntax if dicom_image.transfer_syntax_uid: ds.file_meta.TransferSyntaxUID = dicom_image.transfer_syntax_uid # Generate file path file_path = self._generate_file_path(dicom_image) # Save DICOM file ds.save_as(file_path, enforce_file_format=True) # Update model with file information dicom_image.file_path = file_path dicom_image.file_size = os.path.getsize(file_path) dicom_image.save(update_fields=['file_path', 'file_size']) return file_path def create_dicom_from_data(self, study_data: Dict[str, Any], series_data: Dict[str, Any], image_data: Dict[str, Any], pixel_data: Optional[np.ndarray] = None) -> str: """ Create a DICOM file from raw data dictionaries. Args: study_data: Study-level DICOM data series_data: Series-level DICOM data image_data: Image-level DICOM data pixel_data: Optional numpy array for pixel data Returns: str: Path to the generated DICOM file """ ds = Dataset() ds.file_meta = FileMetaDataset() # File Meta Information ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian ds.file_meta.MediaStorageSOPClassUID = image_data.get('sop_class_uid', CTImageStorage) ds.file_meta.MediaStorageSOPInstanceUID = image_data.get('sop_instance_uid', generate_uid()) ds.file_meta.ImplementationClassUID = generate_uid() ds.file_meta.ImplementationVersionName = "HospitalMgmt_v4.0" # Patient Information ds.PatientName = study_data.get('patient_name', 'Anonymous^Patient') ds.PatientID = study_data.get('patient_id', 'UNKNOWN') ds.PatientBirthDate = study_data.get('patient_birth_date', '') ds.PatientSex = study_data.get('patient_sex', 'O') # Study Information ds.StudyInstanceUID = study_data.get('study_instance_uid', generate_uid()) ds.StudyDate = study_data.get('study_date', datetime.now().strftime('%Y%m%d')) ds.StudyTime = study_data.get('study_time', datetime.now().strftime('%H%M%S')) ds.StudyDescription = study_data.get('study_description', '') ds.AccessionNumber = study_data.get('accession_number', '') # Series Information ds.SeriesInstanceUID = series_data.get('series_instance_uid', generate_uid()) ds.SeriesNumber = str(series_data.get('series_number', 1)) ds.SeriesDescription = series_data.get('series_description', '') ds.Modality = series_data.get('modality', 'CT') # Instance Information ds.SOPClassUID = image_data.get('sop_class_uid', self._get_sop_class_uid(ds.Modality)) ds.SOPInstanceUID = image_data.get('sop_instance_uid', generate_uid()) ds.InstanceNumber = str(image_data.get('instance_number', 1)) # Image Dimensions rows = image_data.get('rows', 512) columns = image_data.get('columns', 512) bits_allocated = image_data.get('bits_allocated', 16) ds.Rows = rows ds.Columns = columns ds.BitsAllocated = bits_allocated ds.BitsStored = image_data.get('bits_stored', bits_allocated) ds.HighBit = ds.BitsStored - 1 ds.PixelRepresentation = 0 # Unsigned ds.SamplesPerPixel = 1 ds.PhotometricInterpretation = "MONOCHROME2" # Generate pixel data if not provided if pixel_data is None: pixel_data = self._generate_synthetic_pixel_data(rows, columns, bits_allocated) # Set pixel data ds.PixelData = pixel_data.tobytes() if bits_allocated <= 8: ds["PixelData"].VR = "OB" else: ds["PixelData"].VR = "OW" # Optional fields if 'image_position' in image_data: ds.ImagePositionPatient = image_data['image_position'] if 'image_orientation' in image_data: ds.ImageOrientationPatient = image_data['image_orientation'] if 'slice_location' in image_data: ds.SliceLocation = str(image_data['slice_location']) if 'window_center' in image_data: ds.WindowCenter = str(image_data['window_center']) if 'window_width' in image_data: ds.WindowWidth = str(image_data['window_width']) # Generate file path filename = f"{ds.SOPInstanceUID}.dcm" file_path = os.path.join(self.dicom_storage_path, filename) # Save DICOM file ds.save_as(file_path, enforce_file_format=True) return file_path def generate_series_dicoms(self, series: ImagingSeries, num_images: int = 10) -> list: """ Generate multiple DICOM images for a series. Args: series: ImagingSeries model instance num_images: Number of images to generate Returns: list: List of file paths for generated DICOM files """ file_paths = [] for i in range(num_images): # Create DICOMImage instance dicom_image = DICOMImage( series=series, sop_instance_uid=generate_uid(), instance_number=i + 1, sop_class_uid=self._get_sop_class_uid(series.modality), rows=512, columns=512, bits_allocated=16, bits_stored=16, file_path='', # Will be set by generate_dicom_from_model file_size=0, # Will be set by generate_dicom_from_model ) # Add slice-specific data if series.slice_thickness: dicom_image.slice_location = i * series.slice_thickness # Set image position for axial slices if series.modality in ['CT', 'MR']: dicom_image.image_position = f"0\\0\\{i * (series.slice_thickness or 5)}" dicom_image.image_orientation = "1\\0\\0\\0\\1\\0" # Save the model instance dicom_image.save() # Generate DICOM file file_path = self.generate_dicom_from_model(dicom_image) file_paths.append(file_path) # Update series metrics series.number_of_instances = num_images series.save(update_fields=['number_of_instances']) return file_paths def _get_sop_class_uid(self, modality: str) -> str: """Get appropriate SOP Class UID for modality.""" return self.SOP_CLASS_UIDS.get(modality, CTImageStorage) def _generate_file_path(self, dicom_image: DICOMImage) -> str: """Generate file path for DICOM image.""" study = dicom_image.study series = dicom_image.series # Create directory structure: tenant/patient/study/series/ path_parts = [ self.dicom_storage_path, str(study.tenant.id), str(study.patient.patient_id), study.accession_number, f"series_{series.series_number:03d}" ] directory = os.path.join(*path_parts) os.makedirs(directory, exist_ok=True) # Generate filename filename = f"image_{dicom_image.instance_number:04d}_{dicom_image.sop_instance_uid}.dcm" return os.path.join(directory, filename) def _generate_synthetic_pixel_data(self, rows: int, columns: int, bits_allocated: int) -> np.ndarray: """ Generate synthetic pixel data for testing purposes. Args: rows: Number of rows columns: Number of columns bits_allocated: Bits allocated per pixel Returns: np.ndarray: Synthetic pixel data """ if bits_allocated <= 8: dtype = np.uint8 max_value = 255 elif bits_allocated <= 16: dtype = np.uint16 max_value = 65535 else: dtype = np.uint32 max_value = 4294967295 # Create a simple pattern - circle in center y, x = np.ogrid[:rows, :columns] center_y, center_x = rows // 2, columns // 2 radius = min(rows, columns) // 4 # Create circular pattern mask = (x - center_x) ** 2 + (y - center_y) ** 2 <= radius ** 2 # Generate base noise pixel_data = np.random.randint(0, max_value // 4, (rows, columns), dtype=dtype) # Add circle pattern pixel_data[mask] = max_value // 2 # Add some anatomical-like structures if bits_allocated > 8: # Add gradient for depth gradient = np.linspace(0, max_value // 8, columns) pixel_data += gradient.astype(dtype) return pixel_data def create_test_dicom_series(self, study: ImagingStudy, modality: str = 'CT', num_images: int = 20) -> ImagingSeries: """ Create a test DICOM series with synthetic data. Args: study: ImagingStudy instance modality: Imaging modality num_images: Number of images to generate Returns: ImagingSeries: Created series with DICOM files """ # Create series series = ImagingSeries.objects.create( study=study, series_instance_uid=generate_uid(), series_number=1, modality=modality, series_description=f"Test {modality} Series", series_datetime=timezone.now(), slice_thickness=5.0 if modality in ['CT', 'MR'] else None, spacing_between_slices=5.0 if modality in ['CT', 'MR'] else None, pixel_spacing="0.5\\0.5" if modality in ['CT', 'MR'] else None, ) # Generate DICOM files file_paths = self.generate_series_dicoms(series, num_images) return series class DICOMValidator: """ Service class for validating DICOM files and data. """ @staticmethod def validate_dicom_file(file_path: str) -> Dict[str, Any]: """ Validate a DICOM file and extract metadata. Args: file_path: Path to DICOM file Returns: dict: Validation results and metadata """ try: from pydicom import dcmread ds = dcmread(file_path) return { 'valid': True, 'sop_instance_uid': str(ds.SOPInstanceUID), 'sop_class_uid': str(ds.SOPClassUID), 'study_instance_uid': str(ds.StudyInstanceUID), 'series_instance_uid': str(ds.SeriesInstanceUID), 'instance_number': int(ds.InstanceNumber), 'modality': str(ds.Modality), 'rows': int(ds.Rows), 'columns': int(ds.Columns), 'bits_allocated': int(ds.BitsAllocated), 'patient_id': str(ds.PatientID), 'study_date': str(ds.StudyDate), 'series_number': int(ds.SeriesNumber), 'file_size': os.path.getsize(file_path), 'errors': [] } except Exception as e: return { 'valid': False, 'errors': [str(e)], 'file_size': os.path.getsize(file_path) if os.path.exists(file_path) else 0 } @staticmethod def validate_dicom_image_model(dicom_image: DICOMImage) -> Dict[str, Any]: """ Validate DICOMImage model data for DICOM generation. Args: dicom_image: DICOMImage model instance Returns: dict: Validation results """ errors = [] warnings = [] # Required fields if not dicom_image.sop_instance_uid: errors.append("SOP Instance UID is required") if not dicom_image.series: errors.append("Series relationship is required") if dicom_image.rows <= 0: errors.append("Rows must be positive") if dicom_image.columns <= 0: errors.append("Columns must be positive") if dicom_image.bits_allocated not in [8, 16, 32]: errors.append("Bits allocated must be 8, 16, or 32") if dicom_image.bits_stored > dicom_image.bits_allocated: errors.append("Bits stored cannot exceed bits allocated") # Warnings for missing optional fields if not dicom_image.image_position: warnings.append("Image position not specified") if not dicom_image.image_orientation: warnings.append("Image orientation not specified") if dicom_image.window_center is None or dicom_image.window_width is None: warnings.append("Window/Level settings not specified") return { 'valid': len(errors) == 0, 'errors': errors, 'warnings': warnings } class DICOMUtilities: """ Utility functions for DICOM operations. """ @staticmethod def generate_study_uid() -> str: """Generate a new Study Instance UID.""" return generate_uid() @staticmethod def generate_series_uid() -> str: """Generate a new Series Instance UID.""" return generate_uid() @staticmethod def generate_instance_uid() -> str: """Generate a new SOP Instance UID.""" return generate_uid() @staticmethod def parse_dicom_date(date_str: str) -> Optional[date]: """Parse DICOM date string (YYYYMMDD) to Python date.""" try: return datetime.strptime(date_str, '%Y%m%d').date() except (ValueError, TypeError): return None @staticmethod def parse_dicom_time(time_str: str) -> Optional[datetime]: """Parse DICOM time string (HHMMSS.ffffff) to Python time.""" try: # Handle various time formats if '.' in time_str: return datetime.strptime(time_str, '%H%M%S.%f').time() else: return datetime.strptime(time_str, '%H%M%S').time() except (ValueError, TypeError): return None @staticmethod def format_dicom_date(date_obj: date) -> str: """Format Python date to DICOM date string.""" return date_obj.strftime('%Y%m%d') @staticmethod def format_dicom_time(time_obj: datetime) -> str: """Format Python datetime to DICOM time string.""" return time_obj.strftime('%H%M%S.%f')[:-3] @staticmethod def calculate_file_size_mb(file_path: str) -> float: """Calculate file size in MB.""" if os.path.exists(file_path): return round(os.path.getsize(file_path) / (1024 * 1024), 2) return 0.0 @staticmethod def get_modality_description(modality: str) -> str: """Get human-readable description for modality.""" descriptions = { 'CT': 'Computed Tomography', 'MR': 'Magnetic Resonance', 'CR': 'Computed Radiography', 'DX': 'Digital Radiography', 'US': 'Ultrasound', 'XA': 'X-Ray Angiography', 'RF': 'Radiofluoroscopy', 'MG': 'Mammography', 'NM': 'Nuclear Medicine', 'PT': 'Positron Emission Tomography', } return descriptions.get(modality, modality)