HH/apps/social/tasks.py
2026-01-15 14:31:58 +03:00

343 lines
11 KiB
Python

"""
Celery scheduled tasks for social media comment scraping and analysis.
"""
import logging
from celery import shared_task
from celery.schedules import crontab
from django.utils import timezone
from datetime import timedelta
from django.conf import settings
from .services import CommentService
from .services.analysis_service import AnalysisService
logger = logging.getLogger(__name__)
# Analysis settings
ANALYSIS_BATCH_SIZE = 10 # Number of comments to analyze per batch
logger = logging.getLogger(__name__)
@shared_task
def scrape_all_platforms():
"""
Scheduled task to scrape all configured social media platforms.
This task is scheduled using Celery Beat.
After scraping, automatically queues analysis for pending comments.
Usage: Schedule this task to run at regular intervals (e.g., daily, hourly)
Returns:
Dictionary with results from each platform
"""
logger.info("Starting scheduled scrape for all platforms")
try:
service = CommentService()
results = service.scrape_and_save()
logger.info(f"Completed scheduled scrape. Results: {results}")
# Automatically queue analysis for pending comments
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
logger.info("Queued analysis task for pending comments")
return results
except Exception as e:
logger.error(f"Error in scheduled scrape task: {e}")
raise
@shared_task
def scrape_youtube_comments(channel_id: str = None):
"""
Scheduled task to scrape YouTube comments.
Args:
channel_id: Optional YouTube channel ID (uses default from settings if not provided)
Returns:
Dictionary with 'total' and 'comments'
"""
logger.info("Starting scheduled YouTube scrape")
try:
service = CommentService()
result = service.scrape_youtube(channel_id=channel_id, save_to_db=True)
logger.info(f"Completed YouTube scrape. Total comments: {len(result)}")
# Automatically queue analysis for pending comments
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
logger.info("Queued analysis task for pending comments")
return {'total': len(result), 'comments': result}
except Exception as e:
logger.error(f"Error in YouTube scrape task: {e}")
raise
@shared_task
def scrape_facebook_comments(page_id: str = None):
"""
Scheduled task to scrape Facebook comments.
Args:
page_id: Optional Facebook page ID (uses default from settings if not provided)
Returns:
Dictionary with 'total' and 'comments'
"""
logger.info("Starting scheduled Facebook scrape")
try:
service = CommentService()
result = service.scrape_facebook(page_id=page_id, save_to_db=True)
logger.info(f"Completed Facebook scrape. Total comments: {len(result)}")
# Automatically queue analysis for pending comments
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
logger.info("Queued analysis task for pending comments")
return {'total': len(result), 'comments': result}
except Exception as e:
logger.error(f"Error in Facebook scrape task: {e}")
raise
@shared_task
def scrape_instagram_comments(account_id: str = None):
"""
Scheduled task to scrape Instagram comments.
Args:
account_id: Optional Instagram account ID (uses default from settings if not provided)
Returns:
Dictionary with 'total' and 'comments'
"""
logger.info("Starting scheduled Instagram scrape")
try:
service = CommentService()
result = service.scrape_instagram(account_id=account_id, save_to_db=True)
logger.info(f"Completed Instagram scrape. Total comments: {len(result)}")
# Automatically queue analysis for pending comments
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
logger.info("Queued analysis task for pending comments")
return {'total': len(result), 'comments': result}
except Exception as e:
logger.error(f"Error in Instagram scrape task: {e}")
raise
@shared_task
def scrape_twitter_comments(username: str = None):
"""
Scheduled task to scrape Twitter/X comments (replies).
Args:
username: Optional Twitter username (uses default from settings if not provided)
Returns:
Dictionary with 'total' and 'comments'
"""
logger.info("Starting scheduled Twitter/X scrape")
try:
service = CommentService()
result = service.scrape_twitter(username=username, save_to_db=True)
logger.info(f"Completed Twitter/X scrape. Total comments: {len(result)}")
# Automatically queue analysis for pending comments
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
logger.info("Queued analysis task for pending comments")
return {'total': len(result), 'comments': result}
except Exception as e:
logger.error(f"Error in Twitter/X scrape task: {e}")
raise
@shared_task
def scrape_linkedin_comments(organization_id: str = None):
"""
Scheduled task to scrape LinkedIn comments from organization posts.
Args:
organization_id: Optional LinkedIn organization URN (uses default from settings if not provided)
Returns:
Dictionary with 'total' and 'comments'
"""
logger.info("Starting scheduled LinkedIn scrape")
try:
service = CommentService()
result = service.scrape_linkedin(organization_id=organization_id, save_to_db=True)
logger.info(f"Completed LinkedIn scrape. Total comments: {len(result)}")
# Automatically queue analysis for pending comments
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
logger.info("Queued analysis task for pending comments")
return {'total': len(result), 'comments': result}
except Exception as e:
logger.error(f"Error in LinkedIn scrape task: {e}")
raise
@shared_task
def scrape_google_reviews(location_names: list = None):
"""
Scheduled task to scrape Google Reviews from business locations.
Args:
location_names: Optional list of location names to scrape (uses all locations if not provided)
Returns:
Dictionary with 'total' and 'reviews'
"""
logger.info("Starting scheduled Google Reviews scrape")
try:
service = CommentService()
result = service.scrape_google_reviews(location_names=location_names, save_to_db=True)
logger.info(f"Completed Google Reviews scrape. Total reviews: {len(result)}")
# Automatically queue analysis for pending comments
analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE)
logger.info("Queued analysis task for pending comments")
return {'total': len(result), 'reviews': result}
except Exception as e:
logger.error(f"Error in Google Reviews scrape task: {e}")
raise
# ============================================================================
# AI Analysis Tasks
# ============================================================================
@shared_task
def analyze_pending_comments(limit: int = 100):
"""
Scheduled task to analyze all pending (unanalyzed) comments.
Args:
limit: Maximum number of comments to analyze in one run
Returns:
Dictionary with analysis statistics
"""
if not getattr(settings, 'ANALYSIS_ENABLED', True):
logger.info("Comment analysis is disabled")
return {'success': False, 'message': 'Analysis disabled'}
logger.info("Starting scheduled comment analysis")
try:
service = AnalysisService()
results = service.analyze_pending_comments(limit=limit)
logger.info(f"Completed comment analysis. Results: {results}")
# Check if there are more pending comments and queue another batch if needed
from .models import SocialMediaComment
pending_count = SocialMediaComment.objects.filter(
ai_analysis__isnull=True
).count() + SocialMediaComment.objects.filter(
ai_analysis={}
).count()
# FIXED: Queue if ANY pending comments remain (not just >= batch size)
if pending_count > 0:
logger.info(f" - Found {pending_count} pending comments, queuing next batch")
# Use min() to ensure we don't exceed batch size
batch_size = min(pending_count, ANALYSIS_BATCH_SIZE)
analyze_pending_comments.delay(limit=batch_size)
return results
except Exception as e:
logger.error(f"Error in comment analysis task: {e}", exc_info=True)
raise
@shared_task
def analyze_recent_comments(hours: int = 24, limit: int = 100):
"""
Scheduled task to analyze comments scraped in the last N hours.
Args:
hours: Number of hours to look back
limit: Maximum number of comments to analyze
Returns:
Dictionary with analysis statistics
"""
if not getattr(settings, 'ANALYSIS_ENABLED', True):
logger.info("Comment analysis is disabled")
return {'success': False, 'message': 'Analysis disabled'}
logger.info(f"Starting analysis for comments from last {hours} hours")
try:
service = AnalysisService()
results = service.analyze_recent_comments(hours=hours, limit=limit)
logger.info(f"Completed recent comment analysis. Results: {results}")
return results
except Exception as e:
logger.error(f"Error in recent comment analysis task: {e}", exc_info=True)
raise
@shared_task
def analyze_platform_comments(platform: str, limit: int = 100):
"""
Scheduled task to analyze comments from a specific platform.
Args:
platform: Platform name (e.g., 'youtube', 'facebook', 'instagram')
limit: Maximum number of comments to analyze
Returns:
Dictionary with analysis statistics
"""
if not getattr(settings, 'ANALYSIS_ENABLED', True):
logger.info("Comment analysis is disabled")
return {'success': False, 'message': 'Analysis disabled'}
logger.info(f"Starting analysis for {platform} comments")
try:
service = AnalysisService()
results = service.analyze_comments_by_platform(platform=platform, limit=limit)
logger.info(f"Completed {platform} comment analysis. Results: {results}")
return results
except Exception as e:
logger.error(f"Error in {platform} comment analysis task: {e}", exc_info=True)
raise