hospital-management/hr_data.py
Marwan Alwali 4d06ca4b5e update
2025-09-20 14:26:19 +03:00

821 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# scripts/hr_data_generator.py
import os
import django
# Set up Django environment
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hospital_management.settings')
django.setup()
import random
from datetime import datetime, timedelta, date, time
from decimal import Decimal
from django.utils import timezone as django_timezone
from hr.models import Employee, Department, Schedule, ScheduleAssignment, TimeEntry, PerformanceReview, TrainingRecord
from accounts.models import User
from core.models import Tenant
# ----------------------------
# Saudi-specific data constants
# ----------------------------
SAUDI_FIRST_NAMES_MALE = [
'Mohammed', 'Ahmed', 'Abdullah', 'Khalid', 'Omar', 'Ali', 'Hassan', 'Ibrahim',
'Yousef', 'Fahd', 'Faisal', 'Saud', 'Nasser', 'Abdulaziz', 'Abdulrahman',
'Majid', 'Saeed', 'Waleed', 'Tariq', 'Mansour', 'Sultan', 'Bandar',
'Turki', 'Nawaf', 'Rayan', 'Ziad', 'Adel', 'Salman', 'Fares', 'Amjad'
]
SAUDI_FIRST_NAMES_FEMALE = [
'Fatima', 'Aisha', 'Khadija', 'Maryam', 'Zahra', 'Noor', 'Layla', 'Sara',
'Hala', 'Noura', 'Reem', 'Lina', 'Dina', 'Rana', 'Jana', 'Maya',
'Amal', 'Hanan', 'Widad', 'Nada', 'Rawan', 'Ghada', 'Samar', 'Hind',
'Munira', 'Rahma', 'Najla', 'Dalal', 'Abeer', 'Manal'
]
SAUDI_LAST_NAMES = [
'Al-Rashid', 'Al-Otaibi', 'Al-Dosari', 'Al-Harbi', 'Al-Zahrani', 'Al-Ghamdi',
'Al-Qahtani', 'Al-Maliki', 'Al-Shammari', 'Al-Mutairi', 'Al-Subai', 'Al-Shamsi',
'Al-Faisal', 'Al-Saud', 'Al-Thani', 'Al-Fahd', 'Al-Sultan', 'Al-Nasser',
'Al-Mansour', 'Al-Khalil', 'Al-Ibrahim', 'Al-Hassan', 'Al-Ali', 'Al-Omar',
'Al-Ahmed', 'Al-Mohammed'
]
SAUDI_CITIES = [
'Riyadh', 'Jeddah', 'Mecca', 'Medina', 'Dammam', 'Khobar', 'Dhahran',
'Taif', 'Tabuk', 'Buraidah', 'Khamis Mushait', 'Hail', 'Hofuf', 'Najran',
'Jazan', 'Yanbu', 'Abha', 'Arar', 'Sakaka', 'Qatif'
]
SAUDI_DEPARTMENTS = [
('EMERGENCY', 'Emergency Department', 'Emergency medical services'),
('ICU', 'Intensive Care Unit', 'Critical care services'),
('CARDIOLOGY', 'Cardiology Department', 'Heart and cardiovascular care'),
('SURGERY', 'General Surgery', 'Surgical services'),
('ORTHOPEDICS', 'Orthopedics Department', 'Bone and joint care'),
('PEDIATRICS', 'Pediatrics Department', 'Children healthcare'),
('OBSTETRICS', 'Obstetrics & Gynecology', 'Women and maternity care'),
('RADIOLOGY', 'Radiology Department', 'Medical imaging services'),
('LABORATORY', 'Laboratory Services', 'Diagnostic testing'),
('PHARMACY', 'Pharmacy Department', 'Medication services'),
('NURSING', 'Nursing Services', 'Patient care services'),
('ADMINISTRATION', 'Administration', 'Hospital administration'),
('FINANCE', 'Finance Department', 'Financial management'),
('HR', 'Human Resources', 'Staff management'),
('IT', 'Information Technology', 'Technology services'),
('MAINTENANCE', 'Maintenance Services', 'Facility maintenance'),
('SECURITY', 'Security Department', 'Hospital security'),
('HOUSEKEEPING', 'Housekeeping Services', 'Cleaning services'),
('FOOD_SERVICE', 'Food Services', 'Dietary services'),
('SOCIAL_WORK', 'Social Work', 'Patient social services')
]
SAUDI_TRAINING_PROGRAMS = [
'Basic Life Support (BLS)', 'Advanced Cardiac Life Support (ACLS)',
'Pediatric Advanced Life Support (PALS)', 'Infection Control',
'Patient Safety', 'Fire Safety', 'Emergency Procedures',
'HIPAA Compliance', 'Cultural Sensitivity', 'Arabic Language',
'Islamic Healthcare Ethics', 'Medication Administration',
'Wound Care Management', 'Electronic Health Records',
'Quality Improvement', 'Customer Service Excellence'
]
# ----------------------------
# Helpers
# ----------------------------
def e164_ksa_mobile() -> str:
"""Return +9665XXXXXXXX to satisfy Employee.e164_ksa_regex."""
return f"+9665{random.randint(10000000, 99999999)}"
def ensure_departments(tenant):
"""
Ensure Department objects exist for this tenant; return a list.
If Department is global (no tenant field), operate globally.
"""
dept_fields = {f.name for f in Department._meta.fields}
is_tenant_scoped = 'tenant' in dept_fields
qset = Department.objects.filter(tenant=tenant) if is_tenant_scoped else Department.objects.all()
existing_codes = set(qset.values_list('code', flat=True))
created = []
for code, name, desc in SAUDI_DEPARTMENTS:
if code in existing_codes:
continue
kwargs = dict(code=code, name=name, description=desc,
department_type=('CLINICAL' if code in
['EMERGENCY', 'ICU', 'CARDIOLOGY', 'SURGERY', 'ORTHOPEDICS',
'PEDIATRICS', 'OBSTETRICS', 'RADIOLOGY', 'LABORATORY', 'PHARMACY', 'NURSING']
else 'ADMINISTRATIVE' if code in ['ADMINISTRATION', 'FINANCE', 'HR', 'IT']
else 'SUPPORT' if code in ['MAINTENANCE', 'SECURITY', 'HOUSEKEEPING', 'FOOD_SERVICE']
else 'ANCILLARY'),
annual_budget=Decimal(str(random.randint(500000, 5000000))),
cost_center=f"CC-{code}",
location=f"{random.choice(['Building A', 'Building B', 'Main Building'])}, Floor {random.randint(1, 5)}",
is_active=True,
notes=f"Primary {name.lower()} for {tenant.name}",
created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30)))
if is_tenant_scoped:
kwargs['tenant'] = tenant
try:
created.append(Department.objects.create(**kwargs))
except Exception as e:
print(f"Error creating department {code} for {tenant.name}: {e}")
# Return all for this tenant (or global)
qset = Department.objects.filter(tenant=tenant) if is_tenant_scoped else Department.objects.all()
print(f"Departments ready for {tenant.name}: {qset.count()}")
return list(qset)
def tenant_scoped_unique_username(tenant, base_username: str) -> str:
"""Make username unique within a tenant (AUTH is tenant-scoped)."""
uname = base_username
i = 1
while User.objects.filter(tenant=tenant, username=uname).exists():
i += 1
uname = f"{base_username}{i}"
return uname
def pick_job_title_for_department(department) -> str:
dtype = getattr(department, 'department_type', 'ADMINISTRATIVE') or 'ADMINISTRATIVE'
if dtype == 'CLINICAL':
return random.choice([
'Consultant Physician', 'Senior Physician', 'Physician', 'Resident Physician', 'Intern',
'Chief Nurse', 'Nurse Manager', 'Senior Nurse', 'Staff Nurse',
'Nurse Practitioner', 'Clinical Nurse Specialist', 'Charge Nurse',
'Pharmacist', 'Clinical Pharmacist', 'Pharmacy Technician',
'Radiologist', 'Radiology Technician', 'Medical Technologist',
'Laboratory Technician', 'Physical Therapist', 'Respiratory Therapist'
])
if dtype == 'SUPPORT':
return random.choice([
'Security Officer', 'Security Guard', 'Maintenance Technician',
'Housekeeping Supervisor', 'Housekeeper', 'Food Service Manager',
'Cook', 'Kitchen Assistant', 'Transport Aide', 'Receptionist'
])
return random.choice([
'Chief Executive Officer', 'Chief Operating Officer', 'Administrator',
'Assistant Administrator', 'Department Manager', 'Supervisor',
'Administrative Assistant', 'Secretary', 'Clerk', 'Coordinator'
])
def infer_role_from_title(job_title: str) -> str:
jt = (job_title or '').lower()
if 'physician' in jt:
return Employee.Role.PHYSICIAN
if 'nurse practitioner' in jt:
return Employee.Role.NURSE_PRACTITIONER
if 'nurse' in jt:
return Employee.Role.NURSE
if 'pharmac' in jt:
return Employee.Role.PHARMACIST
if 'radiolog' in jt and 'techn' not in jt:
return Employee.Role.RADIOLOGIST
if 'radiolog' in jt and 'techn' in jt:
return Employee.Role.RAD_TECH
if 'laborator' in jt:
return Employee.Role.LAB_TECH
if any(k in jt for k in ['chief', 'director', 'manager', 'admin']):
return Employee.Role.ADMIN
return Employee.Role.CLERICAL
# ----------------------------
# Employee creation / update
# ----------------------------
def create_or_update_saudi_employees(tenants, departments_by_tenant, employees_per_tenant=150):
"""
Ensure each tenant has ~employees_per_tenant Employees.
- For existing Users (with Employee via signal), update their Employee fields.
- If we still need more, create additional Users (signal creates Employee), then update.
"""
all_employees = []
for tenant in tenants:
depts = departments_by_tenant[tenant]
if not depts:
print(f"No departments for {tenant.name}, skipping employees...")
continue
# Existing users -> employees
tenant_users = list(User.objects.filter(tenant=tenant).order_by('user_id'))
employees_existing = [u.employee_profile for u in tenant_users if hasattr(u, 'employee_profile')]
num_existing = len(employees_existing)
# Create more users if we need more employees
to_create = max(0, employees_per_tenant - num_existing)
for _ in range(to_create):
gender = random.choice([Employee.Gender.MALE, Employee.Gender.FEMALE])
first = random.choice(SAUDI_FIRST_NAMES_MALE if gender == Employee.Gender.MALE else SAUDI_FIRST_NAMES_FEMALE)
father = random.choice(SAUDI_FIRST_NAMES_MALE)
grandfather = random.choice(SAUDI_FIRST_NAMES_MALE)
last = random.choice(SAUDI_LAST_NAMES)
base_username = f"{first.lower()}.{last.lower().replace('-', '').replace('al', '')}"
username = tenant_scoped_unique_username(tenant, base_username)
email = f"{username}@{tenant.name.lower().replace(' ', '').replace('-', '')}.sa"
u = User.objects.create(
tenant=tenant,
username=username,
email=email,
first_name=first,
father_name=father,
grandfather_name=grandfather,
last_name=last,
is_active=True,
)
u.set_password('Hospital@123')
u.save()
tenant_users.append(u) # signal creates Employee
# Now (re)populate employee HR data
for u in tenant_users:
emp = getattr(u, 'employee_profile', None)
if not emp:
continue
# Basic personal
if not emp.first_name:
emp.first_name = u.first_name or emp.first_name
if not emp.last_name:
emp.last_name = u.last_name or emp.last_name
if not emp.email:
emp.email = u.email
# Demographics
if not emp.gender:
emp.gender = random.choice([Employee.Gender.MALE, Employee.Gender.FEMALE, Employee.Gender.OTHER])
if not emp.marital_status:
emp.marital_status = random.choice([
Employee.MaritalStatus.SINGLE, Employee.MaritalStatus.MARRIED,
Employee.MaritalStatus.DIVORCED, Employee.MaritalStatus.WIDOWED,
Employee.MaritalStatus.SEPARATED, Employee.MaritalStatus.OTHER
])
if not emp.date_of_birth:
# 2255 years old
emp.date_of_birth = (django_timezone.now().date()
- timedelta(days=random.randint(22*365, 55*365)))
# Contact E.164 (both are mobiles by model design)
if not emp.phone:
emp.phone = e164_ksa_mobile()
if not emp.mobile_phone:
emp.mobile_phone = e164_ksa_mobile()
# Address
if not emp.address_line_1:
emp.address_line_1 = f"{random.randint(1, 999)} {random.choice(['King Fahd Rd', 'Prince Sultan St', 'Olaya St'])}"
if not emp.city:
emp.city = random.choice(SAUDI_CITIES)
if not emp.postal_code:
emp.postal_code = f"{random.randint(10000, 99999)}"
if not emp.country:
emp.country = 'Saudi Arabia'
# Org
if not emp.department:
emp.department = random.choice(depts)
if not emp.job_title:
emp.job_title = pick_job_title_for_department(emp.department)
# Role (derive from job title if not set)
if not emp.role or emp.role == Employee.Role.GUEST:
emp.role = infer_role_from_title(emp.job_title)
# Employment
if not emp.employment_type:
emp.employment_type = random.choice([
Employee.EmploymentType.FULL_TIME, Employee.EmploymentType.PART_TIME,
Employee.EmploymentType.CONTRACT, Employee.EmploymentType.TEMPORARY,
Employee.EmploymentType.INTERN, Employee.EmploymentType.VOLUNTEER,
Employee.EmploymentType.PER_DIEM, Employee.EmploymentType.CONSULTANT
])
if not emp.hire_date:
emp.hire_date = django_timezone.now().date() - timedelta(days=random.randint(30, 2000))
if not emp.employment_status:
emp.employment_status = random.choices(
[Employee.EmploymentStatus.ACTIVE, Employee.EmploymentStatus.INACTIVE, Employee.EmploymentStatus.LEAVE],
weights=[85, 10, 5]
)[0]
# If terminated, set a termination_date after hire_date
if emp.employment_status == Employee.EmploymentStatus.TERMINATED and not emp.termination_date:
emp.termination_date = emp.hire_date + timedelta(days=random.randint(30, 1000))
# Licensure (optional by role/title)
jt_lower = (emp.job_title or '').lower()
if not emp.license_number and any(k in jt_lower for k in ['physician', 'nurse', 'pharmacist', 'radiolog']):
emp.license_number = f"LIC-{random.randint(100000, 999999)}"
emp.license_expiry_date = django_timezone.now().date() + timedelta(days=random.randint(180, 1095))
emp.license_state = random.choice(['Riyadh Province', 'Makkah Province', 'Eastern Province', 'Asir Province'])
if 'physician' in jt_lower and not emp.npi_number:
emp.npi_number = f"SA{random.randint(1000000, 9999999)}"
# Preferences
emp.user_timezone = 'Asia/Riyadh'
if not emp.language:
emp.language = random.choice(['ar', 'en', 'ar_SA'])
if not emp.theme:
emp.theme = random.choice([Employee.Theme.LIGHT, Employee.Theme.DARK, Employee.Theme.AUTO])
emp.save()
all_employees.append(emp)
print(f"Employees ready for {tenant.name}: {len([e for e in all_employees if e.tenant == tenant])}")
return all_employees
# ----------------------------
# Scheduling
# ----------------------------
def create_saudi_schedules(employees, schedules_per_employee=2):
"""Create work schedules for employees based on department type."""
schedules = []
schedule_patterns = {
'DAY_SHIFT': {
'sunday': {'start': '07:00', 'end': '19:00'},
'monday': {'start': '07:00', 'end': '19:00'},
'tuesday': {'start': '07:00', 'end': '19:00'},
'wednesday': {'start': '07:00', 'end': '19:00'},
'thursday': {'start': '07:00', 'end': '19:00'},
'friday': {'start': '07:00', 'end': '19:00'},
'saturday': {'start': '07:00', 'end': '19:00'},
},
'NIGHT_SHIFT': {
'sunday': {'start': '19:00', 'end': '07:00'},
'monday': {'start': '19:00', 'end': '07:00'},
'tuesday': {'start': '19:00', 'end': '07:00'},
'wednesday': {'start': '19:00', 'end': '07:00'},
'thursday': {'start': '19:00', 'end': '07:00'},
'friday': {'start': '19:00', 'end': '07:00'},
'saturday': {'start': '19:00', 'end': '07:00'},
},
'ADMIN_HOURS': {
'sunday': {'start': '08:00', 'end': '17:00'},
'monday': {'start': '08:00', 'end': '17:00'},
'tuesday': {'start': '08:00', 'end': '17:00'},
'wednesday': {'start': '08:00', 'end': '17:00'},
'thursday': {'start': '08:00', 'end': '16:00'},
'friday': 'off',
'saturday': 'off',
}
}
clinical = [e for e in employees if e.department and getattr(e.department, 'department_type', '') == 'CLINICAL' and e.employment_status == Employee.EmploymentStatus.ACTIVE]
admin = [e for e in employees if e.department and getattr(e.department, 'department_type', '') in ['ADMINISTRATIVE', 'SUPPORT'] and e.employment_status == Employee.EmploymentStatus.ACTIVE]
# Clinical staff: mix of day/night
for emp in clinical:
for i in range(schedules_per_employee):
pattern_name = random.choice(['DAY_SHIFT', 'NIGHT_SHIFT'])
effective_date = django_timezone.now().date() - timedelta(days=random.randint(0, 180))
end_date = effective_date + timedelta(days=random.randint(90, 365)) if random.choice([True, False]) else None
try:
s = Schedule.objects.create(
employee=emp,
name=f"{emp.get_full_name()} - {pattern_name.replace('_', ' ').title()}",
description=f"{pattern_name.replace('_', ' ').title()} schedule for {emp.job_title}",
schedule_type=random.choice(['REGULAR', 'ROTATING']),
effective_date=effective_date,
end_date=end_date,
schedule_pattern=schedule_patterns[pattern_name],
is_active=(i == 0), # first one active
notes=f"Standard {pattern_name.replace('_', ' ').lower()} for clinical staff",
created_at=django_timezone.now() - timedelta(days=random.randint(1, 90)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
schedules.append(s)
except Exception as e:
print(f"Error creating schedule for {emp.get_full_name()}: {e}")
# Admin/support: admin hours
for emp in admin:
try:
s = Schedule.objects.create(
employee=emp,
name=f"{emp.get_full_name()} - Administrative Hours",
description="Standard administrative working hours",
schedule_type='REGULAR',
effective_date=django_timezone.now().date() - timedelta(days=random.randint(0, 180)),
end_date=None,
schedule_pattern=schedule_patterns['ADMIN_HOURS'],
is_active=True,
notes="Standard administrative schedule",
created_at=django_timezone.now() - timedelta(days=random.randint(1, 90)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
schedules.append(s)
except Exception as e:
print(f"Error creating schedule for {emp.get_full_name()}: {e}")
print(f"Created {len(schedules)} employee schedules")
return schedules
def create_schedule_assignments(schedules, days_back=30):
"""Create specific schedule assignments for active schedules."""
assignments = []
for s in schedules:
if not s.is_active:
continue
start_date = django_timezone.now().date() - timedelta(days=days_back)
for day_offset in range(days_back):
assignment_date = start_date + timedelta(days=day_offset)
weekday = assignment_date.strftime('%A').lower()
day_pat = s.schedule_pattern.get(weekday)
if not day_pat or day_pat == 'off':
continue
try:
start_time = datetime.strptime(day_pat['start'], '%H:%M').time()
end_time = datetime.strptime(day_pat['end'], '%H:%M').time()
except Exception:
continue
# Infer shift type from hours
shift_type = 'NIGHT' if end_time <= start_time else 'DAY'
status = random.choices(
['COMPLETED', 'NO_SHOW', 'CANCELLED'] if assignment_date < django_timezone.now().date() else ['SCHEDULED', 'CONFIRMED'],
weights=[90, 5, 5] if assignment_date < django_timezone.now().date() else [70, 30]
)[0]
try:
a = ScheduleAssignment.objects.create(
schedule=s,
assignment_date=assignment_date,
start_time=start_time,
end_time=end_time,
shift_type=shift_type,
department=s.employee.department,
location=s.employee.department.location if s.employee.department else None,
status=status,
break_minutes=15 if shift_type == 'DAY' else 30,
lunch_minutes=30 if shift_type == 'DAY' else 0,
notes=f"{shift_type.title()} shift assignment",
created_at=django_timezone.now() - timedelta(days=random.randint(0, 7)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 3))
)
assignments.append(a)
except Exception as e:
print(f"Error creating assignment for {s.employee.get_full_name()}: {e}")
print(f"Created {len(assignments)} schedule assignments")
return assignments
def create_time_entries(employees, days_back=30):
"""Create time entries for ACTIVE employees."""
entries = []
active_emps = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE and e.hire_date]
for emp in active_emps:
start_date = django_timezone.now().date() - timedelta(days=days_back)
for d in range(days_back):
work_date = start_date + timedelta(days=d)
# Skip admin weekend days (Fri/Sat)
if emp.department and getattr(emp.department, 'department_type', '') == 'ADMINISTRATIVE' and work_date.weekday() in [4, 5]:
continue
work_probability = 0.85 if emp.department and getattr(emp.department, 'department_type', '') == 'CLINICAL' else 0.90
if random.random() > work_probability:
continue
# Typical shift windows
if emp.department and getattr(emp.department, 'department_type', '') == 'CLINICAL':
shift_opts = [
(time(7, 0), time(15, 0)),
(time(15, 0), time(23, 0)),
(time(23, 0), time(7, 0)),
]
start_t, end_t = random.choice(shift_opts)
else:
start_t = time(8, 0)
end_t = time(17, 0) if work_date.weekday() != 4 else time(12, 0)
cin_var = random.randint(-15, 15)
cout_var = random.randint(-15, 15)
clock_in = datetime.combine(work_date, start_t) + timedelta(minutes=cin_var)
clock_out = datetime.combine(work_date, end_t) + timedelta(minutes=cout_var)
if end_t < start_t:
clock_out += timedelta(days=1)
break_start = clock_in + timedelta(hours=2, minutes=random.randint(0, 60))
break_end = break_start + timedelta(minutes=15)
lunch_start = clock_in + timedelta(hours=4, minutes=random.randint(0, 60))
lunch_end = lunch_start + timedelta(minutes=30)
entry_type = random.choices(['REGULAR', 'OVERTIME', 'HOLIDAY'], weights=[85, 10, 5])[0]
status = random.choices(['APPROVED', 'SUBMITTED', 'DRAFT'], weights=[70, 20, 10])[0]
try:
e = TimeEntry.objects.create(
employee=emp,
work_date=work_date,
clock_in_time=clock_in,
clock_out_time=clock_out,
break_start_time=break_start,
break_end_time=break_end,
lunch_start_time=lunch_start,
lunch_end_time=lunch_end,
entry_type=entry_type,
department=emp.department,
location=emp.department.location if emp.department else None,
status=status,
notes=f"{entry_type.title()} work shift",
created_at=django_timezone.now() - timedelta(days=random.randint(0, 7)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 3))
)
entries.append(e)
except Exception as ex:
print(f"Error creating time entry for {emp.get_full_name()}: {ex}")
print(f"Created {len(entries)} time entries")
return entries
def create_performance_reviews(employees):
"""Create 12 performance reviews for employees with ≥ 6 months service."""
reviews = []
today = django_timezone.now().date()
eligible = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE and e.hire_date and (today - e.hire_date).days >= 180]
competency_areas = [
'Clinical Skills', 'Communication', 'Teamwork', 'Professionalism',
'Quality of Work', 'Productivity', 'Problem Solving', 'Initiative',
'Reliability', 'Cultural Competency', 'Patient Care', 'Safety Compliance'
]
for emp in eligible:
for _ in range(random.randint(1, 2)):
review_date = today - timedelta(days=random.randint(0, 90))
period_start = review_date - timedelta(days=random.randint(180, 360))
period_end = review_date - timedelta(days=30)
review_type = random.choices(['ANNUAL', 'PROBATIONARY', 'MID_YEAR'], weights=[60, 20, 20])[0]
overall = Decimal(str(random.choices([2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0],
weights=[5, 10, 20, 25, 25, 10, 5])[0]))
ratings = {c: Decimal(str(random.choices([2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0],
weights=[5, 10, 20, 25, 25, 10, 5])[0]))
for c in random.sample(competency_areas, random.randint(6, 10))}
status = random.choices(['COMPLETED', 'ACKNOWLEDGED', 'IN_PROGRESS'], weights=[60, 30, 10])[0]
try:
r = PerformanceReview.objects.create(
employee=emp,
review_period_start=period_start,
review_period_end=period_end,
review_date=review_date,
review_type=review_type,
overall_rating=overall,
competency_ratings=ratings,
goals_achieved=f"{emp.first_name} contributed to department objectives.",
goals_not_achieved=("Needs improvement in documentation timeliness." if overall < 4 else None),
future_goals="Maintain high standards of patient care; grow leadership skills.",
strengths=f"Strong {random.choice(['clinical skills', 'communication', 'teamwork'])}.",
areas_for_improvement=("Leadership and mentoring junior staff." if overall < 4.5 else None),
development_plan="Advanced training and leadership opportunities recommended.",
training_recommendations=f"Suggested training: {random.choice(SAUDI_TRAINING_PROGRAMS)}",
employee_comments=("Appreciate the feedback and committed to improvement."
if status == 'ACKNOWLEDGED' else None),
employee_signature_date=(review_date + timedelta(days=random.randint(1, 14))
if status == 'ACKNOWLEDGED' else None),
status=status,
notes=f"{review_type.title()} performance review completed",
created_at=django_timezone.now() - timedelta(days=random.randint(1, 60)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
reviews.append(r)
except Exception as ex:
print(f"Error creating performance review for {emp.get_full_name()}: {ex}")
print(f"Created {len(reviews)} performance reviews")
return reviews
def create_training_records(employees):
"""Create training records for ACTIVE employees (role-aware)."""
records = []
active = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE and e.hire_date]
training_by_role = {
'ALL': ['Orientation', 'Fire Safety', 'Emergency Procedures', 'HIPAA Compliance', 'Patient Safety'],
'CLINICAL': ['Basic Life Support (BLS)', 'Infection Control', 'Medication Administration',
'Wound Care Management'],
'PHYSICIAN': ['Advanced Cardiac Life Support (ACLS)', 'Pediatric Advanced Life Support (PALS)'],
'NURSE': ['IV Therapy', 'Pain Management', 'Electronic Health Records'],
'ADMIN': ['Customer Service Excellence', 'Quality Improvement', 'Arabic Language'],
'SUPPORT': ['Safety Training', 'Equipment Operation', 'Cultural Sensitivity']
}
for emp in active:
mandatory = training_by_role['ALL'][:]
dtype = getattr(emp.department, 'department_type', '') if emp.department else ''
if dtype == 'CLINICAL':
mandatory += training_by_role['CLINICAL']
jt = (emp.job_title or '').lower()
if 'physician' in jt:
mandatory += training_by_role['PHYSICIAN']
elif 'nurse' in jt:
mandatory += training_by_role['NURSE']
elif dtype == 'ADMINISTRATIVE':
mandatory += training_by_role['ADMIN']
else:
mandatory += training_by_role['SUPPORT']
all_training = list(set(mandatory + random.sample(SAUDI_TRAINING_PROGRAMS, random.randint(2, 5))))
days_since_hire = max(1, (django_timezone.now().date() - emp.hire_date).days)
for tname in all_training:
training_date = emp.hire_date + timedelta(days=random.randint(0, days_since_hire))
completion_date = training_date + timedelta(days=random.randint(0, 7))
if tname == 'Orientation':
ttype = 'ORIENTATION'
elif tname in ['Fire Safety', 'Emergency Procedures', 'HIPAA Compliance', 'Patient Safety']:
ttype = 'MANDATORY'
elif 'Certification' in tname or any(k in tname for k in ['BLS', 'ACLS', 'PALS']):
ttype = 'CERTIFICATION'
elif tname in ['Safety Training', 'Infection Control']:
ttype = 'SAFETY'
else:
ttype = 'CONTINUING_ED'
if ttype == 'CERTIFICATION':
duration = Decimal(str(random.randint(8, 16)))
expiry_date = completion_date + timedelta(days=random.randint(365, 730))
elif ttype == 'MANDATORY':
duration = Decimal(str(random.randint(2, 8)))
expiry_date = None
else:
duration = Decimal(str(random.randint(1, 4)))
expiry_date = None
status = random.choices(['COMPLETED', 'IN_PROGRESS', 'SCHEDULED'], weights=[80, 15, 5])[0]
score = Decimal(str(random.randint(75, 100))) if status == 'COMPLETED' else None
passed = bool(score and score >= 70)
try:
rec = TrainingRecord.objects.create(
employee=emp,
# training_name=tname,
# training_description=f"Comprehensive {tname.lower()} training program",
# training_type=ttype,
# training_provider=random.choice([
# 'Saudi Healthcare Training Institute',
# 'Ministry of Health Training Center',
# 'King Fahd Medical Training Academy',
# 'Internal Training Department'
# ]),
# instructor=f"Dr. {random.choice(SAUDI_FIRST_NAMES_MALE + SAUDI_FIRST_NAMES_FEMALE)} {random.choice(SAUDI_LAST_NAMES)}",
# training_date=training_date,
completion_date=completion_date if status == 'COMPLETED' else None,
# expiry_date=expiry_date,
# duration_hours=duration,
credits_earned=duration if status == 'COMPLETED' else Decimal('0.00'),
status=status,
score=score,
passed=passed,
# certificate_number=(f"CERT-{random.randint(100000, 999999)}"
# if status == 'COMPLETED' and ttype == 'CERTIFICATION' else None),
# certification_body=('Saudi Healthcare Certification Board' if ttype == 'CERTIFICATION' else None),
# cost=Decimal(str(random.randint(500, 5000))),
notes=(f"{tname} training completed successfully" if status == 'COMPLETED' else f"{tname} training in progress"),
created_at=django_timezone.now() - timedelta(days=random.randint(1, 30)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 15))
)
records.append(rec)
except Exception as ex:
print(f"Error creating training record for {emp.get_full_name()}: {ex}")
print(f"Created {len(records)} training records")
return records
# ----------------------------
# Orchestration
# ----------------------------
def main():
print("Starting Saudi Healthcare HR Data Generation...")
tenants = list(Tenant.objects.all())
if not tenants:
print("❌ No tenants found. Please run the core/tenant data generator first.")
return
# 1) Departments
print("\n1. Creating Saudi Hospital Departments...")
departments_by_tenant = {t: ensure_departments(t) for t in tenants}
# 2) Employees (create missing Users -> Employees via signal, then populate HR fields)
print("\n2. Creating/Updating Saudi Hospital Employees...")
employees = create_or_update_saudi_employees(tenants, departments_by_tenant, employees_per_tenant=120)
# 3) Department heads (simple heuristic)
print("\n3. Assigning Department Heads...")
for tenant in tenants:
tenant_depts = departments_by_tenant[tenant]
for dept in tenant_depts:
if getattr(dept, 'department_head_id', None):
continue
dept_emps = [e for e in employees if e.department_id == dept.id and e.employment_status == Employee.EmploymentStatus.ACTIVE]
senior = [e for e in dept_emps if any(k in (e.job_title or '') for k in ['Chief', 'Director', 'Head', 'Manager'])]
if senior:
dept.department_head = random.choice(senior)
try:
dept.save(update_fields=['department_head'])
except Exception as ex:
print(f"Could not set head for {dept.name}: {ex}")
# 4) Supervisors (intra-dept)
print("\n4. Assigning Supervisors...")
for e in employees:
if getattr(e, 'supervisor_id', None) or e.employment_status != Employee.EmploymentStatus.ACTIVE:
continue
if any(k in (e.job_title or '') for k in ['Chief', 'Director']):
continue
peers = [p for p in employees
if p.department_id == e.department_id
and p.id != e.id
and p.employment_status == Employee.EmploymentStatus.ACTIVE
and any(k in (p.job_title or '') for k in ['Chief', 'Director', 'Head', 'Manager', 'Senior'])]
if peers:
e.supervisor = random.choice(peers)
try:
e.save(update_fields=['supervisor'])
except Exception as ex:
print(f"Could not set supervisor for {e.get_full_name()}: {ex}")
# 5) Schedules
print("\n5. Creating Employee Work Schedules...")
schedules = create_saudi_schedules(employees, schedules_per_employee=2)
# 6) Schedule assignments
print("\n6. Creating Schedule Assignments...")
assignments = create_schedule_assignments(schedules, days_back=30)
# 7) Time entries
print("\n7. Creating Employee Time Entries...")
time_entries = create_time_entries(employees, days_back=30)
# 8) Performance reviews
print("\n8. Creating Performance Reviews...")
reviews = create_performance_reviews(employees)
# 9) Training records
print("\n9. Creating Training Records...")
# training_records = create_training_records(employees)
print(f"\n✅ Saudi Healthcare HR Data Generation Complete!")
print(f"📊 Summary:")
print(f" - Tenants: {len(tenants)}")
print(f" - Departments: {sum(len(v) for v in departments_by_tenant.values())}")
print(f" - Employees: {len(employees)}")
print(f" - Schedules: {len(schedules)}")
print(f" - Schedule Assignments: {len(assignments)}")
print(f" - Time Entries: {len(time_entries)}")
print(f" - Performance Reviews: {len(reviews)}")
# print(f" - Training Records: {len(training_records)}")
# Distribution summaries
dept_counts = {}
for e in employees:
if e.department:
t = e.department.department_type
dept_counts[t] = dept_counts.get(t, 0) + 1
print(f"\n🏥 Employee Distribution by Department Type:")
for t, c in sorted(dept_counts.items()):
print(f" - {t.title()}: {c}")
status_counts = {}
for e in employees:
status_counts[e.employment_status] = status_counts.get(e.employment_status, 0) + 1
print(f"\n👥 Employee Status Distribution:")
for s, c in sorted(status_counts.items()):
print(f" - {s.title()}: {c}")
return {
'departments_by_tenant': departments_by_tenant,
'employees': employees,
'schedules': schedules,
'assignments': assignments,
'time_entries': time_entries,
'reviews': reviews,
# 'training_records': training_records,
}
if __name__ == "__main__":
main()