No AI analysis available
') + + sentiment = obj.ai_analysis.get('sentiment', {}) + summary_en = obj.ai_analysis.get('summaries', {}).get('en', '') + summary_ar = obj.ai_analysis.get('summaries', {}).get('ar', '') + keywords = obj.ai_analysis.get('keywords', {}).get('en', []) + + html = format_html('Classification: {} ({})
', + sentiment.get('classification', {}).get('en', 'N/A'), + sentiment.get('classification', {}).get('ar', 'N/A') + ) + html += format_html('Score: {}
', + sentiment.get('score', 0) + ) + html += format_html('Confidence: {}
', + sentiment.get('confidence', 0) + ) + + if summary_en: + html += format_html('{}
', summary_en) + if summary_ar: + html += format_html('{}
', summary_ar) + + if keywords: + html += format_html('{}
', ', '.join(keywords)) + + return html + ai_analysis_display.short_description = 'AI Analysis' + + def is_analyzed(self, obj): + """ + Display whether comment has been analyzed. + """ + return bool(obj.ai_analysis) + is_analyzed.boolean = True + is_analyzed.short_description = 'Analyzed' + + def trigger_analysis(self, request, queryset): + """ + Admin action to trigger AI analysis for selected comments. + """ + service = AnalysisService() + analyzed = 0 + failed = 0 + + for comment in queryset: + if not comment.ai_analysis: # Only analyze unanalyzed comments + result = service.reanalyze_comment(comment.id) + if result.get('success'): + analyzed += 1 + else: + failed += 1 + + self.message_user( + request, + f'Analysis complete: {analyzed} analyzed, {failed} failed', + level='SUCCESS' if failed == 0 else 'WARNING' + ) + trigger_analysis.short_description = 'Analyze selected comments' diff --git a/apps/social/apps.py b/apps/social/apps.py index 5649ec6..fedb2ff 100644 --- a/apps/social/apps.py +++ b/apps/social/apps.py @@ -1,10 +1,7 @@ -""" -social app configuration -""" from django.apps import AppConfig class SocialConfig(AppConfig): - default_auto_field = 'django.db.models.BigAutoField' name = 'apps.social' - verbose_name = 'Social' + default_auto_field = 'django.db.models.BigAutoField' + verbose_name = 'Social Media' diff --git a/apps/social/migrations/0001_initial.py b/apps/social/migrations/0001_initial.py index 40076f3..e7a7f4e 100644 --- a/apps/social/migrations/0001_initial.py +++ b/apps/social/migrations/0001_initial.py @@ -1,8 +1,5 @@ -# Generated by Django 5.0.14 on 2026-01-08 06:56 +# Generated by Django 6.0 on 2026-01-07 13:55 -import django.db.models.deletion -import uuid -from django.conf import settings from django.db import migrations, models @@ -11,47 +8,31 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ('organizations', '0001_initial'), - ('px_action_center', '0001_initial'), - migrations.swappable_dependency(settings.AUTH_USER_MODEL), ] operations = [ migrations.CreateModel( - name='SocialMention', + name='SocialMediaComment', fields=[ - ('created_at', models.DateTimeField(auto_now_add=True, db_index=True)), - ('updated_at', models.DateTimeField(auto_now=True)), - ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), - ('platform', models.CharField(choices=[('twitter', 'Twitter/X'), ('facebook', 'Facebook'), ('instagram', 'Instagram'), ('linkedin', 'LinkedIn'), ('youtube', 'YouTube'), ('tiktok', 'TikTok'), ('other', 'Other')], db_index=True, max_length=50)), - ('post_url', models.URLField(max_length=1000)), - ('post_id', models.CharField(db_index=True, help_text='Unique post ID from platform', max_length=200, unique=True)), - ('author_username', models.CharField(max_length=200)), - ('author_name', models.CharField(blank=True, max_length=200)), - ('author_followers', models.IntegerField(blank=True, null=True)), - ('content', models.TextField()), - ('content_ar', models.TextField(blank=True, help_text='Arabic translation if applicable')), - ('sentiment', models.CharField(blank=True, choices=[('positive', 'Positive'), ('neutral', 'Neutral'), ('negative', 'Negative')], db_index=True, max_length=20, null=True)), - ('sentiment_score', models.DecimalField(blank=True, decimal_places=2, help_text='Sentiment score (-1 to 1, or 0-100 depending on AI service)', max_digits=5, null=True)), - ('sentiment_analyzed_at', models.DateTimeField(blank=True, null=True)), - ('likes_count', models.IntegerField(default=0)), - ('shares_count', models.IntegerField(default=0)), - ('comments_count', models.IntegerField(default=0)), - ('posted_at', models.DateTimeField(db_index=True)), - ('collected_at', models.DateTimeField(auto_now_add=True)), - ('responded', models.BooleanField(default=False)), - ('response_text', models.TextField(blank=True)), - ('responded_at', models.DateTimeField(blank=True, null=True)), - ('action_created', models.BooleanField(default=False)), - ('metadata', models.JSONField(blank=True, default=dict)), - ('department', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='social_mentions', to='organizations.department')), - ('hospital', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='social_mentions', to='organizations.hospital')), - ('px_action', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='social_mentions', to='px_action_center.pxaction')), - ('responded_by', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='social_responses', to=settings.AUTH_USER_MODEL)), + ('id', models.BigAutoField(primary_key=True, serialize=False)), + ('platform', models.CharField(choices=[('facebook', 'Facebook'), ('instagram', 'Instagram'), ('youtube', 'YouTube'), ('twitter', 'Twitter/X'), ('linkedin', 'LinkedIn'), ('tiktok', 'TikTok'), ('google', 'Google Reviews')], db_index=True, help_text='Social media platform', max_length=50)), + ('comment_id', models.CharField(db_index=True, help_text='Unique comment ID from the platform', max_length=255)), + ('comments', models.TextField(help_text='Comment text content')), + ('author', models.CharField(blank=True, help_text='Comment author', max_length=255, null=True)), + ('raw_data', models.JSONField(default=dict, help_text='Complete raw data from platform API')), + ('post_id', models.CharField(blank=True, help_text='ID of the post/media', max_length=255, null=True)), + ('media_url', models.URLField(blank=True, help_text='URL to associated media', max_length=500, null=True)), + ('like_count', models.IntegerField(default=0, help_text='Number of likes')), + ('reply_count', models.IntegerField(default=0, help_text='Number of replies')), + ('rating', models.IntegerField(blank=True, db_index=True, help_text='Star rating (1-5) for review platforms like Google Reviews', null=True)), + ('published_at', models.DateTimeField(blank=True, db_index=True, help_text='When the comment was published', null=True)), + ('scraped_at', models.DateTimeField(auto_now_add=True, db_index=True, help_text='When the comment was scraped')), + ('ai_analysis', models.JSONField(blank=True, db_index=True, default=dict, help_text='Complete AI analysis in bilingual format (en/ar) with sentiment, summaries, keywords, topics, entities, and emotions')), ], options={ - 'ordering': ['-posted_at'], - 'indexes': [models.Index(fields=['platform', '-posted_at'], name='social_soci_platfor_b8e20e_idx'), models.Index(fields=['sentiment', '-posted_at'], name='social_soci_sentime_a4e18d_idx'), models.Index(fields=['hospital', 'sentiment', '-posted_at'], name='social_soci_hospita_8b4bde_idx')], + 'ordering': ['-published_at'], + 'indexes': [models.Index(fields=['platform'], name='social_soci_platfor_307afd_idx'), models.Index(fields=['published_at'], name='social_soci_publish_5f2b85_idx'), models.Index(fields=['platform', '-published_at'], name='social_soci_platfor_4f0230_idx'), models.Index(fields=['ai_analysis'], name='idx_ai_analysis')], + 'unique_together': {('platform', 'comment_id')}, }, ), ] diff --git a/apps/social/models.py b/apps/social/models.py index 88801c5..e7ef3c9 100644 --- a/apps/social/models.py +++ b/apps/social/models.py @@ -1,138 +1,107 @@ -""" -Social models - Social media monitoring and sentiment analysis - -This module implements social media monitoring that: -- Tracks mentions across platforms -- Analyzes sentiment -- Creates PX actions for negative mentions -- Monitors brand reputation -""" from django.db import models - -from apps.core.models import TimeStampedModel, UUIDModel +from django.utils import timezone class SocialPlatform(models.TextChoices): """Social media platform choices""" - TWITTER = 'twitter', 'Twitter/X' FACEBOOK = 'facebook', 'Facebook' INSTAGRAM = 'instagram', 'Instagram' - LINKEDIN = 'linkedin', 'LinkedIn' YOUTUBE = 'youtube', 'YouTube' + TWITTER = 'twitter', 'Twitter/X' + LINKEDIN = 'linkedin', 'LinkedIn' TIKTOK = 'tiktok', 'TikTok' - OTHER = 'other', 'Other' + GOOGLE = 'google', 'Google Reviews' -class SentimentType(models.TextChoices): - """Sentiment analysis result choices""" - POSITIVE = 'positive', 'Positive' - NEUTRAL = 'neutral', 'Neutral' - NEGATIVE = 'negative', 'Negative' - - -class SocialMention(UUIDModel, TimeStampedModel): +class SocialMediaComment(models.Model): + """ + Model to store social media comments from various platforms with AI analysis. + Stores scraped comments and AI-powered sentiment, keywords, topics, and entity analysis. """ - Social media mention - tracks mentions of hospital/brand. - Negative sentiment triggers PX action creation. - """ - # Platform and source + # --- Core --- + id = models.BigAutoField(primary_key=True) platform = models.CharField( - max_length=50, + max_length=50, choices=SocialPlatform.choices, - db_index=True - ) - post_url = models.URLField(max_length=1000) - post_id = models.CharField( - max_length=200, - unique=True, db_index=True, - help_text="Unique post ID from platform" + help_text="Social media platform" + ) + comment_id = models.CharField( + max_length=255, + db_index=True, + help_text="Unique comment ID from the platform" ) - # Author information - author_username = models.CharField(max_length=200) - author_name = models.CharField(max_length=200, blank=True) - author_followers = models.IntegerField(null=True, blank=True) + # --- Content --- + comments = models.TextField(help_text="Comment text content") + author = models.CharField(max_length=255, null=True, blank=True, help_text="Comment author") - # Content - content = models.TextField() - content_ar = models.TextField(blank=True, help_text="Arabic translation if applicable") + # --- Raw Data --- + raw_data = models.JSONField( + default=dict, + help_text="Complete raw data from platform API" + ) - # Organization - hospital = models.ForeignKey( - 'organizations.Hospital', - on_delete=models.CASCADE, + # --- Metadata --- + post_id = models.CharField( + max_length=255, + null=True, + blank=True, + help_text="ID of the post/media" + ) + media_url = models.URLField( + max_length=500, + null=True, + blank=True, + help_text="URL to associated media" + ) + + # --- Engagement --- + like_count = models.IntegerField(default=0, help_text="Number of likes") + reply_count = models.IntegerField(default=0, help_text="Number of replies") + rating = models.IntegerField( null=True, blank=True, - related_name='social_mentions' + db_index=True, + help_text="Star rating (1-5) for review platforms like Google Reviews" ) - department = models.ForeignKey( - 'organizations.Department', - on_delete=models.SET_NULL, - null=True, + + # --- Timestamps --- + published_at = models.DateTimeField( + null=True, blank=True, - related_name='social_mentions' + db_index=True, + help_text="When the comment was published" + ) + scraped_at = models.DateTimeField( + auto_now_add=True, + db_index=True, + help_text="When the comment was scraped" ) - # Sentiment analysis - sentiment = models.CharField( - max_length=20, - choices=SentimentType.choices, - null=True, + # --- AI Bilingual Analysis --- + ai_analysis = models.JSONField( + default=dict, blank=True, - db_index=True + db_index=True, + help_text="Complete AI analysis in bilingual format (en/ar) with sentiment, summaries, keywords, topics, entities, and emotions" ) - sentiment_score = models.DecimalField( - max_digits=5, - decimal_places=2, - null=True, - blank=True, - help_text="Sentiment score (-1 to 1, or 0-100 depending on AI service)" - ) - sentiment_analyzed_at = models.DateTimeField(null=True, blank=True) - - # Engagement metrics - likes_count = models.IntegerField(default=0) - shares_count = models.IntegerField(default=0) - comments_count = models.IntegerField(default=0) - - # Timestamps - posted_at = models.DateTimeField(db_index=True) - collected_at = models.DateTimeField(auto_now_add=True) - - # Response tracking - responded = models.BooleanField(default=False) - response_text = models.TextField(blank=True) - responded_at = models.DateTimeField(null=True, blank=True) - responded_by = models.ForeignKey( - 'accounts.User', - on_delete=models.SET_NULL, - null=True, - blank=True, - related_name='social_responses' - ) - - # Action tracking - action_created = models.BooleanField(default=False) - px_action = models.ForeignKey( - 'px_action_center.PXAction', - on_delete=models.SET_NULL, - null=True, - blank=True, - related_name='social_mentions' - ) - - # Metadata - metadata = models.JSONField(default=dict, blank=True) class Meta: - ordering = ['-posted_at'] + ordering = ['-published_at'] + unique_together = ['platform', 'comment_id'] indexes = [ - models.Index(fields=['platform', '-posted_at']), - models.Index(fields=['sentiment', '-posted_at']), - models.Index(fields=['hospital', 'sentiment', '-posted_at']), + models.Index(fields=['platform']), + models.Index(fields=['published_at']), + models.Index(fields=['platform', '-published_at']), + models.Index(fields=['ai_analysis'], name='idx_ai_analysis'), ] def __str__(self): - return f"{self.platform} - {self.author_username} - {self.posted_at.strftime('%Y-%m-%d')}" + return f"{self.platform} - {self.author or 'Anonymous'}" + + @property + def is_analyzed(self): + """Check if comment has been AI analyzed""" + return bool(self.ai_analysis) diff --git a/apps/social/scrapers/__init__.py b/apps/social/scrapers/__init__.py new file mode 100644 index 0000000..cb32ab6 --- /dev/null +++ b/apps/social/scrapers/__init__.py @@ -0,0 +1,13 @@ +""" +Social media scrapers for extracting comments from various platforms. +""" + +from .base import BaseScraper +from .youtube import YouTubeScraper +from .facebook import FacebookScraper +from .instagram import InstagramScraper +from .twitter import TwitterScraper +from .linkedin import LinkedInScraper +from .google_reviews import GoogleReviewsScraper + +__all__ = ['BaseScraper', 'YouTubeScraper', 'FacebookScraper', 'InstagramScraper', 'TwitterScraper', 'LinkedInScraper', 'GoogleReviewsScraper'] diff --git a/apps/social/scrapers/base.py b/apps/social/scrapers/base.py new file mode 100644 index 0000000..9f3ed01 --- /dev/null +++ b/apps/social/scrapers/base.py @@ -0,0 +1,86 @@ +""" +Base scraper class for social media platforms. +""" +import logging +from abc import ABC, abstractmethod +from typing import List, Dict, Any +from datetime import datetime + + +class BaseScraper(ABC): + """ + Abstract base class for social media scrapers. + All platform-specific scrapers should inherit from this class. + """ + + def __init__(self, config: Dict[str, Any]): + """ + Initialize the scraper with configuration. + + Args: + config: Dictionary containing platform-specific configuration + """ + self.config = config + self.logger = logging.getLogger(self.__class__.__name__) + + @abstractmethod + def scrape_comments(self, **kwargs) -> List[Dict[str, Any]]: + """ + Scrape comments from the platform. + + Returns: + List of dictionaries containing comment data with standardized fields: + - comment_id: Unique comment ID from the platform + - comments: Comment text + - author: Author name/username + - published_at: Publication timestamp (ISO format) + - like_count: Number of likes + - reply_count: Number of replies + - post_id: ID of the post/media + - media_url: URL to associated media (if applicable) + - raw_data: Complete raw data from platform API + """ + pass + + def _standardize_comment(self, comment_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Standardize comment data format. + Subclasses can override this method to handle platform-specific formatting. + + Args: + comment_data: Raw comment data from platform API + + Returns: + Standardized comment dictionary + """ + return comment_data + + def _parse_timestamp(self, timestamp_str: str) -> str: + """ + Parse platform timestamp to ISO format. + + Args: + timestamp_str: Platform-specific timestamp string + + Returns: + ISO formatted timestamp string + """ + try: + # Try common timestamp formats + for fmt in [ + '%Y-%m-%dT%H:%M:%S%z', + '%Y-%m-%dT%H:%M:%SZ', + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%d', + ]: + try: + dt = datetime.strptime(timestamp_str, fmt) + return dt.isoformat() + except ValueError: + continue + + # If no format matches, return as-is + return timestamp_str + except Exception as e: + self.logger.warning(f"Failed to parse timestamp {timestamp_str}: {e}") + return timestamp_str diff --git a/apps/social/scrapers/facebook.py b/apps/social/scrapers/facebook.py new file mode 100644 index 0000000..3534aa7 --- /dev/null +++ b/apps/social/scrapers/facebook.py @@ -0,0 +1,187 @@ +""" +Facebook comment scraper using Facebook Graph API. +""" +import logging +import requests +from typing import List, Dict, Any + +from .base import BaseScraper + + +class FacebookScraper(BaseScraper): + """ + Scraper for Facebook comments using Facebook Graph API. + Extracts comments from posts. + """ + + BASE_URL = "https://graph.facebook.com/v19.0" + + def __init__(self, config: Dict[str, Any]): + """ + Initialize Facebook scraper. + + Args: + config: Dictionary with 'access_token' and optionally 'page_id' + """ + super().__init__(config) + self.access_token = config.get('access_token') + if not self.access_token: + raise ValueError( + "Facebook access token is required. " + "Set FACEBOOK_ACCESS_TOKEN in your .env file." + ) + + self.page_id = config.get('page_id') + if not self.page_id: + self.logger.warning( + "Facebook page_id not provided. " + "Set FACEBOOK_PAGE_ID in your .env file to specify which page to scrape." + ) + + self.logger = logging.getLogger(self.__class__.__name__) + + def scrape_comments(self, page_id: str = None, **kwargs) -> List[Dict[str, Any]]: + """ + Scrape comments from all posts on a Facebook page. + + Args: + page_id: Facebook page ID to scrape comments from + + Returns: + List of standardized comment dictionaries + """ + page_id = page_id or self.page_id + if not page_id: + raise ValueError("Facebook page ID is required") + + all_comments = [] + + self.logger.info(f"Starting Facebook comment extraction for page: {page_id}") + + # Get all posts from the page + posts = self._fetch_all_posts(page_id) + self.logger.info(f"Found {len(posts)} posts to process") + + # Get comments for each post + for post in posts: + post_id = post['id'] + post_comments = self._fetch_post_comments(post_id, post) + all_comments.extend(post_comments) + self.logger.info(f"Fetched {len(post_comments)} comments for post {post_id}") + + self.logger.info(f"Completed Facebook scraping. Total comments: {len(all_comments)}") + return all_comments + + def _fetch_all_posts(self, page_id: str) -> List[Dict[str, Any]]: + """ + Fetch all posts from a Facebook page. + + Args: + page_id: Facebook page ID + + Returns: + List of post dictionaries + """ + url = f"{self.BASE_URL}/{page_id}/feed" + params = { + 'access_token': self.access_token, + 'fields': 'id,message,created_time,permalink_url' + } + + all_posts = [] + while url: + try: + response = requests.get(url, params=params) + data = response.json() + + if 'error' in data: + self.logger.error(f"Facebook API error: {data['error']['message']}") + break + + all_posts.extend(data.get('data', [])) + + # Check for next page + url = data.get('paging', {}).get('next') + params = {} # Next URL already contains params + + except Exception as e: + self.logger.error(f"Error fetching posts: {e}") + break + + return all_posts + + def _fetch_post_comments(self, post_id: str, post_data: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Fetch all comments for a specific Facebook post. + + Args: + post_id: Facebook post ID + post_data: Post data dictionary + + Returns: + List of standardized comment dictionaries + """ + url = f"{self.BASE_URL}/{post_id}/comments" + params = { + 'access_token': self.access_token, + 'fields': 'id,message,from,created_time,like_count' + } + + all_comments = [] + while url: + try: + response = requests.get(url, params=params) + data = response.json() + + if 'error' in data: + self.logger.error(f"Facebook API error: {data['error']['message']}") + break + + # Process comments + for comment_data in data.get('data', []): + comment = self._extract_comment(comment_data, post_id, post_data) + if comment: + all_comments.append(comment) + + # Check for next page + url = data.get('paging', {}).get('next') + params = {} # Next URL already contains params + + except Exception as e: + self.logger.error(f"Error fetching comments for post {post_id}: {e}") + break + + return all_comments + + def _extract_comment(self, comment_data: Dict[str, Any], post_id: str, post_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract and standardize a Facebook comment. + + Args: + comment_data: Facebook API comment data + post_id: Post ID + post_data: Post data dictionary + + Returns: + Standardized comment dictionary + """ + try: + from_data = comment_data.get('from', {}) + + comment = { + 'comment_id': comment_data['id'], + 'comments': comment_data.get('message', ''), + 'author': from_data.get('name', ''), + 'published_at': self._parse_timestamp(comment_data.get('created_time')), + 'like_count': comment_data.get('like_count', 0), + 'reply_count': 0, # Facebook API doesn't provide reply count easily + 'post_id': post_id, + 'media_url': post_data.get('permalink_url'), + 'raw_data': comment_data + } + + return self._standardize_comment(comment) + + except Exception as e: + self.logger.error(f"Error extracting Facebook comment: {e}") + return None diff --git a/apps/social/scrapers/google_reviews.py b/apps/social/scrapers/google_reviews.py new file mode 100644 index 0000000..8c47bd3 --- /dev/null +++ b/apps/social/scrapers/google_reviews.py @@ -0,0 +1,345 @@ +""" +Google Reviews scraper using Google My Business API. +""" +import os +import json +import logging +from typing import List, Dict, Any, Optional +from pathlib import Path + +try: + from google.oauth2.credentials import Credentials + from google_auth_oauthlib.flow import InstalledAppFlow + from google.auth.transport.requests import Request + from googleapiclient.discovery import build +except ImportError: + raise ImportError( + "Google API client libraries not installed. " + "Install with: pip install google-api-python-client google-auth-oauthlib" + ) + +from .base import BaseScraper + + +class GoogleReviewsScraper(BaseScraper): + """ + Scraper for Google Reviews using Google My Business API. + Extracts reviews from one or multiple locations. + """ + + # OAuth scope for managing Business Profile data + SCOPES = ['https://www.googleapis.com/auth/business.manage'] + + def __init__(self, config: Dict[str, Any]): + """ + Initialize Google Reviews scraper. + + Args: + config: Dictionary with: + - 'credentials_file': Path to client_secret.json (or None) + - 'token_file': Path to token.json (default: 'token.json') + - 'locations': List of location names to scrape (optional) + - 'account_name': Google account name (optional, will be fetched if not provided) + """ + super().__init__(config) + + self.credentials_file = config.get('credentials_file', 'client_secret.json') + self.token_file = config.get('token_file', 'token.json') + self.locations = config.get('locations', None) # Specific locations to scrape + self.account_name = config.get('account_name', None) + + self.logger = logging.getLogger(self.__class__.__name__) + + # Authenticate and build service + self.service = self._get_authenticated_service() + + def _get_authenticated_service(self): + """ + Get authenticated Google My Business API service. + + Returns: + Authenticated service object + """ + creds = None + + # Load existing credentials from token file + if os.path.exists(self.token_file): + creds = Credentials.from_authorized_user_file(self.token_file, self.SCOPES) + + # If there are no (valid) credentials available, let the user log in + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + self.logger.info("Refreshing expired credentials...") + creds.refresh(Request()) + else: + # Check if credentials file exists + if not os.path.exists(self.credentials_file): + raise FileNotFoundError( + f"Google Reviews requires '{self.credentials_file}' credentials file. " + "This scraper will be disabled. See GOOGLE_REVIEWS_INTEGRATION_GUIDE.md for setup instructions." + ) + + self.logger.info("Starting OAuth flow...") + flow = InstalledAppFlow.from_client_secrets_file( + self.credentials_file, + self.SCOPES + ) + creds = flow.run_local_server(port=0) + + # Save the credentials for the next run + with open(self.token_file, 'w') as token: + token.write(creds.to_json()) + + self.logger.info(f"Credentials saved to {self.token_file}") + + # Build the service using the My Business v4 discovery document + service = build('mybusiness', 'v4', credentials=creds) + self.logger.info("Successfully authenticated with Google My Business API") + + return service + + def _get_account_name(self) -> str: + """ + Get the account ID from Google My Business. + + Returns: + Account name (e.g., 'accounts/123456789') + """ + if self.account_name: + return self.account_name + + self.logger.info("Fetching account list...") + accounts_resp = self.service.accounts().list().execute() + + if not accounts_resp.get('accounts'): + raise ValueError("No Google My Business accounts found. Please ensure you have admin access.") + + account_name = accounts_resp['accounts'][0]['name'] + self.logger.info(f"Using account: {account_name}") + self.account_name = account_name + + return account_name + + def _get_locations(self, account_name: str) -> List[Dict[str, Any]]: + """ + Get all locations for the account. + + Args: + account_name: Google account name + + Returns: + List of location dictionaries + """ + self.logger.info("Fetching location list...") + locations_resp = self.service.accounts().locations().list(parent=account_name).execute() + locations = locations_resp.get('locations', []) + + if not locations: + raise ValueError(f"No locations found under account {account_name}") + + self.logger.info(f"Found {len(locations)} locations") + + # Filter locations if specific locations are requested + if self.locations: + filtered_locations = [] + for loc in locations: + # Check if location name matches any of the requested locations + if any(req_loc in loc['name'] for req_loc in self.locations): + filtered_locations.append(loc) + self.logger.info(f"Filtered to {len(filtered_locations)} locations") + return filtered_locations + + return locations + + def scrape_comments( + self, + location_names: Optional[List[str]] = None, + max_reviews_per_location: int = 100, + **kwargs + ) -> List[Dict[str, Any]]: + """ + Scrape Google reviews from specified locations. + + Args: + location_names: Optional list of location names to scrape (scrapes all if None) + max_reviews_per_location: Maximum reviews to fetch per location + + Returns: + List of standardized review dictionaries + """ + all_reviews = [] + + try: + # Get account and locations + account_name = self._get_account_name() + locations = self._get_locations(account_name) + + # Apply location filter if provided + if location_names: + filtered_locations = [] + for loc in locations: + if any(req_loc in loc['name'] for req_loc in location_names): + filtered_locations.append(loc) + locations = filtered_locations + if not locations: + self.logger.warning(f"No matching locations found for: {location_names}") + return [] + + # Get location resource names for batch fetching + location_resource_names = [loc['name'] for loc in locations] + + self.logger.info(f"Extracting reviews for {len(location_resource_names)} locations...") + + # Batch fetch reviews for all locations + next_page_token = None + page_num = 0 + + while True: + page_num += 1 + self.logger.info(f"Fetching page {page_num} of reviews...") + + batch_body = { + "locationNames": location_resource_names, + "pageSize": max_reviews_per_location, + "pageToken": next_page_token, + "ignoreRatingOnlyReviews": False + } + + # Official batchGetReviews call + results = self.service.accounts().locations().batchGetReviews( + name=account_name, + body=batch_body + ).execute() + + location_reviews = results.get('locationReviews', []) + + if not location_reviews: + self.logger.info(f"No more reviews found on page {page_num}") + break + + # Process reviews + for loc_review in location_reviews: + review_data = loc_review.get('review', {}) + location_name = loc_review.get('name') + + standardized = self._extract_review(location_name, review_data) + if standardized: + all_reviews.append(standardized) + + self.logger.info(f" - Page {page_num}: {len(location_reviews)} reviews (total: {len(all_reviews)})") + + next_page_token = results.get('nextPageToken') + if not next_page_token: + self.logger.info("All reviews fetched") + break + + self.logger.info(f"Completed Google Reviews scraping. Total reviews: {len(all_reviews)}") + + # Log location distribution + location_stats = {} + for review in all_reviews: + location_id = review.get('raw_data', {}).get('location_name', 'unknown') + location_stats[location_id] = location_stats.get(location_id, 0) + 1 + + self.logger.info("Reviews by location:") + for location, count in location_stats.items(): + self.logger.info(f" - {location}: {count} reviews") + + return all_reviews + + except Exception as e: + self.logger.error(f"Error scraping Google Reviews: {e}") + raise + + def _extract_review( + self, + location_name: str, + review_data: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """ + Extract and standardize a review from Google My Business API response. + + Args: + location_name: Location resource name + review_data: Review object from Google API + + Returns: + Standardized review dictionary + """ + try: + # Extract review data + review_id = review_data.get('name', '') + reviewer_info = review_data.get('reviewer', {}) + comment = review_data.get('comment', '') + star_rating = review_data.get('starRating') + create_time = review_data.get('createTime') + update_time = review_data.get('updateTime') + + # Extract reviewer information + reviewer_name = reviewer_info.get('displayName', 'Anonymous') + reviewer_id = reviewer_info.get('name', '') + + # Extract review reply + reply_data = review_data.get('reviewReply', {}) + reply_comment = reply_data.get('comment', '') + reply_time = reply_data.get('updateTime', '') + + # Extract location details if available + # We'll get the full location info from the location name + try: + location_info = self.service.accounts().locations().get( + name=location_name + ).execute() + location_address = location_info.get('address', {}) + location_name_display = location_info.get('locationName', '') + location_city = location_address.get('locality', '') + location_country = location_address.get('countryCode', '') + except: + location_info = {} + location_name_display = '' + location_city = '' + location_country = '' + + # Build Google Maps URL for the review + # Extract location ID from resource name (e.g., 'accounts/123/locations/456') + location_id = location_name.split('/')[-1] + google_maps_url = f"https://search.google.com/local/writereview?placeid={location_id}" + + review_dict = { + 'comment_id': review_id, + 'comments': comment, + 'author': reviewer_name, + 'published_at': self._parse_timestamp(create_time) if create_time else None, + 'like_count': 0, # Google reviews don't have like counts + 'reply_count': 1 if reply_comment else 0, + 'post_id': location_name, # Store location name as post_id + 'media_url': google_maps_url, + 'raw_data': { + 'location_name': location_name, + 'location_id': location_id, + 'location_display_name': location_name_display, + 'location_city': location_city, + 'location_country': location_country, + 'location_info': location_info, + 'review_id': review_id, + 'reviewer_id': reviewer_id, + 'reviewer_name': reviewer_name, + 'star_rating': star_rating, + 'comment': comment, + 'create_time': create_time, + 'update_time': update_time, + 'reply_comment': reply_comment, + 'reply_time': reply_time, + 'full_review': review_data + } + } + + # Add rating field for Google Reviews (1-5 stars) + if star_rating: + review_dict['rating'] = int(star_rating) + + return self._standardize_comment(review_dict) + + except Exception as e: + self.logger.error(f"Error extracting Google review: {e}") + return None diff --git a/apps/social/scrapers/instagram.py b/apps/social/scrapers/instagram.py new file mode 100644 index 0000000..3acee11 --- /dev/null +++ b/apps/social/scrapers/instagram.py @@ -0,0 +1,187 @@ +""" +Instagram comment scraper using Instagram Graph API. +""" +import logging +import requests +from typing import List, Dict, Any + +from .base import BaseScraper + + +class InstagramScraper(BaseScraper): + """ + Scraper for Instagram comments using Instagram Graph API. + Extracts comments from media posts. + """ + + BASE_URL = "https://graph.facebook.com/v19.0" + + def __init__(self, config: Dict[str, Any]): + """ + Initialize Instagram scraper. + + Args: + config: Dictionary with 'access_token' and optionally 'account_id' + """ + super().__init__(config) + self.access_token = config.get('access_token') + if not self.access_token: + raise ValueError( + "Instagram access token is required. " + "Set INSTAGRAM_ACCESS_TOKEN in your .env file." + ) + + self.account_id = config.get('account_id') + if not self.account_id: + self.logger.warning( + "Instagram account_id not provided. " + "Set INSTAGRAM_ACCOUNT_ID in your .env file to specify which account to scrape." + ) + + self.logger = logging.getLogger(self.__class__.__name__) + + def scrape_comments(self, account_id: str = None, **kwargs) -> List[Dict[str, Any]]: + """ + Scrape comments from all media on an Instagram account. + + Args: + account_id: Instagram account ID to scrape comments from + + Returns: + List of standardized comment dictionaries + """ + account_id = account_id or self.account_id + if not account_id: + raise ValueError("Instagram account ID is required") + + all_comments = [] + + self.logger.info(f"Starting Instagram comment extraction for account: {account_id}") + + # Get all media from the account + media_list = self._fetch_all_media(account_id) + self.logger.info(f"Found {len(media_list)} media items to process") + + # Get comments for each media + for media in media_list: + media_id = media['id'] + media_comments = self._fetch_media_comments(media_id, media) + all_comments.extend(media_comments) + self.logger.info(f"Fetched {len(media_comments)} comments for media {media_id}") + + self.logger.info(f"Completed Instagram scraping. Total comments: {len(all_comments)}") + return all_comments + + def _fetch_all_media(self, account_id: str) -> List[Dict[str, Any]]: + """ + Fetch all media from an Instagram account. + + Args: + account_id: Instagram account ID + + Returns: + List of media dictionaries + """ + url = f"{self.BASE_URL}/{account_id}/media" + params = { + 'access_token': self.access_token, + 'fields': 'id,caption,timestamp,permalink_url,media_type' + } + + all_media = [] + while url: + try: + response = requests.get(url, params=params) + data = response.json() + + if 'error' in data: + self.logger.error(f"Instagram API error: {data['error']['message']}") + break + + all_media.extend(data.get('data', [])) + + # Check for next page + url = data.get('paging', {}).get('next') + params = {} # Next URL already contains params + + except Exception as e: + self.logger.error(f"Error fetching media: {e}") + break + + return all_media + + def _fetch_media_comments(self, media_id: str, media_data: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Fetch all comments for a specific Instagram media. + + Args: + media_id: Instagram media ID + media_data: Media data dictionary + + Returns: + List of standardized comment dictionaries + """ + url = f"{self.BASE_URL}/{media_id}/comments" + params = { + 'access_token': self.access_token, + 'fields': 'id,text,username,timestamp,like_count' + } + + all_comments = [] + while url: + try: + response = requests.get(url, params=params) + data = response.json() + + if 'error' in data: + self.logger.error(f"Instagram API error: {data['error']['message']}") + break + + # Process comments + for comment_data in data.get('data', []): + comment = self._extract_comment(comment_data, media_id, media_data) + if comment: + all_comments.append(comment) + + # Check for next page + url = data.get('paging', {}).get('next') + params = {} # Next URL already contains params + + except Exception as e: + self.logger.error(f"Error fetching comments for media {media_id}: {e}") + break + + return all_comments + + def _extract_comment(self, comment_data: Dict[str, Any], media_id: str, media_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract and standardize an Instagram comment. + + Args: + comment_data: Instagram API comment data + media_id: Media ID + media_data: Media data dictionary + + Returns: + Standardized comment dictionary + """ + try: + caption = media_data.get('caption', '') + + comment = { + 'comment_id': comment_data['id'], + 'comments': comment_data.get('text', ''), + 'author': comment_data.get('username', ''), + 'published_at': self._parse_timestamp(comment_data.get('timestamp')), + 'like_count': comment_data.get('like_count', 0), + 'reply_count': 0, # Instagram API doesn't provide reply count easily + 'post_id': media_id, + 'media_url': media_data.get('permalink_url'), + 'raw_data': comment_data + } + + return self._standardize_comment(comment) + + except Exception as e: + self.logger.error(f"Error extracting Instagram comment: {e}") + return None diff --git a/apps/social/scrapers/linkedin.py b/apps/social/scrapers/linkedin.py new file mode 100644 index 0000000..c5cd4e8 --- /dev/null +++ b/apps/social/scrapers/linkedin.py @@ -0,0 +1,262 @@ +""" +LinkedIn comment scraper using LinkedIn Marketing API. +""" +import logging +from typing import List, Dict, Any +import requests + +from .base import BaseScraper + + +class LinkedInScraper(BaseScraper): + """ + Scraper for LinkedIn comments using LinkedIn Marketing API. + Extracts comments from organization posts. + """ + + def __init__(self, config: Dict[str, Any]): + """ + Initialize LinkedIn scraper. + + Args: + config: Dictionary with 'access_token' and 'organization_id' + """ + super().__init__(config) + self.access_token = config.get('access_token') + if not self.access_token: + raise ValueError( + "LinkedIn access token is required. " + "Set LINKEDIN_ACCESS_TOKEN in your .env file." + ) + + self.org_id = config.get('organization_id') + if not self.org_id: + raise ValueError( + "LinkedIn organization ID is required. " + "Set LINKEDIN_ORGANIZATION_ID in your .env file." + ) + + self.api_version = config.get('api_version', '202401') + self.headers = { + 'Authorization': f'Bearer {self.access_token}', + 'LinkedIn-Version': self.api_version, + 'X-Restli-Protocol-Version': '2.0.0', + 'Content-Type': 'application/json' + } + self.base_url = "https://api.linkedin.com/rest" + self.logger = logging.getLogger(self.__class__.__name__) + + def scrape_comments( + self, + organization_id: str = None, + max_posts: int = 50, + max_comments_per_post: int = 100, + **kwargs + ) -> List[Dict[str, Any]]: + """ + Scrape comments from LinkedIn organization posts. + + Args: + organization_id: LinkedIn organization URN (e.g., 'urn:li:organization:1234567') + max_posts: Maximum number of posts to scrape + max_comments_per_post: Maximum comments to fetch per post + + Returns: + List of standardized comment dictionaries + """ + organization_id = organization_id or self.org_id + if not organization_id: + raise ValueError("Organization ID is required") + + all_comments = [] + + self.logger.info(f"Starting LinkedIn comment extraction for {organization_id}") + + try: + # Get all posts for the organization + posts = self._get_all_page_posts(organization_id) + self.logger.info(f"Found {len(posts)} posts") + + # Limit posts if needed + if max_posts and len(posts) > max_posts: + posts = posts[:max_posts] + self.logger.info(f"Limited to {max_posts} posts") + + # Extract comments from each post + for i, post_urn in enumerate(posts, 1): + self.logger.info(f"Processing post {i}/{len(posts)}: {post_urn}") + + try: + comments = self._get_comments_for_post( + post_urn, + max_comments=max_comments_per_post + ) + + for comment in comments: + standardized = self._extract_comment(post_urn, comment) + if standardized: + all_comments.append(standardized) + + self.logger.info(f" - Found {len(comments)} comments") + + except Exception as e: + self.logger.warning(f"Error processing post {post_urn}: {e}") + continue + + self.logger.info(f"Completed LinkedIn scraping. Total comments: {len(all_comments)}") + return all_comments + + except Exception as e: + self.logger.error(f"Error scraping LinkedIn: {e}") + raise + + def _get_all_page_posts(self, org_urn: str, count: int = 50) -> List[str]: + """ + Retrieves all post URNs for the organization. + + Args: + org_urn: Organization URN + count: Number of posts per request + + Returns: + List of post URNs + """ + posts = [] + start = 0 + + while True: + # Finder query for posts by author + url = f"{self.base_url}/posts?author={org_urn}&q=author&count={count}&start={start}" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + data = response.json() + + if 'elements' not in data or not data['elements']: + break + + posts.extend([item['id'] for item in data['elements']]) + start += count + + self.logger.debug(f"Retrieved {len(data['elements'])} posts (total: {len(posts)})") + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error fetching posts: {e}") + break + + return posts + + def _get_comments_for_post(self, post_urn: str, max_comments: int = 100) -> List[Dict[str, Any]]: + """ + Retrieves all comments for a specific post URN. + + Args: + post_urn: Post URN + max_comments: Maximum comments to fetch + + Returns: + List of comment objects + """ + comments = [] + start = 0 + count = 100 + + while True: + # Social Actions API for comments + url = f"{self.base_url}/socialActions/{post_urn}/comments?count={count}&start={start}" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + data = response.json() + + if 'elements' not in data or not data['elements']: + break + + for comment in data['elements']: + comments.append(comment) + + # Check if we've reached the limit + if len(comments) >= max_comments: + return comments[:max_comments] + + start += count + + # Check if we need to stop + if len(comments) >= max_comments: + return comments[:max_comments] + + except requests.exceptions.RequestException as e: + self.logger.warning(f"Error fetching comments for post {post_urn}: {e}") + break + + return comments[:max_comments] + + def _extract_comment(self, post_urn: str, comment: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract and standardize a comment from LinkedIn API response. + + Args: + post_urn: Post URN + comment: Comment object from LinkedIn API + + Returns: + Standardized comment dictionary + """ + try: + # Extract comment data + comment_id = comment.get('id', '') + message = comment.get('message', {}) + comment_text = message.get('text', '') + actor = comment.get('actor', '') + + # Extract author information + author_id = '' + author_name = '' + if isinstance(actor, str): + author_id = actor + elif isinstance(actor, dict): + author_id = actor.get('id', '') + author_name = actor.get('firstName', '') + ' ' + actor.get('lastName', '') + + # Extract created time + created_time = comment.get('created', {}).get('time', '') + + # Extract social actions (likes) + social_actions = comment.get('socialActions', []) + like_count = 0 + for action in social_actions: + if action.get('actionType') == 'LIKE': + like_count = action.get('actorCount', 0) + break + + # Build LinkedIn URL + linkedin_url = post_urn.replace('urn:li:activity:', 'https://www.linkedin.com/feed/update/') + + comment_data = { + 'comment_id': comment_id, + 'comments': comment_text, + 'author': author_name or author_id, + 'published_at': self._parse_timestamp(created_time) if created_time else None, + 'like_count': like_count, + 'reply_count': 0, # LinkedIn API doesn't provide reply count easily + 'post_id': post_urn, + 'media_url': linkedin_url, + 'raw_data': { + 'post_urn': post_urn, + 'comment_id': comment_id, + 'comment_text': comment_text, + 'author_id': author_id, + 'author_name': author_name, + 'created_time': created_time, + 'like_count': like_count, + 'full_comment': comment + } + } + + return self._standardize_comment(comment_data) + + except Exception as e: + self.logger.error(f"Error extracting LinkedIn comment: {e}") + return None diff --git a/apps/social/scrapers/twitter.py b/apps/social/scrapers/twitter.py new file mode 100644 index 0000000..4445ca1 --- /dev/null +++ b/apps/social/scrapers/twitter.py @@ -0,0 +1,194 @@ +""" +Twitter/X comment scraper using Twitter API v2 via Tweepy. +""" +import logging +from typing import List, Dict, Any +import tweepy + +from .base import BaseScraper + + +class TwitterScraper(BaseScraper): + """ + Scraper for Twitter/X comments (replies) using Twitter API v2. + Extracts replies to tweets from a specified user. + """ + + def __init__(self, config: Dict[str, Any]): + """ + Initialize Twitter scraper. + + Args: + config: Dictionary with 'bearer_token' and optionally 'username' + """ + super().__init__(config) + self.bearer_token = config.get('bearer_token') + if not self.bearer_token: + raise ValueError( + "Twitter bearer token is required. " + "Set TWITTER_BEARER_TOKEN in your .env file." + ) + + self.default_username = config.get('username', 'elonmusk') + if not config.get('username'): + self.logger.warning( + "Twitter username not provided. " + "Set TWITTER_USERNAME in your .env file to specify which account to scrape." + ) + + self.client = tweepy.Client( + bearer_token=self.bearer_token, + wait_on_rate_limit=True + ) + self.logger = logging.getLogger(self.__class__.__name__) + + def scrape_comments( + self, + username: str = None, + max_tweets: int = 50, + max_replies_per_tweet: int = 100, + **kwargs + ) -> List[Dict[str, Any]]: + """ + Scrape replies (comments) from a Twitter/X user's tweets. + + Args: + username: Twitter username to scrape (uses default from config if not provided) + max_tweets: Maximum number of tweets to fetch + max_replies_per_tweet: Maximum replies per tweet + + Returns: + List of standardized comment dictionaries + """ + username = username or self.default_username + if not username: + raise ValueError("Username is required") + + all_comments = [] + + self.logger.info(f"Starting Twitter comment extraction for @{username}") + + try: + # Get user ID + user = self.client.get_user(username=username) + if not user.data: + self.logger.error(f"User @{username} not found") + return all_comments + + user_id = user.data.id + self.logger.info(f"Found user ID: {user_id}") + + # Fetch tweets and their replies + tweet_count = 0 + for tweet in tweepy.Paginator( + self.client.get_users_tweets, + id=user_id, + max_results=100 + ).flatten(limit=max_tweets): + + tweet_count += 1 + self.logger.info(f"Processing tweet {tweet_count}/{max_tweets} (ID: {tweet.id})") + + # Search for replies to this tweet + replies = self._get_tweet_replies(tweet.id, max_replies_per_tweet) + + for reply in replies: + comment = self._extract_comment(tweet, reply) + if comment: + all_comments.append(comment) + + self.logger.info(f" - Found {len(replies)} replies for this tweet") + + self.logger.info(f"Completed Twitter scraping. Total comments: {len(all_comments)}") + return all_comments + + except tweepy.errors.NotFound: + self.logger.error(f"User @{username} not found or account is private") + return all_comments + except tweepy.errors.Forbidden: + self.logger.error(f"Access forbidden for @{username}. Check API permissions.") + return all_comments + except tweepy.errors.TooManyRequests: + self.logger.error("Twitter API rate limit exceeded") + return all_comments + except Exception as e: + self.logger.error(f"Error scraping Twitter: {e}") + raise + + def _get_tweet_replies(self, tweet_id: str, max_replies: int) -> List[Dict[str, Any]]: + """ + Get replies for a specific tweet. + + Args: + tweet_id: Original tweet ID + max_replies: Maximum number of replies to fetch + + Returns: + List of reply tweet objects + """ + replies = [] + + # Search for replies using conversation_id + query = f"conversation_id:{tweet_id} is:reply" + + try: + for reply in tweepy.Paginator( + self.client.search_recent_tweets, + query=query, + tweet_fields=['author_id', 'created_at', 'text'], + max_results=100 + ).flatten(limit=max_replies): + replies.append(reply) + except Exception as e: + self.logger.warning(f"Error fetching replies for tweet {tweet_id}: {e}") + + return replies + + def _extract_comment(self, original_tweet: Dict[str, Any], reply_tweet: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract and standardize a reply (comment) from Twitter API response. + + Args: + original_tweet: Original tweet object + reply_tweet: Reply tweet object + + Returns: + Standardized comment dictionary + """ + try: + # Extract reply data + reply_id = str(reply_tweet.id) + reply_text = reply_tweet.text + reply_author_id = str(reply_tweet.author_id) + reply_created_at = reply_tweet.created_at + + # Extract original tweet data + original_tweet_id = str(original_tweet.id) + + # Build Twitter URL + twitter_url = f"https://twitter.com/x/status/{original_tweet_id}" + + comment_data = { + 'comment_id': reply_id, + 'comments': reply_text, + 'author': reply_author_id, + 'published_at': self._parse_timestamp(reply_created_at.isoformat()), + 'like_count': 0, # Twitter API v2 doesn't provide like count for replies in basic query + 'reply_count': 0, # Would need additional API call + 'post_id': original_tweet_id, + 'media_url': twitter_url, + 'raw_data': { + 'original_tweet_id': original_tweet_id, + 'original_tweet_text': original_tweet.text, + 'reply_id': reply_id, + 'reply_author_id': reply_author_id, + 'reply_text': reply_text, + 'reply_at': reply_created_at.isoformat() + } + } + + return self._standardize_comment(comment_data) + + except Exception as e: + self.logger.error(f"Error extracting Twitter comment: {e}") + return None diff --git a/apps/social/scrapers/youtube.py b/apps/social/scrapers/youtube.py new file mode 100644 index 0000000..264f133 --- /dev/null +++ b/apps/social/scrapers/youtube.py @@ -0,0 +1,134 @@ +""" +YouTube comment scraper using YouTube Data API v3. +""" +import logging +from typing import List, Dict, Any +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +from .base import BaseScraper + + +class YouTubeScraper(BaseScraper): + """ + Scraper for YouTube comments using YouTube Data API v3. + Extracts top-level comments only (no replies). + """ + + def __init__(self, config: Dict[str, Any]): + """ + Initialize YouTube scraper. + + Args: + config: Dictionary with 'api_key' and optionally 'channel_id' + """ + super().__init__(config) + self.api_key = config.get('api_key') + if not self.api_key: + raise ValueError( + "YouTube API key is required. " + "Set YOUTUBE_API_KEY in your .env file." + ) + + self.channel_id = config.get('channel_id') + if not self.channel_id: + self.logger.warning( + "YouTube channel_id not provided. " + "Set YOUTUBE_CHANNEL_ID in your .env file to specify which channel to scrape." + ) + + self.youtube = build('youtube', 'v3', developerKey=self.api_key) + self.logger = logging.getLogger(self.__class__.__name__) + + def scrape_comments(self, channel_id: str = None, **kwargs) -> List[Dict[str, Any]]: + """ + Scrape top-level comments from a YouTube channel. + + Args: + channel_id: YouTube channel ID to scrape comments from + + Returns: + List of standardized comment dictionaries + """ + channel_id = channel_id or self.config.get('channel_id') + if not channel_id: + raise ValueError("Channel ID is required") + + all_comments = [] + next_page_token = None + + self.logger.info(f"Starting YouTube comment extraction for channel: {channel_id}") + + while True: + try: + # Get comment threads (top-level comments only) + request = self.youtube.commentThreads().list( + part="snippet", + allThreadsRelatedToChannelId=channel_id, + maxResults=100, + pageToken=next_page_token, + textFormat="plainText" + ) + response = request.execute() + + # Process each comment thread + for item in response.get('items', []): + comment = self._extract_top_level_comment(item) + if comment: + all_comments.append(comment) + + # Check for more pages + next_page_token = response.get('nextPageToken') + if not next_page_token: + break + + self.logger.info(f"Fetched {len(all_comments)} comments so far...") + + except HttpError as e: + if e.resp.status in [403, 429]: + self.logger.error("YouTube API quota exceeded or access forbidden") + break + else: + self.logger.error(f"YouTube API error: {e}") + break + except Exception as e: + self.logger.error(f"Unexpected error scraping YouTube: {e}") + break + + self.logger.info(f"Completed YouTube scraping. Total comments: {len(all_comments)}") + return all_comments + + def _extract_top_level_comment(self, item: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract and standardize a top-level comment from YouTube API response. + + Args: + item: YouTube API comment thread item + + Returns: + Standardized comment dictionary + """ + try: + top_level_comment = item['snippet']['topLevelComment']['snippet'] + comment_id = item['snippet']['topLevelComment']['id'] + + # Get video ID (post_id) + video_id = item['snippet'].get('videoId') + + comment_data = { + 'comment_id': comment_id, + 'comments': top_level_comment.get('textDisplay', ''), + 'author': top_level_comment.get('authorDisplayName', ''), + 'published_at': self._parse_timestamp(top_level_comment.get('publishedAt')), + 'like_count': top_level_comment.get('likeCount', 0), + 'reply_count': item['snippet'].get('totalReplyCount', 0), + 'post_id': video_id, + 'media_url': f"https://www.youtube.com/watch?v={video_id}" if video_id else None, + 'raw_data': item + } + + return self._standardize_comment(comment_data) + + except Exception as e: + self.logger.error(f"Error extracting YouTube comment: {e}") + return None diff --git a/apps/social/serializers.py b/apps/social/serializers.py new file mode 100644 index 0000000..e09c74e --- /dev/null +++ b/apps/social/serializers.py @@ -0,0 +1,105 @@ +""" +Serializers for Social Media Comments app +""" +from rest_framework import serializers +from .models import SocialMediaComment, SocialPlatform + + +class SocialMediaCommentSerializer(serializers.ModelSerializer): + """Serializer for SocialMediaComment model with bilingual AI analysis""" + + platform_display = serializers.CharField(source='get_platform_display', read_only=True) + is_analyzed = serializers.ReadOnlyField() + sentiment_classification_en = serializers.SerializerMethodField() + sentiment_classification_ar = serializers.SerializerMethodField() + sentiment_score = serializers.SerializerMethodField() + confidence = serializers.SerializerMethodField() + + class Meta: + model = SocialMediaComment + fields = [ + 'id', + 'platform', + 'platform_display', + 'comment_id', + 'comments', + 'author', + 'raw_data', + 'post_id', + 'media_url', + 'like_count', + 'reply_count', + 'rating', + 'published_at', + 'scraped_at', + 'ai_analysis', + 'is_analyzed', + 'sentiment_classification_en', + 'sentiment_classification_ar', + 'sentiment_score', + 'confidence', + ] + read_only_fields = [ + 'scraped_at', + ] + + def get_sentiment_classification_en(self, obj): + """Get English sentiment classification""" + if not obj.ai_analysis: + return None + return obj.ai_analysis.get('sentiment', {}).get('classification', {}).get('en') + + def get_sentiment_classification_ar(self, obj): + """Get Arabic sentiment classification""" + if not obj.ai_analysis: + return None + return obj.ai_analysis.get('sentiment', {}).get('classification', {}).get('ar') + + def get_sentiment_score(self, obj): + """Get sentiment score""" + if not obj.ai_analysis: + return None + return obj.ai_analysis.get('sentiment', {}).get('score') + + def get_confidence(self, obj): + """Get confidence score""" + if not obj.ai_analysis: + return None + return obj.ai_analysis.get('sentiment', {}).get('confidence') + + def validate_platform(self, value): + """Validate platform choice""" + if value not in SocialPlatform.values: + raise serializers.ValidationError(f"Invalid platform. Must be one of: {', '.join(SocialPlatform.values)}") + return value + + +class SocialMediaCommentListSerializer(serializers.ModelSerializer): + """Lightweight serializer for list views""" + + platform_display = serializers.CharField(source='get_platform_display', read_only=True) + is_analyzed = serializers.ReadOnlyField() + sentiment = serializers.SerializerMethodField() + + class Meta: + model = SocialMediaComment + fields = [ + 'id', + 'platform', + 'platform_display', + 'comment_id', + 'comments', + 'author', + 'like_count', + 'reply_count', + 'rating', + 'published_at', + 'is_analyzed', + 'sentiment', + ] + + def get_sentiment(self, obj): + """Get sentiment classification (English)""" + if not obj.ai_analysis: + return None + return obj.ai_analysis.get('sentiment', {}).get('classification', {}).get('en') diff --git a/apps/social/services/__init__.py b/apps/social/services/__init__.py new file mode 100644 index 0000000..aab045c --- /dev/null +++ b/apps/social/services/__init__.py @@ -0,0 +1,7 @@ +""" +Services for managing social media comment scraping and database operations. +""" + +from .comment_service import CommentService + +__all__ = ['CommentService'] diff --git a/apps/social/services/analysis_service.py b/apps/social/services/analysis_service.py new file mode 100644 index 0000000..54d7735 --- /dev/null +++ b/apps/social/services/analysis_service.py @@ -0,0 +1,364 @@ +""" +Analysis service for orchestrating AI-powered comment analysis. +Coordinates between SocialMediaComment model and OpenRouter service. +""" +import logging +from typing import List, Dict, Any, Optional +from decimal import Decimal +from datetime import datetime, timedelta + +from django.conf import settings +from django.utils import timezone +from django.db import models + +from ..models import SocialMediaComment +from .openrouter_service import OpenRouterService + + +logger = logging.getLogger(__name__) + + +class AnalysisService: + """ + Service for managing AI analysis of social media comments. + Handles batching, filtering, and updating comments with analysis results. + """ + + def __init__(self): + """Initialize the analysis service.""" + self.openrouter_service = OpenRouterService() + self.batch_size = getattr(settings, 'ANALYSIS_BATCH_SIZE', 10) + + if not self.openrouter_service.is_configured(): + logger.warning("OpenRouter service not properly configured") + else: + logger.info(f"Analysis service initialized (batch_size: {self.batch_size})") + + def analyze_pending_comments( + self, + limit: Optional[int] = None, + platform: Optional[str] = None, + hours_ago: Optional[int] = None + ) -> Dict[str, Any]: + """ + Analyze comments that haven't been analyzed yet. + + Args: + limit: Maximum number of comments to analyze + platform: Filter by platform (optional) + hours_ago: Only analyze comments scraped in the last N hours + + Returns: + Dictionary with analysis statistics + """ + if not self.openrouter_service.is_configured(): + logger.error("OpenRouter service not configured") + return { + 'success': False, + 'error': 'OpenRouter service not configured', + 'analyzed': 0, + 'failed': 0, + 'skipped': 0 + } + + # Build queryset for unanalyzed comments (check if ai_analysis is empty) + # Using Q() for complex filtering (NULL or empty dict) + from django.db.models import Q + queryset = SocialMediaComment.objects.filter( + Q(ai_analysis__isnull=True) | Q(ai_analysis={}) + ) + + if platform: + queryset = queryset.filter(platform=platform) + + if hours_ago: + cutoff_time = timezone.now() - timedelta(hours=hours_ago) + queryset = queryset.filter(scraped_at__gte=cutoff_time) + + if limit: + queryset = queryset[:limit] + + # Fetch comments + comments = list(queryset) + + if not comments: + logger.info("No pending comments to analyze") + return { + 'success': True, + 'analyzed': 0, + 'failed': 0, + 'skipped': 0, + 'message': 'No pending comments to analyze' + } + + logger.info(f"Found {len(comments)} pending comments to analyze") + + # Process in batches + analyzed_count = 0 + failed_count = 0 + skipped_count = 0 + + for i in range(0, len(comments), self.batch_size): + batch = comments[i:i + self.batch_size] + logger.info(f"Processing batch {i//self.batch_size + 1} ({len(batch)} comments)") + + # Prepare batch for API + batch_data = [ + { + 'id': comment.id, + 'text': comment.comments + } + for comment in batch + ] + + # Analyze batch + result = self.openrouter_service.analyze_comments(batch_data) + + if result.get('success'): + # Update comments with analysis results + for analysis in result.get('analyses', []): + try: + comment_id = analysis.get('comment_id') + comment = SocialMediaComment.objects.get(id=comment_id) + + # Build new bilingual analysis structure + ai_analysis = { + 'sentiment': analysis.get('sentiment', {}), + 'summaries': analysis.get('summaries', {}), + 'keywords': analysis.get('keywords', {}), + 'topics': analysis.get('topics', {}), + 'entities': analysis.get('entities', []), + 'emotions': analysis.get('emotions', {}), + 'metadata': { + **result.get('metadata', {}), + 'analyzed_at': timezone.now().isoformat() + } + } + + # Update with bilingual analysis structure + comment.ai_analysis = ai_analysis + comment.save() + + analyzed_count += 1 + logger.debug(f"Updated comment {comment_id} with bilingual analysis") + + except SocialMediaComment.DoesNotExist: + logger.warning(f"Comment {analysis.get('comment_id')} not found") + failed_count += 1 + except Exception as e: + logger.error(f"Error updating comment {comment_id}: {e}") + failed_count += 1 + else: + error = result.get('error', 'Unknown error') + logger.error(f"Batch analysis failed: {error}") + failed_count += len(batch) + + # Calculate skipped (comments that were analyzed during processing) + skipped_count = len(comments) - analyzed_count - failed_count + + logger.info( + f"Analysis complete: {analyzed_count} analyzed, " + f"{failed_count} failed, {skipped_count} skipped" + ) + + return { + 'success': True, + 'analyzed': analyzed_count, + 'failed': failed_count, + 'skipped': skipped_count, + 'total': len(comments) + } + + def analyze_comments_by_platform(self, platform: str, limit: int = 100) -> Dict[str, Any]: + """ + Analyze comments from a specific platform. + + Args: + platform: Platform name (e.g., 'youtube', 'facebook') + limit: Maximum number of comments to analyze + + Returns: + Dictionary with analysis statistics + """ + logger.info(f"Analyzing comments from platform: {platform}") + return self.analyze_pending_comments(limit=limit, platform=platform) + + def analyze_recent_comments(self, hours: int = 24, limit: int = 100) -> Dict[str, Any]: + """ + Analyze comments scraped in the last N hours. + + Args: + hours: Number of hours to look back + limit: Maximum number of comments to analyze + + Returns: + Dictionary with analysis statistics + """ + logger.info(f"Analyzing comments from last {hours} hours") + return self.analyze_pending_comments(limit=limit, hours_ago=hours) + + def get_analysis_statistics( + self, + platform: Optional[str] = None, + days: int = 30 + ) -> Dict[str, Any]: + """ + Get statistics about analyzed comments using ai_analysis structure. + + Args: + platform: Filter by platform (optional) + days: Number of days to look back + + Returns: + Dictionary with analysis statistics + """ + cutoff_date = timezone.now() - timedelta(days=days) + + queryset = SocialMediaComment.objects.filter( + scraped_at__gte=cutoff_date + ) + + if platform: + queryset = queryset.filter(platform=platform) + + total_comments = queryset.count() + + # Count analyzed comments (those with ai_analysis populated) + analyzed_comments = 0 + sentiment_counts = {'positive': 0, 'negative': 0, 'neutral': 0} + confidence_scores = [] + + for comment in queryset: + if comment.ai_analysis: + analyzed_comments += 1 + sentiment = comment.ai_analysis.get('sentiment', {}).get('classification', {}).get('en', 'neutral') + if sentiment in sentiment_counts: + sentiment_counts[sentiment] += 1 + confidence = comment.ai_analysis.get('sentiment', {}).get('confidence', 0) + if confidence: + confidence_scores.append(confidence) + + # Calculate average confidence + avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0 + + return { + 'total_comments': total_comments, + 'analyzed_comments': analyzed_comments, + 'unanalyzed_comments': total_comments - analyzed_comments, + 'analysis_rate': (analyzed_comments / total_comments * 100) if total_comments > 0 else 0, + 'sentiment_distribution': sentiment_counts, + 'average_confidence': float(avg_confidence), + 'platform': platform or 'all' + } + + def reanalyze_comment(self, comment_id: int) -> Dict[str, Any]: + """ + Re-analyze a specific comment. + + Args: + comment_id: ID of the comment to re-analyze + + Returns: + Dictionary with result + """ + try: + comment = SocialMediaComment.objects.get(id=comment_id) + except SocialMediaComment.DoesNotExist: + return { + 'success': False, + 'error': f'Comment {comment_id} not found' + } + + if not self.openrouter_service.is_configured(): + return { + 'success': False, + 'error': 'OpenRouter service not configured' + } + + # Prepare single comment for analysis + batch_data = [{'id': comment.id, 'text': comment.comments}] + + # Analyze + result = self.openrouter_service.analyze_comments(batch_data) + + if result.get('success'): + analysis = result.get('analyses', [{}])[0] if result.get('analyses') else {} + + # Build new bilingual analysis structure + ai_analysis = { + 'sentiment': analysis.get('sentiment', {}), + 'summaries': analysis.get('summaries', {}), + 'keywords': analysis.get('keywords', {}), + 'topics': analysis.get('topics', {}), + 'entities': analysis.get('entities', []), + 'emotions': analysis.get('emotions', {}), + 'metadata': { + **result.get('metadata', {}), + 'analyzed_at': timezone.now().isoformat() + } + } + + # Update comment with bilingual analysis structure + comment.ai_analysis = ai_analysis + comment.save() + + sentiment_en = ai_analysis.get('sentiment', {}).get('classification', {}).get('en') + confidence_val = ai_analysis.get('sentiment', {}).get('confidence', 0) + + return { + 'success': True, + 'comment_id': comment_id, + 'sentiment': sentiment_en, + 'confidence': float(confidence_val) + } + else: + return { + 'success': False, + 'error': result.get('error', 'Unknown error') + } + + def get_top_keywords( + self, + platform: Optional[str] = None, + limit: int = 20, + days: int = 30 + ) -> List[Dict[str, Any]]: + """ + Get most common keywords from analyzed comments using ai_analysis structure. + + Args: + platform: Filter by platform (optional) + limit: Maximum number of keywords to return + days: Number of days to look back + + Returns: + List of keyword dictionaries with 'keyword' and 'count' keys + """ + cutoff_date = timezone.now() - timedelta(days=days) + + queryset = SocialMediaComment.objects.filter( + scraped_at__gte=cutoff_date, + ai_analysis__isnull=False + ).exclude(ai_analysis={}) + + if platform: + queryset = queryset.filter(platform=platform) + + # Count keywords from ai_analysis + keyword_counts = {} + for comment in queryset: + keywords_en = comment.ai_analysis.get('keywords', {}).get('en', []) + for keyword in keywords_en: + keyword_counts[keyword] = keyword_counts.get(keyword, 0) + 1 + + # Sort by count and return top N + sorted_keywords = sorted( + keyword_counts.items(), + key=lambda x: x[1], + reverse=True + )[:limit] + + return [ + {'keyword': keyword, 'count': count} + for keyword, count in sorted_keywords + ] diff --git a/apps/social/services/comment_service.py b/apps/social/services/comment_service.py new file mode 100644 index 0000000..fe105fb --- /dev/null +++ b/apps/social/services/comment_service.py @@ -0,0 +1,366 @@ +""" +Service class for managing social media comment scraping and database operations. +""" +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime +from django.conf import settings + +from ..models import SocialMediaComment +from ..scrapers import YouTubeScraper, FacebookScraper, InstagramScraper, TwitterScraper, LinkedInScraper, GoogleReviewsScraper + + +logger = logging.getLogger(__name__) + + +class CommentService: + """ + Service class to manage scraping from all social media platforms + and saving comments to the database. + """ + + def __init__(self): + """Initialize the comment service.""" + self.scrapers = {} + self._initialize_scrapers() + + def _initialize_scrapers(self): + """Initialize all platform scrapers with configuration from settings.""" + # YouTube scraper + youtube_config = { + 'api_key': getattr(settings, 'YOUTUBE_API_KEY', None), + 'channel_id': getattr(settings, 'YOUTUBE_CHANNEL_ID', None), + } + if youtube_config['api_key']: + self.scrapers['youtube'] = YouTubeScraper(youtube_config) + + # Facebook scraper + facebook_config = { + 'access_token': getattr(settings, 'FACEBOOK_ACCESS_TOKEN', None), + 'page_id': getattr(settings, 'FACEBOOK_PAGE_ID', None), + } + if facebook_config['access_token']: + self.scrapers['facebook'] = FacebookScraper(facebook_config) + + # Instagram scraper + instagram_config = { + 'access_token': getattr(settings, 'INSTAGRAM_ACCESS_TOKEN', None), + 'account_id': getattr(settings, 'INSTAGRAM_ACCOUNT_ID', None), + } + if instagram_config['access_token']: + self.scrapers['instagram'] = InstagramScraper(instagram_config) + + # Twitter/X scraper + twitter_config = { + 'bearer_token': getattr(settings, 'TWITTER_BEARER_TOKEN', None), + 'username': getattr(settings, 'TWITTER_USERNAME', None), + } + if twitter_config['bearer_token']: + self.scrapers['twitter'] = TwitterScraper(twitter_config) + + # LinkedIn scraper + linkedin_config = { + 'access_token': getattr(settings, 'LINKEDIN_ACCESS_TOKEN', None), + 'organization_id': getattr(settings, 'LINKEDIN_ORGANIZATION_ID', None), + } + if linkedin_config['access_token']: + self.scrapers['linkedin'] = LinkedInScraper(linkedin_config) + + # Google Reviews scraper (requires credentials) + google_reviews_config = { + 'credentials_file': getattr(settings, 'GOOGLE_CREDENTIALS_FILE', None), + 'token_file': getattr(settings, 'GOOGLE_TOKEN_FILE', 'token.json'), + 'locations': getattr(settings, 'GOOGLE_LOCATIONS', None), + } + if google_reviews_config['credentials_file']: + try: + self.scrapers['google_reviews'] = GoogleReviewsScraper(google_reviews_config) + except (FileNotFoundError, Exception) as e: + logger.warning(f"Google Reviews scraper not initialized: {e}") + logger.info("Google Reviews will be skipped. See GOOGLE_REVIEWS_INTEGRATION_GUIDE.md for setup.") + + logger.info(f"Initialized scrapers: {list(self.scrapers.keys())}") + + def scrape_and_save( + self, + platforms: Optional[List[str]] = None, + platform_id: Optional[str] = None + ) -> Dict[str, Dict[str, int]]: + """ + Scrape comments from specified platforms and save to database. + + Args: + platforms: List of platforms to scrape (e.g., ['youtube', 'facebook']) + If None, scrape all available platforms + platform_id: Optional platform-specific ID (channel_id, page_id, account_id) + + Returns: + Dictionary with platform names as keys and dictionaries containing: + - 'new': Number of new comments added + - 'updated': Number of existing comments updated + """ + if platforms is None: + platforms = list(self.scrapers.keys()) + + results = {} + + for platform in platforms: + if platform not in self.scrapers: + logger.warning(f"Scraper for {platform} not initialized") + results[platform] = {'new': 0, 'updated': 0} + continue + + try: + logger.info(f"Starting scraping for {platform}") + comments = self.scrapers[platform].scrape_comments(platform_id=platform_id) + save_result = self._save_comments(platform, comments) + results[platform] = save_result + logger.info(f"From {platform}: {save_result['new']} new, {save_result['updated']} updated comments") + except Exception as e: + logger.error(f"Error scraping {platform}: {e}") + results[platform] = {'new': 0, 'updated': 0} + + return results + + def scrape_youtube( + self, + channel_id: Optional[str] = None, + save_to_db: bool = True + ) -> List[Dict[str, Any]]: + """ + Scrape comments from YouTube. + + Args: + channel_id: YouTube channel ID + save_to_db: If True, save comments to database + + Returns: + List of scraped comments + """ + if 'youtube' not in self.scrapers: + raise ValueError("YouTube scraper not initialized") + + comments = self.scrapers['youtube'].scrape_comments(channel_id=channel_id) + + if save_to_db: + self._save_comments('youtube', comments) + + return comments + + def scrape_facebook( + self, + page_id: Optional[str] = None, + save_to_db: bool = True + ) -> List[Dict[str, Any]]: + """ + Scrape comments from Facebook. + + Args: + page_id: Facebook page ID + save_to_db: If True, save comments to database + + Returns: + List of scraped comments + """ + if 'facebook' not in self.scrapers: + raise ValueError("Facebook scraper not initialized") + + comments = self.scrapers['facebook'].scrape_comments(page_id=page_id) + + if save_to_db: + self._save_comments('facebook', comments) + + return comments + + def scrape_instagram( + self, + account_id: Optional[str] = None, + save_to_db: bool = True + ) -> List[Dict[str, Any]]: + """ + Scrape comments from Instagram. + + Args: + account_id: Instagram account ID + save_to_db: If True, save comments to database + + Returns: + List of scraped comments + """ + if 'instagram' not in self.scrapers: + raise ValueError("Instagram scraper not initialized") + + comments = self.scrapers['instagram'].scrape_comments(account_id=account_id) + + if save_to_db: + self._save_comments('instagram', comments) + + return comments + + def scrape_twitter( + self, + username: Optional[str] = None, + save_to_db: bool = True + ) -> List[Dict[str, Any]]: + """ + Scrape comments (replies) from Twitter/X. + + Args: + username: Twitter username + save_to_db: If True, save comments to database + + Returns: + List of scraped comments + """ + if 'twitter' not in self.scrapers: + raise ValueError("Twitter scraper not initialized") + + comments = self.scrapers['twitter'].scrape_comments(username=username) + + if save_to_db: + self._save_comments('twitter', comments) + + return comments + + def scrape_linkedin( + self, + organization_id: Optional[str] = None, + save_to_db: bool = True + ) -> List[Dict[str, Any]]: + """ + Scrape comments from LinkedIn organization posts. + + Args: + organization_id: LinkedIn organization URN (e.g., 'urn:li:organization:1234567') + save_to_db: If True, save comments to database + + Returns: + List of scraped comments + """ + if 'linkedin' not in self.scrapers: + raise ValueError("LinkedIn scraper not initialized") + + comments = self.scrapers['linkedin'].scrape_comments(organization_id=organization_id) + + if save_to_db: + self._save_comments('linkedin', comments) + + return comments + + def scrape_google_reviews( + self, + location_names: Optional[List[str]] = None, + save_to_db: bool = True + ) -> List[Dict[str, Any]]: + """ + Scrape Google Reviews from business locations. + + Args: + location_names: Optional list of location names to scrape (uses all locations if None) + save_to_db: If True, save comments to database + + Returns: + List of scraped reviews + """ + if 'google_reviews' not in self.scrapers: + raise ValueError("Google Reviews scraper not initialized") + + comments = self.scrapers['google_reviews'].scrape_comments(location_names=location_names) + + if save_to_db: + self._save_comments('google_reviews', comments) + + return comments + + def _save_comments(self, platform: str, comments: List[Dict[str, Any]]) -> Dict[str, int]: + """ + Save comments to database using get_or_create to prevent duplicates. + Updates existing comments with fresh data (likes, etc.). + + Args: + platform: Platform name + comments: List of comment dictionaries + + Returns: + Dictionary with: + - 'new': Number of new comments added + - 'updated': Number of existing comments updated + """ + new_count = 0 + updated_count = 0 + + for comment_data in comments: + try: + # Parse published_at timestamp + published_at = None + if comment_data.get('published_at'): + try: + published_at = datetime.fromisoformat( + comment_data['published_at'].replace('Z', '+00:00') + ) + except (ValueError, AttributeError): + pass + + # Prepare default values + defaults = { + 'comments': comment_data.get('comments', ''), + 'author': comment_data.get('author', ''), + 'post_id': comment_data.get('post_id'), + 'media_url': comment_data.get('media_url'), + 'like_count': comment_data.get('like_count', 0), + 'reply_count': comment_data.get('reply_count', 0), + 'rating': comment_data.get('rating'), + 'published_at': published_at, + 'raw_data': comment_data.get('raw_data', {}) + } + + # Use get_or_create to prevent duplicates + comment, created = SocialMediaComment.objects.get_or_create( + platform=platform, + comment_id=comment_data['comment_id'], + defaults=defaults + ) + + if created: + # New comment was created + new_count += 1 + logger.debug(f"New comment added: {comment_data['comment_id']}") + else: + # Comment already exists, update it with fresh data + comment.comments = defaults['comments'] + comment.author = defaults['author'] + comment.post_id = defaults['post_id'] + comment.media_url = defaults['media_url'] + comment.like_count = defaults['like_count'] + comment.reply_count = defaults['reply_count'] + comment.rating = defaults['rating'] + if defaults['published_at']: + comment.published_at = defaults['published_at'] + comment.raw_data = defaults['raw_data'] + comment.save() + updated_count += 1 + logger.debug(f"Comment updated: {comment_data['comment_id']}") + + except Exception as e: + logger.error(f"Error saving comment {comment_data.get('comment_id')}: {e}") + + logger.info(f"Saved comments for {platform}: {new_count} new, {updated_count} updated") + return {'new': new_count, 'updated': updated_count} + + def get_latest_comments(self, platform: Optional[str] = None, limit: int = 100) -> List[SocialMediaComment]: + """ + Get latest comments from database. + + Args: + platform: Filter by platform (optional) + limit: Maximum number of comments to return + + Returns: + List of SocialMediaComment objects + """ + queryset = SocialMediaComment.objects.all() + + if platform: + queryset = queryset.filter(platform=platform) + + return list(queryset.order_by('-published_at')[:limit]) diff --git a/apps/social/services/openrouter_service.py b/apps/social/services/openrouter_service.py new file mode 100644 index 0000000..e237b12 --- /dev/null +++ b/apps/social/services/openrouter_service.py @@ -0,0 +1,430 @@ +""" +OpenRouter API service for AI-powered comment analysis. +Handles authentication, requests, and response parsing for sentiment analysis, +keyword extraction, topic identification, and entity recognition. +""" +import logging +import json +from typing import Dict, List, Any, Optional +from decimal import Decimal +import httpx + +from django.conf import settings +from django.utils import timezone + + +logger = logging.getLogger(__name__) + + +class OpenRouterService: + """ + Service for interacting with OpenRouter API to analyze comments. + Provides sentiment analysis, keyword extraction, topic identification, and entity recognition. + """ + + DEFAULT_MODEL = "anthropic/claude-3-haiku" + DEFAULT_MAX_TOKENS = 1024 + DEFAULT_TEMPERATURE = 0.1 + + def __init__( + self, + api_key: Optional[str] = None, + model: Optional[str] = None, + timeout: int = 30 + ): + """ + Initialize OpenRouter service. + + Args: + api_key: OpenRouter API key (defaults to settings.OPENROUTER_API_KEY) + model: Model to use (defaults to settings.OPENROUTER_MODEL or DEFAULT_MODEL) + timeout: Request timeout in seconds + """ + self.api_key = api_key or getattr(settings, 'OPENROUTER_API_KEY', None) + self.model = model or getattr(settings, 'OPENROUTER_MODEL', self.DEFAULT_MODEL) + self.timeout = timeout + self.api_url = "https://openrouter.ai/api/v1/chat/completions" + + if not self.api_key: + logger.warning( + "OpenRouter API key not configured. " + "Set OPENROUTER_API_KEY in your .env file." + ) + + logger.info(f"OpenRouter service initialized with model: {self.model}") + + def _build_analysis_prompt(self, comments: List[Dict[str, Any]]) -> str: + """ + Build prompt for batch comment analysis with bilingual output. + + Args: + comments: List of comment dictionaries with 'id' and 'text' keys + + Returns: + Formatted prompt string + """ + comments_text = "\n".join([ + f"Comment {i+1}: {c['text']}" + for i, c in enumerate(comments) + ]) + + # Using regular string instead of f-string to avoid JSON brace escaping issues + prompt = """You are a bilingual AI analyst specializing in social media sentiment analysis. Analyze the following comments and provide a COMPLETE bilingual analysis in BOTH English and Arabic. + +Comments to analyze: +""" + comments_text + """ + +IMPORTANT REQUIREMENTS: +1. ALL analysis MUST be provided in BOTH English and Arabic +2. Use clear, modern Arabic that all Arabic speakers can understand +3. Detect comment's language and provide appropriate translations +4. Maintain accuracy and cultural appropriateness in both languages + +For each comment, provide: + +A. Sentiment Analysis (Bilingual) + - classification: {"en": "positive|neutral|negative", "ar": "إيجابي|محايد|سلبي"} + - score: number from -1.0 to 1.0 + - confidence: number from 0.0 to 1.0 + +B. Summaries (Bilingual) + - en: 2-3 sentence English summary of comment's main points and sentiment + - ar: 2-3 sentence Arabic summary (ملخص بالعربية) with the same depth + +C. Keywords (Bilingual - 5-7 each) + - en: list of English keywords + - ar: list of Arabic keywords + +D. Topics (Bilingual - 3-5 each) + - en: list of English topics + - ar: list of Arabic topics + +E. Entities (Bilingual) + - For each entity: {"text": {"en": "...", "ar": "..."}, "type": {"en": "PERSON|ORGANIZATION|LOCATION|BRAND|OTHER", "ar": "شخص|منظمة|موقع|علامة تجارية|أخرى"}} + +F. Emotions + - Provide scores for: joy, anger, sadness, fear, surprise, disgust + - Each emotion: 0.0 to 1.0 + - labels: {"emotion_name": {"en": "English label", "ar": "Arabic label"}} + +Return ONLY valid JSON in this exact format: +{ + "analyses": [ + { + "comment_index": 0, + "sentiment": { + "classification": {"en": "positive", "ar": "إيجابي"}, + "score": 0.85, + "confidence": 0.92 + }, + "summaries": { + "en": "The customer is very satisfied with the excellent service and fast delivery. They praised the staff's professionalism and product quality.", + "ar": "العميل راضٍ جداً عن الخدمة الممتازة والتسليم السريع. أشاد باحترافية الموظفين وجودة المنتج." + }, + "keywords": { + "en": ["excellent service", "fast delivery", "professional", "quality"], + "ar": ["خدمة ممتازة", "تسليم سريع", "احترافي", "جودة"] + }, + "topics": { + "en": ["customer service", "delivery speed", "staff professionalism"], + "ar": ["خدمة العملاء", "سرعة التسليم", "احترافية الموظفين"] + }, + "entities": [ + { + "text": {"en": "Amazon", "ar": "أمازون"}, + "type": {"en": "ORGANIZATION", "ar": "منظمة"} + } + ], + "emotions": { + "joy": 0.9, + "anger": 0.05, + "sadness": 0.0, + "fear": 0.0, + "surprise": 0.15, + "disgust": 0.0, + "labels": { + "joy": {"en": "Joy/Happiness", "ar": "فرح/سعادة"}, + "anger": {"en": "Anger", "ar": "غضب"}, + "sadness": {"en": "Sadness", "ar": "حزن"}, + "fear": {"en": "Fear", "ar": "خوف"}, + "surprise": {"en": "Surprise", "ar": "مفاجأة"}, + "disgust": {"en": "Disgust", "ar": "اشمئزاز"} + } + } + } + ] +} +""" + return prompt + + async def analyze_comments_async(self, comments: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Analyze a batch of comments using OpenRouter API (async). + + Args: + comments: List of comment dictionaries with 'id' and 'text' keys + + Returns: + Dictionary with success status and analysis results + """ + logger.info("=" * 80) + logger.info("STARTING OPENROUTER API ANALYSIS") + logger.info("=" * 80) + + if not self.api_key: + logger.error("API KEY NOT CONFIGURED") + return { + 'success': False, + 'error': 'OpenRouter API key not configured' + } + + logger.info(f"API Key: {self.api_key[:20]}...{self.api_key[-4:]}") + + if not comments: + logger.warning("No comments to analyze") + return { + 'success': True, + 'analyses': [] + } + + try: + logger.info(f"Building prompt for {len(comments)} comments...") + prompt = self._build_analysis_prompt(comments) + logger.info(f"Prompt length: {len(prompt)} characters") + + headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json', + 'HTTP-Referer': getattr(settings, 'SITE_URL', 'http://localhost'), + 'X-Title': 'Social Media Comment Analyzer' + } + + logger.info(f"Request headers prepared: {list(headers.keys())}") + + payload = { + 'model': self.model, + 'messages': [ + { + 'role': 'system', + 'content': 'You are an expert social media sentiment analyzer. Always respond with valid JSON only.' + }, + { + 'role': 'user', + 'content': prompt + } + ], + 'max_tokens': self.DEFAULT_MAX_TOKENS, + 'temperature': self.DEFAULT_TEMPERATURE + } + + logger.info(f"Request payload prepared:") + logger.info(f" - Model: {payload['model']}") + logger.info(f" - Max tokens: {payload['max_tokens']}") + logger.info(f" - Temperature: {payload['temperature']}") + logger.info(f" - Messages: {len(payload['messages'])}") + logger.info(f" - Payload size: {len(json.dumps(payload))} bytes") + + logger.info("-" * 80) + logger.info("SENDING HTTP REQUEST TO OPENROUTER API") + logger.info("-" * 80) + logger.info(f"URL: {self.api_url}") + logger.info(f"Timeout: {self.timeout}s") + + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + self.api_url, + headers=headers, + json=payload + ) + + logger.info("-" * 80) + logger.info("RESPONSE RECEIVED") + logger.info("-" * 80) + logger.info(f"Status Code: {response.status_code}") + logger.info(f"Status Reason: {response.reason_phrase}") + logger.info(f"HTTP Version: {response.http_version}") + logger.info(f"Headers: {dict(response.headers)}") + + # Get raw response text BEFORE any parsing + raw_content = response.text + logger.info(f"Raw response length: {len(raw_content)} characters") + + # Log first and last parts of response for debugging + logger.debug("-" * 80) + logger.debug("RAW RESPONSE CONTENT (First 500 chars):") + logger.debug(raw_content[:500]) + logger.debug("-" * 80) + logger.debug("RAW RESPONSE CONTENT (Last 500 chars):") + logger.debug(raw_content[-500:] if len(raw_content) > 500 else raw_content) + logger.debug("-" * 80) + + response.raise_for_status() + + logger.info("Response status OK (200), attempting to parse JSON...") + + data = response.json() + logger.info(f"Successfully parsed JSON response") + logger.info(f"Response structure: {list(data.keys()) if isinstance(data, dict) else type(data)}") + + # Extract analysis from response + if 'choices' in data and len(data['choices']) > 0: + logger.info(f"Found {len(data['choices'])} choices in response") + content = data['choices'][0]['message']['content'] + logger.info(f"Content message length: {len(content)} characters") + + # Parse JSON response + try: + # Clean up response in case there's any extra text + logger.info("Cleaning response content...") + content = content.strip() + logger.info(f"After strip: {len(content)} chars") + + # Remove markdown code blocks if present + if content.startswith('```json'): + logger.info("Detected ```json prefix, removing...") + content = content[7:] + elif content.startswith('```'): + logger.info("Detected ``` prefix, removing...") + content = content[3:] + + if content.endswith('```'): + logger.info("Detected ``` suffix, removing...") + content = content[:-3] + + content = content.strip() + logger.info(f"After cleaning: {len(content)} chars") + + logger.debug("-" * 80) + logger.debug("CLEANED CONTENT (First 300 chars):") + logger.debug(content[:300]) + logger.debug("-" * 80) + + logger.info("Attempting to parse JSON...") + analysis_data = json.loads(content) + logger.info("JSON parsed successfully!") + logger.info(f"Analysis data keys: {list(analysis_data.keys()) if isinstance(analysis_data, dict) else type(analysis_data)}") + + if 'analyses' in analysis_data: + logger.info(f"Found {len(analysis_data['analyses'])} analyses") + + # Map comment indices back to IDs + analyses = [] + for idx, analysis in enumerate(analysis_data.get('analyses', [])): + comment_idx = analysis.get('comment_index', 0) + if comment_idx < len(comments): + comment_id = comments[comment_idx]['id'] + logger.debug(f" Analysis {idx+1}: comment_index={comment_idx}, comment_id={comment_id}") + analyses.append({ + 'comment_id': comment_id, + **analysis + }) + + # Extract metadata + metadata = { + 'model': self.model, + 'prompt_tokens': data.get('usage', {}).get('prompt_tokens', 0), + 'completion_tokens': data.get('usage', {}).get('completion_tokens', 0), + 'total_tokens': data.get('usage', {}).get('total_tokens', 0), + 'analyzed_at': timezone.now().isoformat() + } + + logger.info(f"Metadata: {metadata}") + logger.info("=" * 80) + logger.info("ANALYSIS COMPLETED SUCCESSFULLY") + logger.info("=" * 80) + + return { + 'success': True, + 'analyses': analyses, + 'metadata': metadata + } + + except json.JSONDecodeError as e: + logger.error("=" * 80) + logger.error("JSON PARSE ERROR") + logger.error("=" * 80) + logger.error(f"Error: {e}") + logger.error(f"Error position: Line {e.lineno}, Column {e.colno}") + logger.error(f"Error message: {e.msg}") + logger.error("-" * 80) + logger.error("FULL CONTENT THAT FAILED TO PARSE:") + logger.error("-" * 80) + logger.error(content) + logger.error("-" * 80) + logger.error("CHARACTER AT ERROR POSITION:") + logger.error("-" * 80) + if hasattr(e, 'pos') and e.pos: + start = max(0, e.pos - 100) + end = min(len(content), e.pos + 100) + logger.error(content[start:end]) + logger.error(f"^ (error at position {e.pos})") + + return { + 'success': False, + 'error': f'Invalid JSON response from API: {str(e)}' + } + else: + logger.error(f"No choices found in response. Response keys: {list(data.keys()) if isinstance(data, dict) else type(data)}") + return { + 'success': False, + 'error': 'No analysis returned from API' + } + + except httpx.HTTPStatusError as e: + logger.error("=" * 80) + logger.error("HTTP STATUS ERROR") + logger.error("=" * 80) + logger.error(f"Status Code: {e.response.status_code}") + logger.error(f"Response Text: {e.response.text}") + return { + 'success': False, + 'error': f'API error: {e.response.status_code} - {str(e)}' + } + except httpx.RequestError as e: + logger.error("=" * 80) + logger.error("HTTP REQUEST ERROR") + logger.error("=" * 80) + logger.error(f"Error: {str(e)}") + return { + 'success': False, + 'error': f'Request failed: {str(e)}' + } + except Exception as e: + logger.error("=" * 80) + logger.error("UNEXPECTED ERROR") + logger.error("=" * 80) + logger.error(f"Error Type: {type(e).__name__}") + logger.error(f"Error Message: {str(e)}") + logger.error("=" * 80) + logger.error("FULL TRACEBACK:", exc_info=True) + logger.error("=" * 80) + return { + 'success': False, + 'error': f'Unexpected error: {str(e)}' + } + + def analyze_comments(self, comments: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Analyze a batch of comments using OpenRouter API (synchronous wrapper). + + Args: + comments: List of comment dictionaries with 'id' and 'text' keys + + Returns: + Dictionary with success status and analysis results + """ + import asyncio + + try: + # Run async function in event loop + loop = asyncio.get_event_loop() + except RuntimeError: + # No event loop exists, create new one + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + return loop.run_until_complete(self.analyze_comments_async(comments)) + + def is_configured(self) -> bool: + """Check if service is properly configured.""" + return bool(self.api_key) diff --git a/apps/social/tasks.py b/apps/social/tasks.py new file mode 100644 index 0000000..0450418 --- /dev/null +++ b/apps/social/tasks.py @@ -0,0 +1,342 @@ +""" +Celery scheduled tasks for social media comment scraping and analysis. +""" +import logging +from celery import shared_task +from celery.schedules import crontab +from django.utils import timezone +from datetime import timedelta +from django.conf import settings + +from .services import CommentService +from .services.analysis_service import AnalysisService + + +logger = logging.getLogger(__name__) + +# Analysis settings +ANALYSIS_BATCH_SIZE = 10 # Number of comments to analyze per batch + + +logger = logging.getLogger(__name__) + + +@shared_task +def scrape_all_platforms(): + """ + Scheduled task to scrape all configured social media platforms. + This task is scheduled using Celery Beat. + + After scraping, automatically queues analysis for pending comments. + + Usage: Schedule this task to run at regular intervals (e.g., daily, hourly) + + Returns: + Dictionary with results from each platform + """ + logger.info("Starting scheduled scrape for all platforms") + + try: + service = CommentService() + results = service.scrape_and_save() + + logger.info(f"Completed scheduled scrape. Results: {results}") + + # Automatically queue analysis for pending comments + analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE) + logger.info("Queued analysis task for pending comments") + + return results + + except Exception as e: + logger.error(f"Error in scheduled scrape task: {e}") + raise + + +@shared_task +def scrape_youtube_comments(channel_id: str = None): + """ + Scheduled task to scrape YouTube comments. + + Args: + channel_id: Optional YouTube channel ID (uses default from settings if not provided) + + Returns: + Dictionary with 'total' and 'comments' + """ + logger.info("Starting scheduled YouTube scrape") + + try: + service = CommentService() + result = service.scrape_youtube(channel_id=channel_id, save_to_db=True) + + logger.info(f"Completed YouTube scrape. Total comments: {len(result)}") + + # Automatically queue analysis for pending comments + analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE) + logger.info("Queued analysis task for pending comments") + + return {'total': len(result), 'comments': result} + + except Exception as e: + logger.error(f"Error in YouTube scrape task: {e}") + raise + + +@shared_task +def scrape_facebook_comments(page_id: str = None): + """ + Scheduled task to scrape Facebook comments. + + Args: + page_id: Optional Facebook page ID (uses default from settings if not provided) + + Returns: + Dictionary with 'total' and 'comments' + """ + logger.info("Starting scheduled Facebook scrape") + + try: + service = CommentService() + result = service.scrape_facebook(page_id=page_id, save_to_db=True) + + logger.info(f"Completed Facebook scrape. Total comments: {len(result)}") + + # Automatically queue analysis for pending comments + analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE) + logger.info("Queued analysis task for pending comments") + + return {'total': len(result), 'comments': result} + + except Exception as e: + logger.error(f"Error in Facebook scrape task: {e}") + raise + + +@shared_task +def scrape_instagram_comments(account_id: str = None): + """ + Scheduled task to scrape Instagram comments. + + Args: + account_id: Optional Instagram account ID (uses default from settings if not provided) + + Returns: + Dictionary with 'total' and 'comments' + """ + logger.info("Starting scheduled Instagram scrape") + + try: + service = CommentService() + result = service.scrape_instagram(account_id=account_id, save_to_db=True) + + logger.info(f"Completed Instagram scrape. Total comments: {len(result)}") + + # Automatically queue analysis for pending comments + analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE) + logger.info("Queued analysis task for pending comments") + + return {'total': len(result), 'comments': result} + + except Exception as e: + logger.error(f"Error in Instagram scrape task: {e}") + raise + + +@shared_task +def scrape_twitter_comments(username: str = None): + """ + Scheduled task to scrape Twitter/X comments (replies). + + Args: + username: Optional Twitter username (uses default from settings if not provided) + + Returns: + Dictionary with 'total' and 'comments' + """ + logger.info("Starting scheduled Twitter/X scrape") + + try: + service = CommentService() + result = service.scrape_twitter(username=username, save_to_db=True) + + logger.info(f"Completed Twitter/X scrape. Total comments: {len(result)}") + + # Automatically queue analysis for pending comments + analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE) + logger.info("Queued analysis task for pending comments") + + return {'total': len(result), 'comments': result} + + except Exception as e: + logger.error(f"Error in Twitter/X scrape task: {e}") + raise + + +@shared_task +def scrape_linkedin_comments(organization_id: str = None): + """ + Scheduled task to scrape LinkedIn comments from organization posts. + + Args: + organization_id: Optional LinkedIn organization URN (uses default from settings if not provided) + + Returns: + Dictionary with 'total' and 'comments' + """ + logger.info("Starting scheduled LinkedIn scrape") + + try: + service = CommentService() + result = service.scrape_linkedin(organization_id=organization_id, save_to_db=True) + + logger.info(f"Completed LinkedIn scrape. Total comments: {len(result)}") + + # Automatically queue analysis for pending comments + analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE) + logger.info("Queued analysis task for pending comments") + + return {'total': len(result), 'comments': result} + + except Exception as e: + logger.error(f"Error in LinkedIn scrape task: {e}") + raise + + +@shared_task +def scrape_google_reviews(location_names: list = None): + """ + Scheduled task to scrape Google Reviews from business locations. + + Args: + location_names: Optional list of location names to scrape (uses all locations if not provided) + + Returns: + Dictionary with 'total' and 'reviews' + """ + logger.info("Starting scheduled Google Reviews scrape") + + try: + service = CommentService() + result = service.scrape_google_reviews(location_names=location_names, save_to_db=True) + + logger.info(f"Completed Google Reviews scrape. Total reviews: {len(result)}") + + # Automatically queue analysis for pending comments + analyze_pending_comments.delay(limit=ANALYSIS_BATCH_SIZE) + logger.info("Queued analysis task for pending comments") + + return {'total': len(result), 'reviews': result} + + except Exception as e: + logger.error(f"Error in Google Reviews scrape task: {e}") + raise + + +# ============================================================================ +# AI Analysis Tasks +# ============================================================================ + +@shared_task +def analyze_pending_comments(limit: int = 100): + """ + Scheduled task to analyze all pending (unanalyzed) comments. + + Args: + limit: Maximum number of comments to analyze in one run + + Returns: + Dictionary with analysis statistics + """ + if not getattr(settings, 'ANALYSIS_ENABLED', True): + logger.info("Comment analysis is disabled") + return {'success': False, 'message': 'Analysis disabled'} + + logger.info("Starting scheduled comment analysis") + + try: + service = AnalysisService() + results = service.analyze_pending_comments(limit=limit) + + logger.info(f"Completed comment analysis. Results: {results}") + + # Check if there are more pending comments and queue another batch if needed + from .models import SocialMediaComment + pending_count = SocialMediaComment.objects.filter( + ai_analysis__isnull=True + ).count() + SocialMediaComment.objects.filter( + ai_analysis={} + ).count() + + # FIXED: Queue if ANY pending comments remain (not just >= batch size) + if pending_count > 0: + logger.info(f" - Found {pending_count} pending comments, queuing next batch") + # Use min() to ensure we don't exceed batch size + batch_size = min(pending_count, ANALYSIS_BATCH_SIZE) + analyze_pending_comments.delay(limit=batch_size) + + return results + + except Exception as e: + logger.error(f"Error in comment analysis task: {e}", exc_info=True) + raise + + +@shared_task +def analyze_recent_comments(hours: int = 24, limit: int = 100): + """ + Scheduled task to analyze comments scraped in the last N hours. + + Args: + hours: Number of hours to look back + limit: Maximum number of comments to analyze + + Returns: + Dictionary with analysis statistics + """ + if not getattr(settings, 'ANALYSIS_ENABLED', True): + logger.info("Comment analysis is disabled") + return {'success': False, 'message': 'Analysis disabled'} + + logger.info(f"Starting analysis for comments from last {hours} hours") + + try: + service = AnalysisService() + results = service.analyze_recent_comments(hours=hours, limit=limit) + + logger.info(f"Completed recent comment analysis. Results: {results}") + return results + + except Exception as e: + logger.error(f"Error in recent comment analysis task: {e}", exc_info=True) + raise + + +@shared_task +def analyze_platform_comments(platform: str, limit: int = 100): + """ + Scheduled task to analyze comments from a specific platform. + + Args: + platform: Platform name (e.g., 'youtube', 'facebook', 'instagram') + limit: Maximum number of comments to analyze + + Returns: + Dictionary with analysis statistics + """ + if not getattr(settings, 'ANALYSIS_ENABLED', True): + logger.info("Comment analysis is disabled") + return {'success': False, 'message': 'Analysis disabled'} + + logger.info(f"Starting analysis for {platform} comments") + + try: + service = AnalysisService() + results = service.analyze_comments_by_platform(platform=platform, limit=limit) + + logger.info(f"Completed {platform} comment analysis. Results: {results}") + return results + + except Exception as e: + logger.error(f"Error in {platform} comment analysis task: {e}", exc_info=True) + raise diff --git a/apps/social/templatetags/ACTION_ICONS_README.md b/apps/social/templatetags/ACTION_ICONS_README.md new file mode 100644 index 0000000..b8243cc --- /dev/null +++ b/apps/social/templatetags/ACTION_ICONS_README.md @@ -0,0 +1,163 @@ +# Action Icons Template Tag + +## Overview + +The `action_icons` template tag library provides reusable SVG icons for common UI actions throughout the application. + +## Usage + +### Loading the Library + +```django +{% load action_icons %} +``` + +### Using the action_icon Tag + +**Correct syntax** (simple_tag): +```django +{% action_icon 'create' %} +{% action_icon 'edit' %} +{% action_icon 'delete' %} +{% action_icon 'view' %} +``` + +**Incorrect syntax** (will cause TemplateSyntaxError): +```django +{{ action_icon 'create' }} +``` + +### Available Icons + +| Action Name | Icon | Description | +|-------------|-------|-------------| +| `create` | ➕ Plus sign | Create/add new item | +| `edit` | ✏️ Pencil | Edit existing item | +| `delete` | 🗑️ Trash | Delete item | +| `view` | 👁️ Eye | View details | +| `save` | 💾 Floppy disk | Save changes | +| `cancel` | ✖️ X | Cancel action | +| `back` | ⬅️ Arrow | Go back | +| `download` | ⬇️ Down arrow | Download content | +| `upload` | ⬆️ Up arrow | Upload content | +| `search` | 🔍 Magnifying glass | Search | +| `filter` | 🔽 Lines | Filter results | +| `check` | ✓ Checkmark | Confirm/success | +| `warning` | ⚠️ Triangle | Warning | +| `info` | ℹ️ Circle | Information | +| `refresh` | 🔄 Arrow circle | Refresh/reload | +| `copy` | 📋 Documents | Copy to clipboard | +| `print` | 🖨️ Printer | Print content | +| `export` | ⬇️ Down arrow | Export data | +| `import` | ⬆️ Up arrow | Import data | + +### Custom Size + +```django +{% action_icon 'create' size=20 %} +``` + +Default size is 16x16 pixels. + +## Example Usage + +### In Button Links + +```django + + {% action_icon 'create' %} {% trans "Add Item" %} + +``` + +### In Action Buttons + +```django + + {% action_icon 'edit' %} + +``` + +### In Headers + +```django +{{ source.name_en }}
+{% trans "Are you sure you want to delete this source? This action cannot be undone." %}
+| {% trans "Name (English)" %} | +{{ source.name_en }} | +
|---|---|
| {% trans "Name (Arabic)" %} | +{{ source.name_ar|default:"-" }} | +
| {% trans "Description" %} | +{{ source.description|default:"-"|truncatewords:20 }} | +
| {% trans "Status" %} | ++ {% if source.is_active %} + {% trans "Active" %} + {% else %} + {% trans "Inactive" %} + {% endif %} + | +
| {% trans "Usage Count" %} | ++ {% if usage_count > 0 %} + {{ usage_count }} + {% else %} + 0 + {% endif %} + | +
{% trans "This source has been used in {{ usage_count }} record(s). You cannot delete sources that have usage records." %}
+{% trans "Recommended action:" %} {% trans "Deactivate this source instead by editing it and unchecking the 'Active' checkbox." %}
++ {% if source.is_active %} + {% trans "Active" %} + {% else %} + {% trans "Inactive" %} + {% endif %} +
+| {% trans "Name (English)" %} | +{{ source.name_en }} | +
|---|---|
| {% trans "Name (Arabic)" %} | +{{ source.name_ar|default:"-" }} | +
| {% trans "Description" %} | +{{ source.description|default:"-"|linebreaks }} | +
| {% trans "Status" %} | ++ {% if source.is_active %} + {% trans "Active" %} + {% else %} + {% trans "Inactive" %} + {% endif %} + | +
| {% trans "Created" %} | +{{ source.created_at|date:"Y-m-d H:i" }} | +
| {% trans "Last Updated" %} | +{{ source.updated_at|date:"Y-m-d H:i" }} | +
| {% trans "Date" %} | +{% trans "Content Type" %} | +{% trans "Object ID" %} | +{% trans "Hospital" %} | +{% trans "User" %} | +
|---|---|---|---|---|
| {{ record.created_at|date:"Y-m-d H:i" }} | +{{ record.content_type.model }} |
+ {{ record.object_id|truncatechars:20 }} | +{{ record.hospital.name_en|default:"-" }} | +{{ record.user.get_full_name|default:"-" }} | +
{% trans "No usage records found for this source." %}
+ {% endif %} +| {% trans "User" %} | +{% trans "Email" %} | +{% trans "Status" %} | +{% trans "Permissions" %} | +{% trans "Created" %} | +{% trans "Actions" %} | +
|---|---|---|---|---|---|
| + {{ su.user.get_full_name|default:"-" }} + | +{{ su.user.email }} | ++ {% if su.is_active %} + {% trans "Active" %} + {% else %} + {% trans "Inactive" %} + {% endif %} + | ++ {% if su.can_create_complaints %} + {% trans "Complaints" %} + {% endif %} + {% if su.can_create_inquiries %} + {% trans "Inquiries" %} + {% endif %} + {% if not su.can_create_complaints and not su.can_create_inquiries %} + {% trans "None" %} + {% endif %} + | +{{ su.created_at|date:"Y-m-d" }} | ++ + | +
+ {% trans "No source users assigned yet." %} + + {% trans "Add a source user" %} + + {% trans "to get started." %} +
+{{ source.name_en }}
+ {% endif %} +{% trans "Manage patient experience source channels" %}
+| {% trans "Name (EN)" %} | +{% trans "Name (AR)" %} | +{% trans "Description" %} | +{% trans "Status" %} | +{% trans "Actions" %} | +
|---|---|---|---|---|
| {{ source.name_en }} | +{{ source.name_ar|default:"-" }} | +{{ source.description|default:"-"|truncatewords:10 }} | ++ {% if source.is_active %} + {% trans "Active" %} + {% else %} + {% trans "Inactive" %} + {% endif %} + | ++ + {% action_icon 'view' %} + + {% if request.user.is_px_admin %} + + {% action_icon 'edit' %} + + + {% action_icon 'delete' %} + + {% endif %} + | +
|
+ + + +{% trans "No sources found. Click 'Add Source' to create one." %} + |
+ ||||
+ {{ source.name_en }} +
+{% trans "Are you sure you want to remove the following source user?" %}
+ ++ {% trans "Welcome" %}, {{ request.user.get_full_name }}! + {% trans "You're managing feedback from this source." %} +
+| {% trans "ID" %} | +{% trans "Title" %} | +{% trans "Patient" %} | +{% trans "Category" %} | +{% trans "Status" %} | +{% trans "Priority" %} | +{% trans "Created" %} | +{% trans "Actions" %} | +
|---|---|---|---|---|---|---|---|
{{ complaint.id|slice:":8" }} |
+ {{ complaint.title|truncatewords:8 }} | +{{ complaint.patient.get_full_name }} | +{{ complaint.get_category_display }} | ++ {% if complaint.status == 'open' %} + {% trans "Open" %} + {% elif complaint.status == 'in_progress' %} + {% trans "In Progress" %} + {% elif complaint.status == 'resolved' %} + {% trans "Resolved" %} + {% else %} + {% trans "Closed" %} + {% endif %} + | ++ {% if complaint.priority == 'high' %} + {% trans "High" %} + {% elif complaint.priority == 'medium' %} + {% trans "Medium" %} + {% else %} + {% trans "Low" %} + {% endif %} + | +{{ complaint.created_at|date:"Y-m-d" }} | ++ + {% action_icon 'view' %} + + | +
|
+ + + +{% trans "No complaints found for this source." %} + |
+ |||||||
| {% trans "ID" %} | +{% trans "Subject" %} | +{% trans "Patient" %} | +{% trans "Category" %} | +{% trans "Status" %} | +{% trans "Created" %} | +{% trans "Actions" %} | +
|---|---|---|---|---|---|---|
{{ inquiry.id|slice:":8" }} |
+ {{ inquiry.subject|truncatewords:8 }} | ++ {% if inquiry.patient %} + {{ inquiry.patient.get_full_name }} + {% else %} + {{ inquiry.contact_name|default:"-" }} + {% endif %} + | +{{ inquiry.get_category_display }} | ++ {% if inquiry.status == 'open' %} + {% trans "Open" %} + {% elif inquiry.status == 'in_progress' %} + {% trans "In Progress" %} + {% elif inquiry.status == 'resolved' %} + {% trans "Resolved" %} + {% else %} + {% trans "Closed" %} + {% endif %} + | +{{ inquiry.created_at|date:"Y-m-d" }} | ++ + {% action_icon 'view' %} + + | +
|
+ + + +{% trans "No inquiries found for this source." %} + |
+ ||||||
+ {{ source.name_en }} +
+{{ mention.content }}
- -Sentiment Score
- {% else %} -Not analyzed yet
- {% endif %} -Action created from this mention
- - View Action - -Track social media mentions and sentiment
-@{{ mention.author_username }}
-{{ mention.content|truncatewords:30 }}
- -No mentions found
-{{ comment.ai_analysis.summaries.en }}
+ +{{ comment.ai_analysis.summaries.ar }}
+ +{% trans "Social media insights and trends" %}
+{% trans "No keywords found" %}
+{% trans "No topics found" %}
+| {% trans "Platform" %} | +{% trans "Comments" %} | +{% trans "Avg Sentiment" %} | +{% trans "Total Likes" %} | +{% trans "Total Replies" %} | +{% trans "Actions" %} | +
|---|---|---|---|---|---|
| + + {{ platform.platform_display }} + + | ++ {{ platform.count }} + | ++ + {{ platform.avg_sentiment|floatformat:2 }} + + | ++ + + {{ platform.total_likes }} + + | ++ + + {{ platform.total_replies }} + + | ++ + {% trans "View" %} + + | +
|
+
+ {% trans "No data available" %} + |
+ |||||
{% trans "No entities found" %}
+{{ comment.get_platform_display }}
+{{ comment.comments }}
+ +{{ comment.ai_analysis.sentiment.classification.ar }}
+{% trans "Analysis confidence" %}: {{ comment.ai_analysis.sentiment.confidence|floatformat:2 }}
+{{ comment.ai_analysis.summaries.en }}
+{{ comment.ai_analysis.summaries.ar }}
+{{ comment.raw_data }}
+ {% trans "Track social media mentions and sentiment across all platforms" %}
+{% trans "No comments found" %}
+{% trans "Monitor and analyze" %} {{ platform_display }} {% trans "comments" %}
+@{{ comment.author }}
+ {% endif %} + +{{ comment.comments|truncatewords:30 }}
+ + {% if comment.ai_analysis and comment.ai_analysis.keywords.en %} +{% trans "No comments found for this platform" %}
+