Marwan Alwali 263292f6be update
2025-11-04 00:50:06 +03:00

2243 lines
102 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# scripts/hr_data_generator.py
"""
Comprehensive HR Data Generator for Hospital Management System
Generates realistic Saudi healthcare HR data including:
- Employees with complete profiles
- Departments with organizational structure
- Work schedules and assignments
- Time tracking entries
- Performance reviews
- Complete training management system
"""
import os
import django
# Set up Django environment
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hospital_management.settings')
try:
django.setup()
except Exception as e:
print(f"Django setup failed: {e}")
print("Please run this script using: python manage.py shell -c \"exec(open('hr_data.py').read())\"")
exit(1)
import random
from datetime import datetime, timedelta, date, time
from decimal import Decimal
from django.utils import timezone as django_timezone
from django.core.exceptions import ValidationError
from hr.models import (
Employee, Department, Schedule, ScheduleAssignment, TimeEntry, PerformanceReview,
TrainingPrograms, TrainingSession, TrainingRecord, ProgramModule, ProgramPrerequisite,
TrainingAttendance, TrainingAssessment, TrainingCertificates,
LeaveType, LeaveBalance, LeaveRequest, LeaveApproval, LeaveDelegate,
SalaryInformation, SalaryAdjustment, DocumentRequest, DocumentTemplate
)
from accounts.models import User
from core.models import Tenant
# ----------------------------
# Saudi-specific data constants
# ----------------------------
SAUDI_FIRST_NAMES_MALE = [
'Mohammed', 'Ahmed', 'Abdullah', 'Khalid', 'Omar', 'Ali', 'Hassan', 'Ibrahim',
'Yousef', 'Fahd', 'Faisal', 'Saud', 'Nasser', 'Abdulaziz', 'Abdulrahman',
'Majid', 'Saeed', 'Waleed', 'Tariq', 'Mansour', 'Sultan', 'Bandar',
'Turki', 'Nawaf', 'Rayan', 'Ziad', 'Adel', 'Salman', 'Fares', 'Amjad'
]
SAUDI_FIRST_NAMES_FEMALE = [
'Fatima', 'Aisha', 'Khadija', 'Maryam', 'Zahra', 'Noor', 'Layla', 'Sara',
'Hala', 'Noura', 'Reem', 'Lina', 'Dina', 'Rana', 'Jana', 'Maya',
'Amal', 'Hanan', 'Widad', 'Nada', 'Rawan', 'Ghada', 'Samar', 'Hind',
'Munira', 'Rahma', 'Najla', 'Dalal', 'Abeer', 'Manal'
]
SAUDI_LAST_NAMES = [
'Al-Rashid', 'Al-Otaibi', 'Al-Dosari', 'Al-Harbi', 'Al-Zahrani', 'Al-Ghamdi',
'Al-Qahtani', 'Al-Maliki', 'Al-Shammari', 'Al-Mutairi', 'Al-Subai', 'Al-Shamsi',
'Al-Faisal', 'Al-Saud', 'Al-Thani', 'Al-Fahd', 'Al-Sultan', 'Al-Nasser',
'Al-Mansour', 'Al-Khalil', 'Al-Ibrahim', 'Al-Hassan', 'Al-Ali', 'Al-Omar',
'Al-Ahmed', 'Al-Mohammed'
]
SAUDI_CITIES = [
'Riyadh', 'Jeddah', 'Mecca', 'Medina', 'Dammam', 'Khobar', 'Dhahran',
'Taif', 'Tabuk', 'Buraidah', 'Khamis Mushait', 'Hail', 'Hofuf', 'Najran',
'Jazan', 'Yanbu', 'Abha', 'Arar', 'Sakaka', 'Qatif'
]
SAUDI_DEPARTMENTS = [
('EMERGENCY', 'Emergency Department', 'Emergency medical services'),
('ICU', 'Intensive Care Unit', 'Critical care services'),
('CARDIOLOGY', 'Cardiology Department', 'Heart and cardiovascular care'),
('SURGERY', 'General Surgery', 'Surgical services'),
('ORTHOPEDICS', 'Orthopedics Department', 'Bone and joint care'),
('PEDIATRICS', 'Pediatrics Department', 'Children healthcare'),
('OBSTETRICS', 'Obstetrics & Gynecology', 'Women and maternity care'),
('RADIOLOGY', 'Radiology Department', 'Medical imaging services'),
('LABORATORY', 'Laboratory Services', 'Diagnostic testing'),
('PHARMACY', 'Pharmacy Department', 'Medication services'),
('NURSING', 'Nursing Services', 'Patient care services'),
('ADMINISTRATION', 'Administration', 'Hospital administration'),
('FINANCE', 'Finance Department', 'Financial management'),
('HR', 'Human Resources', 'Staff management'),
('IT', 'Information Technology', 'Technology services'),
('MAINTENANCE', 'Maintenance Services', 'Facility maintenance'),
('SECURITY', 'Security Department', 'Hospital security'),
('HOUSEKEEPING', 'Housekeeping Services', 'Cleaning services'),
('FOOD_SERVICE', 'Food Services', 'Dietary services'),
('SOCIAL_WORK', 'Social Work', 'Patient social services')
]
SAUDI_TRAINING_PROGRAMS = [
'Basic Life Support (BLS)', 'Advanced Cardiac Life Support (ACLS)',
'Pediatric Advanced Life Support (PALS)', 'Infection Control',
'Patient Safety', 'Fire Safety', 'Emergency Procedures',
'HIPAA Compliance', 'Cultural Sensitivity', 'Arabic Language',
'Islamic Healthcare Ethics', 'Medication Administration',
'Wound Care Management', 'Electronic Health Records',
'Quality Improvement', 'Customer Service Excellence'
]
# ----------------------------
# Helpers
# ----------------------------
def e164_ksa_mobile() -> str:
"""Return +9665XXXXXXXX to satisfy Employee.e164_ksa_regex."""
return f"+9665{random.randint(10000000, 99999999)}"
def ensure_departments(tenant):
"""
Ensure Department objects exist for this tenant; return a list.
If Department is global (no tenant field), operate globally.
"""
dept_fields = {f.name for f in Department._meta.fields}
is_tenant_scoped = 'tenant' in dept_fields
qset = Department.objects.filter(tenant=tenant) if is_tenant_scoped else Department.objects.all()
existing_codes = set(qset.values_list('code', flat=True))
created = []
for code, name, desc in SAUDI_DEPARTMENTS:
if code in existing_codes:
continue
kwargs = dict(code=code, name=name, description=desc,
department_type=('CLINICAL' if code in
['EMERGENCY', 'ICU', 'CARDIOLOGY', 'SURGERY', 'ORTHOPEDICS',
'PEDIATRICS', 'OBSTETRICS', 'RADIOLOGY', 'LABORATORY', 'PHARMACY', 'NURSING']
else 'ADMINISTRATIVE' if code in ['ADMINISTRATION', 'FINANCE', 'HR', 'IT']
else 'SUPPORT' if code in ['MAINTENANCE', 'SECURITY', 'HOUSEKEEPING', 'FOOD_SERVICE']
else 'ANCILLARY'),
annual_budget=Decimal(str(random.randint(500000, 5000000))),
cost_center=f"CC-{code}",
location=f"{random.choice(['Building A', 'Building B', 'Main Building'])}, Floor {random.randint(1, 5)}",
is_active=True,
notes=f"Primary {name.lower()} for {tenant.name}",
created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30)))
if is_tenant_scoped:
kwargs['tenant'] = tenant
try:
created.append(Department.objects.create(**kwargs))
except Exception as e:
print(f"Error creating department {code} for {tenant.name}: {e}")
# Return all for this tenant (or global)
qset = Department.objects.filter(tenant=tenant) if is_tenant_scoped else Department.objects.all()
print(f"Departments ready for {tenant.name}: {qset.count()}")
return list(qset)
def tenant_scoped_unique_username(tenant, base_username: str) -> str:
"""Make username unique within a tenant (AUTH is tenant-scoped)."""
uname = base_username
i = 1
while User.objects.filter(tenant=tenant, username=uname).exists():
i += 1
uname = f"{base_username}{i}"
return uname
def pick_job_title_for_department(department) -> str:
dtype = getattr(department, 'department_type', 'ADMINISTRATIVE') or 'ADMINISTRATIVE'
if dtype == 'CLINICAL':
return random.choice([
'Consultant Physician', 'Senior Physician', 'Physician', 'Resident Physician', 'Intern',
'Chief Nurse', 'Nurse Manager', 'Senior Nurse', 'Staff Nurse',
'Nurse Practitioner', 'Clinical Nurse Specialist', 'Charge Nurse',
'Pharmacist', 'Clinical Pharmacist', 'Pharmacy Technician',
'Radiologist', 'Radiology Technician', 'Medical Technologist',
'Laboratory Technician', 'Physical Therapist', 'Respiratory Therapist'
])
if dtype == 'SUPPORT':
return random.choice([
'Security Officer', 'Security Guard', 'Maintenance Technician',
'Housekeeping Supervisor', 'Housekeeper', 'Food Service Manager',
'Cook', 'Kitchen Assistant', 'Transport Aide', 'Receptionist'
])
return random.choice([
'Chief Executive Officer', 'Chief Operating Officer', 'Administrator',
'Assistant Administrator', 'Department Manager', 'Supervisor',
'Administrative Assistant', 'Secretary', 'Clerk', 'Coordinator'
])
def infer_role_from_title(job_title: str) -> str:
jt = (job_title or '').lower()
if 'physician' in jt:
return Employee.Role.PHYSICIAN
if 'nurse practitioner' in jt:
return Employee.Role.NURSE_PRACTITIONER
if 'nurse' in jt:
return Employee.Role.NURSE
if 'pharmac' in jt:
return Employee.Role.PHARMACIST
if 'radiolog' in jt and 'techn' not in jt:
return Employee.Role.RADIOLOGIST
if 'radiolog' in jt and 'techn' in jt:
return Employee.Role.RAD_TECH
if 'laborator' in jt:
return Employee.Role.LAB_TECH
if any(k in jt for k in ['chief', 'director', 'manager', 'admin']):
return Employee.Role.ADMIN
return Employee.Role.CLERICAL
# ----------------------------
# Employee creation / update
# ----------------------------
def create_or_update_saudi_employees(tenants, departments_by_tenant, employees_per_tenant=150):
"""
Ensure each tenant has ~employees_per_tenant Employees.
- For existing Users (with Employee via signal), update their Employee fields.
- If we still need more, create additional Users (signal creates Employee), then update.
"""
all_employees = []
for tenant in tenants:
depts = departments_by_tenant[tenant]
if not depts:
print(f"No departments for {tenant.name}, skipping employees...")
continue
# Existing users -> employees
tenant_users = list(User.objects.filter(tenant=tenant).order_by('user_id'))
employees_existing = [u.employee_profile for u in tenant_users if hasattr(u, 'employee_profile')]
num_existing = len(employees_existing)
# Create more users if we need more employees
to_create = max(0, employees_per_tenant - num_existing)
print(f"Creating {to_create} new employees for {tenant.name} (existing: {num_existing})")
for i in range(to_create):
try:
gender = random.choice([Employee.Gender.MALE, Employee.Gender.FEMALE])
first = random.choice(SAUDI_FIRST_NAMES_MALE if gender == Employee.Gender.MALE else SAUDI_FIRST_NAMES_FEMALE)
father = random.choice(SAUDI_FIRST_NAMES_MALE)
grandfather = random.choice(SAUDI_FIRST_NAMES_MALE)
last = random.choice(SAUDI_LAST_NAMES)
base_username = f"{first.lower()}.{last.lower().replace('-', '').replace('al', '')}"
username = tenant_scoped_unique_username(tenant, base_username)
email = f"{username}@{tenant.name.lower().replace(' ', '').replace('-', '')}.sa"
u = User.objects.create(
tenant=tenant,
username=username,
email=email,
first_name=first,
last_name=last,
is_active=True,
)
u.set_password('Hospital@123')
u.save()
tenant_users.append(u) # signal creates Employee
if (i + 1) % 50 == 0:
print(f" Created {i + 1}/{to_create} users for {tenant.name}")
except Exception as e:
print(f"Error creating user {i+1} for {tenant.name}: {e}")
continue
# Now (re)populate employee HR data
print(f"Updating employee profiles for {tenant.name}...")
for i, u in enumerate(tenant_users):
try:
emp = getattr(u, 'employee_profile', None)
if not emp:
continue
# Basic personal
if not emp.first_name:
emp.first_name = u.first_name or emp.first_name
if not emp.last_name:
emp.last_name = u.last_name or emp.last_name
if not emp.email:
emp.email = u.email
# Add father and grandfather names
if not emp.father_name:
emp.father_name = random.choice(SAUDI_FIRST_NAMES_MALE)
if not emp.grandfather_name:
emp.grandfather_name = random.choice(SAUDI_FIRST_NAMES_MALE)
# ID information
if not emp.identification_number:
emp.identification_number = f"{random.randint(1000000000, 9999999999)}"
emp.id_type = random.choice([Employee.IdNumberTypes.NATIONAL_ID, Employee.IdNumberTypes.IQAMA])
# Demographics
if not emp.gender:
emp.gender = random.choice([Employee.Gender.MALE, Employee.Gender.FEMALE])
if not emp.marital_status:
emp.marital_status = random.choice([
Employee.MaritalStatus.SINGLE, Employee.MaritalStatus.MARRIED,
Employee.MaritalStatus.DIVORCED, Employee.MaritalStatus.WIDOWED,
Employee.MaritalStatus.SEPARATED
])
if not emp.date_of_birth:
# 2255 years old
emp.date_of_birth = (django_timezone.now().date()
- timedelta(days=random.randint(22*365, 55*365)))
# Contact E.164 (both are mobiles by model design)
if not emp.phone:
emp.phone = e164_ksa_mobile()
if not emp.mobile_phone:
emp.mobile_phone = e164_ksa_mobile()
# Address
if not emp.address_line_1:
emp.address_line_1 = f"{random.randint(1, 999)} {random.choice(['King Fahd Rd', 'Prince Sultan St', 'Olaya St', 'King Abdul Aziz Rd', 'Tahlia St'])}"
if not emp.address_line_2:
if random.choice([True, False]):
emp.address_line_2 = f"Apt {random.randint(1, 50)}"
if not emp.city:
emp.city = random.choice(SAUDI_CITIES)
if not emp.postal_code:
emp.postal_code = f"{random.randint(10000, 99999)}"
if not emp.country:
emp.country = 'Saudi Arabia'
# Org
if not emp.department:
emp.department = random.choice(depts)
if not emp.job_title:
emp.job_title = pick_job_title_for_department(emp.department)
# Role (derive from job title if not set)
if not emp.role or emp.role == Employee.Role.GUEST:
emp.role = infer_role_from_title(emp.job_title)
# Employment
if not emp.employment_type:
emp.employment_type = random.choices([
Employee.EmploymentType.FULL_TIME, Employee.EmploymentType.PART_TIME,
Employee.EmploymentType.CONTRACT, Employee.EmploymentType.TEMPORARY,
Employee.EmploymentType.INTERN, Employee.EmploymentType.CONSULTANT
], weights=[70, 15, 8, 4, 2, 1])[0]
if not emp.hire_date:
emp.hire_date = django_timezone.now().date() - timedelta(days=random.randint(30, 2000))
if not emp.employment_status:
emp.employment_status = random.choices(
[Employee.EmploymentStatus.ACTIVE, Employee.EmploymentStatus.INACTIVE, Employee.EmploymentStatus.LEAVE],
weights=[85, 10, 5]
)[0]
# If terminated, set a termination_date after hire_date
if emp.employment_status == Employee.EmploymentStatus.TERMINATED and not emp.termination_date:
emp.termination_date = emp.hire_date + timedelta(days=random.randint(30, 1000))
# Compensation
if not emp.hourly_rate and emp.employment_type in [Employee.EmploymentType.PART_TIME, Employee.EmploymentType.PER_DIEM]:
emp.hourly_rate = Decimal(str(random.randint(50, 300)))
if not emp.annual_salary and emp.employment_type == Employee.EmploymentType.FULL_TIME:
emp.annual_salary = Decimal(str(random.randint(60000, 500000)))
if not emp.fte_percentage:
if emp.employment_type == Employee.EmploymentType.PART_TIME:
emp.fte_percentage = Decimal(str(random.choice([25, 50, 75])))
else:
emp.fte_percentage = Decimal('100.00')
# Licensure (optional by role/title)
jt_lower = (emp.job_title or '').lower()
if not emp.license_number and any(k in jt_lower for k in ['physician', 'nurse', 'pharmacist', 'radiolog']):
emp.license_number = f"LIC-{random.randint(100000, 999999)}"
emp.license_expiry_date = django_timezone.now().date() + timedelta(days=random.randint(180, 1095))
emp.license_state = random.choice(['Riyadh Province', 'Makkah Province', 'Eastern Province', 'Asir Province'])
if 'physician' in jt_lower and not emp.npi_number:
emp.npi_number = f"SA{random.randint(1000000, 9999999)}"
# Emergency contact
if not emp.emergency_contact_name:
contact_gender = random.choice([Employee.Gender.MALE, Employee.Gender.FEMALE])
contact_first = random.choice(SAUDI_FIRST_NAMES_MALE if contact_gender == Employee.Gender.MALE else SAUDI_FIRST_NAMES_FEMALE)
contact_last = random.choice(SAUDI_LAST_NAMES)
emp.emergency_contact_name = f"{contact_first} {contact_last}"
emp.emergency_contact_relationship = random.choice(['Spouse', 'Parent', 'Sibling', 'Child', 'Friend'])
emp.emergency_contact_phone = e164_ksa_mobile()
# Bio for senior staff
if any(k in jt_lower for k in ['chief', 'director', 'manager', 'senior', 'consultant']):
if not emp.bio:
years_exp = random.randint(5, 25)
emp.bio = f"Experienced healthcare professional with {years_exp} years in {emp.department.name if emp.department else 'healthcare'}. Specialized in {random.choice(['patient care', 'clinical excellence', 'team leadership', 'quality improvement'])}."
# Preferences
emp.user_timezone = 'Asia/Riyadh'
if not emp.language:
emp.language = random.choices(['ar', 'en'], weights=[70, 30])[0]
if not emp.theme:
emp.theme = random.choice([Employee.Theme.LIGHT, Employee.Theme.DARK, Employee.Theme.AUTO])
# Approval status for active employees
if emp.employment_status == Employee.EmploymentStatus.ACTIVE:
emp.is_verified = True
emp.is_approved = True
emp.approval_date = emp.hire_date + timedelta(days=random.randint(1, 30))
emp.save()
all_employees.append(emp)
if (i + 1) % 100 == 0:
print(f" Updated {i + 1}/{len(tenant_users)} employee profiles for {tenant.name}")
except Exception as e:
print(f"Error updating employee profile for user {u.username}: {e}")
continue
print(f"Employees ready for {tenant.name}: {len([e for e in all_employees if e.tenant == tenant])}")
return all_employees
# ----------------------------
# Scheduling
# ----------------------------
def create_saudi_schedules(employees, schedules_per_employee=2):
"""Create work schedules for employees based on department type."""
schedules = []
schedule_patterns = {
'DAY_SHIFT': {
'sunday': {'start': '07:00', 'end': '19:00'},
'monday': {'start': '07:00', 'end': '19:00'},
'tuesday': {'start': '07:00', 'end': '19:00'},
'wednesday': {'start': '07:00', 'end': '19:00'},
'thursday': {'start': '07:00', 'end': '19:00'},
'friday': {'start': '07:00', 'end': '19:00'},
'saturday': {'start': '07:00', 'end': '19:00'},
},
'NIGHT_SHIFT': {
'sunday': {'start': '19:00', 'end': '07:00'},
'monday': {'start': '19:00', 'end': '07:00'},
'tuesday': {'start': '19:00', 'end': '07:00'},
'wednesday': {'start': '19:00', 'end': '07:00'},
'thursday': {'start': '19:00', 'end': '07:00'},
'friday': {'start': '19:00', 'end': '07:00'},
'saturday': {'start': '19:00', 'end': '07:00'},
},
'ADMIN_HOURS': {
'sunday': {'start': '08:00', 'end': '17:00'},
'monday': {'start': '08:00', 'end': '17:00'},
'tuesday': {'start': '08:00', 'end': '17:00'},
'wednesday': {'start': '08:00', 'end': '17:00'},
'thursday': {'start': '08:00', 'end': '16:00'},
'friday': 'off',
'saturday': 'off',
}
}
clinical = [e for e in employees if e.department and getattr(e.department, 'department_type', '') == 'CLINICAL' and e.employment_status == Employee.EmploymentStatus.ACTIVE]
admin = [e for e in employees if e.department and getattr(e.department, 'department_type', '') in ['ADMINISTRATIVE', 'SUPPORT'] and e.employment_status == Employee.EmploymentStatus.ACTIVE]
# Clinical staff: mix of day/night
for emp in clinical:
for i in range(schedules_per_employee):
pattern_name = random.choice(['DAY_SHIFT', 'NIGHT_SHIFT'])
effective_date = django_timezone.now().date() - timedelta(days=random.randint(0, 180))
end_date = effective_date + timedelta(days=random.randint(90, 365)) if random.choice([True, False]) else None
try:
s = Schedule.objects.create(
employee=emp,
name=f"{emp.get_full_name()} - {pattern_name.replace('_', ' ').title()}",
description=f"{pattern_name.replace('_', ' ').title()} schedule for {emp.job_title}",
schedule_type=random.choice(['REGULAR', 'ROTATING']),
effective_date=effective_date,
end_date=end_date,
schedule_pattern=schedule_patterns[pattern_name],
is_active=(i == 0), # first one active
notes=f"Standard {pattern_name.replace('_', ' ').lower()} for clinical staff",
created_at=django_timezone.now() - timedelta(days=random.randint(1, 90)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
schedules.append(s)
except Exception as e:
print(f"Error creating schedule for {emp.get_full_name()}: {e}")
# Admin/support: admin hours
for emp in admin:
try:
s = Schedule.objects.create(
employee=emp,
name=f"{emp.get_full_name()} - Administrative Hours",
description="Standard administrative working hours",
schedule_type='REGULAR',
effective_date=django_timezone.now().date() - timedelta(days=random.randint(0, 180)),
end_date=None,
schedule_pattern=schedule_patterns['ADMIN_HOURS'],
is_active=True,
notes="Standard administrative schedule",
created_at=django_timezone.now() - timedelta(days=random.randint(1, 90)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
schedules.append(s)
except Exception as e:
print(f"Error creating schedule for {emp.get_full_name()}: {e}")
print(f"Created {len(schedules)} employee schedules")
return schedules
def create_schedule_assignments(schedules, days_back=30):
"""Create specific schedule assignments for active schedules."""
assignments = []
for s in schedules:
if not s.is_active:
continue
start_date = django_timezone.now().date() - timedelta(days=days_back)
for day_offset in range(days_back):
assignment_date = start_date + timedelta(days=day_offset)
weekday = assignment_date.strftime('%A').lower()
day_pat = s.schedule_pattern.get(weekday)
if not day_pat or day_pat == 'off':
continue
try:
start_time = datetime.strptime(day_pat['start'], '%H:%M').time()
end_time = datetime.strptime(day_pat['end'], '%H:%M').time()
except Exception:
continue
# Infer shift type from hours
shift_type = 'NIGHT' if end_time <= start_time else 'DAY'
status = random.choices(
['COMPLETED', 'NO_SHOW', 'CANCELLED'] if assignment_date < django_timezone.now().date() else ['SCHEDULED', 'CONFIRMED'],
weights=[90, 5, 5] if assignment_date < django_timezone.now().date() else [70, 30]
)[0]
try:
a = ScheduleAssignment.objects.create(
schedule=s,
assignment_date=assignment_date,
start_time=start_time,
end_time=end_time,
shift_type=shift_type,
department=s.employee.department,
location=s.employee.department.location if s.employee.department else None,
status=status,
break_minutes=15 if shift_type == 'DAY' else 30,
lunch_minutes=30 if shift_type == 'DAY' else 0,
notes=f"{shift_type.title()} shift assignment",
created_at=django_timezone.now() - timedelta(days=random.randint(0, 7)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 3))
)
assignments.append(a)
except Exception as e:
print(f"Error creating assignment for {s.employee.get_full_name()}: {e}")
print(f"Created {len(assignments)} schedule assignments")
return assignments
def create_time_entries(employees, days_back=30):
"""Create time entries for ACTIVE employees."""
entries = []
active_emps = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE and e.hire_date]
for emp in active_emps:
start_date = django_timezone.now().date() - timedelta(days=days_back)
for d in range(days_back):
work_date = start_date + timedelta(days=d)
# Skip admin weekend days (Fri/Sat)
if emp.department and getattr(emp.department, 'department_type', '') == 'ADMINISTRATIVE' and work_date.weekday() in [4, 5]:
continue
work_probability = 0.85 if emp.department and getattr(emp.department, 'department_type', '') == 'CLINICAL' else 0.90
if random.random() > work_probability:
continue
# Typical shift windows
if emp.department and getattr(emp.department, 'department_type', '') == 'CLINICAL':
shift_opts = [
(time(7, 0), time(15, 0)),
(time(15, 0), time(23, 0)),
(time(23, 0), time(7, 0)),
]
start_t, end_t = random.choice(shift_opts)
else:
start_t = time(8, 0)
end_t = time(17, 0) if work_date.weekday() != 4 else time(12, 0)
cin_var = random.randint(-15, 15)
cout_var = random.randint(-15, 15)
clock_in = datetime.combine(work_date, start_t) + timedelta(minutes=cin_var)
clock_out = datetime.combine(work_date, end_t) + timedelta(minutes=cout_var)
if end_t < start_t:
clock_out += timedelta(days=1)
break_start = clock_in + timedelta(hours=2, minutes=random.randint(0, 60))
break_end = break_start + timedelta(minutes=15)
lunch_start = clock_in + timedelta(hours=4, minutes=random.randint(0, 60))
lunch_end = lunch_start + timedelta(minutes=30)
entry_type = random.choices(['REGULAR', 'OVERTIME', 'HOLIDAY'], weights=[85, 10, 5])[0]
status = random.choices(['APPROVED', 'SUBMITTED', 'DRAFT'], weights=[70, 20, 10])[0]
try:
e = TimeEntry.objects.create(
employee=emp,
work_date=work_date,
clock_in_time=clock_in,
clock_out_time=clock_out,
break_start_time=break_start,
break_end_time=break_end,
lunch_start_time=lunch_start,
lunch_end_time=lunch_end,
entry_type=entry_type,
department=emp.department,
location=emp.department.location if emp.department else None,
status=status,
notes=f"{entry_type.title()} work shift",
created_at=django_timezone.now() - timedelta(days=random.randint(0, 7)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 3))
)
entries.append(e)
except Exception as ex:
print(f"Error creating time entry for {emp.get_full_name()}: {ex}")
print(f"Created {len(entries)} time entries")
return entries
def create_performance_reviews(employees):
"""Create 12 performance reviews for employees with ≥ 6 months service."""
reviews = []
today = django_timezone.now().date()
eligible = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE and e.hire_date and (today - e.hire_date).days >= 180]
competency_areas = [
'Clinical Skills', 'Communication', 'Teamwork', 'Professionalism',
'Quality of Work', 'Productivity', 'Problem Solving', 'Initiative',
'Reliability', 'Cultural Competency', 'Patient Care', 'Safety Compliance'
]
for emp in eligible:
for _ in range(random.randint(1, 2)):
review_date = today - timedelta(days=random.randint(0, 90))
period_start = review_date - timedelta(days=random.randint(180, 360))
period_end = review_date - timedelta(days=30)
review_type = random.choices(['ANNUAL', 'PROBATIONARY', 'MID_YEAR'], weights=[60, 20, 20])[0]
overall = Decimal(str(random.choices([2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0],
weights=[5, 10, 20, 25, 25, 10, 5])[0]))
ratings = {c: float(random.choices([2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0],
weights=[5, 10, 20, 25, 25, 10, 5])[0])
for c in random.sample(competency_areas, random.randint(6, 10))}
status = random.choices(['COMPLETED', 'ACKNOWLEDGED', 'IN_PROGRESS'], weights=[60, 30, 10])[0]
try:
r = PerformanceReview.objects.create(
employee=emp,
review_period_start=period_start,
review_period_end=period_end,
review_date=review_date,
review_type=review_type,
overall_rating=overall,
competency_ratings=ratings,
goals_achieved=f"{emp.first_name} contributed to department objectives.",
goals_not_achieved=("Needs improvement in documentation timeliness." if overall < 4 else None),
future_goals="Maintain high standards of patient care; grow leadership skills.",
strengths=f"Strong {random.choice(['clinical skills', 'communication', 'teamwork'])}.",
areas_for_improvement=("Leadership and mentoring junior staff." if overall < 4.5 else None),
development_plan="Advanced training and leadership opportunities recommended.",
training_recommendations=f"Suggested training: {random.choice(SAUDI_TRAINING_PROGRAMS)}",
employee_comments=("Appreciate the feedback and committed to improvement."
if status == 'ACKNOWLEDGED' else None),
employee_signature_date=(review_date + timedelta(days=random.randint(1, 14))
if status == 'ACKNOWLEDGED' else None),
status=status,
notes=f"{review_type.title()} performance review completed",
created_at=django_timezone.now() - timedelta(days=random.randint(1, 60)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
reviews.append(r)
except Exception as ex:
print(f"Error creating performance review for {emp.get_full_name()}: {ex}")
print(f"Created {len(reviews)} performance reviews")
return reviews
def create_training_programs(tenants, employees):
"""Create training programs for each tenant."""
programs = []
training_data = [
('Orientation', 'ORIENTATION', 'New employee orientation program', 8, False, None),
('Fire Safety', 'SAFETY', 'Fire safety and emergency evacuation procedures', 2, False, None),
('Emergency Procedures', 'MANDATORY', 'Hospital emergency response procedures', 4, False, None),
('HIPAA Compliance', 'COMPLIANCE', 'Patient privacy and data protection training', 3, False, None),
('Patient Safety', 'MANDATORY', 'Patient safety protocols and best practices', 6, False, None),
('Basic Life Support (BLS)', 'CERTIFICATION', 'Basic life support certification', 8, True, 730),
('Advanced Cardiac Life Support (ACLS)', 'CERTIFICATION', 'Advanced cardiac life support certification', 16, True, 730),
('Pediatric Advanced Life Support (PALS)', 'CERTIFICATION', 'Pediatric advanced life support certification', 14, True, 730),
('Infection Control', 'SAFETY', 'Infection prevention and control measures', 4, False, None),
('Medication Administration', 'SKILLS', 'Safe medication administration practices', 6, False, None),
('Wound Care Management', 'SKILLS', 'Advanced wound care techniques', 8, False, None),
('IV Therapy', 'SKILLS', 'Intravenous therapy administration', 6, False, None),
('Pain Management', 'CONTINUING_ED', 'Pain assessment and management strategies', 4, False, None),
('Electronic Health Records', 'TECHNICAL', 'EHR system training and best practices', 4, False, None),
('Customer Service Excellence', 'OTHER', 'Patient and family service excellence', 3, False, None),
('Quality Improvement', 'CONTINUING_ED', 'Healthcare quality improvement methodologies', 6, False, None),
('Arabic Language', 'OTHER', 'Arabic language skills for healthcare', 20, False, None),
('Cultural Sensitivity', 'OTHER', 'Cultural competency in healthcare', 4, False, None),
('Islamic Healthcare Ethics', 'MANDATORY', 'Islamic principles in healthcare practice', 6, False, None),
('Leadership Development', 'LEADERSHIP', 'Leadership skills for healthcare managers', 12, False, None),
]
providers = [
'Saudi Healthcare Training Institute',
'Ministry of Health Training Center',
'King Fahd Medical Training Academy',
'Internal Training Department',
'Saudi Commission for Health Specialties'
]
# Get potential instructors
potential_instructors = [e for e in employees
if e.employment_status == Employee.EmploymentStatus.ACTIVE
and any(k in (e.job_title or '').lower() for k in ['senior', 'chief', 'manager', 'director', 'physician'])]
for tenant in tenants:
tenant_instructors = [e for e in potential_instructors if e.tenant == tenant]
for name, ptype, description, hours, is_certified, validity_days in training_data:
# Set program dates
start_date = django_timezone.now().date() - timedelta(days=random.randint(30, 365))
end_date = start_date + timedelta(days=random.randint(30, 180)) if random.choice([True, False]) else None
try:
# Check if program already exists for this tenant
existing = TrainingPrograms.objects.filter(tenant=tenant, name=name).first()
if existing:
print(f"Training program {name} already exists for {tenant.name}, skipping...")
programs.append(existing)
continue
program = TrainingPrograms.objects.create(
tenant=tenant,
name=name,
description=description,
program_type=ptype,
program_provider=random.choice(providers),
instructor=random.choice(tenant_instructors) if tenant_instructors and random.choice([True, False]) else None,
start_date=start_date,
end_date=end_date,
duration_hours=Decimal(str(hours)),
cost=Decimal(str(random.randint(500, 5000))),
is_certified=is_certified,
validity_days=validity_days,
notify_before_days=30 if is_certified else None,
created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
programs.append(program)
print(f"Created training program: {name} for {tenant.name}")
except Exception as e:
print(f"Error creating training program {name} for {tenant.name}: {e}")
import traceback
traceback.print_exc()
print(f"Created {len(programs)} training programs")
return programs
def create_training_sessions(programs, employees):
"""Create training sessions for programs."""
sessions = []
# Get instructors (employees who could teach)
potential_instructors = [e for e in employees
if e.employment_status == Employee.EmploymentStatus.ACTIVE
and any(k in (e.job_title or '').lower() for k in ['senior', 'chief', 'manager', 'director', 'physician'])]
for program in programs:
# Create 1-3 sessions per program
num_sessions = random.randint(1, 3)
for i in range(num_sessions):
# Schedule sessions in the past and future
days_offset = random.randint(-90, 90)
start_date = django_timezone.now().date() + timedelta(days=days_offset)
# Session duration based on program hours
session_hours = program.duration_hours
if session_hours <= 4:
# Half day session
start_time = time(9, 0) if random.choice([True, False]) else time(14, 0)
end_time = time(13, 0) if start_time == time(9, 0) else time(18, 0)
else:
# Full day or multi-day
start_time = time(9, 0)
end_time = time(17, 0)
if session_hours > 8:
# Multi-day, adjust end date
days_needed = int(session_hours / 8) + (1 if session_hours % 8 > 0 else 0)
start_date = start_date
start_datetime = django_timezone.make_aware(datetime.combine(start_date, start_time))
end_datetime = django_timezone.make_aware(datetime.combine(start_date, end_time))
# Adjust for multi-day sessions
if session_hours > 8:
days_needed = int(session_hours / 8)
end_datetime = end_datetime + timedelta(days=days_needed)
instructor = random.choice(potential_instructors) if potential_instructors else None
try:
session = TrainingSession.objects.create(
program=program,
title=f"{program.name} - Session {i+1}",
instructor=instructor,
delivery_method=random.choice(['IN_PERSON', 'VIRTUAL', 'HYBRID']),
start_at=start_datetime,
end_at=end_datetime,
location=random.choice([
'Training Room A', 'Training Room B', 'Conference Hall',
'Simulation Lab', 'Skills Lab', 'Auditorium'
]),
capacity=random.randint(10, 50),
created_at=django_timezone.now() - timedelta(days=random.randint(1, 30))
)
sessions.append(session)
except Exception as e:
print(f"Error creating session for {program.name}: {e}")
print(f"Created {len(sessions)} training sessions")
return sessions
def create_training_records(employees, sessions):
"""Create training enrollment records for employees."""
records = []
# Define which roles need which training
training_requirements = {
'ALL': ['Orientation', 'Fire Safety', 'Emergency Procedures', 'HIPAA Compliance', 'Patient Safety'],
'CLINICAL': ['Basic Life Support (BLS)', 'Infection Control', 'Medication Administration'],
'PHYSICIAN': ['Advanced Cardiac Life Support (ACLS)', 'Pediatric Advanced Life Support (PALS)'],
'NURSE': ['IV Therapy', 'Pain Management', 'Wound Care Management'],
'ADMIN': ['Customer Service Excellence', 'Quality Improvement', 'Arabic Language'],
'SUPPORT': ['Cultural Sensitivity', 'Customer Service Excellence']
}
active_employees = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE]
for emp in active_employees:
# Determine required training based on role and department
required_programs = set(training_requirements['ALL'])
if emp.department:
dtype = getattr(emp.department, 'department_type', '')
if dtype == 'CLINICAL':
required_programs.update(training_requirements['CLINICAL'])
jt = (emp.job_title or '').lower()
if 'physician' in jt:
required_programs.update(training_requirements['PHYSICIAN'])
elif 'nurse' in jt:
required_programs.update(training_requirements['NURSE'])
elif dtype == 'ADMINISTRATIVE':
required_programs.update(training_requirements['ADMIN'])
else:
required_programs.update(training_requirements['SUPPORT'])
# Add some random additional training
all_program_names = [s.program.name for s in sessions]
additional_programs = random.sample([p for p in all_program_names if p not in required_programs],
min(3, len([p for p in all_program_names if p not in required_programs])))
required_programs.update(additional_programs)
# Find sessions for required programs
for program_name in required_programs:
matching_sessions = [s for s in sessions if s.program.name == program_name and s.program.tenant == emp.tenant]
if matching_sessions:
session = random.choice(matching_sessions)
# Determine enrollment status based on session timing
now = django_timezone.now()
if session.start_at < now - timedelta(days=7):
status = random.choices(['COMPLETED', 'NO_SHOW', 'FAILED'], weights=[85, 10, 5])[0]
elif session.start_at < now:
status = random.choices(['IN_PROGRESS', 'COMPLETED'], weights=[30, 70])[0]
else:
status = random.choices(['SCHEDULED', 'WAITLISTED'], weights=[90, 10])[0]
# Generate scores and completion data
score = None
passed = False
completion_date = None
expiry_date = None
credits_earned = Decimal('0.00')
if status == 'COMPLETED':
score = Decimal(str(random.randint(70, 100)))
passed = score >= 70
completion_date = session.end_at.date()
credits_earned = session.program.duration_hours
# Set expiry date for certified programs
if session.program.is_certified and session.program.validity_days:
expiry_date = completion_date + timedelta(days=session.program.validity_days)
elif status == 'FAILED':
score = Decimal(str(random.randint(40, 69)))
passed = False
completion_date = session.end_at.date()
try:
record = TrainingRecord.objects.create(
employee=emp,
program=session.program,
session=session,
started_at=session.start_at if status in ['IN_PROGRESS', 'COMPLETED', 'FAILED', 'NO_SHOW'] else None,
completion_date=completion_date,
expiry_date=expiry_date,
status=status,
credits_earned=credits_earned,
score=score,
passed=passed,
notes=f"{session.program.name} - {status.replace('_', ' ').title()}",
cost_paid=session.program.cost if random.choice([True, False]) else None,
created_at=django_timezone.now() - timedelta(days=random.randint(1, 30)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 15))
)
records.append(record)
except Exception as e:
print(f"Error creating training record for {emp.get_full_name()}: {e}")
print(f"Created {len(records)} training records")
return records
def create_program_modules(programs):
"""Create modules for training programs."""
modules = []
# Define module templates for different program types
module_templates = {
'ORIENTATION': [
'Welcome and Introduction',
'Company Policies and Procedures',
'Safety and Security',
'Benefits and HR Information',
'Department Overview',
'Q&A Session'
],
'BLS': [
'Basic Life Support Overview',
'CPR Techniques',
'AED Usage',
'Choking Response',
'Hands-on Practice',
'Assessment and Certification'
],
'ACLS': [
'Advanced Cardiac Life Support Overview',
'Cardiac Arrest Management',
'Arrhythmia Recognition',
'Pharmacology',
'Team Dynamics',
'Simulation Scenarios',
'Written and Practical Exam'
],
'INFECTION_CONTROL': [
'Infection Prevention Principles',
'Hand Hygiene',
'Personal Protective Equipment',
'Isolation Precautions',
'Cleaning and Disinfection'
]
}
for program in programs:
# Determine modules based on program name/type
if 'BLS' in program.name:
module_list = module_templates['BLS']
elif 'ACLS' in program.name:
module_list = module_templates['ACLS']
elif 'Orientation' in program.name:
module_list = module_templates['ORIENTATION']
elif 'Infection Control' in program.name:
module_list = module_templates['INFECTION_CONTROL']
else:
# Generic modules for other programs
module_list = [
'Introduction and Overview',
'Core Concepts',
'Practical Application',
'Assessment'
]
# Create modules for this program
total_hours = float(program.duration_hours)
hours_per_module = total_hours / len(module_list)
for order, title in enumerate(module_list, 1):
try:
module = ProgramModule.objects.create(
program=program,
title=title,
order=order,
hours=Decimal(str(round(hours_per_module, 2)))
)
modules.append(module)
except Exception as e:
print(f"Error creating module {title} for {program.name}: {e}")
print(f"Created {len(modules)} program modules")
return modules
def create_program_prerequisites(programs):
"""Create prerequisites between training programs."""
prerequisites = []
# Define prerequisite relationships
prerequisite_rules = {
'Advanced Cardiac Life Support (ACLS)': ['Basic Life Support (BLS)'],
'Pediatric Advanced Life Support (PALS)': ['Basic Life Support (BLS)'],
'Wound Care Management': ['Infection Control'],
'IV Therapy': ['Medication Administration'],
'Pain Management': ['Medication Administration'],
'Leadership Development': ['Customer Service Excellence']
}
# Group programs by tenant for easier lookup
programs_by_tenant = {}
for program in programs:
if program.tenant not in programs_by_tenant:
programs_by_tenant[program.tenant] = {}
programs_by_tenant[program.tenant][program.name] = program
for tenant, tenant_programs in programs_by_tenant.items():
for program_name, required_program_names in prerequisite_rules.items():
if program_name in tenant_programs:
program = tenant_programs[program_name]
for required_name in required_program_names:
if required_name in tenant_programs:
required_program = tenant_programs[required_name]
try:
prerequisite = ProgramPrerequisite.objects.create(
program=program,
required_program=required_program
)
prerequisites.append(prerequisite)
except Exception as e:
print(f"Error creating prerequisite {required_name} -> {program_name}: {e}")
print(f"Created {len(prerequisites)} program prerequisites")
return prerequisites
def create_training_attendance(training_records):
"""Create attendance records for training sessions."""
attendance_records = []
# Only create attendance for records that have started
started_records = [r for r in training_records if r.started_at and r.status in ['IN_PROGRESS', 'COMPLETED', 'FAILED', 'NO_SHOW']]
for record in started_records:
# Determine attendance status based on training record status
if record.status == 'NO_SHOW':
attendance_status = 'ABSENT'
elif record.status == 'COMPLETED':
attendance_status = random.choices(['PRESENT', 'LATE'], weights=[90, 10])[0]
elif record.status == 'FAILED':
attendance_status = random.choices(['PRESENT', 'LATE', 'ABSENT'], weights=[70, 20, 10])[0]
else: # IN_PROGRESS
attendance_status = random.choices(['PRESENT', 'LATE'], weights=[85, 15])[0]
# Set check-in/out times based on session times and attendance status
session = record.session
checked_in_at = None
checked_out_at = None
if attendance_status in ['PRESENT', 'LATE']:
if attendance_status == 'LATE':
# Late arrival (5-30 minutes after start)
late_minutes = random.randint(5, 30)
checked_in_at = session.start_at + timedelta(minutes=late_minutes)
else:
# On time or early arrival
early_minutes = random.randint(-10, 5)
checked_in_at = session.start_at + timedelta(minutes=early_minutes)
# Check out time (if session is completed)
if record.status in ['COMPLETED', 'FAILED']:
checkout_variance = random.randint(-15, 15)
checked_out_at = session.end_at + timedelta(minutes=checkout_variance)
notes = None
if attendance_status == 'LATE':
notes = f"Arrived {late_minutes} minutes late"
elif attendance_status == 'ABSENT':
notes = "Did not attend session"
elif attendance_status == 'EXCUSED':
notes = "Excused absence"
try:
attendance = TrainingAttendance.objects.create(
enrollment=record,
checked_in_at=checked_in_at,
checked_out_at=checked_out_at,
status=attendance_status,
notes=notes
)
attendance_records.append(attendance)
except Exception as e:
print(f"Error creating attendance for {record.employee.get_full_name()}: {e}")
print(f"Created {len(attendance_records)} attendance records")
return attendance_records
def create_training_assessments(training_records):
"""Create assessments for training records."""
assessments = []
# Only create assessments for completed or failed records
assessed_records = [r for r in training_records if r.status in ['COMPLETED', 'FAILED'] and r.score is not None]
assessment_types = {
'CERTIFICATION': ['Written Exam', 'Practical Assessment'],
'SKILLS': ['Practical Assessment', 'Skills Demonstration'],
'MANDATORY': ['Knowledge Check'],
'SAFETY': ['Safety Quiz'],
'COMPLIANCE': ['Compliance Test'],
'OTHER': ['Final Assessment']
}
for record in assessed_records:
program_type = record.program.program_type
possible_assessments = assessment_types.get(program_type, ['Final Assessment'])
# Create 1-2 assessments per record
num_assessments = random.randint(1, min(2, len(possible_assessments)))
selected_assessments = random.sample(possible_assessments, num_assessments)
for assessment_name in selected_assessments:
# Determine max score based on assessment type
if 'Practical' in assessment_name:
max_score = Decimal('100.00')
elif 'Written' in assessment_name or 'Exam' in assessment_name:
max_score = Decimal('100.00')
else:
max_score = Decimal('50.00')
# Generate score based on training record score
base_score = float(record.score) if record.score else 70
# Add some variance to the assessment score
score_variance = random.randint(-10, 10)
assessment_score = max(0, min(float(max_score), base_score + score_variance))
passed = assessment_score >= (float(max_score) * 0.7) # 70% passing
# Set taken date around the completion date
if record.completion_date:
taken_at = django_timezone.make_aware(
datetime.combine(record.completion_date, time(random.randint(9, 17), random.randint(0, 59)))
)
else:
taken_at = record.session.end_at
try:
assessment = TrainingAssessment.objects.create(
enrollment=record,
name=assessment_name,
max_score=max_score,
score=Decimal(str(assessment_score)),
passed=passed,
taken_at=taken_at,
notes=f"{assessment_name} for {record.program.name}"
)
assessments.append(assessment)
except Exception as e:
print(f"Error creating assessment for {record.employee.get_full_name()}: {e}")
print(f"Created {len(assessments)} training assessments")
return assessments
def create_training_certificates(training_records, employees):
"""Create certificates for completed certified training."""
certificates = []
# Only create certificates for completed certified programs
eligible_records = [r for r in training_records
if r.status == 'COMPLETED' and r.passed and r.program.is_certified]
# Get potential signers (senior staff)
potential_signers = [e for e in employees
if e.employment_status == Employee.EmploymentStatus.ACTIVE
and any(k in (e.job_title or '').lower() for k in ['chief', 'director', 'manager'])]
for record in eligible_records:
# Generate certificate details
certificate_name = f"{record.program.name} Certificate"
certificate_number = f"CERT-{record.program.tenant.id}-{random.randint(100000, 999999)}"
# Determine certification body based on program type
if 'BLS' in record.program.name or 'ACLS' in record.program.name or 'PALS' in record.program.name:
certification_body = 'Saudi Heart Association'
elif record.program.program_type == 'SAFETY':
certification_body = 'Saudi Occupational Safety and Health Administration'
elif record.program.program_type == 'COMPLIANCE':
certification_body = 'Saudi Ministry of Health'
else:
certification_body = 'Saudi Commission for Health Specialties'
# Set expiry date
expiry_date = None
if record.program.validity_days:
expiry_date = record.completion_date + timedelta(days=record.program.validity_days)
# Select signer from same tenant
tenant_signers = [s for s in potential_signers if s.tenant == record.employee.tenant]
signer_employee = random.choice(tenant_signers) if tenant_signers else None
signer_user = signer_employee.user if signer_employee else None
try:
certificate = TrainingCertificates.objects.create(
program=record.program,
employee=record.employee,
enrollment=record,
certificate_name=certificate_name,
certificate_number=certificate_number,
certification_body=certification_body,
expiry_date=expiry_date,
signed_by=signer_user,
created_by=signer_user,
created_at=django_timezone.now() - timedelta(days=random.randint(0, 7)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 3))
)
certificates.append(certificate)
print(f"Created certificate {certificate_number} for {record.employee.get_full_name()}")
except Exception as e:
print(f"Error creating certificate for {record.employee.get_full_name()}: {e}")
import traceback
traceback.print_exc()
print(f"Created {len(certificates)} training certificates")
return certificates
# ----------------------------
# Leave Management
# ----------------------------
def create_leave_types(tenants):
"""Create standard leave types for each tenant."""
leave_types = []
# Saudi-specific leave types with configurations
leave_type_configs = [
{
'name': 'Annual Leave',
'code': 'AL',
'description': 'Annual vacation leave',
'is_paid': True,
'requires_approval': True,
'requires_documentation': False,
'accrual_method': 'ANNUAL',
'annual_entitlement': Decimal('21.00'),
'max_carry_over': Decimal('5.00'),
'max_consecutive_days': 21,
'min_notice_days': 7,
'gender_specific': None,
},
{
'name': 'Sick Leave',
'code': 'SL',
'description': 'Medical sick leave',
'is_paid': True,
'requires_approval': True,
'requires_documentation': True,
'accrual_method': 'ANNUAL',
'annual_entitlement': Decimal('10.00'),
'max_carry_over': Decimal('0.00'),
'max_consecutive_days': None,
'min_notice_days': 0,
'gender_specific': None,
},
{
'name': 'Emergency Leave',
'code': 'EL',
'description': 'Emergency personal leave',
'is_paid': True,
'requires_approval': True,
'requires_documentation': False,
'accrual_method': 'ANNUAL',
'annual_entitlement': Decimal('5.00'),
'max_carry_over': Decimal('0.00'),
'max_consecutive_days': 3,
'min_notice_days': 0,
'gender_specific': None,
},
{
'name': 'Maternity Leave',
'code': 'ML',
'description': 'Maternity leave for childbirth',
'is_paid': True,
'requires_approval': True,
'requires_documentation': True,
'accrual_method': 'NONE',
'annual_entitlement': Decimal('70.00'),
'max_carry_over': Decimal('0.00'),
'max_consecutive_days': 70,
'min_notice_days': 30,
'gender_specific': Employee.Gender.FEMALE,
},
{
'name': 'Paternity Leave',
'code': 'PL',
'description': 'Paternity leave for childbirth',
'is_paid': True,
'requires_approval': True,
'requires_documentation': True,
'accrual_method': 'NONE',
'annual_entitlement': Decimal('3.00'),
'max_carry_over': Decimal('0.00'),
'max_consecutive_days': 3,
'min_notice_days': 7,
'gender_specific': Employee.Gender.MALE,
},
{
'name': 'Hajj Leave',
'code': 'HL',
'description': 'Leave for Hajj pilgrimage',
'is_paid': True,
'requires_approval': True,
'requires_documentation': False,
'accrual_method': 'NONE',
'annual_entitlement': Decimal('10.00'),
'max_carry_over': Decimal('0.00'),
'max_consecutive_days': 10,
'min_notice_days': 30,
'gender_specific': None,
},
{
'name': 'Bereavement Leave',
'code': 'BL',
'description': 'Leave for family bereavement',
'is_paid': True,
'requires_approval': True,
'requires_documentation': False,
'accrual_method': 'NONE',
'annual_entitlement': Decimal('3.00'),
'max_carry_over': Decimal('0.00'),
'max_consecutive_days': 3,
'min_notice_days': 0,
'gender_specific': None,
},
]
for tenant in tenants:
for config in leave_type_configs:
try:
# Check if leave type already exists
existing = LeaveType.objects.filter(tenant=tenant, code=config['code']).first()
if existing:
leave_types.append(existing)
continue
leave_type = LeaveType.objects.create(
tenant=tenant,
**config,
is_active=True,
available_for_all=True,
created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
leave_types.append(leave_type)
print(f"Created leave type: {config['name']} for {tenant.name}")
except Exception as e:
print(f"Error creating leave type {config['name']} for {tenant.name}: {e}")
print(f"Created {len(leave_types)} leave types")
return leave_types
def create_leave_balances(employees, leave_types):
"""Create leave balances for all active employees."""
balances = []
current_year = django_timezone.now().year
active_employees = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE]
for emp in active_employees:
# Get leave types for this tenant
tenant_leave_types = [lt for lt in leave_types if lt.tenant == emp.tenant]
for leave_type in tenant_leave_types:
# Skip gender-specific leave types if not applicable
if leave_type.gender_specific and leave_type.gender_specific != emp.gender:
continue
try:
# Check if balance already exists
existing = LeaveBalance.objects.filter(
employee=emp,
leave_type=leave_type,
year=current_year
).first()
if existing:
balances.append(existing)
continue
# Calculate entitlement based on hire date and accrual method
if emp.hire_date:
months_employed = ((current_year - emp.hire_date.year) * 12 +
(12 - emp.hire_date.month + 1))
if leave_type.accrual_method == 'PRORATED' and emp.hire_date.year == current_year:
# Pro-rate based on months employed
accrued = (leave_type.annual_entitlement / 12) * min(months_employed, 12)
elif leave_type.accrual_method == 'MONTHLY':
# Monthly accrual
accrued = (leave_type.annual_entitlement / 12) * min(months_employed, 12)
else:
# Annual allocation
accrued = leave_type.annual_entitlement
else:
accrued = leave_type.annual_entitlement
# Opening balance (carry-over from previous year)
opening_balance = Decimal('0.00')
if leave_type.max_carry_over > 0 and random.choice([True, False]):
opening_balance = Decimal(str(random.uniform(0, float(leave_type.max_carry_over))))
# Used balance (some employees have already used leave)
max_used = min(float(accrued + opening_balance), float(leave_type.annual_entitlement))
used = Decimal(str(random.uniform(0, max_used * 0.6))) # Up to 60% used
# Pending balance (some requests pending approval)
max_pending = float(accrued + opening_balance - used)
pending = Decimal(str(random.uniform(0, min(max_pending * 0.2, 5)))) # Up to 20% pending
balance = LeaveBalance.objects.create(
employee=emp,
leave_type=leave_type,
year=current_year,
opening_balance=opening_balance,
accrued=accrued,
used=used,
pending=pending,
adjusted=Decimal('0.00'),
last_accrual_date=django_timezone.now().date(),
created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
balances.append(balance)
except Exception as e:
print(f"Error creating leave balance for {emp.get_full_name()}: {e}")
print(f"Created {len(balances)} leave balances")
return balances
def create_leave_requests(employees, leave_types, balances):
"""Create realistic leave requests for employees."""
requests = []
active_employees = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE]
for emp in active_employees:
# Each employee gets 2-5 leave requests (past, current, future)
num_requests = random.randint(2, 5)
# Get employee's balances
emp_balances = [b for b in balances if b.employee == emp]
for _ in range(num_requests):
if not emp_balances:
continue
# Select a leave type with available balance
balance = random.choice(emp_balances)
leave_type = balance.leave_type
# Generate request dates (mix of past, current, future)
days_offset = random.randint(-180, 180)
start_date = django_timezone.now().date() + timedelta(days=days_offset)
# Duration based on leave type
if leave_type.code == 'ML':
duration = random.randint(60, 70)
elif leave_type.code in ['PL', 'BL', 'EL']:
duration = random.randint(1, 3)
elif leave_type.code == 'HL':
duration = random.randint(7, 10)
else:
duration = random.randint(1, min(14, int(balance.available)))
end_date = start_date + timedelta(days=duration - 1)
# Day types (mostly full days)
start_day_type = random.choices(
['FULL_DAY', 'HALF_DAY_AM', 'HALF_DAY_PM'],
weights=[85, 10, 5]
)[0]
end_day_type = 'FULL_DAY' if duration > 1 else start_day_type
# Calculate total days
total_days = Decimal(str(duration))
if start_day_type != 'FULL_DAY':
total_days -= Decimal('0.5')
if end_day_type != 'FULL_DAY' and duration > 1:
total_days -= Decimal('0.5')
# Status based on date
if start_date < django_timezone.now().date() - timedelta(days=30):
status = random.choices(
['APPROVED', 'REJECTED', 'CANCELLED'],
weights=[70, 20, 10]
)[0]
elif start_date < django_timezone.now().date():
status = random.choices(
['APPROVED', 'PENDING', 'REJECTED'],
weights=[80, 15, 5]
)[0]
else:
status = random.choices(
['PENDING', 'APPROVED', 'DRAFT'],
weights=[60, 30, 10]
)[0]
# Reason
reasons = {
'AL': ['Family vacation', 'Personal time off', 'Rest and relaxation', 'Travel plans'],
'SL': ['Medical treatment', 'Doctor appointment', 'Recovery from illness', 'Medical procedure'],
'EL': ['Family emergency', 'Urgent personal matter', 'Unexpected situation'],
'ML': ['Maternity leave for childbirth'],
'PL': ['Paternity leave for childbirth'],
'HL': ['Hajj pilgrimage'],
'BL': ['Family bereavement'],
}
reason = random.choice(reasons.get(leave_type.code, ['Personal leave']))
try:
request = LeaveRequest.objects.create(
employee=emp,
leave_type=leave_type,
start_date=start_date,
end_date=end_date,
start_day_type=start_day_type,
end_day_type=end_day_type,
total_days=total_days,
reason=reason,
contact_number=emp.mobile_phone if random.choice([True, False]) else None,
emergency_contact=emp.emergency_contact_name if random.choice([True, False]) else None,
status=status,
submitted_at=django_timezone.now() - timedelta(days=abs(days_offset) + random.randint(1, 7)) if status != 'DRAFT' else None,
current_approver=emp.supervisor if status == 'PENDING' else None,
final_approver=emp.supervisor if status == 'APPROVED' else None,
approved_at=django_timezone.now() - timedelta(days=abs(days_offset) + random.randint(1, 5)) if status == 'APPROVED' else None,
rejected_at=django_timezone.now() - timedelta(days=abs(days_offset) + random.randint(1, 5)) if status == 'REJECTED' else None,
rejection_reason='Insufficient staffing during requested period' if status == 'REJECTED' else None,
cancelled_at=django_timezone.now() - timedelta(days=random.randint(1, 10)) if status == 'CANCELLED' else None,
cancellation_reason='Personal circumstances changed' if status == 'CANCELLED' else None,
created_at=django_timezone.now() - timedelta(days=abs(days_offset) + random.randint(7, 14)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 7))
)
requests.append(request)
except Exception as e:
print(f"Error creating leave request for {emp.get_full_name()}: {e}")
print(f"Created {len(requests)} leave requests")
return requests
def create_leave_approvals(leave_requests, employees):
"""Create approval records for processed leave requests."""
approvals = []
# Only create approvals for non-draft requests
processed_requests = [r for r in leave_requests if r.status != 'DRAFT']
for request in processed_requests:
# Level 1 approval (supervisor)
if request.employee.supervisor:
action = 'APPROVED' if request.status == 'APPROVED' else 'REJECTED' if request.status == 'REJECTED' else 'PENDING'
try:
approval = LeaveApproval.objects.create(
leave_request=request,
level=1,
approver=request.employee.supervisor,
action=action,
comments=f"{'Approved' if action == 'APPROVED' else 'Rejected' if action == 'REJECTED' else 'Pending review'} - Level 1",
action_date=request.approved_at or request.rejected_at if action != 'PENDING' else None,
created_at=request.submitted_at or django_timezone.now(),
updated_at=django_timezone.now()
)
approvals.append(approval)
except Exception as e:
print(f"Error creating approval for request {request.request_id}: {e}")
print(f"Created {len(approvals)} leave approvals")
return approvals
def create_leave_delegations(employees):
"""Create delegation records for supervisors."""
delegations = []
# Get supervisors (employees who have direct reports)
supervisors = [e for e in employees
if e.employment_status == Employee.EmploymentStatus.ACTIVE
and any(k in (e.job_title or '').lower() for k in ['manager', 'director', 'chief', 'head', 'supervisor'])]
for supervisor in supervisors:
# 30% chance of having a delegation
if random.random() > 0.3:
continue
# Find potential delegates (peers or senior staff in same department)
potential_delegates = [e for e in employees
if e.employment_status == Employee.EmploymentStatus.ACTIVE
and e.department == supervisor.department
and e.id != supervisor.id
and any(k in (e.job_title or '').lower() for k in ['senior', 'manager', 'supervisor'])]
if not potential_delegates:
continue
delegate = random.choice(potential_delegates)
# Delegation period (past, current, or future)
days_offset = random.randint(-90, 90)
start_date = django_timezone.now().date() + timedelta(days=days_offset)
duration = random.randint(7, 30)
end_date = start_date + timedelta(days=duration)
# Status
is_active = start_date <= django_timezone.now().date() <= end_date
reasons = [
'Annual leave coverage',
'Business travel',
'Training program attendance',
'Temporary assignment',
'Medical leave coverage'
]
try:
delegation = LeaveDelegate.objects.create(
delegator=supervisor,
delegate=delegate,
start_date=start_date,
end_date=end_date,
reason=random.choice(reasons),
is_active=is_active,
created_at=django_timezone.now() - timedelta(days=abs(days_offset) + random.randint(7, 14)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 7))
)
delegations.append(delegation)
except Exception as e:
print(f"Error creating delegation for {supervisor.get_full_name()}: {e}")
print(f"Created {len(delegations)} leave delegations")
return delegations
# ----------------------------
# Salary and Document Management
# ----------------------------
def create_salary_information(employees):
"""Create salary records for active employees."""
salary_records = []
active_employees = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE and e.hire_date]
for emp in active_employees:
# Determine salary based on job title and department
jt_lower = (emp.job_title or '').lower()
# Base salary ranges (SAR per month)
if 'chief' in jt_lower or 'director' in jt_lower:
basic_salary = Decimal(str(random.randint(25000, 45000)))
elif 'consultant' in jt_lower or 'senior physician' in jt_lower:
basic_salary = Decimal(str(random.randint(20000, 35000)))
elif 'physician' in jt_lower or 'doctor' in jt_lower:
basic_salary = Decimal(str(random.randint(15000, 25000)))
elif 'manager' in jt_lower:
basic_salary = Decimal(str(random.randint(12000, 22000)))
elif 'pharmacist' in jt_lower or 'radiologist' in jt_lower:
basic_salary = Decimal(str(random.randint(10000, 18000)))
elif 'nurse practitioner' in jt_lower or 'senior nurse' in jt_lower:
basic_salary = Decimal(str(random.randint(9000, 15000)))
elif 'nurse' in jt_lower:
basic_salary = Decimal(str(random.randint(6000, 12000)))
elif 'technician' in jt_lower or 'technologist' in jt_lower:
basic_salary = Decimal(str(random.randint(5000, 10000)))
else:
basic_salary = Decimal(str(random.randint(4000, 8000)))
# Allowances (typical Saudi structure)
housing_allowance = basic_salary * Decimal('0.25') # 25% of basic
transportation_allowance = Decimal(str(random.randint(500, 1500)))
food_allowance = Decimal(str(random.randint(300, 800)))
# Other allowances (JSON field)
other_allowances = {}
if random.choice([True, False]):
other_allowances['mobile_allowance'] = float(random.randint(200, 500))
if 'manager' in jt_lower or 'director' in jt_lower or 'chief' in jt_lower:
other_allowances['management_allowance'] = float(random.randint(2000, 5000))
# Bank details
bank_names = ['Al Rajhi Bank', 'Saudi National Bank', 'Riyad Bank', 'Al Bilad Bank', 'Bank AlJazira']
bank_name = random.choice(bank_names)
account_number = f"{random.randint(1000000000, 9999999999)}"
iban = f"SA{random.randint(10, 99)}{random.randint(10000000000000000000, 99999999999999999999)}"
# Effective date (hire date or later)
effective_date = emp.hire_date
try:
salary = SalaryInformation.objects.create(
employee=emp,
basic_salary=basic_salary,
housing_allowance=housing_allowance,
transportation_allowance=transportation_allowance,
food_allowance=food_allowance,
other_allowances=other_allowances,
currency='SAR',
payment_frequency='MONTHLY',
bank_name=bank_name,
account_number=account_number,
iban=iban,
effective_date=effective_date,
is_active=True,
created_by=emp.user
)
salary_records.append(salary)
except Exception as e:
print(f"Error creating salary for {emp.get_full_name()}: {e}")
print(f"Created {len(salary_records)} salary records")
return salary_records
def create_salary_adjustments(employees, salary_records):
"""Create salary adjustment records for some employees."""
adjustments = []
# 30% of employees get salary adjustments
eligible_employees = random.sample(
[e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE],
int(len([e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE]) * 0.3)
)
for emp in eligible_employees:
# Get employee's salary records
emp_salaries = [s for s in salary_records if s.employee == emp]
if len(emp_salaries) < 1:
continue
# Create a new salary record (adjustment)
previous_salary = emp_salaries[0]
# Adjustment type and percentage
adjustment_type = random.choice([
'ANNUAL_INCREMENT', 'PROMOTION', 'MERIT_INCREASE',
'COST_OF_LIVING', 'MARKET_ADJUSTMENT'
])
if adjustment_type == 'PROMOTION':
increase_pct = Decimal(str(random.uniform(15, 30)))
elif adjustment_type == 'ANNUAL_INCREMENT':
increase_pct = Decimal(str(random.uniform(3, 8)))
elif adjustment_type == 'MERIT_INCREASE':
increase_pct = Decimal(str(random.uniform(5, 15)))
else:
increase_pct = Decimal(str(random.uniform(2, 10)))
# Calculate new salary
new_basic = previous_salary.basic_salary * (Decimal('1') + increase_pct / Decimal('100'))
new_housing = new_basic * Decimal('0.25')
# Adjustment date (3-12 months after hire)
adjustment_date = previous_salary.effective_date + timedelta(days=random.randint(90, 365))
try:
# Create new salary record
new_salary = SalaryInformation.objects.create(
employee=emp,
basic_salary=new_basic,
housing_allowance=new_housing,
transportation_allowance=previous_salary.transportation_allowance,
food_allowance=previous_salary.food_allowance,
other_allowances=previous_salary.other_allowances,
currency=previous_salary.currency,
payment_frequency=previous_salary.payment_frequency,
bank_name=previous_salary.bank_name,
account_number=previous_salary.account_number,
iban=previous_salary.iban,
effective_date=adjustment_date,
is_active=True,
created_by=emp.user
)
# Deactivate previous salary
previous_salary.is_active = False
previous_salary.end_date = adjustment_date
previous_salary.save()
# Create adjustment record
adjustment = SalaryAdjustment.objects.create(
employee=emp,
previous_salary=previous_salary,
new_salary=new_salary,
adjustment_type=adjustment_type,
effective_date=adjustment_date,
adjustment_reason=f"{adjustment_type.replace('_', ' ').title()} - Performance and market review",
approved_by=emp.supervisor.user if emp.supervisor else None,
approval_date=django_timezone.now() - timedelta(days=random.randint(1, 30))
)
adjustments.append(adjustment)
salary_records.append(new_salary)
except Exception as e:
print(f"Error creating salary adjustment for {emp.get_full_name()}: {e}")
print(f"Created {len(adjustments)} salary adjustments")
return adjustments
def create_document_templates(tenants):
"""Create document templates for each tenant."""
templates = []
template_configs = [
{
'name': 'Salary Certificate - English',
'document_type': 'SALARY_CERTIFICATE',
'language': 'EN',
'description': 'Standard salary certificate in English',
'template_content': '''
<h2>SALARY CERTIFICATE</h2>
<p>Date: {{current_date}}</p>
<p>To Whom It May Concern,</p>
<p>This is to certify that <strong>{{employee_name}}</strong> (ID: {{employee_id}}) is employed with {{company_name}} as {{job_title}} in the {{department}} department since {{hire_date}}.</p>
<p>The employee's current monthly salary breakdown is as follows:</p>
<ul>
<li>Basic Salary: {{basic_salary}} {{currency}}</li>
<li>Housing Allowance: {{housing_allowance}} {{currency}}</li>
<li>Transportation Allowance: {{transportation_allowance}} {{currency}}</li>
<li><strong>Total Monthly Salary: {{total_salary}} {{currency}}</strong></li>
</ul>
<p>This certificate is issued upon the employee's request for official purposes.</p>
''',
'header_content': '<div style="text-align: center;"><h1>{{company_name}}</h1><p>{{company_address}}</p></div>',
'footer_content': '<div style="text-align: center; margin-top: 50px;"><p>HR Department</p></div>',
},
{
'name': 'Employment Certificate - English',
'document_type': 'EMPLOYMENT_CERTIFICATE',
'language': 'EN',
'description': 'Standard employment certificate in English',
'template_content': '''
<h2>EMPLOYMENT CERTIFICATE</h2>
<p>Date: {{current_date}}</p>
<p>To Whom It May Concern,</p>
<p>This is to certify that <strong>{{employee_name}}</strong> (ID: {{employee_id}}) has been employed with {{company_name}} since {{hire_date}}.</p>
<p>Current Position: <strong>{{job_title}}</strong></p>
<p>Department: <strong>{{department}}</strong></p>
<p>This certificate is issued upon the employee's request for official purposes.</p>
''',
'header_content': '<div style="text-align: center;"><h1>{{company_name}}</h1><p>{{company_address}}</p></div>',
'footer_content': '<div style="text-align: center; margin-top: 50px;"><p>HR Department</p></div>',
},
]
for tenant in tenants:
for config in template_configs:
try:
# Check if template already exists
existing = DocumentTemplate.objects.filter(
tenant=tenant,
document_type=config['document_type'],
language=config['language']
).first()
if existing:
templates.append(existing)
continue
template = DocumentTemplate.objects.create(
tenant=tenant,
name=config['name'],
description=config['description'],
document_type=config['document_type'],
language=config['language'],
template_content=config['template_content'],
header_content=config['header_content'],
footer_content=config['footer_content'],
is_active=True,
is_default=True,
requires_approval=True,
created_at=django_timezone.now() - timedelta(days=random.randint(30, 365)),
updated_at=django_timezone.now() - timedelta(days=random.randint(0, 30))
)
templates.append(template)
print(f"Created document template: {config['name']} for {tenant.name}")
except Exception as e:
print(f"Error creating document template {config['name']} for {tenant.name}: {e}")
print(f"Created {len(templates)} document templates")
return templates
def create_document_requests(employees, templates):
"""Create document requests for employees."""
requests = []
active_employees = [e for e in employees if e.employment_status == Employee.EmploymentStatus.ACTIVE]
# 40% of employees request documents
requesting_employees = random.sample(active_employees, int(len(active_employees) * 0.4))
for emp in requesting_employees:
# Each employee requests 1-3 documents
num_requests = random.randint(1, 3)
for _ in range(num_requests):
# Select document type
document_type = random.choice([
'SALARY_CERTIFICATE', 'EMPLOYMENT_CERTIFICATE', 'EXPERIENCE_LETTER',
'TO_WHOM_IT_MAY_CONCERN', 'BANK_LETTER'
])
# Purpose
purposes = {
'SALARY_CERTIFICATE': ['Bank loan application', 'Credit card application', 'Visa application'],
'EMPLOYMENT_CERTIFICATE': ['Visa application', 'Embassy requirements', 'Official documentation'],
'EXPERIENCE_LETTER': ['Job application', 'Professional certification', 'Career development'],
'TO_WHOM_IT_MAY_CONCERN': ['General purpose', 'Official requirements'],
'BANK_LETTER': ['Bank account opening', 'Loan application', 'Financial documentation'],
}
purpose = random.choice(purposes.get(document_type, ['Official purpose']))
# Language
language = random.choices(['EN', 'AR', 'BOTH'], weights=[60, 20, 20])[0]
# Delivery method
delivery_method = random.choices(
['EMAIL', 'PICKUP', 'PORTAL'],
weights=[60, 30, 10]
)[0]
# Request date (past or recent)
days_ago = random.randint(1, 90)
requested_date = django_timezone.now() - timedelta(days=days_ago)
# Required by date
required_by_date = None
if random.choice([True, False]):
required_by_date = django_timezone.now().date() + timedelta(days=random.randint(3, 30))
# Status based on request age
if days_ago > 30:
status = random.choices(
['DELIVERED', 'REJECTED', 'CANCELLED'],
weights=[70, 20, 10]
)[0]
elif days_ago > 7:
status = random.choices(
['READY', 'DELIVERED', 'IN_PROGRESS'],
weights=[40, 40, 20]
)[0]
else:
status = random.choices(
['PENDING', 'IN_PROGRESS', 'READY'],
weights=[50, 30, 20]
)[0]
# Include salary for salary-related documents
include_salary = document_type in ['SALARY_CERTIFICATE', 'BANK_LETTER']
try:
request = DocumentRequest.objects.create(
employee=emp,
document_type=document_type,
purpose=purpose,
language=language,
delivery_method=delivery_method,
delivery_email=emp.email if delivery_method == 'EMAIL' else None,
required_by_date=required_by_date,
include_salary=include_salary,
status=status,
processed_by=emp.supervisor if status in ['READY', 'DELIVERED', 'REJECTED'] else None,
processed_date=django_timezone.now() - timedelta(days=random.randint(1, days_ago)) if status in ['READY', 'DELIVERED', 'REJECTED'] else None
)
requests.append(request)
except Exception as e:
print(f"Error creating document request for {emp.get_full_name()}: {e}")
print(f"Created {len(requests)} document requests")
return requests
# ----------------------------
# Orchestration
# ----------------------------
def main():
print("Starting Saudi Healthcare HR Data Generation...")
tenants = list(Tenant.objects.all())
if not tenants:
print("❌ No tenants found. Please run the core/tenant data generator first.")
return
# 1) Departments
print("\n1. Creating Saudi Hospital Departments...")
departments_by_tenant = {t: ensure_departments(t) for t in tenants}
# 2) Employees (create missing Users -> Employees via signal, then populate HR fields)
print("\n2. Creating/Updating Saudi Hospital Employees...")
employees = create_or_update_saudi_employees(tenants, departments_by_tenant, employees_per_tenant=120)
# 3) Department heads (simple heuristic)
print("\n3. Assigning Department Heads...")
for tenant in tenants:
tenant_depts = departments_by_tenant[tenant]
for dept in tenant_depts:
if getattr(dept, 'department_head_id', None):
continue
dept_emps = [e for e in employees if e.department_id == dept.id and e.employment_status == Employee.EmploymentStatus.ACTIVE]
senior = [e for e in dept_emps if any(k in (e.job_title or '') for k in ['Chief', 'Director', 'Head', 'Manager'])]
if senior:
dept.department_head = random.choice(senior)
try:
dept.save(update_fields=['department_head'])
except Exception as ex:
print(f"Could not set head for {dept.name}: {ex}")
# 4) Supervisors (intra-dept)
print("\n4. Assigning Supervisors...")
for e in employees:
if getattr(e, 'supervisor_id', None) or e.employment_status != Employee.EmploymentStatus.ACTIVE:
continue
if any(k in (e.job_title or '') for k in ['Chief', 'Director']):
continue
peers = [p for p in employees
if p.department_id == e.department_id
and p.id != e.id
and p.employment_status == Employee.EmploymentStatus.ACTIVE
and any(k in (p.job_title or '') for k in ['Chief', 'Director', 'Head', 'Manager', 'Senior'])]
if peers:
e.supervisor = random.choice(peers)
try:
e.save(update_fields=['supervisor'])
except Exception as ex:
print(f"Could not set supervisor for {e.get_full_name()}: {ex}")
# 5) Schedules
print("\n5. Creating Employee Work Schedules...")
schedules = create_saudi_schedules(employees, schedules_per_employee=2)
# 6) Schedule assignments
print("\n6. Creating Schedule Assignments...")
assignments = create_schedule_assignments(schedules, days_back=30)
# 7) Time entries
print("\n7. Creating Employee Time Entries...")
time_entries = create_time_entries(employees, days_back=30)
# 8) Performance reviews
print("\n8. Creating Performance Reviews...")
reviews = create_performance_reviews(employees)
# 9) Training programs
print("\n9. Creating Training Programs...")
training_programs = create_training_programs(tenants, employees)
# 10) Program modules
print("\n10. Creating Program Modules...")
program_modules = create_program_modules(training_programs)
# 11) Program prerequisites
print("\n11. Creating Program Prerequisites...")
program_prerequisites = create_program_prerequisites(training_programs)
# 12) Training sessions
print("\n12. Creating Training Sessions...")
training_sessions = create_training_sessions(training_programs, employees)
# 13) Training records
print("\n13. Creating Training Records...")
training_records = create_training_records(employees, training_sessions)
# 14) Training attendance
print("\n14. Creating Training Attendance...")
training_attendance = create_training_attendance(training_records)
# 15) Training assessments
print("\n15. Creating Training Assessments...")
training_assessments = create_training_assessments(training_records)
# 16) Training certificates
print("\n16. Creating Training Certificates...")
training_certificates = create_training_certificates(training_records, employees)
# 17) Leave types
print("\n17. Creating Leave Types...")
leave_types = create_leave_types(tenants)
# 18) Leave balances
print("\n18. Creating Leave Balances...")
leave_balances = create_leave_balances(employees, leave_types)
# 19) Leave requests
print("\n19. Creating Leave Requests...")
leave_requests = create_leave_requests(employees, leave_types, leave_balances)
# 20) Leave approvals
print("\n20. Creating Leave Approvals...")
leave_approvals = create_leave_approvals(leave_requests, employees)
# 21) Leave delegations
print("\n21. Creating Leave Delegations...")
leave_delegations = create_leave_delegations(employees)
# 22) Salary information
print("\n22. Creating Salary Information...")
salary_records = create_salary_information(employees)
# 23) Salary adjustments
print("\n23. Creating Salary Adjustments...")
salary_adjustments = create_salary_adjustments(employees, salary_records)
# 24) Document templates
print("\n24. Creating Document Templates...")
document_templates = create_document_templates(tenants)
# 25) Document requests
print("\n25. Creating Document Requests...")
document_requests = create_document_requests(employees, document_templates)
print(f"\n✅ Saudi Healthcare HR Data Generation Complete!")
print(f"📊 Summary:")
print(f" - Tenants: {len(tenants)}")
print(f" - Departments: {sum(len(v) for v in departments_by_tenant.values())}")
print(f" - Employees: {len(employees)}")
print(f" - Schedules: {len(schedules)}")
print(f" - Schedule Assignments: {len(assignments)}")
print(f" - Time Entries: {len(time_entries)}")
print(f" - Performance Reviews: {len(reviews)}")
print(f" - Training Programs: {len(training_programs)}")
print(f" - Program Modules: {len(program_modules)}")
print(f" - Program Prerequisites: {len(program_prerequisites)}")
print(f" - Training Sessions: {len(training_sessions)}")
print(f" - Training Records: {len(training_records)}")
print(f" - Training Attendance: {len(training_attendance)}")
print(f" - Training Assessments: {len(training_assessments)}")
print(f" - Training Certificates: {len(training_certificates)}")
print(f" - Leave Types: {len(leave_types)}")
print(f" - Leave Balances: {len(leave_balances)}")
print(f" - Leave Requests: {len(leave_requests)}")
print(f" - Leave Approvals: {len(leave_approvals)}")
print(f" - Leave Delegations: {len(leave_delegations)}")
# Distribution summaries
dept_counts = {}
for e in employees:
if e.department:
t = e.department.department_type
dept_counts[t] = dept_counts.get(t, 0) + 1
print(f"\n🏥 Employee Distribution by Department Type:")
for t, c in sorted(dept_counts.items()):
print(f" - {t.title()}: {c}")
status_counts = {}
for e in employees:
status_counts[e.employment_status] = status_counts.get(e.employment_status, 0) + 1
print(f"\n👥 Employee Status Distribution:")
for s, c in sorted(status_counts.items()):
print(f" - {s.title()}: {c}")
# Leave management statistics
leave_status_counts = {}
for lr in leave_requests:
leave_status_counts[lr.status] = leave_status_counts.get(lr.status, 0) + 1
print(f"\n📅 Leave Request Status Distribution:")
for status, count in sorted(leave_status_counts.items()):
print(f" - {status.title()}: {count}")
# Salary and document statistics
print(f"\n💰 Salary & Document Management:")
print(f" - Salary Records: {len(salary_records)}")
print(f" - Salary Adjustments: {len(salary_adjustments)}")
print(f" - Document Templates: {len(document_templates)}")
print(f" - Document Requests: {len(document_requests)}")
# Document request status distribution
doc_status_counts = {}
for dr in document_requests:
doc_status_counts[dr.status] = doc_status_counts.get(dr.status, 0) + 1
print(f"\n📄 Document Request Status Distribution:")
for status, count in sorted(doc_status_counts.items()):
print(f" - {status.title()}: {count}")
return {
'departments_by_tenant': departments_by_tenant,
'employees': employees,
'schedules': schedules,
'assignments': assignments,
'time_entries': time_entries,
'reviews': reviews,
'training_programs': training_programs,
'program_modules': program_modules,
'program_prerequisites': program_prerequisites,
'training_sessions': training_sessions,
'training_records': training_records,
'training_attendance': training_attendance,
'training_assessments': training_assessments,
'training_certificates': training_certificates,
'leave_types': leave_types,
'leave_balances': leave_balances,
'leave_requests': leave_requests,
'leave_approvals': leave_approvals,
'leave_delegations': leave_delegations,
}
if __name__ == "__main__":
main()