""" Management command to import staff data from CSV file CSV Format: Staff ID,Name,Location,Department,Section,Subsection,AlHammadi Job Title,Country,Gender,Manager Example: 4,ABDULAZIZ SALEH ALHAMMADI,Nuzha,Senior Management Offices,COO Office,,Chief Operating Officer,Saudi Arabia,Male,2 - MOHAMMAD SALEH AL HAMMADI """ import csv import os from django.core.management.base import BaseCommand, CommandError from django.db import transaction from apps.organizations.models import Hospital, Department, Staff # Map CSV departments to standard department codes DEPARTMENT_MAPPING = { 'Senior Management Offices': 'ADM-005', 'Human Resource': 'ADM-005', 'Human Resource ': 'ADM-005', # With trailing space 'Corporate Administration': 'ADM-005', 'Corporate Administration ': 'ADM-005', # With trailing space 'Emergency': 'EMR-001', 'Outpatient': 'OUT-002', 'Inpatient': 'INP-003', 'Diagnostics': 'DIA-004', 'Administration': 'ADM-005', } class Command(BaseCommand): help = 'Import staff data from CSV file' def add_arguments(self, parser): parser.add_argument( 'csv_file', type=str, help='Path to CSV file to import' ) parser.add_argument( '--hospital-code', type=str, required=True, help='Hospital code to assign staff to' ) parser.add_argument( '--staff-type', type=str, default='admin', choices=['physician', 'nurse', 'admin', 'other'], help='Staff type to assign (default: admin)' ) parser.add_argument( '--skip-existing', action='store_true', help='Skip staff with existing employee_id' ) parser.add_argument( '--update-existing', action='store_true', help='Update existing staff records' ) parser.add_argument( '--create-users', action='store_true', help='Create user accounts for imported staff' ) parser.add_argument( '--dry-run', action='store_true', help='Preview without making changes' ) def handle(self, *args, **options): csv_file_path = options['csv_file'] hospital_code = options['hospital_code'] staff_type = options['staff_type'] skip_existing = options['skip_existing'] update_existing = options['update_existing'] create_users = options['create_users'] dry_run = options['dry_run'] self.stdout.write(f"\n{'='*60}") self.stdout.write("Staff CSV Import Command") self.stdout.write(f"{'='*60}\n") # Validate CSV file exists if not os.path.exists(csv_file_path): raise CommandError(f"CSV file not found: {csv_file_path}") # Get hospital try: hospital = Hospital.objects.get(code=hospital_code) self.stdout.write( self.style.SUCCESS(f"✓ Found hospital: {hospital.name} ({hospital.code})") ) except Hospital.DoesNotExist: raise CommandError(f"Hospital with code '{hospital_code}' not found") # Get departments for this hospital departments = Department.objects.filter(hospital=hospital, status='active') self.stdout.write( self.style.SUCCESS(f"✓ Found {departments.count()} departments in hospital") ) # Display configuration self.stdout.write("\nConfiguration:") self.stdout.write(f" CSV file: {csv_file_path}") self.stdout.write(f" Hospital: {hospital.name}") self.stdout.write(f" Staff type: {staff_type}") self.stdout.write(f" Skip existing: {skip_existing}") self.stdout.write(f" Update existing: {update_existing}") self.stdout.write(f" Create user accounts: {create_users}") self.stdout.write(f" Dry run: {dry_run}") # Read and parse CSV self.stdout.write("\nReading CSV file...") staff_data = self.parse_csv(csv_file_path) if not staff_data: self.stdout.write(self.style.WARNING("No valid staff data found in CSV")) return self.stdout.write( self.style.SUCCESS(f"✓ Found {len(staff_data)} staff records in CSV") ) # Track statistics stats = { 'created': 0, 'updated': 0, 'skipped': 0, 'errors': 0, 'manager_links': 0 } # First pass: Create/update all staff records staff_mapping = {} # Maps employee_id to staff object with transaction.atomic(): for idx, row in enumerate(staff_data, 1): try: # Check if staff already exists existing_staff = Staff.objects.filter( employee_id=row['staff_id'] ).first() if existing_staff: if skip_existing: self.stdout.write( f" [{idx}] ⊘ Skipped: {row['name']} (already exists)" ) stats['skipped'] += 1 continue if not update_existing: self.stdout.write( self.style.ERROR( f" [{idx}] ✗ Staff already exists: {row['name']} (use --update-existing to update)" ) ) stats['errors'] += 1 continue # Update existing staff self.update_staff(existing_staff, row, hospital, departments, staff_type) if not dry_run: existing_staff.save() self.stdout.write( self.style.SUCCESS( f" [{idx}] ✓ Updated: {row['name']}" ) ) stats['updated'] += 1 staff_mapping[row['staff_id']] = existing_staff else: # Create new staff staff = self.create_staff(row, hospital, departments, staff_type) if not dry_run: staff.save() staff_mapping[row['staff_id']] = staff self.stdout.write( self.style.SUCCESS( f" [{idx}] ✓ Created: {row['name']}" ) ) stats['created'] += 1 except Exception as e: self.stdout.write( self.style.ERROR( f" [{idx}] ✗ Failed to process {row['name']}: {str(e)}" ) ) stats['errors'] += 1 # Second pass: Link managers self.stdout.write("\nLinking manager relationships...") for idx, row in enumerate(staff_data, 1): if not row['manager_id']: continue try: staff = staff_mapping.get(row['staff_id']) if not staff: continue manager = staff_mapping.get(row['manager_id']) if manager: if staff.report_to != manager: staff.report_to = manager if not dry_run: staff.save() stats['manager_links'] += 1 self.stdout.write( self.style.SUCCESS( f" [{idx}] ✓ Linked {row['name']} → {manager.get_full_name()}" ) ) else: self.stdout.write( self.style.WARNING( f" [{idx}] ⚠ Manager not found: {row['manager_id']} for {row['name']}" ) ) except Exception as e: self.stdout.write( self.style.ERROR( f" [{idx}] ✗ Failed to link manager for {row['name']}: {str(e)}" ) ) stats['errors'] += 1 # Summary self.stdout.write("\n" + "="*60) self.stdout.write("Import Summary:") self.stdout.write(f" Staff records created: {stats['created']}") self.stdout.write(f" Staff records updated: {stats['updated']}") self.stdout.write(f" Staff records skipped: {stats['skipped']}") self.stdout.write(f" Manager relationships linked: {stats['manager_links']}") self.stdout.write(f" Errors: {stats['errors']}") self.stdout.write("="*60 + "\n") if dry_run: self.stdout.write(self.style.WARNING("DRY RUN: No changes were made\n")) else: self.stdout.write(self.style.SUCCESS("Import completed successfully!\n")) def parse_csv(self, csv_file_path): """Parse CSV file and return list of staff data dictionaries""" staff_data = [] try: with open(csv_file_path, 'r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) # Expected columns (Phone is optional) expected_columns = [ 'Staff ID', 'Name', 'Location', 'Department', 'Section', 'Subsection', 'AlHammadi Job Title', 'Country', 'Gender', 'Phone', 'Manager' ] # Validate columns actual_columns = reader.fieldnames if not actual_columns: self.stdout.write(self.style.ERROR("CSV file is empty or has no headers")) return [] # Normalize column names (remove extra spaces) normalized_columns = [col.strip() for col in actual_columns] for row_idx, row in enumerate(reader, 1): try: # Parse manager field "ID - Name" manager_id = None manager_name = None if row.get('Manager', '').strip(): manager_parts = row['Manager'].split('-', 1) manager_id = manager_parts[0].strip() if len(manager_parts) > 1: manager_name = manager_parts[1].strip() # Parse name into first and last name name = row['Name'].strip() name_parts = name.split(None, 1) # Split on first space first_name = name_parts[0] if name_parts else name last_name = name_parts[1] if len(name_parts) > 1 else '' # Map department to standard department dept_name = row['Department'].strip() dept_code = DEPARTMENT_MAPPING.get(dept_name) if not dept_code: # Default to Administration if not found dept_code = 'ADM-005' # Phone is optional - check if column exists phone = '' if 'Phone' in row: phone = row['Phone'].strip() staff_record = { 'staff_id': row['Staff ID'].strip(), 'name': name, 'first_name': first_name, 'last_name': last_name, 'location': row['Location'].strip(), 'department': dept_name, 'department_code': dept_code, 'section': row['Section'].strip(), 'subsection': row['Subsection'].strip(), 'job_title': row['AlHammadi Job Title'].strip(), 'country': row['Country'].strip(), 'gender': row['Gender'].strip().lower(), 'phone': phone, 'manager_id': manager_id, 'manager_name': manager_name } staff_data.append(staff_record) except Exception as e: self.stdout.write( self.style.WARNING(f"Skipping row {row_idx}: {str(e)}") ) continue except Exception as e: self.stdout.write(self.style.ERROR(f"Error reading CSV file: {str(e)}")) return [] return staff_data def create_staff(self, row, hospital, departments, staff_type): """Create a new Staff record from CSV row""" # Find department department = None for dept in departments: if dept.code == row['department_code']: department = dept break # Create staff record staff = Staff( employee_id=row['staff_id'], name=row['name'], # Store original name from CSV first_name=row['first_name'], last_name=row['last_name'], first_name_ar='', last_name_ar='', staff_type=staff_type, job_title=row['job_title'], license_number=None, specialization=row['job_title'], # Use job title as specialization email='', phone=row.get('phone', ''), # Phone from CSV (optional) hospital=hospital, department=department, country=row['country'], location=row['location'], # Store location from CSV gender=row['gender'], department_name=row['department'], section=row['section'], subsection=row['subsection'], report_to=None, # Will be linked in second pass status='active' ) return staff def update_staff(self, staff, row, hospital, departments, staff_type): """Update existing Staff record from CSV row""" # Find department department = None for dept in departments: if dept.code == row['department_code']: department = dept break # Update fields staff.name = row['name'] # Update original name from CSV staff.first_name = row['first_name'] staff.last_name = row['last_name'] staff.staff_type = staff_type staff.job_title = row['job_title'] staff.specialization = row['job_title'] staff.phone = row.get('phone', '') # Update phone (optional) staff.hospital = hospital staff.department = department staff.country = row['country'] staff.location = row['location'] # Update location staff.gender = row['gender'] staff.department_name = row['department'] staff.section = row['section'] staff.subsection = row['subsection'] # report_to will be updated in second pass