HH/apps/social/tests/test_scraping.py
2026-01-15 14:31:58 +03:00

325 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
"""
Test script for social media comment scraper.
Tests both manual scraping and Celery tasks.
"""
import os
import sys
import django
from datetime import datetime, timedelta
# Setup Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
django.setup()
from apps.social.services import CommentService
from apps.social.models import SocialMediaComment
from apps.social import tasks
def print_separator(title=""):
"""Print a visual separator."""
print("\n" + "=" * 70)
if title:
print(f" {title}")
print("=" * 70)
print()
def test_manual_scraping():
"""Test manual scraping from all platforms."""
print_separator("TEST 1: MANUAL SCRAPING")
try:
service = CommentService()
# Test YouTube
print("1. Testing YouTube scraping...")
youtube_comments = service.scrape_youtube(save_to_db=True)
print(f" ✓ Fetched {len(youtube_comments)} YouTube comments")
print(f" Note: Run again to see new vs updated counts")
# Test Facebook
print("\n2. Testing Facebook scraping...")
try:
facebook_comments = service.scrape_facebook(save_to_db=True)
print(f" ✓ Fetched {len(facebook_comments)} Facebook comments")
except Exception as e:
print(f" ✗ Facebook scraping failed: {e}")
# Test Instagram
print("\n3. Testing Instagram scraping...")
try:
instagram_comments = service.scrape_instagram(save_to_db=True)
print(f" ✓ Fetched {len(instagram_comments)} Instagram comments")
except Exception as e:
print(f" ✗ Instagram scraping failed: {e}")
# Verify database
print("\n4. Verifying database...")
total_comments = SocialMediaComment.objects.count()
youtube_count = SocialMediaComment.objects.filter(platform='youtube').count()
facebook_count = SocialMediaComment.objects.filter(platform='facebook').count()
instagram_count = SocialMediaComment.objects.filter(platform='instagram').count()
print(f" Total comments in database: {total_comments}")
print(f" - YouTube: {youtube_count}")
print(f" - Facebook: {facebook_count}")
print(f" - Instagram: {instagram_count}")
# Show sample comment
if total_comments > 0:
latest = SocialMediaComment.objects.first()
print(f"\n Latest comment:")
print(f" Platform: {latest.platform}")
print(f" Author: {latest.author}")
print(f" Comment: {latest.comments[:100]}...")
print(f" Likes: {latest.like_count}")
print("\n ✓ Manual scraping test completed successfully!")
print(" Check logs for new vs updated comment counts")
return True
except Exception as e:
print(f"\n ✗ Error in manual scraping test: {e}")
import traceback
traceback.print_exc()
return False
def test_single_platform():
"""Test scraping a single platform (YouTube)."""
print_separator("TEST 2: SINGLE PLATFORM SCRAPING")
try:
service = CommentService()
print("Scraping YouTube only...")
print("Running TWICE to test duplicate prevention...")
# First run
print("\nFirst run (initial scrape):")
comments1 = service.scrape_youtube(save_to_db=True)
print(f"✓ Fetched {len(comments1)} comments")
# Second run (should show duplicates)
print("\nSecond run (duplicate prevention):")
comments2 = service.scrape_youtube(save_to_db=True)
print(f"✓ Fetched {len(comments2)} comments")
print(" Check logs above - should show '0 new, X updated'")
return True
except Exception as e:
print(f"✗ Error: {e}")
return False
def test_celery_task():
"""Test creating and running a Celery task."""
print_separator("TEST 3: CELERY TASK EXECUTION")
try:
print("1. Creating a Celery task for YouTube scraping...")
# Queue the task using .delay()
result = tasks.scrape_youtube_comments.delay()
print(f" ✓ Task queued with ID: {result.id}")
print(f" Task status: {result.status}")
# Wait for task to complete (with timeout)
print("\n2. Waiting for task to complete (up to 30 seconds)...")
timeout = 30
elapsed = 0
while not result.ready() and elapsed < timeout:
import time
time.sleep(2)
elapsed += 2
print(f" Waiting... ({elapsed}s)")
if result.ready():
if result.successful():
task_result = result.get()
print(f"\n3. Task completed successfully!")
print(f" ✓ Task result: {task_result}")
if isinstance(task_result, dict):
total = task_result.get('total', 0)
comments = task_result.get('comments', [])
print(f" ✓ Total comments scraped: {total}")
elif isinstance(task_result, list):
print(f" ✓ Comments scraped: {len(task_result)}")
print("\n ✓ Celery task test completed successfully!")
return True
else:
print(f"\n ✗ Task failed!")
print(f" Error: {result.result}")
return False
else:
print(f"\n ⚠ Task did not complete within {timeout} seconds")
print(f" Task status: {result.status}")
print(f" This is normal if Celery worker is not running")
print(f" Start Celery worker: celery -A config worker --loglevel=info")
return False
except Exception as e:
print(f" ✗ Error in Celery task test: {e}")
import traceback
traceback.print_exc()
return False
def test_celery_all_platforms_task():
"""Test Celery task for scraping all platforms."""
print_separator("TEST 4: CELERY ALL PLATFORMS TASK")
try:
print("1. Creating a Celery task for scraping all platforms...")
# Queue the task
result = tasks.scrape_all_platforms.delay()
print(f" ✓ Task queued with ID: {result.id}")
# Check task status without waiting (as this takes longer)
print(f"\n2. Task status: {result.status}")
if result.ready():
if result.successful():
task_result = result.get()
print(f" ✓ Task completed successfully!")
print(f" ✓ Result: {task_result}")
else:
print(f" ✗ Task failed: {result.result}")
else:
print(f" Task is still running (or worker not started)")
print(f" This task scrapes all platforms and may take longer")
print(f" Check Celery logs for progress")
print("\n ✓ All platforms task queued successfully!")
return True
except Exception as e:
print(f" ✗ Error: {e}")
import traceback
traceback.print_exc()
return False
def show_celery_info():
"""Show Celery task information."""
print_separator("CELERY INFORMATION")
try:
print("\nChecking Celery configuration...")
# Try to get task info (this requires Celery to be running)
from celery import current_app
# Show registered tasks
registered_tasks = current_app.tasks
print(f"\nRegistered tasks: {len(registered_tasks)}")
# Show comment scraper tasks
scraper_tasks = [t for t in registered_tasks.keys() if 'tasks' in t.lower()]
if scraper_tasks:
print("\nScraper tasks:")
for task_name in sorted(scraper_tasks):
print(f"{task_name}")
# Show beat schedules
schedules = current_app.conf.beat_schedule
if schedules:
print(f"\nCelery Beat schedules: {len(schedules)}")
for name, config in schedules.items():
task = config.get('task', 'N/A')
schedule = config.get('schedule', 'N/A')
print(f"{name}")
print(f" Task: {task}")
print(f" Schedule: {schedule}")
except Exception as e:
print(f"Error getting Celery info: {e}")
print(" This is normal if Celery is not running")
print(" Start Celery: celery -A config worker --beat --loglevel=info")
def show_latest_comments():
"""Show latest comments from database."""
print_separator("LATEST COMMENTS IN DATABASE")
try:
comments = SocialMediaComment.objects.order_by('-scraped_at')[:10]
if not comments.exists():
print("No comments found in database.")
return
for i, comment in enumerate(comments, 1):
print(f"\n{i}. Platform: {comment.platform.upper()}")
print(f" Author: {comment.author or 'Anonymous'}")
print(f" Comment: {comment.comments[:80]}{'...' if len(comment.comments) > 80 else ''}")
print(f" Likes: {comment.like_count} | Scraped: {comment.scraped_at}")
except Exception as e:
print(f"Error fetching comments: {e}")
def main():
"""Run all tests."""
print("\n" + "=" * 70)
print(" SOCIAL MEDIA COMMENT SCRAPER - TEST SUITE (CELERY)")
print("=" * 70)
print("\nThis script will test the scraper functionality with Celery.")
print("Make sure you have:")
print(" 1. Configured your .env file with API keys")
print(" 2. Run database migrations: python manage.py migrate")
print(" 3. (Optional) Redis running: sudo systemctl start redis")
print(" 4. (Optional) Celery worker running: celery -A PX360 worker --loglevel=info")
input("\nPress Enter to start testing...")
# Run tests
results = {
'Manual Scraping': test_manual_scraping(),
'Single Platform': test_single_platform(),
'Celery Task': test_celery_task(),
'All Platforms Task': test_celery_all_platforms_task(),
}
# Show Celery info
show_celery_info()
# Show latest comments
show_latest_comments()
# Summary
print_separator("TEST SUMMARY")
passed = sum(1 for v in results.values() if v)
total = len(results)
for test_name, passed_test in results.items():
status = "✓ PASSED" if passed_test else "✗ FAILED"
print(f"{status}: {test_name}")
print(f"\nTotal: {passed}/{total} tests passed")
print_separator()
print("Testing complete!")
print("\nNext steps:")
print(" - View comments in Django Admin: http://localhost:8000/admin/")
print(" - Check logs: tail -f logs/commentscraper.log")
print(" - Start Celery worker: celery -A config worker --loglevel=info")
print(" - Start Celery Beat: celery -A config beat --loglevel=info")
print(" - Or run both: celery -A config worker --beat --loglevel=info")
print(" - View Celery schedules: python -c 'from config.celery import app; print(app.conf.beat_schedule)'")
print()
if __name__ == '__main__':
main()