367 lines
13 KiB
Python
367 lines
13 KiB
Python
"""
|
|
Service class for managing social media comment scraping and database operations.
|
|
"""
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
from django.conf import settings
|
|
|
|
from ..models import SocialMediaComment
|
|
from ..scrapers import YouTubeScraper, FacebookScraper, InstagramScraper, TwitterScraper, LinkedInScraper, GoogleReviewsScraper
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CommentService:
|
|
"""
|
|
Service class to manage scraping from all social media platforms
|
|
and saving comments to the database.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the comment service."""
|
|
self.scrapers = {}
|
|
self._initialize_scrapers()
|
|
|
|
def _initialize_scrapers(self):
|
|
"""Initialize all platform scrapers with configuration from settings."""
|
|
# YouTube scraper
|
|
youtube_config = {
|
|
'api_key': getattr(settings, 'YOUTUBE_API_KEY', None),
|
|
'channel_id': getattr(settings, 'YOUTUBE_CHANNEL_ID', None),
|
|
}
|
|
if youtube_config['api_key']:
|
|
self.scrapers['youtube'] = YouTubeScraper(youtube_config)
|
|
|
|
# Facebook scraper
|
|
facebook_config = {
|
|
'access_token': getattr(settings, 'FACEBOOK_ACCESS_TOKEN', None),
|
|
'page_id': getattr(settings, 'FACEBOOK_PAGE_ID', None),
|
|
}
|
|
if facebook_config['access_token']:
|
|
self.scrapers['facebook'] = FacebookScraper(facebook_config)
|
|
|
|
# Instagram scraper
|
|
instagram_config = {
|
|
'access_token': getattr(settings, 'INSTAGRAM_ACCESS_TOKEN', None),
|
|
'account_id': getattr(settings, 'INSTAGRAM_ACCOUNT_ID', None),
|
|
}
|
|
if instagram_config['access_token']:
|
|
self.scrapers['instagram'] = InstagramScraper(instagram_config)
|
|
|
|
# Twitter/X scraper
|
|
twitter_config = {
|
|
'bearer_token': getattr(settings, 'TWITTER_BEARER_TOKEN', None),
|
|
'username': getattr(settings, 'TWITTER_USERNAME', None),
|
|
}
|
|
if twitter_config['bearer_token']:
|
|
self.scrapers['twitter'] = TwitterScraper(twitter_config)
|
|
|
|
# LinkedIn scraper
|
|
linkedin_config = {
|
|
'access_token': getattr(settings, 'LINKEDIN_ACCESS_TOKEN', None),
|
|
'organization_id': getattr(settings, 'LINKEDIN_ORGANIZATION_ID', None),
|
|
}
|
|
if linkedin_config['access_token']:
|
|
self.scrapers['linkedin'] = LinkedInScraper(linkedin_config)
|
|
|
|
# Google Reviews scraper (requires credentials)
|
|
google_reviews_config = {
|
|
'credentials_file': getattr(settings, 'GOOGLE_CREDENTIALS_FILE', None),
|
|
'token_file': getattr(settings, 'GOOGLE_TOKEN_FILE', 'token.json'),
|
|
'locations': getattr(settings, 'GOOGLE_LOCATIONS', None),
|
|
}
|
|
if google_reviews_config['credentials_file']:
|
|
try:
|
|
self.scrapers['google_reviews'] = GoogleReviewsScraper(google_reviews_config)
|
|
except (FileNotFoundError, Exception) as e:
|
|
logger.warning(f"Google Reviews scraper not initialized: {e}")
|
|
logger.info("Google Reviews will be skipped. See GOOGLE_REVIEWS_INTEGRATION_GUIDE.md for setup.")
|
|
|
|
logger.info(f"Initialized scrapers: {list(self.scrapers.keys())}")
|
|
|
|
def scrape_and_save(
|
|
self,
|
|
platforms: Optional[List[str]] = None,
|
|
platform_id: Optional[str] = None
|
|
) -> Dict[str, Dict[str, int]]:
|
|
"""
|
|
Scrape comments from specified platforms and save to database.
|
|
|
|
Args:
|
|
platforms: List of platforms to scrape (e.g., ['youtube', 'facebook'])
|
|
If None, scrape all available platforms
|
|
platform_id: Optional platform-specific ID (channel_id, page_id, account_id)
|
|
|
|
Returns:
|
|
Dictionary with platform names as keys and dictionaries containing:
|
|
- 'new': Number of new comments added
|
|
- 'updated': Number of existing comments updated
|
|
"""
|
|
if platforms is None:
|
|
platforms = list(self.scrapers.keys())
|
|
|
|
results = {}
|
|
|
|
for platform in platforms:
|
|
if platform not in self.scrapers:
|
|
logger.warning(f"Scraper for {platform} not initialized")
|
|
results[platform] = {'new': 0, 'updated': 0}
|
|
continue
|
|
|
|
try:
|
|
logger.info(f"Starting scraping for {platform}")
|
|
comments = self.scrapers[platform].scrape_comments(platform_id=platform_id)
|
|
save_result = self._save_comments(platform, comments)
|
|
results[platform] = save_result
|
|
logger.info(f"From {platform}: {save_result['new']} new, {save_result['updated']} updated comments")
|
|
except Exception as e:
|
|
logger.error(f"Error scraping {platform}: {e}")
|
|
results[platform] = {'new': 0, 'updated': 0}
|
|
|
|
return results
|
|
|
|
def scrape_youtube(
|
|
self,
|
|
channel_id: Optional[str] = None,
|
|
save_to_db: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scrape comments from YouTube.
|
|
|
|
Args:
|
|
channel_id: YouTube channel ID
|
|
save_to_db: If True, save comments to database
|
|
|
|
Returns:
|
|
List of scraped comments
|
|
"""
|
|
if 'youtube' not in self.scrapers:
|
|
raise ValueError("YouTube scraper not initialized")
|
|
|
|
comments = self.scrapers['youtube'].scrape_comments(channel_id=channel_id)
|
|
|
|
if save_to_db:
|
|
self._save_comments('youtube', comments)
|
|
|
|
return comments
|
|
|
|
def scrape_facebook(
|
|
self,
|
|
page_id: Optional[str] = None,
|
|
save_to_db: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scrape comments from Facebook.
|
|
|
|
Args:
|
|
page_id: Facebook page ID
|
|
save_to_db: If True, save comments to database
|
|
|
|
Returns:
|
|
List of scraped comments
|
|
"""
|
|
if 'facebook' not in self.scrapers:
|
|
raise ValueError("Facebook scraper not initialized")
|
|
|
|
comments = self.scrapers['facebook'].scrape_comments(page_id=page_id)
|
|
|
|
if save_to_db:
|
|
self._save_comments('facebook', comments)
|
|
|
|
return comments
|
|
|
|
def scrape_instagram(
|
|
self,
|
|
account_id: Optional[str] = None,
|
|
save_to_db: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scrape comments from Instagram.
|
|
|
|
Args:
|
|
account_id: Instagram account ID
|
|
save_to_db: If True, save comments to database
|
|
|
|
Returns:
|
|
List of scraped comments
|
|
"""
|
|
if 'instagram' not in self.scrapers:
|
|
raise ValueError("Instagram scraper not initialized")
|
|
|
|
comments = self.scrapers['instagram'].scrape_comments(account_id=account_id)
|
|
|
|
if save_to_db:
|
|
self._save_comments('instagram', comments)
|
|
|
|
return comments
|
|
|
|
def scrape_twitter(
|
|
self,
|
|
username: Optional[str] = None,
|
|
save_to_db: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scrape comments (replies) from Twitter/X.
|
|
|
|
Args:
|
|
username: Twitter username
|
|
save_to_db: If True, save comments to database
|
|
|
|
Returns:
|
|
List of scraped comments
|
|
"""
|
|
if 'twitter' not in self.scrapers:
|
|
raise ValueError("Twitter scraper not initialized")
|
|
|
|
comments = self.scrapers['twitter'].scrape_comments(username=username)
|
|
|
|
if save_to_db:
|
|
self._save_comments('twitter', comments)
|
|
|
|
return comments
|
|
|
|
def scrape_linkedin(
|
|
self,
|
|
organization_id: Optional[str] = None,
|
|
save_to_db: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scrape comments from LinkedIn organization posts.
|
|
|
|
Args:
|
|
organization_id: LinkedIn organization URN (e.g., 'urn:li:organization:1234567')
|
|
save_to_db: If True, save comments to database
|
|
|
|
Returns:
|
|
List of scraped comments
|
|
"""
|
|
if 'linkedin' not in self.scrapers:
|
|
raise ValueError("LinkedIn scraper not initialized")
|
|
|
|
comments = self.scrapers['linkedin'].scrape_comments(organization_id=organization_id)
|
|
|
|
if save_to_db:
|
|
self._save_comments('linkedin', comments)
|
|
|
|
return comments
|
|
|
|
def scrape_google_reviews(
|
|
self,
|
|
location_names: Optional[List[str]] = None,
|
|
save_to_db: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scrape Google Reviews from business locations.
|
|
|
|
Args:
|
|
location_names: Optional list of location names to scrape (uses all locations if None)
|
|
save_to_db: If True, save comments to database
|
|
|
|
Returns:
|
|
List of scraped reviews
|
|
"""
|
|
if 'google_reviews' not in self.scrapers:
|
|
raise ValueError("Google Reviews scraper not initialized")
|
|
|
|
comments = self.scrapers['google_reviews'].scrape_comments(location_names=location_names)
|
|
|
|
if save_to_db:
|
|
self._save_comments('google_reviews', comments)
|
|
|
|
return comments
|
|
|
|
def _save_comments(self, platform: str, comments: List[Dict[str, Any]]) -> Dict[str, int]:
|
|
"""
|
|
Save comments to database using get_or_create to prevent duplicates.
|
|
Updates existing comments with fresh data (likes, etc.).
|
|
|
|
Args:
|
|
platform: Platform name
|
|
comments: List of comment dictionaries
|
|
|
|
Returns:
|
|
Dictionary with:
|
|
- 'new': Number of new comments added
|
|
- 'updated': Number of existing comments updated
|
|
"""
|
|
new_count = 0
|
|
updated_count = 0
|
|
|
|
for comment_data in comments:
|
|
try:
|
|
# Parse published_at timestamp
|
|
published_at = None
|
|
if comment_data.get('published_at'):
|
|
try:
|
|
published_at = datetime.fromisoformat(
|
|
comment_data['published_at'].replace('Z', '+00:00')
|
|
)
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
|
|
# Prepare default values
|
|
defaults = {
|
|
'comments': comment_data.get('comments', ''),
|
|
'author': comment_data.get('author', ''),
|
|
'post_id': comment_data.get('post_id'),
|
|
'media_url': comment_data.get('media_url'),
|
|
'like_count': comment_data.get('like_count', 0),
|
|
'reply_count': comment_data.get('reply_count', 0),
|
|
'rating': comment_data.get('rating'),
|
|
'published_at': published_at,
|
|
'raw_data': comment_data.get('raw_data', {})
|
|
}
|
|
|
|
# Use get_or_create to prevent duplicates
|
|
comment, created = SocialMediaComment.objects.get_or_create(
|
|
platform=platform,
|
|
comment_id=comment_data['comment_id'],
|
|
defaults=defaults
|
|
)
|
|
|
|
if created:
|
|
# New comment was created
|
|
new_count += 1
|
|
logger.debug(f"New comment added: {comment_data['comment_id']}")
|
|
else:
|
|
# Comment already exists, update it with fresh data
|
|
comment.comments = defaults['comments']
|
|
comment.author = defaults['author']
|
|
comment.post_id = defaults['post_id']
|
|
comment.media_url = defaults['media_url']
|
|
comment.like_count = defaults['like_count']
|
|
comment.reply_count = defaults['reply_count']
|
|
comment.rating = defaults['rating']
|
|
if defaults['published_at']:
|
|
comment.published_at = defaults['published_at']
|
|
comment.raw_data = defaults['raw_data']
|
|
comment.save()
|
|
updated_count += 1
|
|
logger.debug(f"Comment updated: {comment_data['comment_id']}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving comment {comment_data.get('comment_id')}: {e}")
|
|
|
|
logger.info(f"Saved comments for {platform}: {new_count} new, {updated_count} updated")
|
|
return {'new': new_count, 'updated': updated_count}
|
|
|
|
def get_latest_comments(self, platform: Optional[str] = None, limit: int = 100) -> List[SocialMediaComment]:
|
|
"""
|
|
Get latest comments from database.
|
|
|
|
Args:
|
|
platform: Filter by platform (optional)
|
|
limit: Maximum number of comments to return
|
|
|
|
Returns:
|
|
List of SocialMediaComment objects
|
|
"""
|
|
queryset = SocialMediaComment.objects.all()
|
|
|
|
if platform:
|
|
queryset = queryset.filter(platform=platform)
|
|
|
|
return list(queryset.order_by('-published_at')[:limit])
|