195 lines
6.9 KiB
Python
195 lines
6.9 KiB
Python
"""
|
|
Twitter/X comment scraper using Twitter API v2 via Tweepy.
|
|
"""
|
|
import logging
|
|
from typing import List, Dict, Any
|
|
import tweepy
|
|
|
|
from .base import BaseScraper
|
|
|
|
|
|
class TwitterScraper(BaseScraper):
|
|
"""
|
|
Scraper for Twitter/X comments (replies) using Twitter API v2.
|
|
Extracts replies to tweets from a specified user.
|
|
"""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
"""
|
|
Initialize Twitter scraper.
|
|
|
|
Args:
|
|
config: Dictionary with 'bearer_token' and optionally 'username'
|
|
"""
|
|
super().__init__(config)
|
|
self.bearer_token = config.get('bearer_token')
|
|
if not self.bearer_token:
|
|
raise ValueError(
|
|
"Twitter bearer token is required. "
|
|
"Set TWITTER_BEARER_TOKEN in your .env file."
|
|
)
|
|
|
|
self.default_username = config.get('username', 'elonmusk')
|
|
if not config.get('username'):
|
|
self.logger.warning(
|
|
"Twitter username not provided. "
|
|
"Set TWITTER_USERNAME in your .env file to specify which account to scrape."
|
|
)
|
|
|
|
self.client = tweepy.Client(
|
|
bearer_token=self.bearer_token,
|
|
wait_on_rate_limit=True
|
|
)
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
def scrape_comments(
|
|
self,
|
|
username: str = None,
|
|
max_tweets: int = 50,
|
|
max_replies_per_tweet: int = 100,
|
|
**kwargs
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scrape replies (comments) from a Twitter/X user's tweets.
|
|
|
|
Args:
|
|
username: Twitter username to scrape (uses default from config if not provided)
|
|
max_tweets: Maximum number of tweets to fetch
|
|
max_replies_per_tweet: Maximum replies per tweet
|
|
|
|
Returns:
|
|
List of standardized comment dictionaries
|
|
"""
|
|
username = username or self.default_username
|
|
if not username:
|
|
raise ValueError("Username is required")
|
|
|
|
all_comments = []
|
|
|
|
self.logger.info(f"Starting Twitter comment extraction for @{username}")
|
|
|
|
try:
|
|
# Get user ID
|
|
user = self.client.get_user(username=username)
|
|
if not user.data:
|
|
self.logger.error(f"User @{username} not found")
|
|
return all_comments
|
|
|
|
user_id = user.data.id
|
|
self.logger.info(f"Found user ID: {user_id}")
|
|
|
|
# Fetch tweets and their replies
|
|
tweet_count = 0
|
|
for tweet in tweepy.Paginator(
|
|
self.client.get_users_tweets,
|
|
id=user_id,
|
|
max_results=100
|
|
).flatten(limit=max_tweets):
|
|
|
|
tweet_count += 1
|
|
self.logger.info(f"Processing tweet {tweet_count}/{max_tweets} (ID: {tweet.id})")
|
|
|
|
# Search for replies to this tweet
|
|
replies = self._get_tweet_replies(tweet.id, max_replies_per_tweet)
|
|
|
|
for reply in replies:
|
|
comment = self._extract_comment(tweet, reply)
|
|
if comment:
|
|
all_comments.append(comment)
|
|
|
|
self.logger.info(f" - Found {len(replies)} replies for this tweet")
|
|
|
|
self.logger.info(f"Completed Twitter scraping. Total comments: {len(all_comments)}")
|
|
return all_comments
|
|
|
|
except tweepy.errors.NotFound:
|
|
self.logger.error(f"User @{username} not found or account is private")
|
|
return all_comments
|
|
except tweepy.errors.Forbidden:
|
|
self.logger.error(f"Access forbidden for @{username}. Check API permissions.")
|
|
return all_comments
|
|
except tweepy.errors.TooManyRequests:
|
|
self.logger.error("Twitter API rate limit exceeded")
|
|
return all_comments
|
|
except Exception as e:
|
|
self.logger.error(f"Error scraping Twitter: {e}")
|
|
raise
|
|
|
|
def _get_tweet_replies(self, tweet_id: str, max_replies: int) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get replies for a specific tweet.
|
|
|
|
Args:
|
|
tweet_id: Original tweet ID
|
|
max_replies: Maximum number of replies to fetch
|
|
|
|
Returns:
|
|
List of reply tweet objects
|
|
"""
|
|
replies = []
|
|
|
|
# Search for replies using conversation_id
|
|
query = f"conversation_id:{tweet_id} is:reply"
|
|
|
|
try:
|
|
for reply in tweepy.Paginator(
|
|
self.client.search_recent_tweets,
|
|
query=query,
|
|
tweet_fields=['author_id', 'created_at', 'text'],
|
|
max_results=100
|
|
).flatten(limit=max_replies):
|
|
replies.append(reply)
|
|
except Exception as e:
|
|
self.logger.warning(f"Error fetching replies for tweet {tweet_id}: {e}")
|
|
|
|
return replies
|
|
|
|
def _extract_comment(self, original_tweet: Dict[str, Any], reply_tweet: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extract and standardize a reply (comment) from Twitter API response.
|
|
|
|
Args:
|
|
original_tweet: Original tweet object
|
|
reply_tweet: Reply tweet object
|
|
|
|
Returns:
|
|
Standardized comment dictionary
|
|
"""
|
|
try:
|
|
# Extract reply data
|
|
reply_id = str(reply_tweet.id)
|
|
reply_text = reply_tweet.text
|
|
reply_author_id = str(reply_tweet.author_id)
|
|
reply_created_at = reply_tweet.created_at
|
|
|
|
# Extract original tweet data
|
|
original_tweet_id = str(original_tweet.id)
|
|
|
|
# Build Twitter URL
|
|
twitter_url = f"https://twitter.com/x/status/{original_tweet_id}"
|
|
|
|
comment_data = {
|
|
'comment_id': reply_id,
|
|
'comments': reply_text,
|
|
'author': reply_author_id,
|
|
'published_at': self._parse_timestamp(reply_created_at.isoformat()),
|
|
'like_count': 0, # Twitter API v2 doesn't provide like count for replies in basic query
|
|
'reply_count': 0, # Would need additional API call
|
|
'post_id': original_tweet_id,
|
|
'media_url': twitter_url,
|
|
'raw_data': {
|
|
'original_tweet_id': original_tweet_id,
|
|
'original_tweet_text': original_tweet.text,
|
|
'reply_id': reply_id,
|
|
'reply_author_id': reply_author_id,
|
|
'reply_text': reply_text,
|
|
'reply_at': reply_created_at.isoformat()
|
|
}
|
|
}
|
|
|
|
return self._standardize_comment(comment_data)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error extracting Twitter comment: {e}")
|
|
return None
|