#!/usr/bin/env python """ Test script for social media comment scraper. Tests both manual scraping and Celery tasks. """ import os import sys import django from datetime import datetime, timedelta # Setup Django os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') django.setup() from apps.social.services import CommentService from apps.social.models import SocialMediaComment from apps.social import tasks def print_separator(title=""): """Print a visual separator.""" print("\n" + "=" * 70) if title: print(f" {title}") print("=" * 70) print() def test_manual_scraping(): """Test manual scraping from all platforms.""" print_separator("TEST 1: MANUAL SCRAPING") try: service = CommentService() # Test YouTube print("1. Testing YouTube scraping...") youtube_comments = service.scrape_youtube(save_to_db=True) print(f" ✓ Fetched {len(youtube_comments)} YouTube comments") print(f" Note: Run again to see new vs updated counts") # Test Facebook print("\n2. Testing Facebook scraping...") try: facebook_comments = service.scrape_facebook(save_to_db=True) print(f" ✓ Fetched {len(facebook_comments)} Facebook comments") except Exception as e: print(f" ✗ Facebook scraping failed: {e}") # Test Instagram print("\n3. Testing Instagram scraping...") try: instagram_comments = service.scrape_instagram(save_to_db=True) print(f" ✓ Fetched {len(instagram_comments)} Instagram comments") except Exception as e: print(f" ✗ Instagram scraping failed: {e}") # Verify database print("\n4. Verifying database...") total_comments = SocialMediaComment.objects.count() youtube_count = SocialMediaComment.objects.filter(platform='youtube').count() facebook_count = SocialMediaComment.objects.filter(platform='facebook').count() instagram_count = SocialMediaComment.objects.filter(platform='instagram').count() print(f" Total comments in database: {total_comments}") print(f" - YouTube: {youtube_count}") print(f" - Facebook: {facebook_count}") print(f" - Instagram: {instagram_count}") # Show sample comment if total_comments > 0: latest = SocialMediaComment.objects.first() print(f"\n Latest comment:") print(f" Platform: {latest.platform}") print(f" Author: {latest.author}") print(f" Comment: {latest.comments[:100]}...") print(f" Likes: {latest.like_count}") print("\n ✓ Manual scraping test completed successfully!") print(" ℹ Check logs for new vs updated comment counts") return True except Exception as e: print(f"\n ✗ Error in manual scraping test: {e}") import traceback traceback.print_exc() return False def test_single_platform(): """Test scraping a single platform (YouTube).""" print_separator("TEST 2: SINGLE PLATFORM SCRAPING") try: service = CommentService() print("Scraping YouTube only...") print("Running TWICE to test duplicate prevention...") # First run print("\nFirst run (initial scrape):") comments1 = service.scrape_youtube(save_to_db=True) print(f"✓ Fetched {len(comments1)} comments") # Second run (should show duplicates) print("\nSecond run (duplicate prevention):") comments2 = service.scrape_youtube(save_to_db=True) print(f"✓ Fetched {len(comments2)} comments") print(" Check logs above - should show '0 new, X updated'") return True except Exception as e: print(f"✗ Error: {e}") return False def test_celery_task(): """Test creating and running a Celery task.""" print_separator("TEST 3: CELERY TASK EXECUTION") try: print("1. Creating a Celery task for YouTube scraping...") # Queue the task using .delay() result = tasks.scrape_youtube_comments.delay() print(f" ✓ Task queued with ID: {result.id}") print(f" ℹ Task status: {result.status}") # Wait for task to complete (with timeout) print("\n2. Waiting for task to complete (up to 30 seconds)...") timeout = 30 elapsed = 0 while not result.ready() and elapsed < timeout: import time time.sleep(2) elapsed += 2 print(f" Waiting... ({elapsed}s)") if result.ready(): if result.successful(): task_result = result.get() print(f"\n3. Task completed successfully!") print(f" ✓ Task result: {task_result}") if isinstance(task_result, dict): total = task_result.get('total', 0) comments = task_result.get('comments', []) print(f" ✓ Total comments scraped: {total}") elif isinstance(task_result, list): print(f" ✓ Comments scraped: {len(task_result)}") print("\n ✓ Celery task test completed successfully!") return True else: print(f"\n ✗ Task failed!") print(f" Error: {result.result}") return False else: print(f"\n ⚠ Task did not complete within {timeout} seconds") print(f" ℹ Task status: {result.status}") print(f" ℹ This is normal if Celery worker is not running") print(f" ℹ Start Celery worker: celery -A config worker --loglevel=info") return False except Exception as e: print(f" ✗ Error in Celery task test: {e}") import traceback traceback.print_exc() return False def test_celery_all_platforms_task(): """Test Celery task for scraping all platforms.""" print_separator("TEST 4: CELERY ALL PLATFORMS TASK") try: print("1. Creating a Celery task for scraping all platforms...") # Queue the task result = tasks.scrape_all_platforms.delay() print(f" ✓ Task queued with ID: {result.id}") # Check task status without waiting (as this takes longer) print(f"\n2. Task status: {result.status}") if result.ready(): if result.successful(): task_result = result.get() print(f" ✓ Task completed successfully!") print(f" ✓ Result: {task_result}") else: print(f" ✗ Task failed: {result.result}") else: print(f" ℹ Task is still running (or worker not started)") print(f" ℹ This task scrapes all platforms and may take longer") print(f" ℹ Check Celery logs for progress") print("\n ✓ All platforms task queued successfully!") return True except Exception as e: print(f" ✗ Error: {e}") import traceback traceback.print_exc() return False def show_celery_info(): """Show Celery task information.""" print_separator("CELERY INFORMATION") try: print("\nChecking Celery configuration...") # Try to get task info (this requires Celery to be running) from celery import current_app # Show registered tasks registered_tasks = current_app.tasks print(f"\nRegistered tasks: {len(registered_tasks)}") # Show comment scraper tasks scraper_tasks = [t for t in registered_tasks.keys() if 'tasks' in t.lower()] if scraper_tasks: print("\nScraper tasks:") for task_name in sorted(scraper_tasks): print(f" ✓ {task_name}") # Show beat schedules schedules = current_app.conf.beat_schedule if schedules: print(f"\nCelery Beat schedules: {len(schedules)}") for name, config in schedules.items(): task = config.get('task', 'N/A') schedule = config.get('schedule', 'N/A') print(f" • {name}") print(f" Task: {task}") print(f" Schedule: {schedule}") except Exception as e: print(f"Error getting Celery info: {e}") print("ℹ This is normal if Celery is not running") print("ℹ Start Celery: celery -A config worker --beat --loglevel=info") def show_latest_comments(): """Show latest comments from database.""" print_separator("LATEST COMMENTS IN DATABASE") try: comments = SocialMediaComment.objects.order_by('-scraped_at')[:10] if not comments.exists(): print("No comments found in database.") return for i, comment in enumerate(comments, 1): print(f"\n{i}. Platform: {comment.platform.upper()}") print(f" Author: {comment.author or 'Anonymous'}") print(f" Comment: {comment.comments[:80]}{'...' if len(comment.comments) > 80 else ''}") print(f" Likes: {comment.like_count} | Scraped: {comment.scraped_at}") except Exception as e: print(f"Error fetching comments: {e}") def main(): """Run all tests.""" print("\n" + "=" * 70) print(" SOCIAL MEDIA COMMENT SCRAPER - TEST SUITE (CELERY)") print("=" * 70) print("\nThis script will test the scraper functionality with Celery.") print("Make sure you have:") print(" 1. Configured your .env file with API keys") print(" 2. Run database migrations: python manage.py migrate") print(" 3. (Optional) Redis running: sudo systemctl start redis") print(" 4. (Optional) Celery worker running: celery -A PX360 worker --loglevel=info") input("\nPress Enter to start testing...") # Run tests results = { 'Manual Scraping': test_manual_scraping(), 'Single Platform': test_single_platform(), 'Celery Task': test_celery_task(), 'All Platforms Task': test_celery_all_platforms_task(), } # Show Celery info show_celery_info() # Show latest comments show_latest_comments() # Summary print_separator("TEST SUMMARY") passed = sum(1 for v in results.values() if v) total = len(results) for test_name, passed_test in results.items(): status = "✓ PASSED" if passed_test else "✗ FAILED" print(f"{status}: {test_name}") print(f"\nTotal: {passed}/{total} tests passed") print_separator() print("Testing complete!") print("\nNext steps:") print(" - View comments in Django Admin: http://localhost:8000/admin/") print(" - Check logs: tail -f logs/commentscraper.log") print(" - Start Celery worker: celery -A config worker --loglevel=info") print(" - Start Celery Beat: celery -A config beat --loglevel=info") print(" - Or run both: celery -A config worker --beat --loglevel=info") print(" - View Celery schedules: python -c 'from config.celery import app; print(app.conf.beat_schedule)'") print() if __name__ == '__main__': main()