461 lines
19 KiB
Python
461 lines
19 KiB
Python
# scripts/seed_saudi_accounts.py
|
|
|
|
import os
|
|
import django
|
|
|
|
# Set up Django environment
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hospital_management.settings')
|
|
django.setup()
|
|
|
|
import random
|
|
import uuid
|
|
import secrets
|
|
from datetime import timedelta
|
|
from django.contrib.auth.hashers import make_password
|
|
from django.utils import timezone as django_timezone
|
|
from django.db import transaction
|
|
|
|
from accounts.models import User, TwoFactorDevice, SocialAccount, UserSession, PasswordHistory
|
|
from core.models import Tenant
|
|
from hr.models import Employee, Department
|
|
|
|
|
|
# -------------------------------
|
|
# Saudi-specific data constants
|
|
# -------------------------------
|
|
SAUDI_FIRST_NAMES_MALE = [
|
|
'Mohammed', 'Abdullah', 'Ahmed', 'Omar', 'Ali', 'Hassan', 'Khalid', 'Faisal',
|
|
'Saad', 'Fahd', 'Bandar', 'Turki', 'Nasser', 'Saud', 'Abdulrahman',
|
|
'Abdulaziz', 'Salman', 'Waleed', 'Majid', 'Rayan', 'Yazeed', 'Mansour',
|
|
'Osama', 'Tariq', 'Adel', 'Nawaf', 'Sultan', 'Mishaal', 'Badr', 'Ziad'
|
|
]
|
|
SAUDI_FIRST_NAMES_FEMALE = [
|
|
'Fatima', 'Aisha', 'Maryam', 'Khadija', 'Sarah', 'Noura', 'Hala', 'Reem',
|
|
'Lina', 'Dana', 'Rana', 'Nada', 'Layla', 'Amira', 'Zahra', 'Yasmin',
|
|
'Dina', 'Noor', 'Rahma', 'Salma', 'Lama', 'Ghada', 'Rania', 'Maha',
|
|
'Wedad', 'Najla', 'Shahd', 'Jood', 'Rand', 'Malak'
|
|
]
|
|
SAUDI_FAMILY_NAMES = [
|
|
'Al-Rashid', 'Al-Harbi', 'Al-Qahtani', 'Al-Dosari', 'Al-Otaibi', 'Al-Mutairi',
|
|
'Al-Shammari', 'Al-Zahrani', 'Al-Ghamdi', 'Al-Maliki', 'Al-Subai', 'Al-Jubayr',
|
|
'Al-Faisal', 'Al-Saud', 'Al-Shaalan', 'Al-Rajhi', 'Al-Sudairy', 'Al-Ajmi',
|
|
'Al-Anzi', 'Al-Dawsari', 'Al-Shamrani', 'Al-Balawi', 'Al-Juhani', 'Al-Sulami'
|
|
]
|
|
SAUDI_CITIES = [
|
|
'Riyadh', 'Jeddah', 'Mecca', 'Medina', 'Dammam', 'Khobar', 'Dhahran',
|
|
'Taif', 'Tabuk', 'Buraidah', 'Khamis Mushait', 'Hofuf', 'Mubarraz',
|
|
'Jubail', 'Yanbu', 'Abha', 'Najran', 'Jazan', 'Hail', 'Arar'
|
|
]
|
|
SAUDI_PROVINCES = [
|
|
'Riyadh Province', 'Makkah Province', 'Eastern Province', 'Asir Province',
|
|
'Jazan Province', 'Medina Province', 'Qassim Province', 'Tabuk Province',
|
|
'Hail Province', 'Northern Borders Province', 'Najran Province', 'Al Bahah Province'
|
|
]
|
|
SAUDI_DEPARTMENTS = [
|
|
'Internal Medicine', 'Cardiology', 'Orthopedics', 'Neurology', 'Oncology',
|
|
'Pediatrics', 'Emergency Medicine', 'Radiology', 'Laboratory Medicine',
|
|
'Pharmacy', 'Surgery', 'Obstetrics and Gynecology', 'Dermatology',
|
|
'Ophthalmology', 'ENT', 'Anesthesiology', 'Pathology', 'Psychiatry'
|
|
]
|
|
SAUDI_JOB_TITLES = {
|
|
'PHYSICIAN': ['Consultant Physician', 'Senior Physician', 'Staff Physician', 'Resident Physician',
|
|
'Chief Medical Officer'],
|
|
'NURSE': ['Head Nurse', 'Senior Nurse', 'Staff Nurse', 'Charge Nurse', 'Clinical Nurse Specialist'],
|
|
'PHARMACIST': ['Clinical Pharmacist', 'Staff Pharmacist', 'Pharmacy Manager', 'Pharmaceutical Consultant'],
|
|
'ADMIN': ['Medical Director', 'Hospital Administrator', 'Department Manager', 'Operations Manager'],
|
|
'LAB_TECH': ['Senior Lab Technician', 'Medical Laboratory Scientist', 'Lab Supervisor'],
|
|
'RAD_TECH': ['Senior Radiologic Technologist', 'CT Technologist', 'MRI Technologist'],
|
|
'RADIOLOGIST': ['Consultant Radiologist', 'Senior Radiologist', 'Interventional Radiologist'],
|
|
'MEDICAL_ASSISTANT': ['Medical Assistant'],
|
|
'CLERICAL': ['Clerical Staff'],
|
|
}
|
|
SAUDI_LICENSE_PREFIXES = ['MOH', 'SCFHS', 'SMLE', 'SFH']
|
|
|
|
ROLE_DISTRIBUTION = {
|
|
'PHYSICIAN': 0.15,
|
|
'NURSE': 0.25,
|
|
'PHARMACIST': 0.08,
|
|
'LAB_TECH': 0.10,
|
|
'RAD_TECH': 0.08,
|
|
'RADIOLOGIST': 0.05,
|
|
'ADMIN': 0.07,
|
|
'MEDICAL_ASSISTANT': 0.12,
|
|
'CLERICAL': 0.10
|
|
}
|
|
|
|
|
|
# -------------------------------
|
|
# Helpers
|
|
# -------------------------------
|
|
def ensure_departments(tenant):
|
|
"""
|
|
Ensure Department objects exist for this tenant; return a list of them.
|
|
Adjust if your Department is global (then drop tenant filtering).
|
|
"""
|
|
existing = list(Department.objects.filter(tenant=tenant)) if 'tenant' in [f.name for f in Department._meta.fields] else list(Department.objects.all())
|
|
if existing:
|
|
return existing
|
|
|
|
# create seed departments
|
|
bulk = []
|
|
for name in SAUDI_DEPARTMENTS:
|
|
if 'tenant' in [f.name for f in Department._meta.fields]:
|
|
bulk.append(Department(name=name, tenant=tenant))
|
|
else:
|
|
bulk.append(Department(name=name))
|
|
Department.objects.bulk_create(bulk, ignore_conflicts=True)
|
|
|
|
return list(Department.objects.filter(tenant=tenant)) if 'tenant' in [f.name for f in Department._meta.fields] else list(Department.objects.all())
|
|
|
|
|
|
def generate_saudi_mobile_e164():
|
|
"""Generate Saudi E.164 mobile: +9665XXXXXXXX"""
|
|
return f"+9665{random.randint(10000000, 99999999)}"
|
|
|
|
|
|
def generate_saudi_license():
|
|
"""Generate Saudi medical license number (fictional format)"""
|
|
prefix = random.choice(SAUDI_LICENSE_PREFIXES)
|
|
return f"{prefix}-{random.randint(100000, 999999)}"
|
|
|
|
|
|
def tenant_scoped_unique_username(tenant, base_username: str) -> str:
|
|
"""
|
|
Make username unique within a tenant (your User has tenant-scoped unique constraint).
|
|
"""
|
|
username = base_username
|
|
i = 1
|
|
while User.objects.filter(tenant=tenant, username=username).exists():
|
|
i += 1
|
|
username = f"{base_username}{i}"
|
|
return username
|
|
|
|
|
|
def pick_job_title(role: str) -> str:
|
|
titles = SAUDI_JOB_TITLES.get(role)
|
|
if titles:
|
|
return random.choice(titles)
|
|
# fallback
|
|
return role.replace('_', ' ').title()
|
|
|
|
|
|
# -------------------------------
|
|
# Generators
|
|
# -------------------------------
|
|
def create_saudi_users(tenants, users_per_tenant=50):
|
|
"""
|
|
Create Users (auth + security), then populate Employee profile.
|
|
Relies on the post_save signal to create Employee automatically.
|
|
"""
|
|
all_users = []
|
|
|
|
for tenant in tenants:
|
|
departments = ensure_departments(tenant)
|
|
tenant_users = []
|
|
|
|
for role, pct in ROLE_DISTRIBUTION.items():
|
|
count = max(1, int(users_per_tenant * pct))
|
|
|
|
for _ in range(count):
|
|
is_male = random.choice([True, False])
|
|
first_name = random.choice(SAUDI_FIRST_NAMES_MALE if is_male else SAUDI_FIRST_NAMES_FEMALE)
|
|
father_name = random.choice(SAUDI_FIRST_NAMES_MALE)
|
|
grandfather_name = random.choice(SAUDI_FIRST_NAMES_MALE)
|
|
last_name = random.choice(SAUDI_FAMILY_NAMES)
|
|
|
|
# base username like "mohammed.alrashid"
|
|
base_username = f"{first_name.lower()}.{last_name.lower().replace('-', '').replace('al', '')}"
|
|
username = tenant_scoped_unique_username(tenant, base_username)
|
|
email = f"{username}@{tenant.name.lower().replace(' ', '').replace('-', '')}.sa"
|
|
|
|
is_admin = role in ['ADMIN', 'SUPER_ADMIN']
|
|
is_superuser = role == 'SUPER_ADMIN'
|
|
|
|
# Auth-level fields only
|
|
user = User.objects.create(
|
|
tenant=tenant,
|
|
username=username,
|
|
email=email,
|
|
first_name=first_name,
|
|
last_name=last_name,
|
|
is_active=True,
|
|
is_staff=is_admin,
|
|
is_superuser=is_superuser,
|
|
|
|
# security/session (these live on User by design)
|
|
force_password_change=random.choice([True, False]),
|
|
password_expires_at=django_timezone.now() + timedelta(days=random.randint(90, 365)),
|
|
failed_login_attempts=random.randint(0, 2),
|
|
two_factor_enabled=random.choice([True, False]) if role in ['PHYSICIAN', 'ADMIN', 'PHARMACIST'] else False,
|
|
max_concurrent_sessions=random.choice([1, 2, 3, 5]),
|
|
session_timeout_minutes=random.choice([30, 60, 120, 240]),
|
|
last_password_change=django_timezone.now() - timedelta(days=random.randint(1, 90)),
|
|
date_joined=django_timezone.now() - timedelta(days=random.randint(1, 365)),
|
|
)
|
|
user.set_password('Hospital@123')
|
|
user.save()
|
|
|
|
# Signal should have created Employee; now populate Employee fields
|
|
emp: Employee = user.employee_profile # created by signal
|
|
emp.tenant = tenant # ensure alignment
|
|
emp.first_name = first_name
|
|
emp.father_name = father_name
|
|
emp.grandfather_name = grandfather_name
|
|
emp.last_name = last_name
|
|
|
|
# Contact (E.164 KSA)
|
|
mobile = generate_saudi_mobile_e164()
|
|
emp.phone = mobile
|
|
emp.mobile_phone = mobile
|
|
emp.email = email
|
|
|
|
# Role/Org
|
|
emp.role = role
|
|
emp.department = random.choice(departments) if departments else None
|
|
emp.job_title = pick_job_title(role)
|
|
|
|
# License (only some roles)
|
|
if role in ['PHYSICIAN', 'NURSE', 'PHARMACIST', 'RADIOLOGIST']:
|
|
emp.license_number = generate_saudi_license()
|
|
emp.license_state = random.choice(SAUDI_PROVINCES)
|
|
emp.license_expiry_date = django_timezone.now().date() + timedelta(days=random.randint(365, 1095))
|
|
if role == 'PHYSICIAN':
|
|
# fictitious local analogue to NPI
|
|
emp.npi_number = f"SA{random.randint(1000000, 9999999)}"
|
|
|
|
# Preferences
|
|
emp.user_timezone = 'Asia/Riyadh'
|
|
emp.language = random.choice(['ar', 'en', 'ar_SA'])
|
|
emp.theme = random.choice([Employee.Theme.LIGHT, Employee.Theme.DARK, Employee.Theme.AUTO])
|
|
|
|
# Status / approval (approved later per-tenant)
|
|
emp.is_verified = True
|
|
emp.is_approved = True
|
|
emp.approval_date = django_timezone.now() - timedelta(days=random.randint(1, 180))
|
|
|
|
emp.save()
|
|
|
|
tenant_users.append(user)
|
|
all_users.append(user)
|
|
|
|
# Approval relationships: choose an approver among admins in this tenant
|
|
admin_users = [u for u in tenant_users if u.is_staff or u.is_superuser]
|
|
if admin_users:
|
|
approver = random.choice(admin_users)
|
|
for u in tenant_users:
|
|
if u != approver:
|
|
emp = u.employee_profile
|
|
emp.approved_by = approver
|
|
emp.save(update_fields=['approved_by'])
|
|
|
|
print(f"Created {len(tenant_users)} users for {tenant.name}")
|
|
|
|
return all_users
|
|
|
|
|
|
def create_saudi_two_factor_devices(users):
|
|
"""Create two-factor authentication devices for Saudi users"""
|
|
devices = []
|
|
|
|
device_types = ['TOTP', 'SMS', 'EMAIL']
|
|
device_names = {
|
|
'TOTP': ['Google Authenticator', 'Microsoft Authenticator', 'Authy', 'LastPass Authenticator'],
|
|
'SMS': ['Primary Mobile', 'Work Mobile', 'Emergency Contact'],
|
|
'EMAIL': ['Work Email', 'Personal Email', 'Backup Email']
|
|
}
|
|
|
|
for user in users:
|
|
if user.two_factor_enabled:
|
|
device_count = random.randint(1, 3)
|
|
emp = getattr(user, 'employee_profile', None)
|
|
|
|
for _ in range(device_count):
|
|
device_type = random.choice(device_types)
|
|
|
|
device_data = {
|
|
'user': user,
|
|
'device_id': uuid.uuid4(),
|
|
'name': random.choice(device_names[device_type]),
|
|
'device_type': device_type,
|
|
'is_active': True,
|
|
'is_verified': True,
|
|
'verified_at': django_timezone.now() - timedelta(days=random.randint(1, 30)),
|
|
'last_used_at': django_timezone.now() - timedelta(hours=random.randint(1, 168)),
|
|
'usage_count': random.randint(5, 100),
|
|
'created_at': django_timezone.now() - timedelta(days=random.randint(1, 60))
|
|
}
|
|
|
|
if device_type == 'TOTP':
|
|
device_data['secret_key'] = secrets.token_urlsafe(32)
|
|
elif device_type == 'SMS':
|
|
device_data['phone_number'] = emp.mobile_phone if emp else None
|
|
elif device_type == 'EMAIL':
|
|
device_data['email_address'] = emp.email if emp and emp.email else user.email
|
|
|
|
device = TwoFactorDevice.objects.create(**device_data)
|
|
devices.append(device)
|
|
|
|
print(f"Created {len(devices)} two-factor devices")
|
|
return devices
|
|
|
|
|
|
def create_saudi_social_accounts(users):
|
|
"""Create social authentication accounts for Saudi users"""
|
|
social_accounts = []
|
|
providers = ['GOOGLE', 'MICROSOFT', 'APPLE', 'LINKEDIN']
|
|
|
|
for user in users:
|
|
if random.choice([True, False, False, False]): # ~25% chance
|
|
provider = random.choice(providers)
|
|
display_name = user.get_full_name() or (user.employee_profile.get_display_name() if hasattr(user, 'employee_profile') else user.username)
|
|
|
|
social_account = SocialAccount.objects.create(
|
|
user=user,
|
|
provider=provider,
|
|
provider_id=f"{provider.lower()}_{random.randint(100000000, 999999999)}",
|
|
provider_email=user.email,
|
|
display_name=display_name,
|
|
profile_url=f"https://{provider.lower()}.com/profile/{user.username}",
|
|
avatar_url=f"https://{provider.lower()}.com/avatar/{user.username}.jpg",
|
|
access_token=secrets.token_urlsafe(64),
|
|
refresh_token=secrets.token_urlsafe(64),
|
|
token_expires_at=django_timezone.now() + timedelta(hours=1),
|
|
is_active=True,
|
|
created_at=django_timezone.now() - timedelta(days=random.randint(1, 180)),
|
|
last_login_at=django_timezone.now() - timedelta(hours=random.randint(1, 48))
|
|
)
|
|
social_accounts.append(social_account)
|
|
|
|
print(f"Created {len(social_accounts)} social accounts")
|
|
return social_accounts
|
|
|
|
|
|
def create_saudi_user_sessions(users):
|
|
"""Create user sessions for Saudi healthcare users"""
|
|
sessions = []
|
|
|
|
saudi_ips = [
|
|
'37.99.', '37.200.', '31.9.', '31.173.', '188.161.',
|
|
'185.84.', '188.245.', '217.9.', '82.205.', '5.63.'
|
|
]
|
|
browsers = [
|
|
'Chrome 120.0.0.0', 'Safari 17.1.2', 'Firefox 121.0.0', 'Edge 120.0.0.0',
|
|
'Chrome Mobile 120.0.0.0', 'Safari Mobile 17.1.2'
|
|
]
|
|
operating_systems = [
|
|
'Windows 11', 'Windows 10', 'macOS 14.0', 'iOS 17.1.2',
|
|
'Android 14', 'Ubuntu 22.04'
|
|
]
|
|
device_types = ['DESKTOP', 'MOBILE', 'TABLET']
|
|
login_methods = ['PASSWORD', 'TWO_FACTOR', 'SOCIAL', 'SSO']
|
|
|
|
for user in users:
|
|
session_count = random.randint(1, 5)
|
|
timeout_minutes = user.session_timeout_minutes or 30
|
|
|
|
for i in range(session_count):
|
|
ip_prefix = random.choice(saudi_ips)
|
|
ip_address = f"{ip_prefix}{random.randint(1, 255)}.{random.randint(1, 255)}"
|
|
|
|
session_start = django_timezone.now() - timedelta(hours=random.randint(1, 720))
|
|
is_active = (i == 0) and random.choice([True, True, False]) # recent likely active
|
|
|
|
session = UserSession.objects.create(
|
|
user=user,
|
|
session_key=f"session_{secrets.token_urlsafe(20)}",
|
|
session_id=uuid.uuid4(),
|
|
ip_address=ip_address,
|
|
user_agent=f"Mozilla/5.0 (compatible; HospitalSystem/1.0; {random.choice(browsers)})",
|
|
device_type=random.choice(device_types),
|
|
browser=random.choice(browsers),
|
|
operating_system=random.choice(operating_systems),
|
|
country='Saudi Arabia',
|
|
region=random.choice(SAUDI_PROVINCES),
|
|
city=random.choice(SAUDI_CITIES),
|
|
is_active=is_active,
|
|
login_method=random.choice(login_methods),
|
|
created_at=session_start,
|
|
last_activity_at=session_start + timedelta(minutes=random.randint(1, 480)),
|
|
expires_at=session_start + timedelta(minutes=timeout_minutes),
|
|
ended_at=None if is_active else session_start + timedelta(hours=random.randint(1, 8))
|
|
)
|
|
sessions.append(session)
|
|
|
|
print(f"Created {len(sessions)} user sessions")
|
|
return sessions
|
|
|
|
|
|
def create_saudi_password_history(users):
|
|
"""Create password history for Saudi users"""
|
|
password_history = []
|
|
passwords = ['Hospital@123', 'Medical@456', 'Health@789', 'Saudi@2024', 'Secure@Pass']
|
|
|
|
for user in users:
|
|
history_count = random.randint(1, 5)
|
|
for i in range(history_count):
|
|
password = random.choice(passwords)
|
|
history_entry = PasswordHistory.objects.create(
|
|
user=user,
|
|
password_hash=make_password(password),
|
|
created_at=django_timezone.now() - timedelta(days=random.randint(30 * i, 30 * (i + 1)))
|
|
)
|
|
password_history.append(history_entry)
|
|
|
|
print(f"Created {len(password_history)} password history entries")
|
|
return password_history
|
|
|
|
|
|
# -------------------------------
|
|
# Main
|
|
# -------------------------------
|
|
def main():
|
|
print("Starting Saudi Healthcare Accounts Data Generation...")
|
|
|
|
tenants = list(Tenant.objects.all())
|
|
if not tenants:
|
|
print("❌ No tenants found. Please seed core tenants first.")
|
|
return
|
|
|
|
print("\n1. Creating Saudi Healthcare Users (with Employee profiles)...")
|
|
users = create_saudi_users(tenants, users_per_tenant=40)
|
|
|
|
print("\n2. Creating Two-Factor Authentication Devices...")
|
|
devices = create_saudi_two_factor_devices(users)
|
|
|
|
print("\n3. Creating Social Authentication Accounts...")
|
|
social_accounts = create_saudi_social_accounts(users)
|
|
|
|
print("\n4. Creating User Sessions...")
|
|
sessions = create_saudi_user_sessions(users)
|
|
|
|
print("\n5. Creating Password History...")
|
|
password_history = create_saudi_password_history(users)
|
|
|
|
print(f"\n✅ Saudi Healthcare Accounts Data Generation Complete!")
|
|
print(f"📊 Summary:")
|
|
print(f" - Users: {len(users)}")
|
|
print(f" - Two-Factor Devices: {len(devices)}")
|
|
print(f" - Social Accounts: {len(social_accounts)}")
|
|
print(f" - User Sessions: {len(sessions)}")
|
|
print(f" - Password History Entries: {len(password_history)}")
|
|
|
|
role_counts = {}
|
|
for u in users:
|
|
role = u.employee_profile.role if hasattr(u, 'employee_profile') else 'UNKNOWN'
|
|
role_counts[role] = role_counts.get(role, 0) + 1
|
|
|
|
print(f"\n👥 User Role Distribution:")
|
|
for role, count in sorted(role_counts.items()):
|
|
print(f" - {role.replace('_', ' ').title()}: {count}")
|
|
|
|
return {
|
|
'users': users,
|
|
'devices': devices,
|
|
'social_accounts': social_accounts,
|
|
'sessions': sessions,
|
|
'password_history': password_history,
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |