237 lines
9.0 KiB
Python
237 lines
9.0 KiB
Python
"""
|
|
Management command to update staff is_head field from CSV file
|
|
|
|
CSV Format (includes new is_head column):
|
|
Staff ID,Name,Location,Department,Section,Subsection,AlHammadi Job Title,Country,Gender,is_head,Manager
|
|
|
|
Example:
|
|
4,ABDULAZIZ SALEH ALHAMMADI,Nuzha,Senior Management Offices,COO Office,,Chief Operating Officer,Saudi Arabia,Male,Yes,2 - MOHAMMAD SALEH AL HAMMADI
|
|
"""
|
|
import csv
|
|
import os
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.db import transaction
|
|
|
|
from apps.organizations.models import Staff
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Update staff is_head field from CSV file'
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
'csv_file',
|
|
type=str,
|
|
help='Path to CSV file with is_head column'
|
|
)
|
|
parser.add_argument(
|
|
'--set-false-for-missing',
|
|
action='store_true',
|
|
help='Set is_head=False for staff not found in CSV'
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run',
|
|
action='store_true',
|
|
help='Preview without making changes'
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
csv_file_path = options['csv_file']
|
|
set_false_for_missing = options['set_false_for_missing']
|
|
dry_run = options['dry_run']
|
|
|
|
self.stdout.write(f"\n{'='*60}")
|
|
self.stdout.write("Staff is_head Update Command")
|
|
self.stdout.write(f"{'='*60}\n")
|
|
|
|
# Validate CSV file exists
|
|
if not os.path.exists(csv_file_path):
|
|
raise CommandError(f"CSV file not found: {csv_file_path}")
|
|
|
|
# Display configuration
|
|
self.stdout.write("Configuration:")
|
|
self.stdout.write(f" CSV file: {csv_file_path}")
|
|
self.stdout.write(f" Set false for missing: {set_false_for_missing}")
|
|
self.stdout.write(f" Dry run: {dry_run}")
|
|
|
|
# Read and parse CSV
|
|
self.stdout.write("\nReading CSV file...")
|
|
staff_head_data = self.parse_csv(csv_file_path)
|
|
|
|
if not staff_head_data:
|
|
self.stdout.write(self.style.WARNING("No valid staff data found in CSV"))
|
|
return
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f"✓ Found {len(staff_head_data)} staff records in CSV")
|
|
)
|
|
|
|
# Get total staff count
|
|
total_staff = Staff.objects.count()
|
|
self.stdout.write(f" Total staff records in database: {total_staff}")
|
|
|
|
# Track statistics
|
|
stats = {
|
|
'updated_to_true': 0,
|
|
'updated_to_false': 0,
|
|
'skipped': 0,
|
|
'errors': 0,
|
|
'not_found': 0
|
|
}
|
|
|
|
# Process updates
|
|
with transaction.atomic():
|
|
for idx, (employee_id, is_head) in enumerate(staff_head_data.items(), 1):
|
|
try:
|
|
# Find staff by employee_id
|
|
staff = Staff.objects.filter(employee_id=employee_id).first()
|
|
|
|
if not staff:
|
|
self.stdout.write(
|
|
self.style.WARNING(
|
|
f" [{idx}] ⚠ Staff not found: {employee_id}"
|
|
)
|
|
)
|
|
stats['not_found'] += 1
|
|
continue
|
|
|
|
# Check if value needs updating
|
|
if staff.is_head == is_head:
|
|
self.stdout.write(
|
|
f" [{idx}] ⊘ Skipped: {staff.name} (already {is_head})"
|
|
)
|
|
stats['skipped'] += 1
|
|
continue
|
|
|
|
# Update is_head field
|
|
old_value = staff.is_head
|
|
staff.is_head = is_head
|
|
|
|
if not dry_run:
|
|
staff.save()
|
|
|
|
# Track update
|
|
if is_head:
|
|
stats['updated_to_true'] += 1
|
|
self.stdout.write(
|
|
self.style.SUCCESS(
|
|
f" [{idx}] ✓ Set is_head=True: {staff.name} ({employee_id})"
|
|
)
|
|
)
|
|
else:
|
|
stats['updated_to_false'] += 1
|
|
self.stdout.write(
|
|
self.style.WARNING(
|
|
f" [{idx}] ✓ Set is_head=False: {staff.name} ({employee_id})"
|
|
)
|
|
)
|
|
|
|
except Exception as e:
|
|
self.stdout.write(
|
|
self.style.ERROR(
|
|
f" [{idx}] ✗ Failed to update {employee_id}: {str(e)}"
|
|
)
|
|
)
|
|
stats['errors'] += 1
|
|
|
|
# Optionally set is_head=False for staff not in CSV
|
|
if set_false_for_missing:
|
|
self.stdout.write("\nSetting is_head=False for staff not in CSV...")
|
|
csv_employee_ids = set(staff_head_data.keys())
|
|
|
|
for staff in Staff.objects.filter(is_head=True):
|
|
if staff.employee_id not in csv_employee_ids:
|
|
if not dry_run:
|
|
staff.is_head = False
|
|
staff.save()
|
|
stats['updated_to_false'] += 1
|
|
self.stdout.write(
|
|
self.style.WARNING(
|
|
f" ✓ Set is_head=False (not in CSV): {staff.name} ({staff.employee_id})"
|
|
)
|
|
)
|
|
|
|
# Summary
|
|
self.stdout.write("\n" + "="*60)
|
|
self.stdout.write("Update Summary:")
|
|
self.stdout.write(f" Updated to is_head=True: {stats['updated_to_true']}")
|
|
self.stdout.write(f" Updated to is_head=False: {stats['updated_to_false']}")
|
|
self.stdout.write(f" Skipped (no change needed): {stats['skipped']}")
|
|
self.stdout.write(f" Not found in database: {stats['not_found']}")
|
|
self.stdout.write(f" Errors: {stats['errors']}")
|
|
self.stdout.write("="*60 + "\n")
|
|
|
|
if dry_run:
|
|
self.stdout.write(self.style.WARNING("DRY RUN: No changes were made\n"))
|
|
else:
|
|
self.stdout.write(self.style.SUCCESS("Update completed successfully!\n"))
|
|
|
|
def parse_csv(self, csv_file_path):
|
|
"""Parse CSV file and return dictionary mapping employee_id to is_head value"""
|
|
staff_head_data = {}
|
|
|
|
try:
|
|
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
|
|
# Check if required columns exist
|
|
if not reader.fieldnames:
|
|
self.stdout.write(self.style.ERROR("CSV file is empty or has no headers"))
|
|
return {}
|
|
|
|
# Check for is_head column
|
|
if 'is_head' not in reader.fieldnames:
|
|
self.stdout.write(
|
|
self.style.ERROR("CSV file is missing 'is_head' column")
|
|
)
|
|
return []
|
|
|
|
if 'Staff ID' not in reader.fieldnames:
|
|
self.stdout.write(
|
|
self.style.ERROR("CSV file is missing 'Staff ID' column")
|
|
)
|
|
return {}
|
|
|
|
self.stdout.write("CSV columns found:")
|
|
self.stdout.write(f" {', '.join(reader.fieldnames)}\n")
|
|
|
|
for row_idx, row in enumerate(reader, 1):
|
|
try:
|
|
# Get employee_id
|
|
employee_id = row['Staff ID'].strip()
|
|
if not employee_id:
|
|
self.stdout.write(
|
|
self.style.WARNING(f"Skipping row {row_idx}: Empty Staff ID")
|
|
)
|
|
continue
|
|
|
|
# Parse is_head column
|
|
is_head_str = row['is_head'].strip().lower()
|
|
|
|
# Handle various boolean representations
|
|
is_head = None
|
|
if is_head_str in ['true', 'yes', 'y', '1', 'on']:
|
|
is_head = True
|
|
elif is_head_str in ['false', 'no', 'n', '0', 'off', '']:
|
|
is_head = False
|
|
else:
|
|
self.stdout.write(
|
|
self.style.WARNING(
|
|
f"Skipping row {row_idx}: Invalid is_head value '{is_head_str}' for {employee_id}"
|
|
)
|
|
)
|
|
continue
|
|
|
|
staff_head_data[employee_id] = is_head
|
|
|
|
except Exception as e:
|
|
self.stdout.write(
|
|
self.style.WARNING(f"Skipping row {row_idx}: {str(e)}")
|
|
)
|
|
continue
|
|
|
|
except Exception as e:
|
|
self.stdout.write(self.style.ERROR(f"Error reading CSV file: {str(e)}"))
|
|
return {}
|
|
|
|
return staff_head_data |