401 lines
15 KiB
Python

"""
Management command to import staff data from CSV file
CSV Format:
Staff ID,Name,Location,Department,Section,Subsection,AlHammadi Job Title,Country,Gender,Manager
Example:
4,ABDULAZIZ SALEH ALHAMMADI,Nuzha,Senior Management Offices,COO Office,,Chief Operating Officer,Saudi Arabia,Male,2 - MOHAMMAD SALEH AL HAMMADI
"""
import csv
import os
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from apps.organizations.models import Hospital, Department, Staff
# Map CSV departments to standard department codes
DEPARTMENT_MAPPING = {
'Senior Management Offices': 'ADM-005',
'Human Resource': 'ADM-005',
'Human Resource ': 'ADM-005', # With trailing space
'Corporate Administration': 'ADM-005',
'Corporate Administration ': 'ADM-005', # With trailing space
'Emergency': 'EMR-001',
'Outpatient': 'OUT-002',
'Inpatient': 'INP-003',
'Diagnostics': 'DIA-004',
'Administration': 'ADM-005',
}
class Command(BaseCommand):
help = 'Import staff data from CSV file'
def add_arguments(self, parser):
parser.add_argument(
'csv_file',
type=str,
help='Path to CSV file to import'
)
parser.add_argument(
'--hospital-code',
type=str,
required=True,
help='Hospital code to assign staff to'
)
parser.add_argument(
'--staff-type',
type=str,
default='admin',
choices=['physician', 'nurse', 'admin', 'other'],
help='Staff type to assign (default: admin)'
)
parser.add_argument(
'--skip-existing',
action='store_true',
help='Skip staff with existing employee_id'
)
parser.add_argument(
'--update-existing',
action='store_true',
help='Update existing staff records'
)
parser.add_argument(
'--create-users',
action='store_true',
help='Create user accounts for imported staff'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Preview without making changes'
)
def handle(self, *args, **options):
csv_file_path = options['csv_file']
hospital_code = options['hospital_code']
staff_type = options['staff_type']
skip_existing = options['skip_existing']
update_existing = options['update_existing']
create_users = options['create_users']
dry_run = options['dry_run']
self.stdout.write(f"\n{'='*60}")
self.stdout.write("Staff CSV Import Command")
self.stdout.write(f"{'='*60}\n")
# Validate CSV file exists
if not os.path.exists(csv_file_path):
raise CommandError(f"CSV file not found: {csv_file_path}")
# Get hospital
try:
hospital = Hospital.objects.get(code=hospital_code)
self.stdout.write(
self.style.SUCCESS(f"✓ Found hospital: {hospital.name} ({hospital.code})")
)
except Hospital.DoesNotExist:
raise CommandError(f"Hospital with code '{hospital_code}' not found")
# Get departments for this hospital
departments = Department.objects.filter(hospital=hospital, status='active')
self.stdout.write(
self.style.SUCCESS(f"✓ Found {departments.count()} departments in hospital")
)
# Display configuration
self.stdout.write("\nConfiguration:")
self.stdout.write(f" CSV file: {csv_file_path}")
self.stdout.write(f" Hospital: {hospital.name}")
self.stdout.write(f" Staff type: {staff_type}")
self.stdout.write(f" Skip existing: {skip_existing}")
self.stdout.write(f" Update existing: {update_existing}")
self.stdout.write(f" Create user accounts: {create_users}")
self.stdout.write(f" Dry run: {dry_run}")
# Read and parse CSV
self.stdout.write("\nReading CSV file...")
staff_data = self.parse_csv(csv_file_path)
if not staff_data:
self.stdout.write(self.style.WARNING("No valid staff data found in CSV"))
return
self.stdout.write(
self.style.SUCCESS(f"✓ Found {len(staff_data)} staff records in CSV")
)
# Track statistics
stats = {
'created': 0,
'updated': 0,
'skipped': 0,
'errors': 0,
'manager_links': 0
}
# First pass: Create/update all staff records
staff_mapping = {} # Maps employee_id to staff object
with transaction.atomic():
for idx, row in enumerate(staff_data, 1):
try:
# Check if staff already exists
existing_staff = Staff.objects.filter(
employee_id=row['staff_id']
).first()
if existing_staff:
if skip_existing:
self.stdout.write(
f" [{idx}] ⊘ Skipped: {row['name']} (already exists)"
)
stats['skipped'] += 1
continue
if not update_existing:
self.stdout.write(
self.style.ERROR(
f" [{idx}] ✗ Staff already exists: {row['name']} (use --update-existing to update)"
)
)
stats['errors'] += 1
continue
# Update existing staff
self.update_staff(existing_staff, row, hospital, departments, staff_type)
if not dry_run:
existing_staff.save()
self.stdout.write(
self.style.SUCCESS(
f" [{idx}] ✓ Updated: {row['name']}"
)
)
stats['updated'] += 1
staff_mapping[row['staff_id']] = existing_staff
else:
# Create new staff
staff = self.create_staff(row, hospital, departments, staff_type)
if not dry_run:
staff.save()
staff_mapping[row['staff_id']] = staff
self.stdout.write(
self.style.SUCCESS(
f" [{idx}] ✓ Created: {row['name']}"
)
)
stats['created'] += 1
except Exception as e:
self.stdout.write(
self.style.ERROR(
f" [{idx}] ✗ Failed to process {row['name']}: {str(e)}"
)
)
stats['errors'] += 1
# Second pass: Link managers
self.stdout.write("\nLinking manager relationships...")
for idx, row in enumerate(staff_data, 1):
if not row['manager_id']:
continue
try:
staff = staff_mapping.get(row['staff_id'])
if not staff:
continue
manager = staff_mapping.get(row['manager_id'])
if manager:
if staff.report_to != manager:
staff.report_to = manager
if not dry_run:
staff.save()
stats['manager_links'] += 1
self.stdout.write(
self.style.SUCCESS(
f" [{idx}] ✓ Linked {row['name']}{manager.get_full_name()}"
)
)
else:
self.stdout.write(
self.style.WARNING(
f" [{idx}] ⚠ Manager not found: {row['manager_id']} for {row['name']}"
)
)
except Exception as e:
self.stdout.write(
self.style.ERROR(
f" [{idx}] ✗ Failed to link manager for {row['name']}: {str(e)}"
)
)
stats['errors'] += 1
# Summary
self.stdout.write("\n" + "="*60)
self.stdout.write("Import Summary:")
self.stdout.write(f" Staff records created: {stats['created']}")
self.stdout.write(f" Staff records updated: {stats['updated']}")
self.stdout.write(f" Staff records skipped: {stats['skipped']}")
self.stdout.write(f" Manager relationships linked: {stats['manager_links']}")
self.stdout.write(f" Errors: {stats['errors']}")
self.stdout.write("="*60 + "\n")
if dry_run:
self.stdout.write(self.style.WARNING("DRY RUN: No changes were made\n"))
else:
self.stdout.write(self.style.SUCCESS("Import completed successfully!\n"))
def parse_csv(self, csv_file_path):
"""Parse CSV file and return list of staff data dictionaries"""
staff_data = []
try:
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
# Expected columns (Phone is optional)
expected_columns = [
'Staff ID', 'Name', 'Location', 'Department',
'Section', 'Subsection', 'AlHammadi Job Title',
'Country', 'Gender', 'Phone', 'Manager'
]
# Validate columns
actual_columns = reader.fieldnames
if not actual_columns:
self.stdout.write(self.style.ERROR("CSV file is empty or has no headers"))
return []
# Normalize column names (remove extra spaces)
normalized_columns = [col.strip() for col in actual_columns]
for row_idx, row in enumerate(reader, 1):
try:
# Parse manager field "ID - Name"
manager_id = None
manager_name = None
if row.get('Manager', '').strip():
manager_parts = row['Manager'].split('-', 1)
manager_id = manager_parts[0].strip()
if len(manager_parts) > 1:
manager_name = manager_parts[1].strip()
# Parse name into first and last name
name = row['Name'].strip()
name_parts = name.split(None, 1) # Split on first space
first_name = name_parts[0] if name_parts else name
last_name = name_parts[1] if len(name_parts) > 1 else ''
# Map department to standard department
dept_name = row['Department'].strip()
dept_code = DEPARTMENT_MAPPING.get(dept_name)
if not dept_code:
# Default to Administration if not found
dept_code = 'ADM-005'
# Phone is optional - check if column exists
phone = ''
if 'Phone' in row:
phone = row['Phone'].strip()
staff_record = {
'staff_id': row['Staff ID'].strip(),
'name': name,
'first_name': first_name,
'last_name': last_name,
'location': row['Location'].strip(),
'department': dept_name,
'department_code': dept_code,
'section': row['Section'].strip(),
'subsection': row['Subsection'].strip(),
'job_title': row['AlHammadi Job Title'].strip(),
'country': row['Country'].strip(),
'gender': row['Gender'].strip().lower(),
'phone': phone,
'manager_id': manager_id,
'manager_name': manager_name
}
staff_data.append(staff_record)
except Exception as e:
self.stdout.write(
self.style.WARNING(f"Skipping row {row_idx}: {str(e)}")
)
continue
except Exception as e:
self.stdout.write(self.style.ERROR(f"Error reading CSV file: {str(e)}"))
return []
return staff_data
def create_staff(self, row, hospital, departments, staff_type):
"""Create a new Staff record from CSV row"""
# Find department
department = None
for dept in departments:
if dept.code == row['department_code']:
department = dept
break
# Create staff record
staff = Staff(
employee_id=row['staff_id'],
name=row['name'], # Store original name from CSV
first_name=row['first_name'],
last_name=row['last_name'],
first_name_ar='',
last_name_ar='',
staff_type=staff_type,
job_title=row['job_title'],
license_number=None,
specialization=row['job_title'], # Use job title as specialization
email='',
phone=row.get('phone', ''), # Phone from CSV (optional)
hospital=hospital,
department=department,
country=row['country'],
location=row['location'], # Store location from CSV
gender=row['gender'],
department_name=row['department'],
section=row['section'],
subsection=row['subsection'],
report_to=None, # Will be linked in second pass
status='active'
)
return staff
def update_staff(self, staff, row, hospital, departments, staff_type):
"""Update existing Staff record from CSV row"""
# Find department
department = None
for dept in departments:
if dept.code == row['department_code']:
department = dept
break
# Update fields
staff.name = row['name'] # Update original name from CSV
staff.first_name = row['first_name']
staff.last_name = row['last_name']
staff.staff_type = staff_type
staff.job_title = row['job_title']
staff.specialization = row['job_title']
staff.phone = row.get('phone', '') # Update phone (optional)
staff.hospital = hospital
staff.department = department
staff.country = row['country']
staff.location = row['location'] # Update location
staff.gender = row['gender']
staff.department_name = row['department']
staff.section = row['section']
staff.subsection = row['subsection']
# report_to will be updated in second pass