380 lines
17 KiB
Python
380 lines
17 KiB
Python
"""
|
|
Unified management command to import staff data from CSV file.
|
|
|
|
This command:
|
|
1. Auto-creates Departments if they don't exist (with Arabic names)
|
|
2. Auto-creates Sections as sub-departments (with Arabic names)
|
|
3. Sets the department ForeignKey properly
|
|
4. Handles is_head flag
|
|
5. Links manager relationships
|
|
6. Handles bilingual (English/Arabic) data
|
|
|
|
CSV Format:
|
|
Staff ID,Name,Name_ar,Manager,Manager_ar,Civil Identity Number,Location,Location_ar,Department,Department_ar,Section,Section_ar,Subsection,Subsection_ar,AlHammadi Job Title,AlHammadi Job Title_ar,Country,Country_ar
|
|
|
|
Example:
|
|
4,ABDULAZIZ SALEH ALHAMMADI,عبدالعزيز صالح محمد الحمادي,2 - MOHAMMAD SALEH AL HAMMADI,2 - محمد صالح محمد الحمادي,1013086457,Nuzha,النزهة,Senior Management Offices, إدارة مكاتب الإدارة العليا ,COO Office,مكتب الرئيس التنفيذي للعمليات والتشغيل,,,Chief Operating Officer,الرئيس التنفيذي للعمليات والتشغيل,Saudi Arabia,المملكة العربية السعودية
|
|
"""
|
|
import csv
|
|
import os
|
|
import uuid
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.db import transaction
|
|
|
|
from apps.organizations.models import Hospital, Department, Staff, StaffSection, StaffSubsection
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Import staff from CSV with auto-creation of departments and sections (bilingual support)'
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument('csv_file', type=str, help='Path to CSV file')
|
|
parser.add_argument('--hospital-code', type=str, required=True, help='Hospital code')
|
|
parser.add_argument('--staff-type', type=str, default='admin', choices=['physician', 'nurse', 'admin', 'other'])
|
|
parser.add_argument('--update-existing', action='store_true', help='Update existing staff')
|
|
parser.add_argument('--dry-run', action='store_true', help='Preview without changes')
|
|
|
|
def handle(self, *args, **options):
|
|
csv_file = options['csv_file']
|
|
hospital_code = options['hospital_code']
|
|
staff_type = options['staff_type']
|
|
update_existing = options['update_existing']
|
|
dry_run = options['dry_run']
|
|
|
|
if not os.path.exists(csv_file):
|
|
raise CommandError(f"CSV file not found: {csv_file}")
|
|
|
|
try:
|
|
hospital = Hospital.objects.get(code=hospital_code)
|
|
except Hospital.DoesNotExist:
|
|
raise CommandError(f"Hospital '{hospital_code}' not found")
|
|
|
|
self.stdout.write(f"\nImporting staff for: {hospital.name}")
|
|
self.stdout.write(f"CSV: {csv_file}")
|
|
self.stdout.write(f"Dry run: {dry_run}\n")
|
|
|
|
# Parse CSV
|
|
staff_data = self.parse_csv(csv_file)
|
|
self.stdout.write(f"Found {len(staff_data)} records in CSV\n")
|
|
|
|
# Statistics
|
|
stats = {'created': 0, 'updated': 0, 'skipped': 0, 'depts_created': 0, 'sections_created': 0,
|
|
'subsections_created': 0, 'managers_linked': 0, 'errors': 0}
|
|
|
|
# Caches
|
|
dept_cache = {} # {(hospital_id, dept_name): Department}
|
|
section_cache = {} # {(department_id, section_name): StaffSection}
|
|
subsection_cache = {} # {(section_id, subsection_name): StaffSubsection}
|
|
staff_map = {} # employee_id -> Staff
|
|
|
|
with transaction.atomic():
|
|
# Pass 1: Create/update staff
|
|
for idx, row in enumerate(staff_data, 1):
|
|
try:
|
|
# Get or create department (top-level only)
|
|
department = self._get_or_create_department(
|
|
hospital, row['department'], row.get('department_ar', ''),
|
|
dept_cache, dry_run, stats
|
|
)
|
|
|
|
# Get or create section (under department)
|
|
section = self._get_or_create_section(
|
|
department, row['section'], row.get('section_ar', ''),
|
|
section_cache, dry_run, stats
|
|
)
|
|
|
|
# Get or create subsection (under section)
|
|
subsection = self._get_or_create_subsection(
|
|
section, row['subsection'], row.get('subsection_ar', ''),
|
|
subsection_cache, dry_run, stats
|
|
)
|
|
|
|
# Check existing
|
|
existing = Staff.objects.filter(employee_id=row['staff_id']).first()
|
|
|
|
if existing and not update_existing:
|
|
self.stdout.write(f"[{idx}] ⊘ Skipped (exists): {row['name']}")
|
|
stats['skipped'] += 1
|
|
staff_map[row['staff_id']] = existing
|
|
continue
|
|
|
|
if existing:
|
|
self._update_staff(existing, row, hospital, department, section, subsection, staff_type)
|
|
if not dry_run:
|
|
existing.save()
|
|
self.stdout.write(f"[{idx}] ✓ Updated: {row['name']}")
|
|
stats['updated'] += 1
|
|
staff_map[row['staff_id']] = existing
|
|
else:
|
|
staff = self._create_staff(row, hospital, department, section, subsection, staff_type)
|
|
if not dry_run:
|
|
staff.save()
|
|
staff_map[row['staff_id']] = staff
|
|
self.stdout.write(f"[{idx}] ✓ Created: {row['name']}")
|
|
stats['created'] += 1
|
|
|
|
except Exception as e:
|
|
self.stdout.write(self.style.ERROR(f"[{idx}] ✗ Error: {row.get('name', 'Unknown')} - {e}"))
|
|
stats['errors'] += 1
|
|
|
|
# Pass 2: Link managers
|
|
self.stdout.write("\nLinking managers...")
|
|
for row in staff_data:
|
|
if not row.get('manager_id'):
|
|
continue
|
|
staff = staff_map.get(row['staff_id'])
|
|
manager = staff_map.get(row['manager_id'])
|
|
if staff and manager and staff.report_to != manager:
|
|
staff.report_to = manager
|
|
if not dry_run:
|
|
staff.save()
|
|
stats['managers_linked'] += 1
|
|
self.stdout.write(f" ✓ {row['name']} → {manager.name}")
|
|
|
|
# Summary
|
|
self.stdout.write(f"\n{'='*50}")
|
|
self.stdout.write("Summary:")
|
|
self.stdout.write(f" Staff created: {stats['created']}")
|
|
self.stdout.write(f" Staff updated: {stats['updated']}")
|
|
self.stdout.write(f" Staff skipped: {stats['skipped']}")
|
|
self.stdout.write(f" Departments created: {stats['depts_created']}")
|
|
self.stdout.write(f" Sections created: {stats['sections_created']}")
|
|
self.stdout.write(f" Subsections created: {stats.get('subsections_created', 0)}")
|
|
self.stdout.write(f" Managers linked: {stats['managers_linked']}")
|
|
self.stdout.write(f" Errors: {stats['errors']}")
|
|
if dry_run:
|
|
self.stdout.write(self.style.WARNING("\nDRY RUN - No changes made"))
|
|
|
|
def parse_csv(self, csv_file):
|
|
"""Parse CSV and return list of dicts with bilingual support"""
|
|
data = []
|
|
with open(csv_file, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
# Parse manager "ID - Name"
|
|
manager_id = None
|
|
manager_name = ''
|
|
if row.get('Manager', '').strip():
|
|
manager_parts = row['Manager'].split('-', 1)
|
|
manager_id = manager_parts[0].strip()
|
|
manager_name = manager_parts[1].strip() if len(manager_parts) > 1 else ''
|
|
|
|
# Parse name
|
|
name = row.get('Name', '').strip()
|
|
parts = name.split(None, 1)
|
|
|
|
# Parse Arabic name
|
|
name_ar = row.get('Name_ar', '').strip()
|
|
parts_ar = name_ar.split(None, 1) if name_ar else ['', '']
|
|
|
|
data.append({
|
|
'staff_id': row.get('Staff ID', '').strip(),
|
|
'name': name,
|
|
'name_ar': name_ar,
|
|
'first_name': parts[0] if parts else name,
|
|
'last_name': parts[1] if len(parts) > 1 else '',
|
|
'first_name_ar': parts_ar[0] if parts_ar else '',
|
|
'last_name_ar': parts_ar[1] if len(parts_ar) > 1 else '',
|
|
'civil_id': row.get('Civil Identity Number', '').strip(),
|
|
'location': row.get('Location', '').strip(),
|
|
'location_ar': row.get('Location_ar', '').strip(),
|
|
'department': row.get('Department', '').strip(),
|
|
'department_ar': row.get('Department_ar', '').strip(),
|
|
'section': row.get('Section', '').strip(),
|
|
'section_ar': row.get('Section_ar', '').strip(),
|
|
'subsection': row.get('Subsection', '').strip(),
|
|
'subsection_ar': row.get('Subsection_ar', '').strip(),
|
|
'job_title': row.get('AlHammadi Job Title', '').strip(),
|
|
'job_title_ar': row.get('AlHammadi Job Title_ar', '').strip(),
|
|
'country': row.get('Country', '').strip(),
|
|
'country_ar': row.get('Country_ar', '').strip(),
|
|
'gender': row.get('Gender', '').strip().lower() if row.get('Gender') else '',
|
|
'manager_id': manager_id,
|
|
'manager_name': manager_name,
|
|
})
|
|
return data
|
|
|
|
def _get_or_create_department(self, hospital, dept_name, dept_name_ar, cache, dry_run, stats):
|
|
"""Get or create department (top-level only)"""
|
|
if not dept_name:
|
|
return None
|
|
|
|
cache_key = (str(hospital.id), dept_name)
|
|
|
|
if cache_key in cache:
|
|
return cache[cache_key]
|
|
|
|
# Get or create main department (top-level, parent=None)
|
|
dept, created = Department.objects.get_or_create(
|
|
hospital=hospital,
|
|
name__iexact=dept_name,
|
|
parent__isnull=True, # Only match top-level departments
|
|
defaults={
|
|
'name': dept_name,
|
|
'name_ar': dept_name_ar or '',
|
|
'code': str(uuid.uuid4())[:8],
|
|
'status': 'active'
|
|
}
|
|
)
|
|
if created and not dry_run:
|
|
stats['depts_created'] += 1
|
|
self.stdout.write(f" + Created department: {dept_name}")
|
|
elif created and dry_run:
|
|
stats['depts_created'] += 1
|
|
self.stdout.write(f" + Would create department: {dept_name}")
|
|
|
|
# Update Arabic name if empty and we have new data
|
|
if dept.name_ar != dept_name_ar and dept_name_ar:
|
|
dept.name_ar = dept_name_ar
|
|
if not dry_run:
|
|
dept.save()
|
|
|
|
cache[cache_key] = dept
|
|
return dept
|
|
|
|
def _get_or_create_section(self, department, section_name, section_name_ar, cache, dry_run, stats):
|
|
"""Get or create StaffSection within a department"""
|
|
if not section_name or not department:
|
|
return None
|
|
|
|
cache_key = (str(department.id), section_name)
|
|
|
|
if cache_key in cache:
|
|
return cache[cache_key]
|
|
|
|
# If section name is same as department (case-insensitive), skip
|
|
if section_name.lower() == department.name.lower():
|
|
self.stdout.write(f" ! Section name '{section_name}' same as department, skipping section")
|
|
cache[cache_key] = None
|
|
return None
|
|
|
|
# Get or create section
|
|
section, created = StaffSection.objects.get_or_create(
|
|
department=department,
|
|
name__iexact=section_name,
|
|
defaults={
|
|
'name': section_name,
|
|
'name_ar': section_name_ar or '',
|
|
'code': str(uuid.uuid4())[:8],
|
|
'status': 'active'
|
|
}
|
|
)
|
|
if created and not dry_run:
|
|
stats['sections_created'] += 1
|
|
self.stdout.write(f" + Created section: {section_name} (under {department.name})")
|
|
elif created and dry_run:
|
|
stats['sections_created'] += 1
|
|
self.stdout.write(f" + Would create section: {section_name} (under {department.name})")
|
|
|
|
# Update Arabic name if empty and we have new data
|
|
if section.name_ar != section_name_ar and section_name_ar:
|
|
section.name_ar = section_name_ar
|
|
if not dry_run:
|
|
section.save()
|
|
|
|
cache[cache_key] = section
|
|
return section
|
|
|
|
def _get_or_create_subsection(self, section, subsection_name, subsection_name_ar, cache, dry_run, stats):
|
|
"""Get or create StaffSubsection within a section"""
|
|
if not subsection_name or not section:
|
|
return None
|
|
|
|
cache_key = (str(section.id), subsection_name)
|
|
|
|
if cache_key in cache:
|
|
return cache[cache_key]
|
|
|
|
# If subsection name is same as section (case-insensitive), skip
|
|
if subsection_name.lower() == section.name.lower():
|
|
self.stdout.write(f" ! Subsection name '{subsection_name}' same as section, skipping subsection")
|
|
cache[cache_key] = None
|
|
return None
|
|
|
|
# Get or create subsection
|
|
subsection, created = StaffSubsection.objects.get_or_create(
|
|
section=section,
|
|
name__iexact=subsection_name,
|
|
defaults={
|
|
'name': subsection_name,
|
|
'name_ar': subsection_name_ar or '',
|
|
'code': str(uuid.uuid4())[:8],
|
|
'status': 'active'
|
|
}
|
|
)
|
|
if created and not dry_run:
|
|
stats['subsections_created'] = stats.get('subsections_created', 0) + 1
|
|
self.stdout.write(f" + Created subsection: {subsection_name} (under {section.name})")
|
|
elif created and dry_run:
|
|
stats['subsections_created'] = stats.get('subsections_created', 0) + 1
|
|
self.stdout.write(f" + Would create subsection: {subsection_name} (under {section.name})")
|
|
|
|
# Update Arabic name if empty and we have new data
|
|
if subsection.name_ar != subsection_name_ar and subsection_name_ar:
|
|
subsection.name_ar = subsection_name_ar
|
|
if not dry_run:
|
|
subsection.save()
|
|
|
|
cache[cache_key] = subsection
|
|
return subsection
|
|
|
|
def _create_staff(self, row, hospital, department, section, subsection, staff_type):
|
|
"""Create new Staff record with bilingual data"""
|
|
return Staff(
|
|
employee_id=row['staff_id'],
|
|
name=row['name'],
|
|
name_ar=row['name_ar'],
|
|
first_name=row['first_name'],
|
|
last_name=row['last_name'],
|
|
first_name_ar=row['first_name_ar'],
|
|
last_name_ar=row['last_name_ar'],
|
|
civil_id=row['civil_id'],
|
|
staff_type=staff_type,
|
|
job_title=row['job_title'],
|
|
job_title_ar=row['job_title_ar'],
|
|
specialization=row['job_title'],
|
|
hospital=hospital,
|
|
department=department,
|
|
section_fk=section, # ForeignKey to StaffSection
|
|
subsection_fk=subsection, # ForeignKey to StaffSubsection
|
|
department_name=row['department'],
|
|
department_name_ar=row['department_ar'],
|
|
section=row['section'], # Original CSV value
|
|
section_ar=row['section_ar'],
|
|
subsection=row['subsection'], # Original CSV value
|
|
subsection_ar=row['subsection_ar'],
|
|
location=row['location'],
|
|
location_ar=row['location_ar'],
|
|
country=row['country'],
|
|
country_ar=row['country_ar'],
|
|
gender=row['gender'],
|
|
status='active'
|
|
)
|
|
|
|
def _update_staff(self, staff, row, hospital, department, section, subsection, staff_type):
|
|
"""Update existing Staff record with bilingual data"""
|
|
staff.name = row['name']
|
|
staff.name_ar = row['name_ar']
|
|
staff.first_name = row['first_name']
|
|
staff.last_name = row['last_name']
|
|
staff.first_name_ar = row['first_name_ar']
|
|
staff.last_name_ar = row['last_name_ar']
|
|
staff.civil_id = row['civil_id']
|
|
staff.job_title = row['job_title']
|
|
staff.job_title_ar = row['job_title_ar']
|
|
staff.hospital = hospital
|
|
staff.department = department
|
|
staff.section_fk = section # ForeignKey to StaffSection
|
|
staff.subsection_fk = subsection # ForeignKey to StaffSubsection
|
|
staff.department_name = row['department']
|
|
staff.department_name_ar = row['department_ar']
|
|
staff.section = row['section'] # Original CSV value
|
|
staff.section_ar = row['section_ar']
|
|
staff.subsection = row['subsection'] # Original CSV value
|
|
staff.subsection_ar = row['subsection_ar']
|
|
staff.location = row['location']
|
|
staff.location_ar = row['location_ar']
|
|
staff.country = row['country']
|
|
staff.country_ar = row['country_ar']
|
|
staff.gender = row['gender']
|