""" HIS Client Service - Fetches patient data from external HIS systems This service provides a client to fetch patient survey data from Hospital Information Systems (HIS) via their APIs. """ import logging import os from datetime import datetime, timedelta from typing import Dict, List, Optional from urllib.parse import quote import requests from django.utils import timezone from apps.integrations.models import IntegrationConfig, SourceSystem logger = logging.getLogger("apps.integrations") class HISClient: """ Client for fetching patient data from HIS systems. This client connects to external HIS APIs and retrieves patient demographic and visit data for survey processing. """ def __init__(self, config: Optional[IntegrationConfig] = None): """ Initialize HIS client. Args: config: IntegrationConfig instance. If None, will try to load active HIS configuration from database. """ self.config = config or self._get_default_config() self.session = requests.Session() # Load credentials from environment if no config self.username = os.getenv("HIS_API_USERNAME", "") self.password = os.getenv("HIS_API_PASSWORD", "") def _get_default_config(self) -> Optional[IntegrationConfig]: """Get default active HIS configuration from database.""" try: return IntegrationConfig.objects.filter(source_system=SourceSystem.HIS, is_active=True).first() except Exception as e: logger.error(f"Error loading HIS configuration: {e}") return None def _get_headers(self) -> Dict[str, str]: """Get request headers with authentication.""" headers = { "Content-Type": "application/json", "Accept": "application/json", } # Support for API key if configured if self.config and self.config.api_key: headers["X-API-Key"] = self.config.api_key headers["Authorization"] = f"Bearer {self.config.api_key}" return headers def _get_api_url(self) -> Optional[str]: """Get API URL from configuration or environment.""" if not self.config: # Fallback to environment variable return os.getenv("HIS_API_URL", "") return self.config.api_url def _get_auth(self) -> Optional[tuple]: """Get Basic Auth credentials.""" if self.username and self.password: return (self.username, self.password) return None def fetch_patients( self, since: Optional[datetime] = None, patient_type: Optional[str] = None, limit: int = 100 ) -> List[Dict]: """ Fetch patients from HIS system. Args: since: Only fetch patients updated since this datetime patient_type: Filter by patient type (1=Inpatient, 2=OPD, 3=EMS) limit: Maximum number of patients to fetch Returns: List of patient data dictionaries in HIS format """ api_url = self._get_api_url() if not api_url: logger.error("No HIS API URL configured") return [] try: # Build URL with query parameters for the new endpoint format if since: # Calculate end time (5 minutes window since this runs every 5 minutes) end_time = since + timedelta(minutes=5) # Format dates for HIS API: DD-Mon-YYYY HH:MM:SS from_date = self._format_datetime(since) to_date = self._format_datetime(end_time) # Build URL with parameters (SSN=0 and MobileNo=0 to get all) url = f"{api_url}?FromDate={quote(from_date)}&ToDate={quote(to_date)}&SSN=0&MobileNo=0" else: # If no since time, use last 10 minutes as default end_time = timezone.now() start_time = end_time - timedelta(minutes=10) from_date = self._format_datetime(start_time) to_date = self._format_datetime(end_time) url = f"{api_url}?FromDate={quote(from_date)}&ToDate={quote(to_date)}&SSN=0&MobileNo=0" logger.info(f"Fetching patients from HIS: {url}") # Make request with Basic Auth response = self.session.get( url, headers=self._get_headers(), auth=self._get_auth(), timeout=30, verify=True, # Set to False if SSL certificate issues ) response.raise_for_status() # Parse response data = response.json() # Handle different response formats if isinstance(data, list): patients = data elif isinstance(data, dict): patients = data.get("FetchPatientDataTimeStampList", []) # Alternative formats if not patients: patients = data.get("patients", []) if not patients: patients = data.get("data", []) else: patients = [] logger.info(f"Fetched {len(patients)} patients from HIS") return patients except requests.exceptions.RequestException as e: logger.error(f"Error fetching patients from HIS: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching patients: {e}") return [] # Build query parameters params = {"limit": limit} if since: # Format: DD-Mon-YYYY HH:MM (HIS format) params["since"] = self._format_datetime(since) if patient_type: params["patient_type"] = patient_type # Additional config params from IntegrationConfig if self.config and self.config.config_json: config_params = self.config.config_json.get("fetch_params", {}) params.update(config_params) try: logger.info(f"Fetching patients from HIS: {api_url}") response = self.session.get(api_url, headers=self._get_headers(), params=params, timeout=30) response.raise_for_status() data = response.json() # Handle different response formats if isinstance(data, list): patients = data elif isinstance(data, dict): # Standard HIS format patients = data.get("FetchPatientDataTimeStampList", []) # Alternative formats if not patients: patients = data.get("patients", []) if not patients: patients = data.get("data", []) else: patients = [] logger.info(f"Fetched {len(patients)} patients from HIS") return patients except requests.exceptions.RequestException as e: logger.error(f"Error fetching patients from HIS: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching patients: {e}") return [] def fetch_discharged_patients(self, since: Optional[datetime] = None, limit: int = 100) -> List[Dict]: """ Fetch discharged patients who need surveys. This is the main method for survey fetching - only discharged patients are eligible for surveys. Args: since: Only fetch patients discharged since this datetime limit: Maximum number of patients to fetch Returns: List of patient data dictionaries in HIS format """ api_url = self._get_api_url() if not api_url: logger.error("No HIS API URL configured") return [] try: # Build URL with query parameters for the new endpoint format if since: # Calculate end time (5 minutes window since this runs every 5 minutes) end_time = since + timedelta(minutes=5) # Format dates for HIS API: DD-Mon-YYYY HH:MM:SS from_date = self._format_datetime(since) to_date = self._format_datetime(end_time) # Build URL with parameters (SSN=0 and MobileNo=0 to get all) url = f"{api_url}?FromDate={quote(from_date)}&ToDate={quote(to_date)}&SSN=0&MobileNo=0" else: # If no since time, use last 10 minutes as default end_time = timezone.now() start_time = end_time - timedelta(minutes=10) from_date = self._format_datetime(start_time) to_date = self._format_datetime(end_time) url = f"{api_url}?FromDate={quote(from_date)}&ToDate={quote(to_date)}&SSN=0&MobileNo=0" logger.info(f"Fetching discharged patients from HIS: {url}") # Make request with Basic Auth response = self.session.get( url, headers=self._get_headers(), auth=self._get_auth(), timeout=30, verify=True, # Set to False if SSL certificate issues ) response.raise_for_status() data = response.json() # Parse response if isinstance(data, list): patients = data elif isinstance(data, dict): patients = data.get("FetchPatientDataTimeStampList", []) if not patients: patients = data.get("patients", []) if not patients: patients = data.get("data", []) else: patients = [] # Filter only discharged patients discharged = [ p for p in patients if p.get("DischargeDate") or (isinstance(p, dict) and p.get("FetchPatientDataTimeStampList", [{}])[0].get("DischargeDate")) ] logger.info(f"Fetched {len(discharged)} discharged patients from HIS") return discharged except requests.exceptions.RequestException as e: logger.error(f"Error fetching discharged patients: {e}") return [] def fetch_patient_visits(self, patient_id: str) -> List[Dict]: """ Fetch visit data for a specific patient. Args: patient_id: Patient ID from HIS Returns: List of visit data dictionaries """ api_url = self._get_api_url() if not api_url: return [] visits_endpoint = f"{api_url.rstrip('/')}/{patient_id}/visits" try: response = self.session.get(visits_endpoint, headers=self._get_headers(), timeout=30) response.raise_for_status() data = response.json() if isinstance(data, list): return data elif isinstance(data, dict): return data.get("FetchPatientDataTimeStampVisitDataList", []) return [] except requests.exceptions.RequestException as e: logger.error(f"Error fetching visits for patient {patient_id}: {e}") return [] def test_connection(self) -> Dict: """ Test connectivity to HIS system. Returns: Dict with 'success' boolean and 'message' string """ api_url = self._get_api_url() if not api_url: return {"success": False, "message": "No API URL configured"} try: # Try to fetch patients for last 1 minute as a test end_time = timezone.now() start_time = end_time - timedelta(minutes=1) from_date = self._format_datetime(start_time) to_date = self._format_datetime(end_time) url = f"{api_url}?FromDate={quote(from_date)}&ToDate={quote(to_date)}&SSN=0&MobileNo=0" response = self.session.get( url, headers=self._get_headers(), auth=self._get_auth(), timeout=10, verify=True ) if response.status_code == 200: return { "success": True, "message": "Successfully connected to HIS", "config_name": self.config.name if self.config else "Default", } else: return {"success": False, "message": f"HIS returned status {response.status_code}"} except requests.exceptions.ConnectionError: return {"success": False, "message": "Could not connect to HIS server"} except requests.exceptions.Timeout: return {"success": False, "message": "Connection to HIS timed out"} except Exception as e: return {"success": False, "message": f"Error: {str(e)}"} @staticmethod def _format_datetime(dt: datetime) -> str: """Format datetime for HIS API (DD-Mon-YYYY HH:MM:SS format).""" months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] return f"{dt.day:02d}-{months[dt.month - 1]}-{dt.year} {dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}" class HISClientFactory: """Factory for creating HIS clients.""" @staticmethod def get_client(hospital_id: Optional[str] = None) -> HISClient: """ Get HIS client for a specific hospital or default. Args: hospital_id: Optional hospital ID to get specific config Returns: HISClient instance """ config = None if hospital_id: # Try to find config for specific hospital try: from apps.organizations.models import Hospital hospital = Hospital.objects.filter(id=hospital_id).first() if hospital: config = IntegrationConfig.objects.filter( source_system=SourceSystem.HIS, is_active=True, config_json__hospital_id=str(hospital.id) ).first() except Exception: pass return HISClient(config) @staticmethod def get_all_active_clients() -> List[HISClient]: """Get all active HIS clients for multi-hospital setups.""" configs = IntegrationConfig.objects.filter(source_system=SourceSystem.HIS, is_active=True) if not configs.exists(): # Return default client with no config return [HISClient()] return [HISClient(config) for config in configs]