Marwan Alwali 23158e9fbf update
2025-09-08 03:00:23 +03:00

758 lines
26 KiB
Python

# """
# Viewflow workflows for inpatients app.
# Provides admission, transfer, and discharge workflows.
# """
#
# from viewflow import Flow, lock
# from viewflow.base import this, flow_func
# from viewflow.contrib import celery
# from viewflow.decorators import flow_view
# from viewflow.fields import CharField, ModelField
# from viewflow.forms import ModelForm
# from viewflow.views import CreateProcessView, UpdateProcessView
# from viewflow.models import Process, Task
# from django.contrib.auth.models import User
# from django.urls import reverse_lazy
# from django.utils import timezone
# from django.db import transaction
# from django.core.mail import send_mail
#
# from .models import Admission, Transfer, DischargeSummary, Ward, Bed
# from .views import (
# AdmissionRegistrationView, BedAssignmentView, MedicalAssessmentView,
# NursingAssessmentView, AdmissionOrdersView, TransferRequestView,
# TransferApprovalView, TransferExecutionView, DischargeInitiationView,
# DischargePlanningView, DischargeExecutionView
# )
#
#
# class AdmissionProcess(Process):
# """
# Viewflow process model for patient admissions
# """
# admission = ModelField(Admission, help_text='Associated admission record')
#
# # Process status tracking
# registration_completed = models.BooleanField(default=False)
# bed_assigned = models.BooleanField(default=False)
# medical_assessment_completed = models.BooleanField(default=False)
# nursing_assessment_completed = models.BooleanField(default=False)
# orders_written = models.BooleanField(default=False)
# admission_completed = models.BooleanField(default=False)
#
# class Meta:
# verbose_name = 'Admission Process'
# verbose_name_plural = 'Admission Processes'
#
#
# class AdmissionFlow(Flow):
# """
# Hospital Patient Admission Workflow
#
# This flow manages the complete patient admission process from
# initial registration through final admission completion.
# """
#
# process_class = AdmissionProcess
#
# # Flow definition
# start = (
# flow_func(this.start_admission)
# .Next(this.patient_registration)
# )
#
# patient_registration = (
# flow_view(AdmissionRegistrationView)
# .Permission('inpatients.can_register_admissions')
# .Next(this.check_bed_availability)
# )
#
# check_bed_availability = (
# flow_func(this.check_beds)
# .Next(this.bed_assignment)
# )
#
# bed_assignment = (
# flow_view(BedAssignmentView)
# .Permission('inpatients.can_assign_beds')
# .Next(this.parallel_assessments)
# )
#
# parallel_assessments = (
# flow_func(this.start_parallel_assessments)
# .Next(this.medical_assessment)
# .Next(this.nursing_assessment)
# )
#
# medical_assessment = (
# flow_view(MedicalAssessmentView)
# .Permission('inpatients.can_perform_medical_assessment')
# .Next(this.join_assessments)
# )
#
# nursing_assessment = (
# flow_view(NursingAssessmentView)
# .Permission('inpatients.can_perform_nursing_assessment')
# .Next(this.join_assessments)
# )
#
# join_assessments = (
# flow_func(this.join_parallel_assessments)
# .Next(this.write_admission_orders)
# )
#
# write_admission_orders = (
# flow_view(AdmissionOrdersView)
# .Permission('inpatients.can_write_orders')
# .Next(this.finalize_admission)
# )
#
# finalize_admission = (
# flow_func(this.complete_admission)
# .Next(this.end)
# )
#
# end = flow_func(this.end_admission)
#
# # Flow functions
# def start_admission(self, activation):
# """Initialize the admission process"""
# process = activation.process
# admission = process.admission
#
# # Update admission status
# admission.status = 'PENDING'
# admission.save()
#
# # Send notification to registration staff
# self.notify_registration_staff(admission)
#
# def check_beds(self, activation):
# """Check bed availability for the patient"""
# process = activation.process
# admission = process.admission
#
# # Check if beds are available in the requested ward
# available_beds = Bed.objects.filter(
# ward=admission.current_ward,
# status='AVAILABLE'
# ).count()
#
# if available_beds == 0:
# # Handle no beds available - could trigger waiting list
# admission.admission_notes += f"\nNo beds available in {admission.current_ward.name} at {timezone.now()}"
# admission.save()
#
# # Notify bed management
# self.notify_bed_shortage(admission)
#
# def start_parallel_assessments(self, activation):
# """Start parallel medical and nursing assessments"""
# process = activation.process
#
# # Create assessment tasks
# self.create_assessment_tasks(process)
#
# def join_parallel_assessments(self, activation):
# """Wait for both assessments to complete"""
# process = activation.process
#
# # Check if both assessments are completed
# if (process.medical_assessment_completed and
# process.nursing_assessment_completed):
#
# # Notify physician to write orders
# self.notify_physician_for_orders(process.admission)
#
# def complete_admission(self, activation):
# """Finalize the admission process"""
# process = activation.process
# admission = process.admission
#
# # Update admission status
# admission.status = 'ADMITTED'
# admission.save()
#
# # Update bed status
# if admission.current_bed:
# admission.current_bed.status = 'OCCUPIED'
# admission.current_bed.current_patient = admission.patient
# admission.current_bed.current_admission = admission
# admission.current_bed.occupied_since = timezone.now()
# admission.current_bed.save()
#
# # Mark process as completed
# process.admission_completed = True
# process.save()
#
# # Send completion notifications
# self.notify_admission_complete(admission)
#
# def end_admission(self, activation):
# """End the admission workflow"""
# process = activation.process
#
# # Generate admission summary report
# self.generate_admission_summary(process.admission)
#
# # Helper methods
# def notify_registration_staff(self, admission):
# """Notify registration staff of new admission"""
# from django.contrib.auth.models import Group
#
# registration_staff = User.objects.filter(
# groups__name='Registration Staff'
# )
#
# for staff in registration_staff:
# send_mail(
# subject=f'New Admission: {admission.patient.get_full_name()}',
# message=f'A new {admission.get_admission_type_display()} admission has been initiated.',
# from_email='admissions@hospital.com',
# recipient_list=[staff.email],
# fail_silently=True
# )
#
# def notify_bed_shortage(self, admission):
# """Notify bed management of shortage"""
# from django.contrib.auth.models import Group
#
# bed_managers = User.objects.filter(
# groups__name='Bed Management'
# )
#
# for manager in bed_managers:
# send_mail(
# subject=f'Bed Shortage Alert - {admission.current_ward.name}',
# message=f'No beds available for admission of {admission.patient.get_full_name()}.',
# from_email='admissions@hospital.com',
# recipient_list=[manager.email],
# fail_silently=True
# )
#
# def create_assessment_tasks(self, process):
# """Create assessment tasks for medical and nursing staff"""
# # This would create tasks in a task management system
# pass
#
# def notify_physician_for_orders(self, admission):
# """Notify attending physician to write orders"""
# if admission.attending_physician and admission.attending_physician.email:
# send_mail(
# subject=f'Admission Orders Required: {admission.patient.get_full_name()}',
# message=f'Please write admission orders. Both assessments are complete.',
# from_email='admissions@hospital.com',
# recipient_list=[admission.attending_physician.email],
# fail_silently=True
# )
#
# def notify_admission_complete(self, admission):
# """Notify relevant staff that admission is complete"""
# # Notify nursing staff
# nursing_staff = User.objects.filter(
# groups__name='Nursing Staff',
# profile__department=admission.current_ward
# )
#
# for nurse in nursing_staff:
# send_mail(
# subject=f'New Patient Admitted: {admission.patient.get_full_name()}',
# message=f'Patient has been admitted to {admission.current_bed.room_number if admission.current_bed else "TBD"}.',
# from_email='admissions@hospital.com',
# recipient_list=[nurse.email],
# fail_silently=True
# )
#
# def generate_admission_summary(self, admission):
# """Generate admission summary report"""
# # This would generate a comprehensive admission report
# pass
#
#
# class TransferProcess(Process):
# """
# Viewflow process model for patient transfers
# """
# transfer = ModelField(Transfer, help_text='Associated transfer record')
#
# # Process status tracking
# request_approved = models.BooleanField(default=False)
# bed_prepared = models.BooleanField(default=False)
# transport_arranged = models.BooleanField(default=False)
# handoff_completed = models.BooleanField(default=False)
# transfer_completed = models.BooleanField(default=False)
#
# class Meta:
# verbose_name = 'Transfer Process'
# verbose_name_plural = 'Transfer Processes'
#
#
# class TransferFlow(Flow):
# """
# Hospital Patient Transfer Workflow
#
# This flow manages patient transfers between wards, beds, or units.
# """
#
# process_class = TransferProcess
#
# # Flow definition
# start = (
# flow_func(this.start_transfer)
# .Next(this.transfer_request)
# )
#
# transfer_request = (
# flow_view(TransferRequestView)
# .Permission('inpatients.can_request_transfers')
# .Next(this.transfer_approval)
# )
#
# transfer_approval = (
# flow_view(TransferApprovalView)
# .Permission('inpatients.can_approve_transfers')
# .Next(this.prepare_destination)
# )
#
# prepare_destination = (
# flow_func(this.prepare_bed)
# .Next(this.arrange_transport)
# )
#
# arrange_transport = (
# flow_func(this.schedule_transport)
# .Next(this.execute_transfer)
# )
#
# execute_transfer = (
# flow_view(TransferExecutionView)
# .Permission('inpatients.can_execute_transfers')
# .Next(this.complete_transfer)
# )
#
# complete_transfer = (
# flow_func(this.finalize_transfer)
# .Next(this.end)
# )
#
# end = flow_func(this.end_transfer)
#
# # Flow functions
# def start_transfer(self, activation):
# """Initialize the transfer process"""
# process = activation.process
# transfer = process.transfer
#
# # Update transfer status
# transfer.status = 'REQUESTED'
# transfer.save()
#
# # Notify receiving ward
# self.notify_receiving_ward(transfer)
#
# def prepare_bed(self, activation):
# """Prepare destination bed"""
# process = activation.process
# transfer = process.transfer
#
# if transfer.to_bed:
# # Reserve the destination bed
# transfer.to_bed.status = 'RESERVED'
# transfer.to_bed.reserved_until = timezone.now() + timezone.timedelta(hours=2)
# transfer.to_bed.save()
#
# process.bed_prepared = True
# process.save()
#
# def schedule_transport(self, activation):
# """Schedule patient transport"""
# process = activation.process
# transfer = process.transfer
#
# # Auto-schedule transport based on priority
# if transfer.priority in ['EMERGENT', 'STAT']:
# transfer.scheduled_datetime = timezone.now() + timezone.timedelta(minutes=15)
# elif transfer.priority == 'URGENT':
# transfer.scheduled_datetime = timezone.now() + timezone.timedelta(hours=1)
# else:
# transfer.scheduled_datetime = timezone.now() + timezone.timedelta(hours=4)
#
# transfer.save()
#
# process.transport_arranged = True
# process.save()
#
# # Notify transport team
# self.notify_transport_team(transfer)
#
# def finalize_transfer(self, activation):
# """Finalize the transfer process"""
# process = activation.process
# transfer = process.transfer
# admission = transfer.admission
#
# # Update admission location
# admission.current_ward = transfer.to_ward
# admission.current_bed = transfer.to_bed
# admission.save()
#
# # Update bed statuses
# if transfer.from_bed:
# transfer.from_bed.status = 'AVAILABLE'
# transfer.from_bed.current_patient = None
# transfer.from_bed.current_admission = None
# transfer.from_bed.occupied_since = None
# transfer.from_bed.save()
#
# if transfer.to_bed:
# transfer.to_bed.status = 'OCCUPIED'
# transfer.to_bed.current_patient = transfer.patient
# transfer.to_bed.current_admission = admission
# transfer.to_bed.occupied_since = timezone.now()
# transfer.to_bed.save()
#
# # Update transfer status
# transfer.status = 'COMPLETED'
# transfer.actual_datetime = timezone.now()
# transfer.save()
#
# process.transfer_completed = True
# process.save()
#
# # Send completion notifications
# self.notify_transfer_complete(transfer)
#
# def end_transfer(self, activation):
# """End the transfer workflow"""
# process = activation.process
#
# # Generate transfer summary
# self.generate_transfer_summary(process.transfer)
#
# # Helper methods
# def notify_receiving_ward(self, transfer):
# """Notify receiving ward of incoming transfer"""
# receiving_staff = User.objects.filter(
# groups__name='Nursing Staff',
# profile__department=transfer.to_ward
# )
#
# for staff in receiving_staff:
# send_mail(
# subject=f'Incoming Transfer: {transfer.patient.get_full_name()}',
# message=f'Patient transfer requested from {transfer.from_ward.name}.',
# from_email='transfers@hospital.com',
# recipient_list=[staff.email],
# fail_silently=True
# )
#
# def notify_transport_team(self, transfer):
# """Notify transport team of scheduled transfer"""
# transport_staff = User.objects.filter(
# groups__name='Transport Team'
# )
#
# for staff in transport_staff:
# send_mail(
# subject=f'Transport Scheduled: {transfer.patient.get_full_name()}',
# message=f'Transfer scheduled for {transfer.scheduled_datetime}.',
# from_email='transport@hospital.com',
# recipient_list=[staff.email],
# fail_silently=True
# )
#
# def notify_transfer_complete(self, transfer):
# """Notify relevant staff that transfer is complete"""
# # Notify both sending and receiving wards
# all_staff = User.objects.filter(
# groups__name='Nursing Staff',
# profile__department__in=[transfer.from_ward, transfer.to_ward]
# )
#
# for staff in all_staff:
# send_mail(
# subject=f'Transfer Complete: {transfer.patient.get_full_name()}',
# message=f'Patient transfer from {transfer.from_ward.name} to {transfer.to_ward.name} completed.',
# from_email='transfers@hospital.com',
# recipient_list=[staff.email],
# fail_silently=True
# )
#
# def generate_transfer_summary(self, transfer):
# """Generate transfer summary report"""
# # This would generate a comprehensive transfer report
# pass
#
#
# class DischargeProcess(Process):
# """
# Viewflow process model for patient discharges
# """
# admission = ModelField(Admission, help_text='Associated admission record')
# discharge_summary = ModelField(DischargeSummary, null=True, blank=True,
# help_text='Associated discharge summary')
#
# # Process status tracking
# discharge_planning_started = models.BooleanField(default=False)
# discharge_orders_written = models.BooleanField(default=False)
# discharge_summary_completed = models.BooleanField(default=False)
# patient_education_completed = models.BooleanField(default=False)
# medications_reconciled = models.BooleanField(default=False)
# follow_up_scheduled = models.BooleanField(default=False)
# discharge_completed = models.BooleanField(default=False)
#
# class Meta:
# verbose_name = 'Discharge Process'
# verbose_name_plural = 'Discharge Processes'
#
#
# class DischargeFlow(Flow):
# """
# Hospital Patient Discharge Workflow
#
# This flow manages the complete patient discharge process from
# discharge planning through final discharge completion.
# """
#
# process_class = DischargeProcess
#
# # Flow definition
# start = (
# flow_func(this.start_discharge)
# .Next(this.discharge_planning)
# )
#
# discharge_planning = (
# flow_view(DischargePlanningView)
# .Permission('inpatients.can_plan_discharges')
# .Next(this.write_discharge_orders)
# )
#
# write_discharge_orders = (
# flow_view(DischargeOrdersView)
# .Permission('inpatients.can_write_discharge_orders')
# .Next(this.parallel_discharge_tasks)
# )
#
# parallel_discharge_tasks = (
# flow_func(this.start_parallel_discharge_tasks)
# .Next(this.complete_discharge_summary)
# .Next(this.patient_education)
# .Next(this.medication_reconciliation)
# .Next(this.schedule_follow_up)
# )
#
# complete_discharge_summary = (
# flow_view(DischargeDocumentationView)
# .Permission('inpatients.can_complete_discharge_summary')
# .Next(this.join_discharge_tasks)
# )
#
# patient_education = (
# flow_view(PatientEducationView)
# .Permission('inpatients.can_provide_patient_education')
# .Next(this.join_discharge_tasks)
# )
#
# medication_reconciliation = (
# flow_view(MedicationReconciliationView)
# .Permission('inpatients.can_reconcile_medications')
# .Next(this.join_discharge_tasks)
# )
#
# schedule_follow_up = (
# flow_view(FollowUpSchedulingView)
# .Permission('inpatients.can_schedule_follow_up')
# .Next(this.join_discharge_tasks)
# )
#
# join_discharge_tasks = (
# flow_func(this.join_parallel_discharge_tasks)
# .Next(this.execute_discharge)
# )
#
# execute_discharge = (
# flow_view(DischargeExecutionView)
# .Permission('inpatients.can_execute_discharges')
# .Next(this.finalize_discharge)
# )
#
# finalize_discharge = (
# flow_func(this.complete_discharge)
# .Next(this.end)
# )
#
# end = flow_func(this.end_discharge)
#
# # Flow functions
# def start_discharge(self, activation):
# """Initialize the discharge process"""
# process = activation.process
# admission = process.admission
#
# # Update admission discharge planning status
# admission.discharge_planning_started = True
# admission.save()
#
# # Notify discharge planning team
# self.notify_discharge_planning_team(admission)
#
# def start_parallel_discharge_tasks(self, activation):
# """Start parallel discharge preparation tasks"""
# process = activation.process
#
# # Create discharge preparation tasks
# self.create_discharge_tasks(process)
#
# def join_parallel_discharge_tasks(self, activation):
# """Wait for all discharge tasks to complete"""
# process = activation.process
#
# # Check if all discharge tasks are completed
# if (process.discharge_summary_completed and
# process.patient_education_completed and
# process.medications_reconciled and
# process.follow_up_scheduled):
#
# # Notify that patient is ready for discharge
# self.notify_ready_for_discharge(process.admission)
#
# def complete_discharge(self, activation):
# """Finalize the discharge process"""
# process = activation.process
# admission = process.admission
#
# # Update admission status
# admission.status = 'DISCHARGED'
# admission.discharge_datetime = timezone.now()
# admission.save()
#
# # Update bed status
# if admission.current_bed:
# admission.current_bed.status = 'CLEANING'
# admission.current_bed.current_patient = None
# admission.current_bed.current_admission = None
# admission.current_bed.occupied_since = None
# admission.current_bed.save()
#
# # Mark process as completed
# process.discharge_completed = True
# process.save()
#
# # Send completion notifications
# self.notify_discharge_complete(admission)
#
# def end_discharge(self, activation):
# """End the discharge workflow"""
# process = activation.process
#
# # Generate discharge summary report
# self.generate_final_discharge_report(process.admission)
#
# # Helper methods
# def notify_discharge_planning_team(self, admission):
# """Notify discharge planning team"""
# discharge_planners = User.objects.filter(
# groups__name='Discharge Planning'
# )
#
# for planner in discharge_planners:
# send_mail(
# subject=f'Discharge Planning Required: {admission.patient.get_full_name()}',
# message=f'Discharge planning has been initiated.',
# from_email='discharge@hospital.com',
# recipient_list=[planner.email],
# fail_silently=True
# )
#
# def create_discharge_tasks(self, process):
# """Create discharge preparation tasks"""
# # This would create tasks in a task management system
# pass
#
# def notify_ready_for_discharge(self, admission):
# """Notify that patient is ready for discharge"""
# if admission.attending_physician and admission.attending_physician.email:
# send_mail(
# subject=f'Patient Ready for Discharge: {admission.patient.get_full_name()}',
# message=f'All discharge preparations are complete.',
# from_email='discharge@hospital.com',
# recipient_list=[admission.attending_physician.email],
# fail_silently=True
# )
#
# def notify_discharge_complete(self, admission):
# """Notify relevant staff that discharge is complete"""
# # Notify nursing staff and housekeeping
# relevant_staff = User.objects.filter(
# groups__name__in=['Nursing Staff', 'Housekeeping'],
# profile__department=admission.current_ward
# )
#
# for staff in relevant_staff:
# send_mail(
# subject=f'Patient Discharged: {admission.patient.get_full_name()}',
# message=f'Patient has been discharged from {admission.current_bed.room_number if admission.current_bed else "ward"}.',
# from_email='discharge@hospital.com',
# recipient_list=[staff.email],
# fail_silently=True
# )
#
# def generate_final_discharge_report(self, admission):
# """Generate final discharge report"""
# # This would generate a comprehensive discharge report
# pass
#
#
# # Celery tasks for background processing
# @celery.job
# def auto_assign_bed(admission_id, ward_id):
# """Background task to automatically assign available bed"""
# try:
# admission = Admission.objects.get(id=admission_id)
# ward = Ward.objects.get(id=ward_id)
#
# available_bed = Bed.objects.filter(
# ward=ward,
# status='AVAILABLE'
# ).first()
#
# if available_bed:
# admission.current_bed = available_bed
# available_bed.status = 'RESERVED'
# available_bed.save()
# admission.save()
# return True
#
# return False
# except (Admission.DoesNotExist, Ward.DoesNotExist):
# return False
#
#
# @celery.job
# def schedule_discharge_planning(admission_id, days_before_discharge=2):
# """Background task to schedule discharge planning"""
# try:
# admission = Admission.objects.get(id=admission_id)
#
# if admission.estimated_length_of_stay:
# planning_date = admission.admission_datetime + timezone.timedelta(
# days=admission.estimated_length_of_stay - days_before_discharge
# )
#
# # Schedule discharge planning
# # This would integrate with a scheduling system
# return True
#
# return False
# except Admission.DoesNotExist:
# return False
#
#
# @celery.job
# def generate_encounter_number():
# """Generate unique encounter number"""
# from django.utils.crypto import get_random_string
# return f"ENC{timezone.now().strftime('%Y%m%d')}{get_random_string(4, '0123456789')}"
#