343 lines
11 KiB
Python
343 lines
11 KiB
Python
"""
|
|
Celery scheduled tasks for social media comment scraping and analysis.
|
|
"""
|
|
import logging
|
|
from celery import shared_task
|
|
from celery.schedules import crontab
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
from django.conf import settings
|
|
|
|
from .services import CommentService
|
|
from .services.analysis_service import AnalysisService
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Analysis settings
|
|
ANALYSIS_BATCH_SIZE = 10 # Number of comments to analyze per batch
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@shared_task
|
|
def scrape_all_platforms():
|
|
"""
|
|
Scheduled task to scrape all configured social media platforms.
|
|
This task is scheduled using Celery Beat.
|
|
|
|
After scraping, automatically queues analysis for pending comments.
|
|
|
|
Usage: Schedule this task to run at regular intervals (e.g., daily, hourly)
|
|
|
|
Returns:
|
|
Dictionary with results from each platform
|
|
"""
|
|
logger.info("Starting scheduled scrape for all platforms")
|
|
|
|
try:
|
|
service = CommentService()
|
|
results = service.scrape_and_save()
|
|
|
|
logger.info(f"Completed scheduled scrape. Results: {results}")
|
|
|
|
# Automatically queue analysis for pending comments
|
|
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
|
|
logger.info("Queued analysis task for pending comments")
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in scheduled scrape task: {e}")
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def scrape_youtube_comments(channel_id: str = None):
|
|
"""
|
|
Scheduled task to scrape YouTube comments.
|
|
|
|
Args:
|
|
channel_id: Optional YouTube channel ID (uses default from settings if not provided)
|
|
|
|
Returns:
|
|
Dictionary with 'total' and 'comments'
|
|
"""
|
|
logger.info("Starting scheduled YouTube scrape")
|
|
|
|
try:
|
|
service = CommentService()
|
|
result = service.scrape_youtube(channel_id=channel_id, save_to_db=True)
|
|
|
|
logger.info(f"Completed YouTube scrape. Total comments: {len(result)}")
|
|
|
|
# Automatically queue analysis for pending comments
|
|
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
|
|
logger.info("Queued analysis task for pending comments")
|
|
|
|
return {'total': len(result), 'comments': result}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in YouTube scrape task: {e}")
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def scrape_facebook_comments(page_id: str = None):
|
|
"""
|
|
Scheduled task to scrape Facebook comments.
|
|
|
|
Args:
|
|
page_id: Optional Facebook page ID (uses default from settings if not provided)
|
|
|
|
Returns:
|
|
Dictionary with 'total' and 'comments'
|
|
"""
|
|
logger.info("Starting scheduled Facebook scrape")
|
|
|
|
try:
|
|
service = CommentService()
|
|
result = service.scrape_facebook(page_id=page_id, save_to_db=True)
|
|
|
|
logger.info(f"Completed Facebook scrape. Total comments: {len(result)}")
|
|
|
|
# Automatically queue analysis for pending comments
|
|
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
|
|
logger.info("Queued analysis task for pending comments")
|
|
|
|
return {'total': len(result), 'comments': result}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in Facebook scrape task: {e}")
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def scrape_instagram_comments(account_id: str = None):
|
|
"""
|
|
Scheduled task to scrape Instagram comments.
|
|
|
|
Args:
|
|
account_id: Optional Instagram account ID (uses default from settings if not provided)
|
|
|
|
Returns:
|
|
Dictionary with 'total' and 'comments'
|
|
"""
|
|
logger.info("Starting scheduled Instagram scrape")
|
|
|
|
try:
|
|
service = CommentService()
|
|
result = service.scrape_instagram(account_id=account_id, save_to_db=True)
|
|
|
|
logger.info(f"Completed Instagram scrape. Total comments: {len(result)}")
|
|
|
|
# Automatically queue analysis for pending comments
|
|
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
|
|
logger.info("Queued analysis task for pending comments")
|
|
|
|
return {'total': len(result), 'comments': result}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in Instagram scrape task: {e}")
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def scrape_twitter_comments(username: str = None):
|
|
"""
|
|
Scheduled task to scrape Twitter/X comments (replies).
|
|
|
|
Args:
|
|
username: Optional Twitter username (uses default from settings if not provided)
|
|
|
|
Returns:
|
|
Dictionary with 'total' and 'comments'
|
|
"""
|
|
logger.info("Starting scheduled Twitter/X scrape")
|
|
|
|
try:
|
|
service = CommentService()
|
|
result = service.scrape_twitter(username=username, save_to_db=True)
|
|
|
|
logger.info(f"Completed Twitter/X scrape. Total comments: {len(result)}")
|
|
|
|
# Automatically queue analysis for pending comments
|
|
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
|
|
logger.info("Queued analysis task for pending comments")
|
|
|
|
return {'total': len(result), 'comments': result}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in Twitter/X scrape task: {e}")
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def scrape_linkedin_comments(organization_id: str = None):
|
|
"""
|
|
Scheduled task to scrape LinkedIn comments from organization posts.
|
|
|
|
Args:
|
|
organization_id: Optional LinkedIn organization URN (uses default from settings if not provided)
|
|
|
|
Returns:
|
|
Dictionary with 'total' and 'comments'
|
|
"""
|
|
logger.info("Starting scheduled LinkedIn scrape")
|
|
|
|
try:
|
|
service = CommentService()
|
|
result = service.scrape_linkedin(organization_id=organization_id, save_to_db=True)
|
|
|
|
logger.info(f"Completed LinkedIn scrape. Total comments: {len(result)}")
|
|
|
|
# Automatically queue analysis for pending comments
|
|
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
|
|
logger.info("Queued analysis task for pending comments")
|
|
|
|
return {'total': len(result), 'comments': result}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in LinkedIn scrape task: {e}")
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def scrape_google_reviews(location_names: list = None):
|
|
"""
|
|
Scheduled task to scrape Google Reviews from business locations.
|
|
|
|
Args:
|
|
location_names: Optional list of location names to scrape (uses all locations if not provided)
|
|
|
|
Returns:
|
|
Dictionary with 'total' and 'reviews'
|
|
"""
|
|
logger.info("Starting scheduled Google Reviews scrape")
|
|
|
|
try:
|
|
service = CommentService()
|
|
result = service.scrape_google_reviews(location_names=location_names, save_to_db=True)
|
|
|
|
logger.info(f"Completed Google Reviews scrape. Total reviews: {len(result)}")
|
|
|
|
# Automatically queue analysis for pending comments
|
|
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
|
|
logger.info("Queued analysis task for pending comments")
|
|
|
|
return {'total': len(result), 'reviews': result}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in Google Reviews scrape task: {e}")
|
|
raise
|
|
|
|
|
|
# ============================================================================
|
|
# AI Analysis Tasks
|
|
# ============================================================================
|
|
|
|
@shared_task
|
|
def analyze_pending_comments(limit: int = 100):
|
|
"""
|
|
Scheduled task to analyze all pending (unanalyzed) comments.
|
|
|
|
Args:
|
|
limit: Maximum number of comments to analyze in one run
|
|
|
|
Returns:
|
|
Dictionary with analysis statistics
|
|
"""
|
|
if not getattr(settings, 'ANALYSIS_ENABLED', True):
|
|
logger.info("Comment analysis is disabled")
|
|
return {'success': False, 'message': 'Analysis disabled'}
|
|
|
|
logger.info("Starting scheduled comment analysis")
|
|
|
|
try:
|
|
service = AnalysisService()
|
|
results = service.analyze_pending_comments(limit=limit)
|
|
|
|
logger.info(f"Completed comment analysis. Results: {results}")
|
|
|
|
# Check if there are more pending comments and queue another batch if needed
|
|
from .models import SocialMediaComment
|
|
pending_count = SocialMediaComment.objects.filter(
|
|
ai_analysis__isnull=True
|
|
).count() + SocialMediaComment.objects.filter(
|
|
ai_analysis={}
|
|
).count()
|
|
|
|
# FIXED: Queue if ANY pending comments remain (not just >= batch size)
|
|
if pending_count > 0:
|
|
logger.info(f" - Found {pending_count} pending comments, queuing next batch")
|
|
# Use min() to ensure we don't exceed batch size
|
|
batch_size = min(pending_count, ANALYSIS_BATCH_SIZE)
|
|
analyze_pending_comments.delay(limit=batch_size)
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in comment analysis task: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def analyze_recent_comments(hours: int = 24, limit: int = 100):
|
|
"""
|
|
Scheduled task to analyze comments scraped in the last N hours.
|
|
|
|
Args:
|
|
hours: Number of hours to look back
|
|
limit: Maximum number of comments to analyze
|
|
|
|
Returns:
|
|
Dictionary with analysis statistics
|
|
"""
|
|
if not getattr(settings, 'ANALYSIS_ENABLED', True):
|
|
logger.info("Comment analysis is disabled")
|
|
return {'success': False, 'message': 'Analysis disabled'}
|
|
|
|
logger.info(f"Starting analysis for comments from last {hours} hours")
|
|
|
|
try:
|
|
service = AnalysisService()
|
|
results = service.analyze_recent_comments(hours=hours, limit=limit)
|
|
|
|
logger.info(f"Completed recent comment analysis. Results: {results}")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in recent comment analysis task: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def analyze_platform_comments(platform: str, limit: int = 100):
|
|
"""
|
|
Scheduled task to analyze comments from a specific platform.
|
|
|
|
Args:
|
|
platform: Platform name (e.g., 'youtube', 'facebook', 'instagram')
|
|
limit: Maximum number of comments to analyze
|
|
|
|
Returns:
|
|
Dictionary with analysis statistics
|
|
"""
|
|
if not getattr(settings, 'ANALYSIS_ENABLED', True):
|
|
logger.info("Comment analysis is disabled")
|
|
return {'success': False, 'message': 'Analysis disabled'}
|
|
|
|
logger.info(f"Starting analysis for {platform} comments")
|
|
|
|
try:
|
|
service = AnalysisService()
|
|
results = service.analyze_comments_by_platform(platform=platform, limit=limit)
|
|
|
|
logger.info(f"Completed {platform} comment analysis. Results: {results}")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in {platform} comment analysis task: {e}", exc_info=True)
|
|
raise
|