agdar/core/management/commands/generate_test_data.py
Marwan Alwali a04817ef6e update
2025-11-02 19:25:08 +03:00

2275 lines
100 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Management command to generate Saudi-influenced test data for all apps.
This command creates realistic test data with Saudi cultural context including:
- Arabic names (both English and Arabic)
- Saudi phone numbers and national IDs
- Saudi cities and addresses
- SAR currency
- Saudi healthcare context
"""
import random
import secrets
from datetime import date, datetime, timedelta
from decimal import Decimal
from django.core.management.base import BaseCommand
from django.db import transaction
from django.utils import timezone
from faker import Faker
# Import all models
from core.models import (
Tenant, User, Patient, Clinic, File, SubFile, Consent, ConsentTemplate, Attachment, AuditLog,
SettingTemplate, TenantSetting
)
from appointments.models import (
Provider, Room, Schedule, Appointment, AppointmentReminder, AppointmentConfirmation
)
from finance.models import (
Service, Package, Payer, Invoice, InvoiceLineItem, Payment, PackagePurchase, CSID
)
from notifications.models import (
MessageTemplate, Message, NotificationPreference, MessageLog
)
from referrals.models import Referral, ReferralAutoRule
from integrations.models import (
ExternalOrder, NphiesMessage, NphiesEncounterLink, EInvoice, ZatcaCredential,
PayerContract
)
from medical.models import (
MedicalConsultation, MedicationPlan, MedicalFollowUp,
ConsultationResponse, ConsultationFeedback
)
from nursing.models import NursingEncounter, GrowthChart, VitalSignsAlert
from aba.models import ABAConsult, ABABehavior, ABAGoal, ABASession, ABASkillTarget
from ot.models import OTConsult, OTSession, OTTargetSkill, OTProgressReport
from slp.models import SLPConsult, SLPAssessment, SLPIntervention, SLPTarget, SLPProgressReport
from hr.models import Attendance, Schedule as HRSchedule, Holiday, LeaveRequest, LeaveBalance
from documents.models import DocumentTemplate, ClinicalNote, NoteAddendum, NoteAuditLog
# Saudi-specific data
SAUDI_MALE_NAMES_EN = [
'Mohammed', 'Abdullah', 'Fahad', 'Sultan', 'Khalid', 'Faisal', 'Saud', 'Turki',
'Abdulaziz', 'Abdulrahman', 'Omar', 'Ali', 'Hassan', 'Hussein', 'Saleh', 'Saad',
'Nasser', 'Mansour', 'Bandar', 'Nawaf', 'Talal', 'Waleed', 'Yazeed', 'Ziyad'
]
SAUDI_MALE_NAMES_AR = [
'محمد', 'عبدالله', 'فهد', 'سلطان', 'خالد', 'فيصل', 'سعود', 'تركي',
'عبدالعزيز', 'عبدالرحمن', 'عمر', 'علي', 'حسن', 'حسين', 'صالح', 'سعد',
'ناصر', 'منصور', 'بندر', 'نواف', 'طلال', 'وليد', 'يزيد', 'زياد'
]
SAUDI_FEMALE_NAMES_EN = [
'Noura', 'Fatima', 'Sarah', 'Maha', 'Hessa', 'Lama', 'Reem', 'Amal',
'Hind', 'Latifa', 'Munira', 'Abeer', 'Hanan', 'Jawaher', 'Lulwa', 'Mona',
'Nada', 'Rania', 'Salma', 'Wafa', 'Basma', 'Dalal', 'Ghada', 'Hayat'
]
SAUDI_FEMALE_NAMES_AR = [
'نورة', 'فاطمة', 'سارة', 'مها', 'حصة', 'لمى', 'ريم', 'أمل',
'هند', 'لطيفة', 'منيرة', 'عبير', 'حنان', 'جواهر', 'لولوة', 'منى',
'ندى', 'رانيا', 'سلمى', 'وفاء', 'بسمة', 'دلال', 'غادة', 'حياة'
]
SAUDI_FAMILY_NAMES_EN = [
'Al-Otaibi', 'Al-Ghamdi', 'Al-Qahtani', 'Al-Harbi', 'Al-Zahrani', 'Al-Shammari',
'Al-Dosari', 'Al-Mutairi', 'Al-Maliki', 'Al-Subai', 'Al-Rashidi', 'Al-Ahmadi',
'Al-Anzi', 'Al-Juaid', 'Al-Shahrani', 'Al-Yami', 'Al-Balawi', 'Al-Shamrani',
'Al-Dawsari', 'Al-Subaie', 'Al-Harthi', 'Al-Omari', 'Al-Shehri', 'Al-Asiri'
]
SAUDI_FAMILY_NAMES_AR = [
'العتيبي', 'الغامدي', 'القحطاني', 'الحربي', 'الزهراني', 'الشمري',
'الدوسري', 'المطيري', 'المالكي', 'السبيعي', 'الرشيدي', 'الأحمدي',
'العنزي', 'الجعيد', 'الشهراني', 'اليامي', 'البلوي', 'الشمراني',
'الدوسري', 'السبيعي', 'الحارثي', 'العمري', 'الشهري', 'العسيري'
]
SAUDI_CITIES = [
'Riyadh', 'Jeddah', 'Mecca', 'Medina', 'Dammam', 'Khobar', 'Dhahran',
'Taif', 'Buraidah', 'Tabuk', 'Khamis Mushait', 'Hail', 'Najran', 'Jubail',
'Abha', 'Yanbu', 'Al-Kharj', 'Al-Hasa', 'Qatif', 'Arar'
]
RIYADH_DISTRICTS = [
'Al-Olaya', 'Al-Malaz', 'Al-Naseem', 'Al-Rawdah', 'Al-Muruj', 'Al-Nakheel',
'Al-Sahafa', 'Al-Yasmin', 'Al-Narjis', 'Al-Aqiq', 'Granada', 'Hittin',
'King Fahd', 'Al-Wurud', 'Al-Hamra', 'Al-Sulimaniyah'
]
SAUDI_INSURANCE_COMPANIES = [
'Bupa Arabia', 'Tawuniya', 'Medgulf', 'Malath', 'Al-Rajhi Takaful',
'Saico', 'Walaa', 'Solidarity', 'Gulf Union', 'Salama'
]
SAUDI_MOBILE_PREFIXES = ['50', '53', '54', '55', '56', '57', '58', '59']
class SaudiDataGenerator:
"""Helper class to generate Saudi-specific data."""
def __init__(self):
self.fake = Faker(['ar_SA', 'en_US'])
self.fake_ar = Faker('ar_SA')
self.fake_en = Faker('en_US')
def saudi_phone(self):
"""Generate Saudi mobile number."""
prefix = random.choice(SAUDI_MOBILE_PREFIXES)
number = ''.join([str(random.randint(0, 9)) for _ in range(7)])
return f'+966{prefix}{number}'
def saudi_national_id(self):
"""Generate Saudi national ID (10 digits)."""
first_digit = random.choice(['1', '2']) # 1 for Saudi, 2 for resident
remaining = ''.join([str(random.randint(0, 9)) for _ in range(9)])
return f'{first_digit}{remaining}'
def saudi_vat_number(self):
"""Generate Saudi VAT number (15 digits, starts and ends with 3)."""
middle = ''.join([str(random.randint(0, 9)) for _ in range(13)])
return f'3{middle}3'
def saudi_name(self, sex='M'):
"""Generate Saudi name in both English and Arabic with father and grandfather names."""
if sex == 'M':
first_en = random.choice(SAUDI_MALE_NAMES_EN)
first_ar = random.choice(SAUDI_MALE_NAMES_AR)
else:
first_en = random.choice(SAUDI_FEMALE_NAMES_EN)
first_ar = random.choice(SAUDI_FEMALE_NAMES_AR)
# Father and grandfather names are typically male names
father_en = random.choice(SAUDI_MALE_NAMES_EN)
father_ar = random.choice(SAUDI_MALE_NAMES_AR)
grandfather_en = random.choice(SAUDI_MALE_NAMES_EN)
grandfather_ar = random.choice(SAUDI_MALE_NAMES_AR)
last_en = random.choice(SAUDI_FAMILY_NAMES_EN)
last_ar = random.choice(SAUDI_FAMILY_NAMES_AR)
return {
'first_en': first_en,
'first_ar': first_ar,
'father_en': father_en,
'father_ar': father_ar,
'grandfather_en': grandfather_en,
'grandfather_ar': grandfather_ar,
'last_en': last_en,
'last_ar': last_ar
}
def saudi_address(self):
"""Generate Saudi address."""
city = random.choice(SAUDI_CITIES)
if city == 'Riyadh':
district = random.choice(RIYADH_DISTRICTS)
else:
district = f'Al-{self.fake_en.word().title()}'
street = f'{self.fake_en.street_name()}'
building = random.randint(1, 9999)
postal_code = ''.join([str(random.randint(0, 9)) for _ in range(5)])
return {
'address': f'Building {building}, {street}, {district}',
'city': city,
'postal_code': postal_code
}
class Command(BaseCommand):
help = 'Generate Saudi-influenced test data for all apps'
def add_arguments(self, parser):
parser.add_argument(
'--tenants',
type=int,
default=1,
help='Number of tenants to create (default: 1)'
)
parser.add_argument(
'--patients',
type=int,
default=50,
help='Number of patients per tenant (default: 50)'
)
parser.add_argument(
'--appointments',
type=int,
default=100,
help='Number of appointments per tenant (default: 100)'
)
parser.add_argument(
'--clear',
action='store_true',
help='Clear existing data before generating new data'
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.generator = SaudiDataGenerator()
self.created_counts = {}
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('Starting Saudi-influenced test data generation...'))
if options['clear']:
self.stdout.write(self.style.WARNING('Clearing existing data...'))
self.clear_data()
try:
with transaction.atomic():
# Create superuser if it doesn't exist
self.create_superuser()
# Generate setting templates first (global, not tenant-specific)
self.generate_setting_templates()
# Generate data in order of dependencies
tenants = self.generate_tenants(options['tenants'])
for tenant in tenants:
self.stdout.write(f'\nGenerating data for tenant: {tenant.name}')
# Generate tenant settings
self.generate_tenant_settings(tenant)
# Generate consent templates
self.generate_consent_templates(tenant)
# Foundation layer
users = self.generate_users(tenant)
clinics = self.generate_clinics(tenant)
# Patient layer
patients = self.generate_patients(tenant, options['patients'])
# Scheduling layer
providers = self.generate_providers(tenant, users, clinics)
rooms = self.generate_rooms(tenant, clinics)
schedules = self.generate_schedules(providers)
appointments = self.generate_appointments(
tenant, patients, providers, clinics, rooms, options['appointments']
)
# Clinical layer
self.generate_clinical_data(tenant, patients, appointments, providers, users)
# Financial layer
services = self.generate_services(tenant, clinics)
packages = self.generate_packages(tenant, services)
self.generate_financial_data(tenant, patients, appointments, services, packages)
# Communication layer
self.generate_communication_data(tenant, patients, appointments)
# Integration layer
self.generate_integration_data(tenant, patients, appointments, clinics, users)
# HR layer
self.generate_hr_data(tenant, users)
# Documents layer
self.generate_documents_data(tenant, patients, appointments, users)
self.print_summary()
self.stdout.write(self.style.SUCCESS('\n✓ Test data generation completed successfully!'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'\n✗ Error generating data: {str(e)}'))
raise
def create_superuser(self):
"""Create superuser if it doesn't exist."""
username = 'admin'
email = 'marwan@tenhal.sa'
if User.objects.filter(username=username).exists():
self.stdout.write(f' Superuser "{username}" already exists, skipping...')
return
superuser = User.objects.create_superuser(
username=username,
email=email,
password='Kfsh&rc9788',
first_name='Marwan',
last_name='Alwali',
phone_number='+966501234567',
timezone='Asia/Riyadh',
email_verified=True,
preferences={
'language': 'en',
'notifications_enabled': True,
'theme': 'light'
}
)
self.stdout.write(self.style.SUCCESS(f' ✓ Created superuser: {username} ({email})'))
self.created_counts['superuser'] = 1
def clear_data(self):
"""Clear all existing data (except superusers)."""
models_to_clear = [
# Clear in reverse dependency order
NoteAuditLog, NoteAddendum, ClinicalNote, DocumentTemplate,
SLPProgressReport, SLPTarget, SLPIntervention, SLPAssessment, SLPConsult,
OTProgressReport, OTTargetSkill, OTSession, OTConsult,
ABASkillTarget, ABASession, ABAGoal, ABABehavior, ABAConsult,
VitalSignsAlert, GrowthChart, NursingEncounter,
ConsultationFeedback, ConsultationResponse, MedicalFollowUp, MedicationPlan, MedicalConsultation,
PayerContract, ZatcaCredential, EInvoice, NphiesEncounterLink, NphiesMessage, ExternalOrder,
ReferralAutoRule, Referral,
MessageLog, Message, NotificationPreference, MessageTemplate,
CSID, PackagePurchase, Payment, InvoiceLineItem, Invoice, Payer, Package, Service,
AppointmentConfirmation, AppointmentReminder, Appointment, Schedule, Room, Provider,
AuditLog, Attachment, Consent, SubFile, File, Patient, Clinic,
TenantSetting,
]
for model in models_to_clear:
count = model.objects.count()
model.objects.all().delete()
self.stdout.write(f' Cleared {count} {model.__name__} records')
# Clear non-superuser users and tenants
User.objects.filter(is_superuser=False).delete()
Tenant.objects.all().delete()
# Don't clear SettingTemplate as it's global configuration
def generate_setting_templates(self):
"""Generate setting templates (global configuration)."""
if SettingTemplate.objects.exists():
self.stdout.write(' Setting templates already exist, skipping...')
return
templates = [
# Basic Information
{
'key': 'basic_clinic_name_en',
'category': SettingTemplate.Category.BASIC,
'label_en': 'Clinic Name (English)',
'label_ar': 'اسم العيادة (إنجليزي)',
'data_type': SettingTemplate.DataType.STRING,
'is_required': True,
'order': 1,
},
{
'key': 'basic_clinic_name_ar',
'category': SettingTemplate.Category.BASIC,
'label_en': 'Clinic Name (Arabic)',
'label_ar': 'اسم العيادة (عربي)',
'data_type': SettingTemplate.DataType.STRING,
'is_required': True,
'order': 2,
},
# VAT Registration
{
'key': 'vat_registration_number',
'category': SettingTemplate.Category.VAT,
'label_en': 'VAT Registration Number',
'label_ar': 'رقم التسجيل الضريبي',
'data_type': SettingTemplate.DataType.STRING,
'is_required': True,
'validation_regex': r'^3\d{13}3$',
'help_text_en': '15 digits, must start and end with 3',
'help_text_ar': '15 رقم، يجب أن يبدأ وينتهي بـ 3',
'order': 1,
},
# Address Information
{
'key': 'address_street',
'category': SettingTemplate.Category.ADDRESS,
'label_en': 'Street Address',
'label_ar': 'عنوان الشارع',
'data_type': SettingTemplate.DataType.TEXT,
'is_required': True,
'order': 1,
},
{
'key': 'address_city',
'category': SettingTemplate.Category.ADDRESS,
'label_en': 'City',
'label_ar': 'المدينة',
'data_type': SettingTemplate.DataType.STRING,
'is_required': True,
'default_value': 'Riyadh',
'order': 2,
},
{
'key': 'address_postal_code',
'category': SettingTemplate.Category.ADDRESS,
'label_en': 'Postal Code',
'label_ar': 'الرمز البريدي',
'data_type': SettingTemplate.DataType.STRING,
'is_required': True,
'order': 3,
},
]
for template_data in templates:
SettingTemplate.objects.create(**template_data)
self.stdout.write(f' Created {len(templates)} setting templates')
def generate_tenant_settings(self, tenant):
"""Generate tenant-specific settings."""
templates = SettingTemplate.objects.filter(is_active=True)
for template in templates:
# Generate appropriate value based on template
if template.key == 'basic_clinic_name_en':
value = tenant.name
elif template.key == 'basic_clinic_name_ar':
value = tenant.settings.get('name_ar', '')
elif template.key == 'vat_registration_number':
value = tenant.vat_number
elif template.key == 'address_street':
value = tenant.address
elif template.key == 'address_city':
value = tenant.city
elif template.key == 'address_postal_code':
value = tenant.postal_code
else:
value = template.default_value
TenantSetting.objects.create(
tenant=tenant,
template=template,
value=value
)
self.stdout.write(f' Created {templates.count()} tenant settings')
def generate_consent_templates(self, tenant):
"""Generate consent templates for different consent types."""
templates_data = [
{
'consent_type': ConsentTemplate.ConsentType.GENERAL_TREATMENT,
'title_en': 'General Treatment Consent',
'title_ar': 'موافقة العلاج العامة',
'content_en': '''GENERAL TREATMENT CONSENT FORM
Patient Name: {patient_name}
Medical Record Number: {patient_mrn}
Date of Birth: {patient_dob}
Age: {patient_age}
Date: {date}
I, the undersigned, hereby consent to receive medical treatment and therapeutic services at {clinic_name}.
I understand that:
1. The treatment plan will be explained to me by the healthcare provider
2. I have the right to ask questions about my treatment
3. I can refuse treatment at any time
4. My medical information will be kept confidential
5. Treatment may involve some risks and side effects
I acknowledge that I have read and understood this consent form, and I voluntarily agree to the proposed treatment.
Signature: ___________________________
Name: {patient_name}
Date: {date}''',
'content_ar': '''نموذج موافقة العلاج العامة
اسم المريض: {patient_name}
رقم السجل الطبي: {patient_mrn}
تاريخ الميلاد: {patient_dob}
العمر: {patient_age}
التاريخ: {date}
أنا الموقع أدناه، أوافق بموجب هذا على تلقي العلاج الطبي والخدمات العلاجية في {clinic_name}.
أفهم أن:
١. سيتم شرح خطة العلاج لي من قبل مقدم الرعاية الصحية
٢. لدي الحق في طرح الأسئلة حول علاجي
٣. يمكنني رفض العلاج في أي وقت
٤. ستبقى معلوماتي الطبية سرية
٥. قد ينطوي العلاج على بعض المخاطر والآثار الجانبية
أقر بأنني قرأت وفهمت نموذج الموافقة هذا، وأوافق طوعاً على العلاج المقترح.
التوقيع: ___________________________
الاسم: {patient_name}
التاريخ: {date}''',
'version': 1
},
{
'consent_type': ConsentTemplate.ConsentType.SERVICE_SPECIFIC,
'title_en': 'Service-Specific Consent',
'title_ar': 'موافقة خاصة بالخدمة',
'content_en': '''SERVICE-SPECIFIC CONSENT FORM
Patient Name: {patient_name}
Medical Record Number: {patient_mrn}
Date: {date}
I consent to receive the following specific services:
- Occupational Therapy
- Speech-Language Therapy
- ABA Therapy
- Other therapeutic interventions as recommended
I understand that:
1. Each service has specific goals and objectives
2. Progress will be monitored and reported regularly
3. Services may be adjusted based on patient progress
4. I will be informed of any changes to the treatment plan
I give my consent for the healthcare team to provide these services.
Signature: ___________________________
Name: {patient_name}
Date: {date}''',
'content_ar': '''نموذج موافقة خاصة بالخدمة
اسم المريض: {patient_name}
رقم السجل الطبي: {patient_mrn}
التاريخ: {date}
أوافق على تلقي الخدمات المحددة التالية:
- العلاج الوظيفي
- علاج النطق واللغة
- علاج ABA
- تدخلات علاجية أخرى حسب التوصية
أفهم أن:
١. كل خدمة لها أهداف وغايات محددة
٢. سيتم مراقبة التقدم والإبلاغ عنه بانتظام
٣. قد يتم تعديل الخدمات بناءً على تقدم المريض
٤. سأبلغ بأي تغييرات في خطة العلاج
أعطي موافقتي لفريق الرعاية الصحية لتقديم هذه الخدمات.
التوقيع: ___________________________
الاسم: {patient_name}
التاريخ: {date}''',
'version': 1
},
{
'consent_type': ConsentTemplate.ConsentType.PHOTO_VIDEO,
'title_en': 'Photo/Video Consent',
'title_ar': 'موافقة التصوير الفوتوغرافي/الفيديو',
'content_en': '''PHOTO/VIDEO CONSENT FORM
Patient Name: {patient_name}
Medical Record Number: {patient_mrn}
Date: {date}
I consent to the use of photographs and/or video recordings of the patient for:
☐ Medical documentation and treatment planning
☐ Educational purposes (training healthcare professionals)
☐ Research purposes (with patient identity protected)
☐ Marketing and promotional materials (with patient identity protected)
I understand that:
1. Photos/videos will be stored securely
2. Patient identity will be protected when used for educational/research purposes
3. I can withdraw this consent at any time
4. Photos/videos will not be shared without proper authorization
Signature: ___________________________
Name: {patient_name}
Date: {date}''',
'content_ar': '''نموذج موافقة التصوير الفوتوغرافي/الفيديو
اسم المريض: {patient_name}
رقم السجل الطبي: {patient_mrn}
التاريخ: {date}
أوافق على استخدام الصور الفوتوغرافية و/أو تسجيلات الفيديو للمريض لـ:
☐ التوثيق الطبي وتخطيط العلاج
☐ الأغراض التعليمية (تدريب المهنيين الصحيين)
☐ أغراض البحث (مع حماية هوية المريض)
☐ المواد التسويقية والترويجية (مع حماية هوية المريض)
أفهم أن:
١. سيتم تخزين الصور/الفيديوهات بشكل آمن
٢. ستتم حماية هوية المريض عند استخدامها للأغراض التعليمية/البحثية
٣. يمكنني سحب هذه الموافقة في أي وقت
٤. لن تتم مشاركة الصور/الفيديوهات بدون تصريح مناسب
التوقيع: ___________________________
الاسم: {patient_name}
التاريخ: {date}''',
'version': 1
},
{
'consent_type': ConsentTemplate.ConsentType.DATA_SHARING,
'title_en': 'Data Sharing Consent',
'title_ar': 'موافقة مشاركة البيانات',
'content_en': '''DATA SHARING CONSENT FORM
Patient Name: {patient_name}
Medical Record Number: {patient_mrn}
Date: {date}
I consent to the sharing of patient medical information with:
☐ Other healthcare providers involved in patient care
☐ Insurance companies for billing purposes
☐ Educational institutions (with patient identity protected)
☐ Research institutions (with patient identity protected)
I understand that:
1. Data will be shared only for legitimate healthcare purposes
2. Patient confidentiality will be maintained
3. Data will be shared in compliance with privacy regulations
4. I can limit or withdraw this consent at any time
Signature: ___________________________
Name: {patient_name}
Date: {date}''',
'content_ar': '''نموذج موافقة مشاركة البيانات
اسم المريض: {patient_name}
رقم السجل الطبي: {patient_mrn}
التاريخ: {date}
أوافق على مشاركة المعلومات الطبية للمريض مع:
☐ مقدمي الرعاية الصحية الآخرين المشاركين في رعاية المريض
☐ شركات التأمين لأغراض الفوترة
☐ المؤسسات التعليمية (مع حماية هوية المريض)
☐ مؤسسات البحث (مع حماية هوية المريض)
أفهم أن:
١. سيتم مشاركة البيانات فقط لأغراض الرعاية الصحية المشروعة
٢. سيتم الحفاظ على سرية المريض
٣. سيتم مشاركة البيانات وفقاً لأنظمة الخصوصية
٤. يمكنني تقييد أو سحب هذه الموافقة في أي وقت
التوقيع: ___________________________
الاسم: {patient_name}
التاريخ: {date}''',
'version': 1
}
]
templates = []
for template_data in templates_data:
template = ConsentTemplate.objects.create(
tenant=tenant,
**template_data
)
templates.append(template)
self.created_counts.setdefault('consent_templates', 0)
self.created_counts['consent_templates'] += len(templates)
self.stdout.write(f' Created {len(templates)} consent templates')
return templates
def generate_tenants(self, count):
"""Generate tenant organizations."""
tenants = []
tenant_names = [
('Agdar Center', 'مركز أقدر', 'AGDAR'),
('Al-Noor Medical Center', 'مركز النور الطبي', 'ALNOOR'),
('Shifa Healthcare', 'شفاء للرعاية الصحية', 'SHIFA'),
]
for i in range(min(count, len(tenant_names))):
name_en, name_ar, code = tenant_names[i]
address_data = self.generator.saudi_address()
tenant = Tenant.objects.create(
name=name_en,
code=code,
vat_number=self.generator.saudi_vat_number(),
address=address_data['address'],
city=address_data['city'],
postal_code=address_data['postal_code'],
country_code='SA',
is_active=True,
settings={
'name_ar': name_ar,
'currency': 'SAR',
'timezone': 'Asia/Riyadh',
'language': 'ar'
}
)
tenants.append(tenant)
self.stdout.write(f' Created tenant: {tenant.name}')
self.created_counts['tenants'] = len(tenants)
return tenants
def generate_users(self, tenant):
"""Generate users with Saudi names."""
users = []
roles = [
(User.Role.ADMIN, 2),
(User.Role.DOCTOR, 3),
(User.Role.NURSE, 3),
(User.Role.OT, 2),
(User.Role.SLP, 2),
(User.Role.ABA, 2),
(User.Role.FRONT_DESK, 2),
(User.Role.FINANCE, 2),
]
for role, count in roles:
for i in range(count):
sex = random.choice(['M', 'F'])
name = self.generator.saudi_name(sex)
username = f"{name['first_en'].lower()}.{name['last_en'].lower()}.{role.lower()}{i+1}"
user = User.objects.create_user(
username=username,
email=f"{username}@{tenant.code.lower()}.sa",
password='test123',
tenant=tenant,
role=role,
first_name=name['first_en'],
last_name=name['last_en'],
phone_number=self.generator.saudi_phone(),
employee_id=f"E{random.randint(1000, 9999)}",
bio=f"Healthcare professional specializing in {role}",
timezone='Asia/Riyadh',
email_verified=random.choice([True, False]),
preferences={
'language': random.choice(['en', 'ar']),
'notifications_enabled': True,
'theme': 'light'
}
)
users.append(user)
self.created_counts.setdefault('users', 0)
self.created_counts['users'] += len(users)
self.stdout.write(f' Created {len(users)} users')
return users
def generate_clinics(self, tenant):
"""Generate clinics/departments."""
clinics_data = [
('Medical Department', 'القسم الطبي', Clinic.Specialty.MEDICAL, 'MED'),
('Nursing Department', 'قسم التمريض', Clinic.Specialty.NURSING, 'NUR'),
('ABA Therapy', 'علاج ABA', Clinic.Specialty.ABA, 'ABA'),
('Occupational Therapy', 'العلاج الوظيفي', Clinic.Specialty.OT, 'OT'),
('Speech-Language Pathology', 'علاج النطق واللغة', Clinic.Specialty.SLP, 'SLP'),
]
clinics = []
for name_en, name_ar, specialty, code in clinics_data:
clinic = Clinic.objects.create(
tenant=tenant,
name_en=name_en,
name_ar=name_ar,
specialty=specialty,
code=code,
is_active=True
)
clinics.append(clinic)
self.created_counts.setdefault('clinics', 0)
self.created_counts['clinics'] += len(clinics)
self.stdout.write(f' Created {len(clinics)} clinics')
return clinics
def generate_patients(self, tenant, count):
"""Generate patients with Saudi demographics."""
patients = []
for i in range(count):
sex = random.choice(['M', 'F'])
name = self.generator.saudi_name(sex)
address = self.generator.saudi_address()
# Generate realistic age distribution (more children for therapy center)
age_years = random.choices(
[random.randint(2, 12), random.randint(13, 18), random.randint(19, 60)],
weights=[60, 25, 15]
)[0]
dob = date.today() - timedelta(days=age_years * 365 + random.randint(0, 364))
# Generate MRN
mrn = f"{str(i+1).zfill(6)}"
patient = Patient.objects.create(
tenant=tenant,
mrn=mrn,
national_id=self.generator.saudi_national_id(),
first_name_en=name['first_en'],
father_name_en=name['father_en'],
grandfather_name_en=name['grandfather_en'],
last_name_en=name['last_en'],
first_name_ar=name['first_ar'],
father_name_ar=name['father_ar'],
grandfather_name_ar=name['grandfather_ar'],
last_name_ar=name['last_ar'],
date_of_birth=dob,
sex=sex,
phone=self.generator.saudi_phone(),
email=f"{name['first_en'].lower()}.{name['last_en'].lower()}@example.sa",
caregiver_name=f"{random.choice(SAUDI_MALE_NAMES_EN if sex == 'M' else SAUDI_FEMALE_NAMES_EN)} {name['last_en']}",
caregiver_phone=self.generator.saudi_phone(),
caregiver_relationship=random.choice(['Father', 'Mother', 'Guardian']),
address=address['address'],
city=address['city'],
postal_code=address['postal_code'],
emergency_contact=f"Emergency: {self.generator.saudi_phone()}"
)
# Create notification preferences
NotificationPreference.objects.create(
patient=patient,
sms_enabled=True,
whatsapp_enabled=random.choice([True, False]),
email_enabled=random.choice([True, False]),
preferred_language=random.choice(['en', 'ar']),
preferred_channel=random.choice([
Message.Channel.SMS,
Message.Channel.WHATSAPP
])
)
# Note: File is automatically created by signal, no need to create it here
patients.append(patient)
self.created_counts.setdefault('patients', 0)
self.created_counts['patients'] += len(patients)
self.stdout.write(f' Created {len(patients)} patients')
return patients
def generate_providers(self, tenant, users, clinics):
"""Generate provider profiles."""
providers = []
clinical_users = [u for u in users if u.role in [
User.Role.DOCTOR, User.Role.NURSE, User.Role.OT,
User.Role.SLP, User.Role.ABA
]]
for user in clinical_users:
# Map user role to clinic specialty
specialty_map = {
User.Role.DOCTOR: Clinic.Specialty.MEDICAL,
User.Role.NURSE: Clinic.Specialty.NURSING,
User.Role.OT: Clinic.Specialty.OT,
User.Role.SLP: Clinic.Specialty.SLP,
User.Role.ABA: Clinic.Specialty.ABA,
}
specialty = specialty_map.get(user.role)
relevant_clinics = [c for c in clinics if c.specialty == specialty]
provider = Provider.objects.create(
tenant=tenant,
user=user,
is_available=True,
max_daily_appointments=random.randint(15, 25)
)
if relevant_clinics:
provider.specialties.set(relevant_clinics)
providers.append(provider)
self.created_counts.setdefault('providers', 0)
self.created_counts['providers'] += len(providers)
self.stdout.write(f' Created {len(providers)} providers')
return providers
def generate_rooms(self, tenant, clinics):
"""Generate rooms for each clinic."""
rooms = []
for clinic in clinics:
for i in range(random.randint(2, 4)):
room = Room.objects.create(
tenant=tenant,
name=f"{clinic.name_en} Room {i+1}",
room_number=f"{clinic.code}-{i+1:02d}",
clinic=clinic,
is_available=True
)
rooms.append(room)
self.created_counts.setdefault('rooms', 0)
self.created_counts['rooms'] += len(rooms)
self.stdout.write(f' Created {len(rooms)} rooms')
return rooms
def generate_schedules(self, providers):
"""Generate weekly schedules for providers (Sunday-Thursday)."""
schedules = []
for provider in providers:
# Saudi work week: Sunday (0) to Thursday (4)
for day in range(5): # 0-4 (Sunday to Thursday)
# Morning shift
schedule = Schedule.objects.create(
provider=provider,
day_of_week=day,
start_time='08:00',
end_time='12:00',
slot_duration=30,
is_active=True
)
schedules.append(schedule)
# Afternoon shift (after prayer break)
schedule = Schedule.objects.create(
provider=provider,
day_of_week=day,
start_time='15:00',
end_time='19:00',
slot_duration=30,
is_active=True
)
schedules.append(schedule)
self.created_counts.setdefault('schedules', 0)
self.created_counts['schedules'] += len(schedules)
self.stdout.write(f' Created {len(schedules)} schedules')
return schedules
def generate_appointments(self, tenant, patients, providers, clinics, rooms, count):
"""Generate appointments with varied statuses."""
appointments = []
# Generate appointments over the past 3 months and next month
start_date = date.today() - timedelta(days=90)
end_date = date.today() + timedelta(days=30)
for i in range(count):
patient = random.choice(patients)
provider = random.choice(providers)
clinic = random.choice(list(provider.specialties.all())) if provider.specialties.exists() else random.choice(clinics)
room = random.choice([r for r in rooms if r.clinic == clinic]) if rooms else None
# Generate appointment date (weighted towards recent dates)
days_offset = random.choices(
range((end_date - start_date).days),
weights=[1.5 if i < 90 else 0.5 for i in range((end_date - start_date).days)]
)[0]
appt_date = start_date + timedelta(days=days_offset)
# Skip Fridays and Saturdays (Saudi weekend)
while appt_date.weekday() in [4, 5]: # Friday=4, Saturday=5
appt_date += timedelta(days=1)
# Generate time slot
hour = random.choice([8, 9, 10, 11, 15, 16, 17, 18])
minute = random.choice([0, 30])
appt_time = f"{hour:02d}:{minute:02d}"
# Determine status based on date
if appt_date < date.today():
status = random.choices(
[Appointment.Status.COMPLETED, Appointment.Status.NO_SHOW, Appointment.Status.CANCELLED],
weights=[75, 15, 10]
)[0]
elif appt_date == date.today():
status = random.choices(
[Appointment.Status.CONFIRMED, Appointment.Status.ARRIVED, Appointment.Status.IN_PROGRESS],
weights=[40, 35, 25]
)[0]
else:
status = random.choices(
[Appointment.Status.BOOKED, Appointment.Status.CONFIRMED],
weights=[30, 70]
)[0]
appointment = Appointment.objects.create(
tenant=tenant,
appointment_number=f"A{str(i+1).zfill(6)}",
patient=patient,
clinic=clinic,
provider=provider,
room=room,
service_type=random.choice([
'Initial Consultation', 'Follow-up', 'Therapy Session',
'Assessment', 'Progress Review'
]),
scheduled_date=appt_date,
scheduled_time=appt_time,
duration=random.choice([30, 45, 60]),
status=status,
finance_cleared=random.choice([True, False]) if status != Appointment.Status.CANCELLED else False,
consent_verified=random.choice([True, False]) if status != Appointment.Status.CANCELLED else False,
notes=random.choice(['', 'First visit', 'Regular session', 'Parent requested'])
)
# Add timestamps for completed appointments
if status in [Appointment.Status.COMPLETED, Appointment.Status.NO_SHOW]:
appt_datetime = datetime.combine(appt_date, datetime.strptime(appt_time, '%H:%M').time())
appointment.arrival_at = timezone.make_aware(appt_datetime)
if status == Appointment.Status.COMPLETED:
appointment.start_at = appointment.arrival_at + timedelta(minutes=5)
appointment.end_at = appointment.start_at + timedelta(minutes=appointment.duration)
appointment.save()
appointments.append(appointment)
self.created_counts.setdefault('appointments', 0)
self.created_counts['appointments'] += len(appointments)
self.stdout.write(f' Created {len(appointments)} appointments')
return appointments
def generate_clinical_data(self, tenant, patients, appointments, providers, users):
"""Generate clinical records for various specialties."""
# Sample of completed appointments for clinical data
completed_appointments = [a for a in appointments if a.status == Appointment.Status.COMPLETED]
if not completed_appointments:
return
# Generate medical consultations and related data
medical_providers = [p for p in providers if p.user.role == User.Role.DOCTOR]
if medical_providers:
consultations = []
for _ in range(min(20, len(completed_appointments))):
appt = random.choice(completed_appointments)
if not hasattr(appt, 'medical_consultations') or not appt.medical_consultations.exists():
consultation = MedicalConsultation.objects.create(
tenant=tenant,
patient=appt.patient,
appointment=appt,
consultation_date=appt.scheduled_date,
provider=random.choice(medical_providers).user,
chief_complaint=random.choice([
'Developmental delay concerns',
'Speech and language difficulties',
'Behavioral issues',
'Motor skill challenges'
]),
clinical_summary='Patient evaluated and treatment plan discussed.',
recommendations='Continue therapy sessions as scheduled.'
)
consultations.append(consultation)
# Generate medication plans for some consultations
for consultation in random.sample(consultations, min(10, len(consultations))):
MedicationPlan.objects.create(
consultation=consultation,
drug_name=random.choice(['Risperidone', 'Melatonin', 'Omega-3', 'Vitamin D']),
dose=random.choice(['2.5mg', '5mg', '10mg', '500mg']),
frequency=random.choice(list(MedicationPlan.Frequency.choices))[0],
compliance=random.choice(list(MedicationPlan.Compliance.choices))[0],
gains='Patient showing improvement',
side_effects='',
target_behavior='',
improved=random.choice([True, False])
)
# Generate consultation responses from other disciplines
for consultation in random.sample(consultations, min(8, len(consultations))):
response_type = random.choice(list(ConsultationResponse.ResponseType.choices))[0]
# Find a provider of the appropriate type
responder_role_map = {
'OT': User.Role.OT,
'SLP': User.Role.SLP,
'ABA': User.Role.ABA,
'NURSING': User.Role.NURSE,
}
responder_role = responder_role_map.get(response_type, User.Role.DOCTOR)
potential_responders = [u for u in users if u.role == responder_role]
if potential_responders:
ConsultationResponse.objects.create(
tenant=tenant,
consultation=consultation,
response_type=response_type,
responder=random.choice(potential_responders),
response_date=consultation.consultation_date + timedelta(days=random.randint(1, 7)),
assessment='Patient evaluated from ' + response_type + ' perspective',
recommendations='Continue coordinated care',
follow_up_needed=random.choice([True, False])
)
# Generate consultation feedback
for consultation in random.sample(consultations, min(12, len(consultations))):
feedback_type = random.choice(list(ConsultationFeedback.FeedbackType.choices))[0]
if feedback_type == 'FAMILY':
ConsultationFeedback.objects.create(
consultation=consultation,
feedback_type=feedback_type,
submitted_by_name=consultation.patient.caregiver_name or 'Family Member',
feedback_date=consultation.consultation_date + timedelta(days=random.randint(1, 14)),
satisfaction_rating=random.randint(3, 5),
communication_rating=random.randint(3, 5),
care_quality_rating=random.randint(3, 5),
comments='We are satisfied with the care provided',
concerns='',
suggestions=''
)
else:
ConsultationFeedback.objects.create(
consultation=consultation,
feedback_type=feedback_type,
submitted_by=random.choice(users),
feedback_date=consultation.consultation_date + timedelta(days=random.randint(1, 7)),
satisfaction_rating=random.randint(3, 5),
communication_rating=random.randint(3, 5),
care_quality_rating=random.randint(3, 5),
comments='Good interdisciplinary collaboration',
concerns='',
suggestions=''
)
# Generate follow-ups for some consultations
for consultation in random.sample(consultations, min(8, len(consultations))):
follow_up_date = consultation.consultation_date + timedelta(days=random.randint(14, 60))
# Create previous complaints status
previous_complaints_status = {}
if consultation.chief_complaint:
complaints = consultation.chief_complaint.split(',')[:3]
for complaint in complaints:
previous_complaints_status[complaint.strip()] = random.choice(['RESOLVED', 'STATIC', 'WORSE'])
MedicalFollowUp.objects.create(
tenant=tenant,
patient=consultation.patient,
previous_consultation=consultation,
followup_date=follow_up_date,
previous_complaints_status=previous_complaints_status,
new_complaints='Follow-up assessment',
assessment='Patient progress reviewed',
recommendations='Continue current treatment plan',
provider=consultation.provider,
family_satisfaction=random.choice(list(MedicalFollowUp.FamilySatisfaction.choices))[0]
)
self.created_counts.setdefault('medical_consultations', 0)
self.created_counts['medical_consultations'] += len(consultations)
# Generate nursing encounters and related data
nursing_providers = [p for p in providers if p.user.role == User.Role.NURSE]
if nursing_providers:
encounters = []
for _ in range(min(30, len(completed_appointments))):
appt = random.choice(completed_appointments)
encounter = NursingEncounter.objects.create(
tenant=tenant,
patient=appt.patient,
appointment=appt,
encounter_date=appt.scheduled_date,
filled_by=random.choice(nursing_providers).user,
height_cm=Decimal(str(random.uniform(80, 180))),
weight_kg=Decimal(str(random.uniform(15, 80))),
hr_bpm=random.randint(60, 100),
bp_systolic=random.randint(90, 130),
bp_diastolic=random.randint(60, 85),
temperature=Decimal(str(random.uniform(36.5, 37.5))),
spo2=random.randint(95, 100)
)
encounters.append(encounter)
# Generate growth charts for pediatric patients
pediatric_patients = [p for p in patients if p.age <= 18]
for patient in random.sample(pediatric_patients, min(15, len(pediatric_patients))):
patient_encounters = [e for e in encounters if e.patient == patient]
if patient_encounters:
encounter = random.choice(patient_encounters)
# Calculate age in months
from dateutil.relativedelta import relativedelta
age_delta = relativedelta(encounter.encounter_date, patient.date_of_birth)
age_months = age_delta.years * 12 + age_delta.months
GrowthChart.objects.create(
patient=patient,
nursing_encounter=encounter,
measurement_date=encounter.encounter_date,
age_months=age_months,
height_cm=encounter.height_cm,
weight_kg=encounter.weight_kg,
head_circumference_cm=Decimal(str(random.uniform(40, 55))) if patient.age <= 5 else None,
percentile_height=Decimal(str(random.uniform(10, 90))),
percentile_weight=Decimal(str(random.uniform(10, 90)))
)
# Generate vital signs alerts for abnormal readings
for encounter in random.sample(encounters, min(5, len(encounters))):
if encounter.hr_bpm and encounter.hr_bpm > 100:
VitalSignsAlert.objects.create(
tenant=tenant,
nursing_encounter=encounter,
vital_sign='Heart Rate',
value=f'{encounter.hr_bpm} bpm',
severity=random.choice(list(VitalSignsAlert.Severity.choices))[0],
status=random.choice(list(VitalSignsAlert.Status.choices))[0],
acknowledged_by=random.choice(nursing_providers).user if random.choice([True, False]) else None,
notes='Elevated heart rate detected'
)
elif encounter.bp_systolic and encounter.bp_systolic > 130:
VitalSignsAlert.objects.create(
tenant=tenant,
nursing_encounter=encounter,
vital_sign='Blood Pressure',
value=f'{encounter.bp_systolic}/{encounter.bp_diastolic} mmHg',
severity=random.choice(list(VitalSignsAlert.Severity.choices))[0],
status=random.choice(list(VitalSignsAlert.Status.choices))[0],
acknowledged_by=random.choice(nursing_providers).user if random.choice([True, False]) else None,
notes='Elevated blood pressure detected'
)
self.created_counts.setdefault('nursing_encounters', 0)
self.created_counts['nursing_encounters'] += len(encounters)
# Generate ABA consultations and related data
aba_providers = [p for p in providers if p.user.role == User.Role.ABA]
if aba_providers:
consults = []
for _ in range(min(15, len(completed_appointments))):
appt = random.choice(completed_appointments)
if appt.patient.age <= 18: # ABA typically for children
consult = ABAConsult.objects.create(
tenant=tenant,
patient=appt.patient,
appointment=appt,
consultation_date=appt.scheduled_date,
provider=random.choice(aba_providers).user,
reason_of_referral=random.choice(list(ABAConsult.ReasonOfReferral.choices))[0],
parental_concern='Concerns about behavioral development',
recommendations='Continue ABA therapy sessions'
)
consults.append(consult)
# Generate behaviors for ABA consults
behavior_types = ['Aggression', 'Self-injury', 'Tantrums', 'Non-compliance', 'Stereotypy']
for consult in consults:
for i in range(random.randint(1, 3)):
ABABehavior.objects.create(
consult=consult,
behavior_description=f'Observed {random.choice(behavior_types).lower()} behavior',
frequency=random.choice(list(ABABehavior.Frequency.choices))[0],
duration=random.choice(['5-10 minutes', '10-30 minutes', '30+ minutes']),
intensity=random.choice(list(ABABehavior.Intensity.choices))[0],
antecedents_likely='Various triggers observed',
antecedents_least_likely='Structured activities',
consequences='Intervention applied',
order=i
)
# Generate goals for ABA consults
goal_descriptions = [
'Improve communication skills',
'Enhance social interaction',
'Develop self-care abilities',
'Increase academic engagement',
'Improve play skills'
]
for consult in consults:
for _ in range(random.randint(1, 2)):
ABAGoal.objects.create(
consult=consult,
goal_description=random.choice(goal_descriptions),
target_date=consult.consultation_date + timedelta(days=random.randint(30, 90)),
status=random.choice(list(ABAGoal.Status.choices))[0],
progress_notes='Progress being monitored'
)
# Generate ABA sessions
sessions = []
for _ in range(min(20, len(completed_appointments))):
appt = random.choice(completed_appointments)
if appt.patient.age <= 18: # ABA typically for children
session = ABASession.objects.create(
tenant=tenant,
patient=appt.patient,
appointment=appt,
session_date=appt.scheduled_date,
provider=random.choice(aba_providers).user,
session_type=random.choice(list(ABASession.SessionType.choices))[0],
duration_minutes=random.choice([30, 45, 60]),
engagement_level=random.randint(1, 5),
cooperation_level=random.randint(1, 5),
target_behaviors='Reduce tantrums, Increase eye contact, Improve turn-taking',
interventions_used='Positive reinforcement, Visual schedules, Token economy',
activities_performed='Structured play activities, social skills practice',
behavioral_observations='Patient showed good engagement during session',
progress_notes='Patient making progress on targeted behaviors',
recommendations='Continue current intervention strategies',
home_program='Practice turn-taking during family activities'
)
sessions.append(session)
# Generate skill targets for ABA sessions
skill_names = [
'Follows one-step directions',
'Makes eye contact when called',
'Shares toys with peers',
'Waits for turn',
'Uses words to request',
'Identifies emotions',
'Completes simple tasks',
'Responds to name'
]
for session in sessions:
for i in range(random.randint(2, 4)):
trials_total = random.randint(5, 20)
trials_correct = random.randint(0, trials_total)
# Determine mastery level based on success rate
success_rate = (trials_correct / trials_total * 100) if trials_total > 0 else 0
if success_rate == 0:
mastery_level = 'NOT_STARTED'
elif success_rate <= 25:
mastery_level = 'EMERGING'
elif success_rate <= 50:
mastery_level = 'DEVELOPING'
elif success_rate <= 75:
mastery_level = 'PROGRESSING'
else:
mastery_level = 'MASTERED'
ABASkillTarget.objects.create(
session=session,
skill_name=random.choice(skill_names),
mastery_level=mastery_level,
trials_correct=trials_correct,
trials_total=trials_total,
notes='Progress tracked during session',
order=i
)
self.created_counts.setdefault('aba_consults', 0)
self.created_counts['aba_consults'] += len(consults)
self.created_counts.setdefault('aba_sessions', 0)
self.created_counts['aba_sessions'] += len(sessions)
# Generate OT consultations and related data
ot_providers = [p for p in providers if p.user.role == User.Role.OT]
if ot_providers:
consults = []
sessions = []
# Generate OT consults
for _ in range(min(12, len(completed_appointments))):
appt = random.choice(completed_appointments)
consult = OTConsult.objects.create(
tenant=tenant,
patient=appt.patient,
appointment=appt,
consultation_date=appt.scheduled_date,
provider=random.choice(ot_providers).user,
reasons='Motor skill development concerns, sensory processing difficulties',
top_difficulty_areas='Fine motor skills, gross motor coordination, self-care activities',
recommendation=random.choice(list(OTConsult.Recommendation.choices))[0],
recommendation_notes='Weekly OT sessions recommended'
)
consults.append(consult)
# Generate OT sessions
for _ in range(min(15, len(completed_appointments))):
appt = random.choice(completed_appointments)
session = OTSession.objects.create(
tenant=tenant,
patient=appt.patient,
appointment=appt,
session_date=appt.scheduled_date,
provider=random.choice(ot_providers).user,
session_type=random.choice(list(OTSession.SessionType.choices))[0],
cooperative_level=random.randint(1, 4),
distraction_tolerance=random.randint(1, 4),
observations='Patient showed good progress',
recommendations='Continue current treatment plan'
)
sessions.append(session)
# Generate target skills for sessions
skill_names = [
'Pencil grasp', 'Scissor skills', 'Ball throwing', 'Balance beam walking',
'Sensory tolerance', 'Dressing skills', 'Feeding independence', 'Visual tracking'
]
for session in sessions:
for i in range(random.randint(1, 3)):
OTTargetSkill.objects.create(
session=session,
skill_name=random.choice(skill_names),
score=random.randint(0, 10),
notes='Progress noted during session',
order=i
)
# Generate progress reports
for patient in random.sample([s.patient for s in sessions], min(8, len(set([s.patient for s in sessions])))):
patient_sessions = [s for s in sessions if s.patient == patient]
if patient_sessions:
OTProgressReport.objects.create(
tenant=tenant,
patient=patient,
report_date=patient_sessions[-1].session_date,
provider=patient_sessions[-1].provider,
sessions_scheduled=len(patient_sessions) + random.randint(0, 2),
sessions_attended=len(patient_sessions),
goals_progress='Patient showing improvement in targeted skills',
overall_progress='Patient has shown consistent progress',
recommendations='Continue current treatment plan',
continue_treatment=True
)
self.created_counts.setdefault('ot_consults', 0)
self.created_counts['ot_consults'] += len(consults)
self.created_counts.setdefault('ot_sessions', 0)
self.created_counts['ot_sessions'] += len(sessions)
# Generate SLP consultations and related data
slp_providers = [p for p in providers if p.user.role == User.Role.SLP]
if slp_providers:
consults = []
assessments = []
interventions = []
# Generate SLP consults
for _ in range(min(12, len(completed_appointments))):
appt = random.choice(completed_appointments)
consult = SLPConsult.objects.create(
tenant=tenant,
patient=appt.patient,
appointment=appt,
consultation_date=appt.scheduled_date,
provider=random.choice(slp_providers).user,
consult_variant=random.choice(list(SLPConsult.ConsultVariant.choices))[0],
primary_concern='Speech and language delay concerns',
suspected_areas='Articulation, language comprehension',
type_of_service=random.choice(list(SLPConsult.ServiceType.choices))[0],
recommendations='Speech therapy sessions recommended'
)
consults.append(consult)
# Generate assessments for consults
for consult in consults:
assessment = SLPAssessment.objects.create(
tenant=tenant,
patient=consult.patient,
appointment=consult.appointment,
assessment_date=consult.consultation_date,
provider=consult.provider,
diagnosis_statement='Speech and language disorder',
case_history='Patient history documented',
clinical_summary='Assessment findings documented',
recommendations='Therapy plan developed'
)
assessments.append(assessment)
# Generate interventions
for _ in range(min(15, len(completed_appointments))):
appt = random.choice(completed_appointments)
intervention = SLPIntervention.objects.create(
tenant=tenant,
patient=appt.patient,
appointment=appt,
session_number=random.randint(1, 20),
session_date=appt.scheduled_date,
session_time=appt.scheduled_time,
provider=random.choice(slp_providers).user
)
interventions.append(intervention)
# Generate targets for interventions
for intervention in interventions:
for target_num in range(1, random.randint(2, 3)): # 1-2 targets
SLPTarget.objects.create(
intervention=intervention,
target_number=target_num,
subjective='Patient engaged during session',
objective='Achieved 70% accuracy on target',
assessment='Good progress noted',
plan='Continue current approach',
prompt_strategies='Visual cues, verbal prompts'
)
# Generate progress reports
for patient in random.sample([i.patient for i in interventions], min(8, len(set([i.patient for i in interventions])))):
patient_interventions = [i for i in interventions if i.patient == patient]
if patient_interventions:
SLPProgressReport.objects.create(
tenant=tenant,
patient=patient,
report_date=patient_interventions[-1].session_date,
provider=patient_interventions[-1].provider,
sessions_scheduled=len(patient_interventions) + random.randint(0, 2),
sessions_attended=len(patient_interventions),
final_diagnosis='Speech and language disorder',
objectives_progress='Patient showing improvement in targeted areas',
overall_progress='Patient has made progress in therapy',
recommendations='Continue speech therapy sessions',
package_sessions_count=random.choice([10, 20, 30])
)
self.created_counts.setdefault('slp_consults', 0)
self.created_counts['slp_consults'] += len(consults)
self.created_counts.setdefault('slp_interventions', 0)
self.created_counts['slp_interventions'] += len(interventions)
self.stdout.write(f' Created clinical records for all specialties')
def generate_services(self, tenant, clinics):
"""Generate billable services."""
services = []
service_data = [
('Initial Consultation', 'استشارة أولية', 300, 60),
('Follow-up Visit', 'زيارة متابعة', 200, 30),
('Therapy Session', 'جلسة علاجية', 250, 45),
('Assessment', 'تقييم', 400, 90),
('Progress Review', 'مراجعة التقدم', 150, 30),
]
for clinic in clinics:
for name_en, name_ar, price, duration in service_data:
service = Service.objects.create(
tenant=tenant,
code=f"{clinic.code}-{name_en.replace(' ', '').upper()[:6]}",
name_en=f"{clinic.name_en} - {name_en}",
name_ar=f"{clinic.name_ar} - {name_ar}",
clinic=clinic,
base_price=price,
duration_minutes=duration,
is_active=True
)
services.append(service)
self.created_counts.setdefault('services', 0)
self.created_counts['services'] += len(services)
self.stdout.write(f' Created {len(services)} services')
return services
def generate_packages(self, tenant, services):
"""Generate service packages."""
packages = []
# Group services by clinic
from itertools import groupby
services_by_clinic = {}
for service in sorted(services, key=lambda s: s.clinic.code):
clinic_code = service.clinic.code
if clinic_code not in services_by_clinic:
services_by_clinic[clinic_code] = []
services_by_clinic[clinic_code].append(service)
for clinic_code, clinic_services in services_by_clinic.items():
if len(clinic_services) >= 2:
package = Package.objects.create(
tenant=tenant,
name_en=f"{clinic_services[0].clinic.name_en} - 10 Session Package",
name_ar=f"{clinic_services[0].clinic.name_ar} - باقة 10 جلسات",
total_sessions=10,
price=2000, # Discounted from 2500
validity_days=90,
is_active=True
)
package.services.set(clinic_services[:2])
packages.append(package)
self.created_counts.setdefault('packages', 0)
self.created_counts['packages'] += len(packages)
self.stdout.write(f' Created {len(packages)} packages')
return packages
def generate_financial_data(self, tenant, patients, appointments, services, packages):
"""Generate invoices, payments, CSID records, and package purchases."""
# Create payers for some patients
payers = []
for patient in random.sample(patients, min(30, len(patients))):
payer_type = random.choice([Payer.PayerType.SELF, Payer.PayerType.INSURANCE])
if payer_type == Payer.PayerType.INSURANCE:
payer = Payer.objects.create(
tenant=tenant,
patient=patient,
name=random.choice(SAUDI_INSURANCE_COMPANIES),
payer_type=payer_type,
policy_number=f"P{random.randint(100000, 999999)}",
coverage_percentage=random.choice([50, 70, 80, 100]),
is_active=True
)
else:
payer = Payer.objects.create(
tenant=tenant,
patient=patient,
name='Self Pay',
payer_type=payer_type,
coverage_percentage=0,
is_active=True
)
payers.append(payer)
# Create CSID records for tenant
CSID.objects.create(
tenant=tenant,
csid_type=CSID.CSIDType.PRODUCTION,
certificate='SAMPLE_CERTIFICATE_BASE64',
secret='SAMPLE_SECRET',
egs_serial_number=f'1-{tenant.code}|2-EGS|3-{random.randint(1000, 9999)}',
common_name=f'{tenant.name} EGS Unit',
organization_unit=tenant.name,
issue_date=timezone.now() - timedelta(days=30),
expiry_date=timezone.now() + timedelta(days=335),
status=CSID.Status.ACTIVE
)
# Create invoices for completed appointments
completed_appointments = [a for a in appointments if a.status == Appointment.Status.COMPLETED]
invoice_counter = 1
for appt in random.sample(completed_appointments, min(50, len(completed_appointments))):
service = random.choice([s for s in services if s.clinic == appt.clinic])
# Determine invoice type based on patient nationality
invoice_type = Invoice.InvoiceType.SIMPLIFIED if appt.patient.national_id.startswith('1') else Invoice.InvoiceType.STANDARD
invoice = Invoice.objects.create(
tenant=tenant,
invoice_number=f"I{tenant.code}{random.randint(100000, 999999)}",
invoice_counter=invoice_counter,
invoice_type=invoice_type,
patient=appt.patient,
appointment=appt,
issue_date=appt.scheduled_date,
due_date=appt.scheduled_date + timedelta(days=30),
subtotal=service.base_price,
tax=service.base_price * Decimal('0.15') if not appt.patient.national_id.startswith('1') else Decimal('0'),
total=service.base_price * Decimal('1.15') if not appt.patient.national_id.startswith('1') else service.base_price,
status=random.choice([
Invoice.Status.PAID,
Invoice.Status.ISSUED,
Invoice.Status.PARTIALLY_PAID
])
)
invoice_counter += 1
# Create line item
InvoiceLineItem.objects.create(
invoice=invoice,
service=service,
description=service.name_en,
quantity=1,
unit_price=service.base_price,
total=service.base_price
)
# Create payment if paid
if invoice.status in [Invoice.Status.PAID, Invoice.Status.PARTIALLY_PAID]:
payment_amount = invoice.total if invoice.status == Invoice.Status.PAID else invoice.total * Decimal('0.5')
Payment.objects.create(
tenant=tenant,
invoice=invoice,
payment_date=timezone.now(),
amount=payment_amount,
method=random.choice(list(Payment.PaymentMethod.choices))[0],
status=Payment.Status.COMPLETED,
transaction_id=f"T{random.randint(100000, 999999)}"
)
# Create package purchases for some patients
for patient in random.sample(patients, min(15, len(patients))):
if packages:
package = random.choice(packages)
purchase_date = date.today() - timedelta(days=random.randint(0, 60))
sessions_used = random.randint(0, package.total_sessions)
PackagePurchase.objects.create(
tenant=tenant,
patient=patient,
package=package,
purchase_date=purchase_date,
expiry_date=purchase_date + timedelta(days=package.validity_days),
total_sessions=package.total_sessions,
sessions_used=sessions_used,
status=random.choice(list(PackagePurchase.Status.choices))[0]
)
self.stdout.write(f' Created financial records including CSID')
def generate_communication_data(self, tenant, patients, appointments):
"""Generate message templates, messages, reminders, and confirmations."""
# Create message templates
templates_data = [
('appointment_reminder', 'Appointment Reminder', MessageTemplate.Channel.SMS,
'Dear {patient_name}, this is a reminder for your appointment on {appointment_date} at {appointment_time}.',
'عزيزي {patient_name}، هذا تذكير بموعدك في {appointment_date} الساعة {appointment_time}.'),
('appointment_confirmation', 'Appointment Confirmation', MessageTemplate.Channel.SMS,
'Your appointment has been confirmed for {appointment_date} at {appointment_time}.',
'تم تأكيد موعدك في {appointment_date} الساعة {appointment_time}.'),
]
for code, name, channel, body_en, body_ar in templates_data:
MessageTemplate.objects.create(
tenant=tenant,
code=code,
name=name,
channel=channel,
body_en=body_en,
body_ar=body_ar,
variables=['patient_name', 'appointment_date', 'appointment_time'],
is_active=True
)
# Create messages and message logs for upcoming appointments
upcoming_appointments = [a for a in appointments if a.scheduled_date >= date.today()]
for appt in random.sample(upcoming_appointments, min(20, len(upcoming_appointments))):
message = Message.objects.create(
tenant=tenant,
channel=Message.Channel.SMS,
recipient=str(appt.patient.phone),
body=f"Reminder: Your appointment is on {appt.scheduled_date} at {appt.scheduled_time}",
status=random.choice([Message.Status.SENT, Message.Status.DELIVERED]),
sent_at=timezone.now()
)
# Create message log
MessageLog.objects.create(
message=message,
event_type=random.choice(list(MessageLog.EventType.choices))[0],
details={'status': 'sent', 'provider': 'SMS Gateway'}
)
# Create appointment reminders for upcoming appointments
for appt in random.sample(upcoming_appointments, min(30, len(upcoming_appointments))):
AppointmentReminder.objects.create(
appointment=appt,
reminder_type=random.choice(list(AppointmentReminder.ReminderType.choices))[0],
scheduled_for=timezone.now() - timedelta(hours=random.randint(1, 48)),
sent_at=timezone.now() if random.choice([True, False]) else None,
status=random.choice(list(AppointmentReminder.Status.choices))[0]
)
# Create appointment confirmations for confirmed appointments
confirmed_appointments = [a for a in appointments if a.status == Appointment.Status.CONFIRMED]
for appt in random.sample(confirmed_appointments, min(25, len(confirmed_appointments))):
AppointmentConfirmation.objects.create(
appointment=appt,
token=secrets.token_urlsafe(32),
status=AppointmentConfirmation.Status.CONFIRMED,
confirmation_method=random.choice(list(AppointmentConfirmation.ConfirmationMethod.choices))[0],
confirmed_at=timezone.now() - timedelta(hours=random.randint(1, 72)),
expires_at=timezone.now() + timedelta(days=7),
sent_at=timezone.now() - timedelta(hours=random.randint(24, 96))
)
self.stdout.write(f' Created communication records')
def generate_integration_data(self, tenant, patients, appointments, clinics, users):
"""Generate referrals, integration records, and core models."""
completed_appointments = [a for a in appointments if a.status == Appointment.Status.COMPLETED]
# Create referrals
referrals = []
for _ in range(min(10, len(completed_appointments))):
appt = random.choice(completed_appointments)
from_clinic = appt.clinic
to_clinics = Clinic.objects.filter(tenant=tenant).exclude(id=from_clinic.id)
if to_clinics.exists():
to_clinic = random.choice(to_clinics)
referral = Referral.objects.create(
tenant=tenant,
patient=appt.patient,
from_clinic=from_clinic,
from_discipline=from_clinic.specialty,
from_provider=appt.provider.user,
to_clinic=to_clinic,
to_discipline=to_clinic.specialty,
reason='Patient requires additional specialized care',
urgency=random.choice(list(Referral.Urgency.choices))[0],
status=random.choice(list(Referral.Status.choices))[0]
)
referrals.append(referral)
# Create referral auto rules
for clinic in clinics:
to_clinics = Clinic.objects.filter(tenant=tenant).exclude(id=clinic.id)
if to_clinics.exists():
ReferralAutoRule.objects.create(
tenant=tenant,
name=f"Auto-refer from {clinic.name_en}",
description=f'Automatic referral rule for {clinic.name_en}',
trigger_clinic=clinic,
target_clinic=random.choice(to_clinics),
trigger_keywords=['autism', 'delay', 'disorder'],
urgency=random.choice(list(Referral.Urgency.choices))[0],
auto_create=random.choice([True, False]),
is_active=random.choice([True, False])
)
# Create consents for patients
for patient in random.sample(patients, min(25, len(patients))):
for _ in range(random.randint(1, 2)):
Consent.objects.create(
tenant=tenant,
patient=patient,
consent_type=random.choice(list(Consent.ConsentType.choices))[0],
content_text='Standard consent form content',
signed_by_name=patient.caregiver_name or patient.full_name_en,
signed_by_relationship=patient.caregiver_relationship or 'Self',
signed_at=timezone.now() - timedelta(days=random.randint(0, 180)),
signature_method=random.choice(list(Consent.SignatureMethod.choices))[0],
is_active=True
)
# Create audit logs for various actions using GenericForeignKey
from django.contrib.contenttypes.models import ContentType
patient_ct = ContentType.objects.get_for_model(Patient)
for _ in range(50):
AuditLog.objects.create(
tenant=tenant,
content_type=patient_ct,
object_id=random.choice(patients).id,
action=random.choice(list(AuditLog.Action.choices))[0],
user=random.choice(users),
changes={'action': 'performed on patient record'},
ip_address=f'192.168.1.{random.randint(1, 255)}'
)
# Create some external orders (lab/radiology)
for _ in range(min(10, len(completed_appointments))):
appt = random.choice(completed_appointments)
ExternalOrder.objects.create(
tenant=tenant,
patient=appt.patient,
order_type=random.choice(list(ExternalOrder.OrderType.choices))[0],
order_details={
'tests': ['CBC', 'Metabolic Panel'] if random.choice([True, False]) else ['X-Ray Chest'],
'notes': 'Routine screening'
},
status=random.choice(list(ExternalOrder.Status.choices))[0],
ordered_by=appt.provider.user,
ordered_at=timezone.now() - timedelta(days=random.randint(1, 30))
)
# Create PayerContract for insurance companies
for insurance_company in SAUDI_INSURANCE_COMPANIES[:5]:
PayerContract.objects.create(
tenant=tenant,
payer_code=f"INS{random.randint(1000, 9999)}",
payer_name=insurance_company,
credentials={'api_key': 'SAMPLE_KEY', 'api_secret': 'SAMPLE_SECRET'},
endpoints={
'eligibility': 'https://api.example.com/eligibility',
'claims': 'https://api.example.com/claims'
},
supports_eligibility=True,
supports_prior_auth=True,
supports_claims=True,
is_active=True
)
self.stdout.write(f' Created integration and core records including PayerContract')
def generate_hr_data(self, tenant, users):
"""Generate HR data including attendance, schedules, holidays, leave requests, and leave balances."""
# Generate HR schedules for employees (Sunday-Thursday work week)
hr_schedules = []
for user in users:
# Saudi work week: Sunday to Thursday
for day in ['SUN', 'MON', 'TUE', 'WED', 'THU']:
hr_schedule = HRSchedule.objects.create(
tenant=tenant,
employee=user,
day_of_week=day,
start_time='08:00',
end_time='17:00',
is_active=True
)
hr_schedules.append(hr_schedule)
self.created_counts.setdefault('hr_schedules', 0)
self.created_counts['hr_schedules'] += len(hr_schedules)
# Generate attendance records for the past 30 days
attendances = []
for user in users:
for days_ago in range(30):
attendance_date = date.today() - timedelta(days=days_ago)
# Skip Fridays and Saturdays (Saudi weekend)
if attendance_date.weekday() in [4, 5]:
continue
# Random attendance status
status = random.choices(
[Attendance.Status.PRESENT, Attendance.Status.LATE, Attendance.Status.ABSENT,
Attendance.Status.HALF_DAY, Attendance.Status.LEAVE],
weights=[70, 10, 5, 10, 5]
)[0]
if status not in [Attendance.Status.ABSENT, Attendance.Status.LEAVE]:
from datetime import time
# Determine check-in time based on status
if status == Attendance.Status.LATE:
check_in_hour = random.choice([8, 9])
check_in_minute = random.choice([15, 30, 45])
else:
check_in_hour = 8
check_in_minute = random.choice([0, 5, 10])
# Determine check-out time based on status
if status == Attendance.Status.HALF_DAY:
check_out_hour = random.choice([12, 13])
check_out_minute = random.choice([0, 30])
else:
check_out_hour = random.choice([16, 17, 18])
check_out_minute = random.choice([0, 15, 30, 45])
attendance = Attendance.objects.create(
tenant=tenant,
employee=user,
date=attendance_date,
check_in=time(check_in_hour, check_in_minute),
check_out=time(check_out_hour, check_out_minute),
status=status,
notes='' if status == Attendance.Status.PRESENT else f'{status} - recorded'
)
# hours_worked is automatically calculated in the model's save method
else:
# For ABSENT or LEAVE status, no check-in/check-out times
attendance = Attendance.objects.create(
tenant=tenant,
employee=user,
date=attendance_date,
status=status,
notes='Absent' if status == Attendance.Status.ABSENT else 'On approved leave'
)
attendances.append(attendance)
self.created_counts.setdefault('attendances', 0)
self.created_counts['attendances'] += len(attendances)
# Generate holidays
holidays_data = [
('Saudi National Day', date(date.today().year, 9, 23), True),
('Founding Day', date(date.today().year, 2, 22), True),
('Eid Al-Fitr', date.today() + timedelta(days=random.randint(30, 90)), False),
('Eid Al-Adha', date.today() + timedelta(days=random.randint(120, 180)), False),
]
holidays = []
for name, holiday_date, is_recurring in holidays_data:
holiday = Holiday.objects.create(
tenant=tenant,
name=name,
date=holiday_date,
is_recurring=is_recurring,
description=f'{name} - Public holiday'
)
holidays.append(holiday)
self.created_counts.setdefault('holidays', 0)
self.created_counts['holidays'] += len(holidays)
# Generate leave requests
leave_requests = []
current_year = date.today().year
for user in random.sample(users, min(10, len(users))):
for _ in range(random.randint(1, 3)):
leave_type = random.choice(list(LeaveRequest.LeaveType.choices))[0]
start_date = date.today() + timedelta(days=random.randint(-60, 60))
days_requested = random.randint(1, 7)
end_date = start_date + timedelta(days=days_requested - 1)
# Determine status based on date
if start_date < date.today():
status = random.choice([
LeaveRequest.Status.APPROVED,
LeaveRequest.Status.REJECTED
])
else:
status = random.choice([
LeaveRequest.Status.PENDING,
LeaveRequest.Status.APPROVED
])
leave_request = LeaveRequest.objects.create(
tenant=tenant,
employee=user,
leave_type=leave_type,
start_date=start_date,
end_date=end_date,
days_requested=days_requested,
reason=f'Request for {leave_type.lower()} leave',
status=status
)
# Add reviewer info if approved or rejected
if status in [LeaveRequest.Status.APPROVED, LeaveRequest.Status.REJECTED]:
admin_users = [u for u in users if u.role == User.Role.ADMIN]
if admin_users:
leave_request.reviewed_by = random.choice(admin_users)
leave_request.reviewed_at = timezone.now() - timedelta(days=random.randint(1, 5))
leave_request.reviewer_comments = 'Approved' if status == LeaveRequest.Status.APPROVED else 'Not approved at this time'
leave_request.save()
leave_requests.append(leave_request)
self.created_counts.setdefault('leave_requests', 0)
self.created_counts['leave_requests'] += len(leave_requests)
# Generate leave balances for all users
leave_balances = []
for user in users:
# Annual leave balance
annual_balance = LeaveBalance.objects.create(
tenant=tenant,
employee=user,
year=current_year,
leave_type=LeaveRequest.LeaveType.ANNUAL,
total_days=Decimal('21.0'), # Standard Saudi annual leave
used_days=Decimal(str(random.uniform(0, 10)))
)
leave_balances.append(annual_balance)
# Sick leave balance
sick_balance = LeaveBalance.objects.create(
tenant=tenant,
employee=user,
year=current_year,
leave_type=LeaveRequest.LeaveType.SICK,
total_days=Decimal('15.0'),
used_days=Decimal(str(random.uniform(0, 5)))
)
leave_balances.append(sick_balance)
self.created_counts.setdefault('leave_balances', 0)
self.created_counts['leave_balances'] += len(leave_balances)
self.stdout.write(f' Created HR records (schedules, attendance, holidays, leave requests, leave balances)')
def generate_documents_data(self, tenant, patients, appointments, users):
"""Generate document templates and clinical notes with audit trail."""
# Generate document templates
templates_data = [
{
'name': 'Medical Progress Note Template',
'category': 'medical',
'description': 'Standard template for medical progress notes',
'content': '''MEDICAL PROGRESS NOTE
Patient: {{patient_name}}
MRN: {{patient_mrn}}
Date: {{date}}
Provider: {{provider_name}}
Chief Complaint:
{{chief_complaint}}
Assessment:
{{assessment}}
Plan:
{{plan}}
Provider Signature: _______________
Date: {{date}}'''
},
{
'name': 'Therapy Session Note Template',
'category': 'progress',
'description': 'Template for therapy session documentation',
'content': '''THERAPY SESSION NOTE
Patient: {{patient_name}}
Session Date: {{date}}
Therapist: {{provider_name}}
Session Type: {{session_type}}
Goals Addressed:
{{goals}}
Activities:
{{activities}}
Patient Response:
{{response}}
Recommendations:
{{recommendations}}
Therapist Signature: _______________'''
},
{
'name': 'Assessment Report Template',
'category': 'assessment',
'description': 'Comprehensive assessment report template',
'content': '''ASSESSMENT REPORT
Patient Information:
Name: {{patient_name}}
DOB: {{patient_dob}}
Age: {{patient_age}}
Assessment Date: {{date}}
Reason for Referral:
{{referral_reason}}
Assessment Findings:
{{findings}}
Recommendations:
{{recommendations}}
Assessor: {{provider_name}}
Signature: _______________
Date: {{date}}'''
},
]
templates = []
for template_data in templates_data:
template = DocumentTemplate.objects.create(
name=template_data['name'],
category=template_data['category'],
description=template_data['description'],
content=template_data['content'],
is_active=True,
created_by=random.choice(users)
)
templates.append(template)
self.created_counts.setdefault('document_templates', 0)
self.created_counts['document_templates'] += len(templates)
# Generate clinical notes for completed appointments
completed_appointments = [a for a in appointments if a.status == Appointment.Status.COMPLETED]
notes = []
for appt in random.sample(completed_appointments, min(30, len(completed_appointments))):
# Randomly choose to use a template or not
template = random.choice(templates) if random.choice([True, False]) else None
# Determine status based on how old the appointment is
days_since = (date.today() - appt.scheduled_date).days
if days_since > 7:
status = random.choice(['draft', 'final', 'final', 'final']) # More likely to be final
else:
status = random.choice(['draft', 'draft', 'final']) # More likely to be draft
note = ClinicalNote.objects.create(
patient=appt.patient,
template=template,
title=f"{appt.service_type} - {appt.scheduled_date}",
content=f'''Patient: {appt.patient.full_name_en}
Date: {appt.scheduled_date}
Provider: {appt.provider.user.get_full_name()}
Session Summary:
Patient attended {appt.service_type.lower()} session.
{random.choice([
"Good progress noted during session.",
"Patient engaged well with activities.",
"Continued work on established goals.",
"Patient showed improvement in targeted areas."
])}
Observations:
{random.choice([
"Patient was cooperative and attentive.",
"Patient demonstrated good effort throughout session.",
"Patient required minimal prompting.",
"Patient showed enthusiasm for activities."
])}
Plan:
Continue current treatment plan. Next session scheduled.''',
status=status,
author=appt.provider.user
)
# If finalized, add finalization details
if status == 'final':
note.finalized_at = timezone.now() - timedelta(days=random.randint(0, days_since))
note.finalized_by = appt.provider.user
note.save()
notes.append(note)
# Create audit log for note creation
NoteAuditLog.objects.create(
note=note,
action='created',
user=appt.provider.user,
timestamp=timezone.now() - timedelta(days=days_since),
ip_address=f'192.168.1.{random.randint(1, 255)}'
)
# If finalized, add finalization audit log
if status == 'final':
NoteAuditLog.objects.create(
note=note,
action='finalized',
user=note.finalized_by,
timestamp=note.finalized_at,
ip_address=f'192.168.1.{random.randint(1, 255)}'
)
self.created_counts.setdefault('clinical_notes', 0)
self.created_counts['clinical_notes'] += len(notes)
# Generate addendums for some finalized notes
finalized_notes = [n for n in notes if n.status == 'final']
addendums = []
for note in random.sample(finalized_notes, min(5, len(finalized_notes))):
addendum = NoteAddendum.objects.create(
note=note,
content=random.choice([
'Additional information: Patient contacted after session to report continued progress at home.',
'Correction: Session duration was 45 minutes, not 30 minutes as initially documented.',
'Update: Follow-up appointment scheduled for next week.',
'Addendum: Parent reported improvement in targeted behaviors since last session.'
]),
reason=random.choice([
'Additional information',
'Correction',
'Update',
'Clarification'
]),
author=note.author
)
addendums.append(addendum)
# Update note status to amended
note.status = 'amended'
note.save()
# Create audit log for addendum
NoteAuditLog.objects.create(
note=note,
action='amended',
user=addendum.author,
timestamp=timezone.now(),
ip_address=f'192.168.1.{random.randint(1, 255)}',
changes={'addendum_id': addendum.id, 'reason': addendum.reason}
)
self.created_counts.setdefault('note_addendums', 0)
self.created_counts['note_addendums'] += len(addendums)
# Generate view audit logs for random notes
for note in random.sample(notes, min(20, len(notes))):
for _ in range(random.randint(1, 3)):
NoteAuditLog.objects.create(
note=note,
action='viewed',
user=random.choice(users),
timestamp=timezone.now() - timedelta(days=random.randint(0, 30)),
ip_address=f'192.168.1.{random.randint(1, 255)}'
)
self.stdout.write(f' Created documents data (templates, notes, addendums, audit logs)')
def print_summary(self):
"""Print summary of created data."""
self.stdout.write('\n' + '='*60)
self.stdout.write(self.style.SUCCESS('DATA GENERATION SUMMARY'))
self.stdout.write('='*60)
for model_name, count in sorted(self.created_counts.items()):
self.stdout.write(f' {model_name.title()}: {count}')
self.stdout.write('='*60)