HH/apps/social/scrapers/twitter.py
2026-01-12 12:27:29 +03:00

195 lines
6.9 KiB
Python

"""
Twitter/X comment scraper using Twitter API v2 via Tweepy.
"""
import logging
from typing import List, Dict, Any
import tweepy
from .base import BaseScraper
class TwitterScraper(BaseScraper):
"""
Scraper for Twitter/X comments (replies) using Twitter API v2.
Extracts replies to tweets from a specified user.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize Twitter scraper.
Args:
config: Dictionary with 'bearer_token' and optionally 'username'
"""
super().__init__(config)
self.bearer_token = config.get('bearer_token')
if not self.bearer_token:
raise ValueError(
"Twitter bearer token is required. "
"Set TWITTER_BEARER_TOKEN in your .env file."
)
self.default_username = config.get('username', 'elonmusk')
if not config.get('username'):
self.logger.warning(
"Twitter username not provided. "
"Set TWITTER_USERNAME in your .env file to specify which account to scrape."
)
self.client = tweepy.Client(
bearer_token=self.bearer_token,
wait_on_rate_limit=True
)
self.logger = logging.getLogger(self.__class__.__name__)
def scrape_comments(
self,
username: str = None,
max_tweets: int = 50,
max_replies_per_tweet: int = 100,
**kwargs
) -> List[Dict[str, Any]]:
"""
Scrape replies (comments) from a Twitter/X user's tweets.
Args:
username: Twitter username to scrape (uses default from config if not provided)
max_tweets: Maximum number of tweets to fetch
max_replies_per_tweet: Maximum replies per tweet
Returns:
List of standardized comment dictionaries
"""
username = username or self.default_username
if not username:
raise ValueError("Username is required")
all_comments = []
self.logger.info(f"Starting Twitter comment extraction for @{username}")
try:
# Get user ID
user = self.client.get_user(username=username)
if not user.data:
self.logger.error(f"User @{username} not found")
return all_comments
user_id = user.data.id
self.logger.info(f"Found user ID: {user_id}")
# Fetch tweets and their replies
tweet_count = 0
for tweet in tweepy.Paginator(
self.client.get_users_tweets,
id=user_id,
max_results=100
).flatten(limit=max_tweets):
tweet_count += 1
self.logger.info(f"Processing tweet {tweet_count}/{max_tweets} (ID: {tweet.id})")
# Search for replies to this tweet
replies = self._get_tweet_replies(tweet.id, max_replies_per_tweet)
for reply in replies:
comment = self._extract_comment(tweet, reply)
if comment:
all_comments.append(comment)
self.logger.info(f" - Found {len(replies)} replies for this tweet")
self.logger.info(f"Completed Twitter scraping. Total comments: {len(all_comments)}")
return all_comments
except tweepy.errors.NotFound:
self.logger.error(f"User @{username} not found or account is private")
return all_comments
except tweepy.errors.Forbidden:
self.logger.error(f"Access forbidden for @{username}. Check API permissions.")
return all_comments
except tweepy.errors.TooManyRequests:
self.logger.error("Twitter API rate limit exceeded")
return all_comments
except Exception as e:
self.logger.error(f"Error scraping Twitter: {e}")
raise
def _get_tweet_replies(self, tweet_id: str, max_replies: int) -> List[Dict[str, Any]]:
"""
Get replies for a specific tweet.
Args:
tweet_id: Original tweet ID
max_replies: Maximum number of replies to fetch
Returns:
List of reply tweet objects
"""
replies = []
# Search for replies using conversation_id
query = f"conversation_id:{tweet_id} is:reply"
try:
for reply in tweepy.Paginator(
self.client.search_recent_tweets,
query=query,
tweet_fields=['author_id', 'created_at', 'text'],
max_results=100
).flatten(limit=max_replies):
replies.append(reply)
except Exception as e:
self.logger.warning(f"Error fetching replies for tweet {tweet_id}: {e}")
return replies
def _extract_comment(self, original_tweet: Dict[str, Any], reply_tweet: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract and standardize a reply (comment) from Twitter API response.
Args:
original_tweet: Original tweet object
reply_tweet: Reply tweet object
Returns:
Standardized comment dictionary
"""
try:
# Extract reply data
reply_id = str(reply_tweet.id)
reply_text = reply_tweet.text
reply_author_id = str(reply_tweet.author_id)
reply_created_at = reply_tweet.created_at
# Extract original tweet data
original_tweet_id = str(original_tweet.id)
# Build Twitter URL
twitter_url = f"https://twitter.com/x/status/{original_tweet_id}"
comment_data = {
'comment_id': reply_id,
'comments': reply_text,
'author': reply_author_id,
'published_at': self._parse_timestamp(reply_created_at.isoformat()),
'like_count': 0, # Twitter API v2 doesn't provide like count for replies in basic query
'reply_count': 0, # Would need additional API call
'post_id': original_tweet_id,
'media_url': twitter_url,
'raw_data': {
'original_tweet_id': original_tweet_id,
'original_tweet_text': original_tweet.text,
'reply_id': reply_id,
'reply_author_id': reply_author_id,
'reply_text': reply_text,
'reply_at': reply_created_at.isoformat()
}
}
return self._standardize_comment(comment_data)
except Exception as e:
self.logger.error(f"Error extracting Twitter comment: {e}")
return None