Marwan Alwali ab2c4a36c5 update
2025-10-02 10:13:03 +03:00

625 lines
23 KiB
Python

"""
Radiology services for DICOM generation and management.
"""
import os
import numpy as np
from datetime import datetime, date
from typing import Optional, Dict, Any, Tuple
from django.conf import settings
from django.utils import timezone
from pydicom import Dataset, FileMetaDataset
from pydicom.uid import (
ExplicitVRLittleEndian, ImplicitVRLittleEndian,
CTImageStorage, MRImageStorage, ComputedRadiographyImageStorage,
DigitalXRayImageStorageForPresentation, UltrasoundImageStorage,
generate_uid
)
from .models import DICOMImage, ImagingSeries, ImagingStudy
class DICOMGenerator:
"""
Service class for generating DICOM files from DICOMImage model data.
"""
# DICOM SOP Class UIDs for different modalities
SOP_CLASS_UIDS = {
'CT': CTImageStorage,
'MR': MRImageStorage,
'CR': ComputedRadiographyImageStorage,
'DX': DigitalXRayImageStorageForPresentation,
'US': UltrasoundImageStorage,
'XA': '1.2.840.10008.5.1.4.1.1.12.1', # X-Ray Angiography
'RF': '1.2.840.10008.5.1.4.1.1.12.2', # Radiofluoroscopy
'MG': '1.2.840.10008.5.1.4.1.1.1.2', # Mammography
'NM': '1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine
'PT': '1.2.840.10008.5.1.4.1.1.128', # PET
}
def __init__(self):
"""Initialize DICOM generator."""
self.media_root = getattr(settings, 'MEDIA_ROOT', 'media')
self.dicom_storage_path = os.path.join(self.media_root, 'dicom')
# Ensure DICOM storage directory exists
os.makedirs(self.dicom_storage_path, exist_ok=True)
def generate_dicom_from_model(self, dicom_image: DICOMImage, pixel_data: Optional[np.ndarray] = None) -> str:
"""
Generate a DICOM file from a DICOMImage model instance.
Args:
dicom_image: DICOMImage model instance
pixel_data: Optional numpy array for pixel data. If None, generates synthetic data.
Returns:
str: Path to the generated DICOM file
"""
# Create DICOM dataset
ds = Dataset()
ds.file_meta = FileMetaDataset()
# Set transfer syntax
ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
ds.file_meta.MediaStorageSOPClassUID = dicom_image.sop_class_uid or self._get_sop_class_uid(dicom_image.series.modality)
ds.file_meta.MediaStorageSOPInstanceUID = dicom_image.sop_instance_uid
ds.file_meta.ImplementationClassUID = generate_uid()
ds.file_meta.ImplementationVersionName = "HospitalMgmt_v4.0"
# Patient Information
patient = dicom_image.patient
ds.PatientName = f"{patient.last_name}^{patient.first_name}"
ds.PatientID = str(patient.patient_id)
ds.PatientBirthDate = patient.date_of_birth.strftime('%Y%m%d') if patient.date_of_birth else ''
ds.PatientSex = patient.gender if hasattr(patient, 'gender') else 'O'
# Study Information
study = dicom_image.study
ds.StudyInstanceUID = study.study_instance_uid
ds.StudyDate = study.study_datetime.strftime('%Y%m%d')
ds.StudyTime = study.study_datetime.strftime('%H%M%S.%f')[:-3]
ds.StudyDescription = study.study_description
ds.AccessionNumber = study.accession_number
ds.ReferringPhysicianName = f"{study.referring_physician.last_name}^{study.referring_physician.first_name}"
# Series Information
series = dicom_image.series
ds.SeriesInstanceUID = series.series_instance_uid
ds.SeriesNumber = str(series.series_number)
ds.SeriesDescription = series.series_description or ''
ds.SeriesDate = series.series_datetime.strftime('%Y%m%d')
ds.SeriesTime = series.series_datetime.strftime('%H%M%S.%f')[:-3]
ds.Modality = series.modality
ds.ProtocolName = series.protocol_name or ''
# Instance Information
ds.SOPClassUID = dicom_image.sop_class_uid or self._get_sop_class_uid(series.modality)
ds.SOPInstanceUID = dicom_image.sop_instance_uid
ds.InstanceNumber = str(dicom_image.instance_number)
# Image Information
if dicom_image.content_date:
ds.ContentDate = dicom_image.content_date.strftime('%Y%m%d')
if dicom_image.content_time:
ds.ContentTime = dicom_image.content_time.strftime('%H%M%S.%f')[:-3]
if dicom_image.acquisition_datetime:
ds.AcquisitionDate = dicom_image.acquisition_datetime.strftime('%Y%m%d')
ds.AcquisitionTime = dicom_image.acquisition_datetime.strftime('%H%M%S.%f')[:-3]
# Image Type
if dicom_image.image_type:
ds.ImageType = dicom_image.image_type.split('\\')
else:
ds.ImageType = ['ORIGINAL', 'PRIMARY']
# Equipment Information
if study.manufacturer:
ds.Manufacturer = study.manufacturer
if study.model_name:
ds.ManufacturerModelName = study.model_name
if study.station_name:
ds.StationName = study.station_name
# Image Dimensions and Pixel Data
ds.Rows = dicom_image.rows
ds.Columns = dicom_image.columns
ds.BitsAllocated = dicom_image.bits_allocated
ds.BitsStored = dicom_image.bits_stored
ds.HighBit = dicom_image.bits_stored - 1
ds.PixelRepresentation = 0 # Unsigned by default
ds.SamplesPerPixel = 1
ds.PhotometricInterpretation = "MONOCHROME2"
# Image Position and Orientation
if dicom_image.image_position:
ds.ImagePositionPatient = [float(x) for x in dicom_image.image_position.split('\\')]
if dicom_image.image_orientation:
ds.ImageOrientationPatient = [float(x) for x in dicom_image.image_orientation.split('\\')]
if dicom_image.slice_location is not None:
ds.SliceLocation = str(dicom_image.slice_location)
# Pixel Spacing
if series.pixel_spacing:
ds.PixelSpacing = [float(x) for x in series.pixel_spacing.split('\\')]
if series.slice_thickness:
ds.SliceThickness = str(series.slice_thickness)
if series.spacing_between_slices:
ds.SpacingBetweenSlices = str(series.spacing_between_slices)
# Window/Level Settings
if dicom_image.window_center is not None:
ds.WindowCenter = str(dicom_image.window_center)
if dicom_image.window_width is not None:
ds.WindowWidth = str(dicom_image.window_width)
# Patient Position
if series.patient_position:
ds.PatientPosition = series.patient_position
# Technical Parameters
if study.kvp:
ds.KVP = str(study.kvp)
if study.exposure_time:
ds.ExposureTime = str(study.exposure_time)
# Contrast Information
if series.contrast_agent:
ds.ContrastBolusAgent = series.contrast_agent
if series.contrast_route:
ds.ContrastBolusRoute = series.contrast_route
# Generate or use provided pixel data
if pixel_data is None:
pixel_data = self._generate_synthetic_pixel_data(
dicom_image.rows,
dicom_image.columns,
dicom_image.bits_allocated
)
# Set pixel data
ds.PixelData = pixel_data.tobytes()
if dicom_image.bits_allocated <= 8:
ds["PixelData"].VR = "OB"
else:
ds["PixelData"].VR = "OW"
# Transfer Syntax
if dicom_image.transfer_syntax_uid:
ds.file_meta.TransferSyntaxUID = dicom_image.transfer_syntax_uid
# Generate file path
file_path = self._generate_file_path(dicom_image)
# Save DICOM file
ds.save_as(file_path, enforce_file_format=True)
# Update model with file information
dicom_image.file_path = file_path
dicom_image.file_size = os.path.getsize(file_path)
dicom_image.save(update_fields=['file_path', 'file_size'])
return file_path
def create_dicom_from_data(self,
study_data: Dict[str, Any],
series_data: Dict[str, Any],
image_data: Dict[str, Any],
pixel_data: Optional[np.ndarray] = None) -> str:
"""
Create a DICOM file from raw data dictionaries.
Args:
study_data: Study-level DICOM data
series_data: Series-level DICOM data
image_data: Image-level DICOM data
pixel_data: Optional numpy array for pixel data
Returns:
str: Path to the generated DICOM file
"""
ds = Dataset()
ds.file_meta = FileMetaDataset()
# File Meta Information
ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
ds.file_meta.MediaStorageSOPClassUID = image_data.get('sop_class_uid', CTImageStorage)
ds.file_meta.MediaStorageSOPInstanceUID = image_data.get('sop_instance_uid', generate_uid())
ds.file_meta.ImplementationClassUID = generate_uid()
ds.file_meta.ImplementationVersionName = "HospitalMgmt_v4.0"
# Patient Information
ds.PatientName = study_data.get('patient_name', 'Anonymous^Patient')
ds.PatientID = study_data.get('patient_id', 'UNKNOWN')
ds.PatientBirthDate = study_data.get('patient_birth_date', '')
ds.PatientSex = study_data.get('patient_sex', 'O')
# Study Information
ds.StudyInstanceUID = study_data.get('study_instance_uid', generate_uid())
ds.StudyDate = study_data.get('study_date', datetime.now().strftime('%Y%m%d'))
ds.StudyTime = study_data.get('study_time', datetime.now().strftime('%H%M%S'))
ds.StudyDescription = study_data.get('study_description', '')
ds.AccessionNumber = study_data.get('accession_number', '')
# Series Information
ds.SeriesInstanceUID = series_data.get('series_instance_uid', generate_uid())
ds.SeriesNumber = str(series_data.get('series_number', 1))
ds.SeriesDescription = series_data.get('series_description', '')
ds.Modality = series_data.get('modality', 'CT')
# Instance Information
ds.SOPClassUID = image_data.get('sop_class_uid', self._get_sop_class_uid(ds.Modality))
ds.SOPInstanceUID = image_data.get('sop_instance_uid', generate_uid())
ds.InstanceNumber = str(image_data.get('instance_number', 1))
# Image Dimensions
rows = image_data.get('rows', 512)
columns = image_data.get('columns', 512)
bits_allocated = image_data.get('bits_allocated', 16)
ds.Rows = rows
ds.Columns = columns
ds.BitsAllocated = bits_allocated
ds.BitsStored = image_data.get('bits_stored', bits_allocated)
ds.HighBit = ds.BitsStored - 1
ds.PixelRepresentation = 0 # Unsigned
ds.SamplesPerPixel = 1
ds.PhotometricInterpretation = "MONOCHROME2"
# Generate pixel data if not provided
if pixel_data is None:
pixel_data = self._generate_synthetic_pixel_data(rows, columns, bits_allocated)
# Set pixel data
ds.PixelData = pixel_data.tobytes()
if bits_allocated <= 8:
ds["PixelData"].VR = "OB"
else:
ds["PixelData"].VR = "OW"
# Optional fields
if 'image_position' in image_data:
ds.ImagePositionPatient = image_data['image_position']
if 'image_orientation' in image_data:
ds.ImageOrientationPatient = image_data['image_orientation']
if 'slice_location' in image_data:
ds.SliceLocation = str(image_data['slice_location'])
if 'window_center' in image_data:
ds.WindowCenter = str(image_data['window_center'])
if 'window_width' in image_data:
ds.WindowWidth = str(image_data['window_width'])
# Generate file path
filename = f"{ds.SOPInstanceUID}.dcm"
file_path = os.path.join(self.dicom_storage_path, filename)
# Save DICOM file
ds.save_as(file_path, enforce_file_format=True)
return file_path
def generate_series_dicoms(self, series: ImagingSeries, num_images: int = 10) -> list:
"""
Generate multiple DICOM images for a series.
Args:
series: ImagingSeries model instance
num_images: Number of images to generate
Returns:
list: List of file paths for generated DICOM files
"""
file_paths = []
for i in range(num_images):
# Create DICOMImage instance
dicom_image = DICOMImage(
series=series,
sop_instance_uid=generate_uid(),
instance_number=i + 1,
sop_class_uid=self._get_sop_class_uid(series.modality),
rows=512,
columns=512,
bits_allocated=16,
bits_stored=16,
file_path='', # Will be set by generate_dicom_from_model
file_size=0, # Will be set by generate_dicom_from_model
)
# Add slice-specific data
if series.slice_thickness:
dicom_image.slice_location = i * series.slice_thickness
# Set image position for axial slices
if series.modality in ['CT', 'MR']:
dicom_image.image_position = f"0\\0\\{i * (series.slice_thickness or 5)}"
dicom_image.image_orientation = "1\\0\\0\\0\\1\\0"
# Save the model instance
dicom_image.save()
# Generate DICOM file
file_path = self.generate_dicom_from_model(dicom_image)
file_paths.append(file_path)
# Update series metrics
series.number_of_instances = num_images
series.save(update_fields=['number_of_instances'])
return file_paths
def _get_sop_class_uid(self, modality: str) -> str:
"""Get appropriate SOP Class UID for modality."""
return self.SOP_CLASS_UIDS.get(modality, CTImageStorage)
def _generate_file_path(self, dicom_image: DICOMImage) -> str:
"""Generate file path for DICOM image."""
study = dicom_image.study
series = dicom_image.series
# Create directory structure: tenant/patient/study/series/
path_parts = [
self.dicom_storage_path,
str(study.tenant.id),
str(study.patient.patient_id),
study.accession_number,
f"series_{series.series_number:03d}"
]
directory = os.path.join(*path_parts)
os.makedirs(directory, exist_ok=True)
# Generate filename
filename = f"image_{dicom_image.instance_number:04d}_{dicom_image.sop_instance_uid}.dcm"
return os.path.join(directory, filename)
def _generate_synthetic_pixel_data(self, rows: int, columns: int, bits_allocated: int) -> np.ndarray:
"""
Generate synthetic pixel data for testing purposes.
Args:
rows: Number of rows
columns: Number of columns
bits_allocated: Bits allocated per pixel
Returns:
np.ndarray: Synthetic pixel data
"""
if bits_allocated <= 8:
dtype = np.uint8
max_value = 255
elif bits_allocated <= 16:
dtype = np.uint16
max_value = 65535
else:
dtype = np.uint32
max_value = 4294967295
# Create a simple pattern - circle in center
y, x = np.ogrid[:rows, :columns]
center_y, center_x = rows // 2, columns // 2
radius = min(rows, columns) // 4
# Create circular pattern
mask = (x - center_x) ** 2 + (y - center_y) ** 2 <= radius ** 2
# Generate base noise
pixel_data = np.random.randint(0, max_value // 4, (rows, columns), dtype=dtype)
# Add circle pattern
pixel_data[mask] = max_value // 2
# Add some anatomical-like structures
if bits_allocated > 8:
# Add gradient for depth
gradient = np.linspace(0, max_value // 8, columns)
pixel_data += gradient.astype(dtype)
return pixel_data
def create_test_dicom_series(self,
study: ImagingStudy,
modality: str = 'CT',
num_images: int = 20) -> ImagingSeries:
"""
Create a test DICOM series with synthetic data.
Args:
study: ImagingStudy instance
modality: Imaging modality
num_images: Number of images to generate
Returns:
ImagingSeries: Created series with DICOM files
"""
# Create series
series = ImagingSeries.objects.create(
study=study,
series_instance_uid=generate_uid(),
series_number=1,
modality=modality,
series_description=f"Test {modality} Series",
series_datetime=timezone.now(),
slice_thickness=5.0 if modality in ['CT', 'MR'] else None,
spacing_between_slices=5.0 if modality in ['CT', 'MR'] else None,
pixel_spacing="0.5\\0.5" if modality in ['CT', 'MR'] else None,
)
# Generate DICOM files
file_paths = self.generate_series_dicoms(series, num_images)
return series
class DICOMValidator:
"""
Service class for validating DICOM files and data.
"""
@staticmethod
def validate_dicom_file(file_path: str) -> Dict[str, Any]:
"""
Validate a DICOM file and extract metadata.
Args:
file_path: Path to DICOM file
Returns:
dict: Validation results and metadata
"""
try:
from pydicom import dcmread
ds = dcmread(file_path)
return {
'valid': True,
'sop_instance_uid': str(ds.SOPInstanceUID),
'sop_class_uid': str(ds.SOPClassUID),
'study_instance_uid': str(ds.StudyInstanceUID),
'series_instance_uid': str(ds.SeriesInstanceUID),
'instance_number': int(ds.InstanceNumber),
'modality': str(ds.Modality),
'rows': int(ds.Rows),
'columns': int(ds.Columns),
'bits_allocated': int(ds.BitsAllocated),
'patient_id': str(ds.PatientID),
'study_date': str(ds.StudyDate),
'series_number': int(ds.SeriesNumber),
'file_size': os.path.getsize(file_path),
'errors': []
}
except Exception as e:
return {
'valid': False,
'errors': [str(e)],
'file_size': os.path.getsize(file_path) if os.path.exists(file_path) else 0
}
@staticmethod
def validate_dicom_image_model(dicom_image: DICOMImage) -> Dict[str, Any]:
"""
Validate DICOMImage model data for DICOM generation.
Args:
dicom_image: DICOMImage model instance
Returns:
dict: Validation results
"""
errors = []
warnings = []
# Required fields
if not dicom_image.sop_instance_uid:
errors.append("SOP Instance UID is required")
if not dicom_image.series:
errors.append("Series relationship is required")
if dicom_image.rows <= 0:
errors.append("Rows must be positive")
if dicom_image.columns <= 0:
errors.append("Columns must be positive")
if dicom_image.bits_allocated not in [8, 16, 32]:
errors.append("Bits allocated must be 8, 16, or 32")
if dicom_image.bits_stored > dicom_image.bits_allocated:
errors.append("Bits stored cannot exceed bits allocated")
# Warnings for missing optional fields
if not dicom_image.image_position:
warnings.append("Image position not specified")
if not dicom_image.image_orientation:
warnings.append("Image orientation not specified")
if dicom_image.window_center is None or dicom_image.window_width is None:
warnings.append("Window/Level settings not specified")
return {
'valid': len(errors) == 0,
'errors': errors,
'warnings': warnings
}
class DICOMUtilities:
"""
Utility functions for DICOM operations.
"""
@staticmethod
def generate_study_uid() -> str:
"""Generate a new Study Instance UID."""
return generate_uid()
@staticmethod
def generate_series_uid() -> str:
"""Generate a new Series Instance UID."""
return generate_uid()
@staticmethod
def generate_instance_uid() -> str:
"""Generate a new SOP Instance UID."""
return generate_uid()
@staticmethod
def parse_dicom_date(date_str: str) -> Optional[date]:
"""Parse DICOM date string (YYYYMMDD) to Python date."""
try:
return datetime.strptime(date_str, '%Y%m%d').date()
except (ValueError, TypeError):
return None
@staticmethod
def parse_dicom_time(time_str: str) -> Optional[datetime]:
"""Parse DICOM time string (HHMMSS.ffffff) to Python time."""
try:
# Handle various time formats
if '.' in time_str:
return datetime.strptime(time_str, '%H%M%S.%f').time()
else:
return datetime.strptime(time_str, '%H%M%S').time()
except (ValueError, TypeError):
return None
@staticmethod
def format_dicom_date(date_obj: date) -> str:
"""Format Python date to DICOM date string."""
return date_obj.strftime('%Y%m%d')
@staticmethod
def format_dicom_time(time_obj: datetime) -> str:
"""Format Python datetime to DICOM time string."""
return time_obj.strftime('%H%M%S.%f')[:-3]
@staticmethod
def calculate_file_size_mb(file_path: str) -> float:
"""Calculate file size in MB."""
if os.path.exists(file_path):
return round(os.path.getsize(file_path) / (1024 * 1024), 2)
return 0.0
@staticmethod
def get_modality_description(modality: str) -> str:
"""Get human-readable description for modality."""
descriptions = {
'CT': 'Computed Tomography',
'MR': 'Magnetic Resonance',
'CR': 'Computed Radiography',
'DX': 'Digital Radiography',
'US': 'Ultrasound',
'XA': 'X-Ray Angiography',
'RF': 'Radiofluoroscopy',
'MG': 'Mammography',
'NM': 'Nuclear Medicine',
'PT': 'Positron Emission Tomography',
}
return descriptions.get(modality, modality)