HH/apps/social/tests/test_celery.py
2026-01-12 12:27:29 +03:00

199 lines
5.9 KiB
Python

#!/usr/bin/env python
"""
Test script to verify Celery configuration and task registration.
Run this script to test if Celery is properly set up.
"""
import os
import sys
import django
# Setup Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
django.setup()
from celery import current_app
from apps.social import tasks
import redis
def test_redis_connection():
"""Test Redis connection."""
print("=" * 60)
print("Testing Redis Connection...")
print("=" * 60)
try:
r = redis.Redis(host='localhost', port=6379, db=0)
r.ping()
print("✅ Redis is running and accessible")
return True
except redis.ConnectionError as e:
print(f"❌ Redis connection failed: {e}")
print(" Make sure Redis server is running: sudo systemctl start redis")
return False
except Exception as e:
print(f"❌ Unexpected error: {e}")
return False
def test_celery_config():
"""Test Celery configuration."""
print("\n" + "=" * 60)
print("Testing Celery Configuration...")
print("=" * 60)
try:
# Check broker URL
broker_url = current_app.conf.broker_url
print(f"✅ Broker URL: {broker_url}")
# Check result backend
result_backend = current_app.conf.result_backend
print(f"✅ Result Backend: {result_backend}")
# Check timezone
timezone = current_app.conf.timezone
print(f"✅ Timezone: {timezone}")
# Check task serialization
task_serializer = current_app.conf.task_serializer
print(f"✅ Task Serializer: {task_serializer}")
return True
except Exception as e:
print(f"❌ Configuration error: {e}")
return False
def test_task_registration():
"""Test if tasks are registered."""
print("\n" + "=" * 60)
print("Testing Task Registration...")
print("=" * 60)
registered_tasks = []
# Define expected tasks
expected_tasks = [
'apps.social.tasks.scrape_youtube_comments',
'apps.social.tasks.scrape_facebook_comments',
'apps.social.tasks.scrape_instagram_comments',
'apps.social.tasks.scrape_all_platforms',
'apps.social.tasks.analyze_pending_comments',
'apps.social.tasks.analyze_recent_comments',
'apps.social.tasks.analyze_platform_comments',
]
# Get registered tasks
all_tasks = current_app.tasks.keys()
for task_name in expected_tasks:
if task_name in all_tasks:
print(f"{task_name}")
registered_tasks.append(task_name)
else:
print(f"{task_name} - NOT REGISTERED")
print(f"\nTotal registered: {len(registered_tasks)}/{len(expected_tasks)}")
return len(registered_tasks) == len(expected_tasks)
def test_schedules():
"""Test Celery Beat schedules."""
print("\n" + "=" * 60)
print("Testing Celery Beat Schedules...")
print("=" * 60)
try:
schedules = current_app.conf.beat_schedule
if not schedules:
print("❌ No schedules defined")
return False
print(f"{len(schedules)} schedules defined:\n")
for name, config in schedules.items():
task = config.get('task', 'N/A')
schedule = config.get('schedule', 'N/A')
print(f"{name}")
print(f" Task: {task}")
print(f" Schedule: {schedule}")
print()
return True
except Exception as e:
print(f"❌ Schedule error: {e}")
return False
def test_task_import():
"""Test if tasks module can be imported."""
print("\n" + "=" * 60)
print("Testing Task Module Import...")
print("=" * 60)
try:
# Check if all task functions exist
task_functions = [
'scrape_youtube_comments',
'scrape_facebook_comments',
'scrape_instagram_comments',
'scrape_all_platforms',
'analyze_pending_comments',
'analyze_recent_comments',
'analyze_platform_comments',
]
for func_name in task_functions:
if hasattr(tasks, func_name):
func = getattr(tasks, func_name)
print(f"{func_name}")
else:
print(f"{func_name} - NOT FOUND")
return True
except Exception as e:
print(f"❌ Import error: {e}")
return False
def main():
"""Run all tests."""
print("\n" + "=" * 60)
print("CELERY SETUP TEST")
print("=" * 60)
results = {
'Redis Connection': test_redis_connection(),
'Celery Configuration': test_celery_config(),
'Task Registration': test_task_registration(),
'Schedules': test_schedules(),
'Task Module Import': test_task_import(),
}
# Summary
print("\n" + "=" * 60)
print("TEST SUMMARY")
print("=" * 60)
passed = sum(1 for result in results.values() if result)
total = len(results)
for test_name, result in results.items():
status = "✅ PASS" if result else "❌ FAIL"
print(f"{test_name}: {status}")
print("\n" + "=" * 60)
print(f"Overall: {passed}/{total} tests passed")
print("=" * 60)
if passed == total:
print("\n🎉 All tests passed! Celery is properly configured.")
print("\nNext steps:")
print("1. Start Redis (if not running): sudo systemctl start redis")
print("2. Start Celery Worker: celery -A PX360 worker --loglevel=info")
print("3. Start Celery Beat: celery -A PX360 beat --loglevel=info")
print("4. Or run both together: celery -A PX360 worker --beat --loglevel=info")
return 0
else:
print("\n⚠️ Some tests failed. Please check the errors above.")
return 1
if __name__ == '__main__':
sys.exit(main())