60 lines
1.8 KiB
Python
60 lines
1.8 KiB
Python
#!/usr/bin/env python
|
|
"""
|
|
Test script to verify Celery Beat configuration is correct.
|
|
"""
|
|
import os
|
|
import sys
|
|
import django
|
|
|
|
# Set Django settings
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
|
|
django.setup()
|
|
|
|
from celery.schedules import crontab
|
|
from config.celery import app
|
|
|
|
def test_celery_config():
|
|
"""Test that Celery configuration is valid."""
|
|
print("Testing Celery Beat configuration...")
|
|
print("-" * 60)
|
|
|
|
# Check beat_schedule
|
|
beat_schedule = app.conf.beat_schedule
|
|
print(f"Total scheduled tasks: {len(beat_schedule)}")
|
|
print()
|
|
|
|
all_valid = True
|
|
for task_name, task_config in beat_schedule.items():
|
|
print(f"Task: {task_name}")
|
|
print(f" - Target: {task_config['task']}")
|
|
|
|
schedule = task_config['schedule']
|
|
|
|
# Check if schedule is properly configured
|
|
if isinstance(schedule, (int, float)):
|
|
print(f" - Schedule: {schedule} seconds")
|
|
elif isinstance(schedule, crontab):
|
|
print(f" - Schedule: Crontab(hour={schedule._hour}, minute={schedule._minute})")
|
|
elif isinstance(schedule, dict):
|
|
print(f" - ❌ ERROR: Schedule is a dict (should be crontab object)")
|
|
print(f" Dict contents: {schedule}")
|
|
all_valid = False
|
|
else:
|
|
print(f" - ⚠️ WARNING: Unknown schedule type: {type(schedule)}")
|
|
all_valid = False
|
|
|
|
print()
|
|
|
|
print("-" * 60)
|
|
if all_valid:
|
|
print("✅ All Celery schedules are properly configured!")
|
|
print("\nYou can now run:")
|
|
print(" celery -A PX360 worker --beat --loglevel=info")
|
|
return 0
|
|
else:
|
|
print("❌ Some Celery schedules are misconfigured!")
|
|
return 1
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(test_celery_config())
|