2026-02-12 15:09:48 +03:00

267 lines
10 KiB
Python

import requests
import time
import base64
import hashlib
import secrets
import datetime
from urllib.parse import urlencode
from django.conf import settings
from django.utils import timezone
from apps.social.utils.x import XConfig
import logging
logger = logging.getLogger(__name__)
class XAPIError(Exception):
pass
class XRateLimitError(Exception):
"""Custom exception to signal Celery to retry with a countdown."""
def __init__(self, reset_at_timestamp):
self.reset_at = reset_at_timestamp
super().__init__(f"Rate limit hit. Retry after {reset_at_timestamp}")
class XService:
TWITTER_EPOCH = 1288834974657
# --- AUTHENTICATION ---
@staticmethod
def _get_auth_header():
"""
Creates the Authorization header using Basic Auth.
Required for Confidential Clients (Web Apps).
"""
auth_str = f"{settings.X_CLIENT_ID}:{settings.X_CLIENT_SECRET}"
encoded_auth = base64.b64encode(auth_str.encode()).decode()
return {"Authorization": f"Basic {encoded_auth}", "Content-Type": "application/x-www-form-urlencoded"}
@staticmethod
def generate_pkce_pair():
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').replace('=', '')
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode('utf-8')).digest()).decode('utf-8').replace('=', '')
return code_verifier, code_challenge
@staticmethod
def generate_auth_params():
code_verifier, code_challenge = XService.generate_pkce_pair()
state = secrets.token_urlsafe(32)
return code_verifier, code_challenge, state
@staticmethod
def get_auth_url(code_challenge, state):
# Note: PKCE uses client_id in URL, but token swap uses Basic Auth
params = {
"response_type": "code",
"client_id": settings.X_CLIENT_ID,
"redirect_uri": settings.X_REDIRECT_URI,
"scope": " ".join(XConfig.SCOPES),
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256"
}
return f"{XConfig.AUTH_URL}?{urlencode(params)}"
@staticmethod
def exchange_code_for_token(code, code_verifier):
# FIX: Use Basic Auth Header, not body client_id
headers = XService._get_auth_header()
payload = {
"code": code,
"grant_type": "authorization_code",
"redirect_uri": settings.X_REDIRECT_URI,
"code_verifier": code_verifier,
}
res = requests.post(XConfig.TOKEN_URL, headers=headers, data=payload)
data = res.json()
if 'error' in data:
raise XAPIError(data.get('error_description'))
expires_in = data.get('expires_in', 7200)
return {
"access_token": data['access_token'],
"refresh_token": data['refresh_token'],
"expires_at": timezone.now() + datetime.timedelta(seconds=expires_in)
}
@staticmethod
def refresh_tokens(account):
# FIX: Use Basic Auth Header
headers = XService._get_auth_header()
payload = {
"grant_type": "refresh_token",
"refresh_token": account.refresh_token,
}
res = requests.post(XConfig.TOKEN_URL, headers=headers, data=payload)
data = res.json()
if 'error' in data:
account.is_active = False
account.save()
raise XAPIError(data.get('error_description'))
account.access_token = data['access_token']
if 'refresh_token' in data:
account.refresh_token = data['refresh_token']
account.expires_at = timezone.now() + datetime.timedelta(seconds=data.get('expires_in', 7200))
account.save()
@staticmethod
def get_valid_token(account):
if account.expires_at <= timezone.now() + datetime.timedelta(minutes=5):
XService.refresh_tokens(account)
return account.access_token
@staticmethod
def _make_request(endpoint, account, method="GET", payload=None):
token = XService.get_valid_token(account)
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
url = f"{XConfig.BASE_URL}/{endpoint}"
try:
if method == "GET":
response = requests.get(url, headers=headers, params=payload)
else:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 429:
# FIX: Raise specific exception for Celery to handle, don't block worker
reset_header = response.headers.get('x-rate-limit-reset')
if reset_header:
reset_time = int(reset_header)
raise XRateLimitError(reset_time)
else:
# Fallback if header missing
raise XRateLimitError(int(time.time()) + 60)
if response.status_code >= 400:
raise XAPIError(f"API Error {response.status_code}: {response.text}")
return response.json()
except requests.exceptions.RequestException as e:
raise XAPIError(f"Network Error: {str(e)}")
# --- DATA FETCHING HELPERS ---
@staticmethod
def _datetime_to_snowflake_id(dt):
if not dt: return None
timestamp_ms = int(dt.timestamp() * 1000)
snowflake = (timestamp_ms - XService.TWITTER_EPOCH) << 22
return str(snowflake)
@staticmethod
def _datetime_to_iso_string(dt):
if not dt: return None
if dt.tzinfo: dt = dt.astimezone(datetime.timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
@staticmethod
def _attach_expansions(data, response):
"""
Maps users from 'includes' to their respective tweets in 'data'.
This allows the tasks to simply access r_data['author'].
"""
users = {u['id']: u for u in response.get('includes', {}).get('users', [])}
media = {m['media_key']: m for m in response.get('includes', {}).get('media', [])}
for item in data:
item['author'] = users.get(item.get('author_id'))
item['media_objects'] = [media[k] for k in item.get('attachments', {}).get('media_keys', []) if k in media]
return data
@staticmethod
def get_user_tweets(account):
tweets = []
next_token = None
while True:
# FIX: Added expansions
params = {
"tweet.fields": "created_at,conversation_id,author_id,attachments,public_metrics",
"expansions": "author_id,attachments.media_keys",
"user.fields": "username,name",
"media.fields": "type,url,preview_image_url,alt_text",
"max_results": 100,
"exclude": "retweets"
}
if next_token: params['pagination_token'] = next_token
data = XService._make_request(f"users/{account.platform_id}/tweets", account, "GET", params=params)
raw_tweets = data.get('data', [])
# Attach author/media data
enriched_tweets = XService._attach_expansions(raw_tweets, data)
tweets.extend(enriched_tweets)
next_token = data.get('meta', {}).get('next_token')
if not next_token: break
time.sleep(0.5) # Small politeness delay between pages
return tweets
@staticmethod
def fetch_tweet_replies(account, conversation_id, since_datetime=None, owner_id=None):
use_enterprise = getattr(settings, 'X_USE_ENTERPRISE', False)
endpoint = XConfig.SEARCH_RECENT_URL if not use_enterprise else XConfig.SEARCH_ALL_URL
# Note: Free Tier (Basic) does not support Search.
# If this returns 403 on Basic Tier, you are on Free Tier which cannot search replies.
next_token = None
replies = []
while True:
query = f"conversation_id:{conversation_id}"
if owner_id: query += f" to:{owner_id}"
query += " -is:retweet"
# FIX: Added expansions
params = {
"query": query,
"tweet.fields": "created_at,author_id,text,referenced_tweets,in_reply_to_user_id",
"expansions": "author_id",
"user.fields": "username,name",
"max_results": 100
}
if since_datetime:
if use_enterprise:
params['start_time'] = XService._datetime_to_iso_string(since_datetime)
else:
params['since_id'] = XService._datetime_to_snowflake_id(since_datetime)
if next_token: params['pagination_token'] = next_token
try:
data = XService._make_request(endpoint, account, "GET", params=params)
raw_replies = data.get('data', [])
if not raw_replies: break
# Attach authors so tasks can read .get('author')
enriched_replies = XService._attach_expansions(raw_replies, data)
replies.extend(enriched_replies)
next_token = data.get('meta', {}).get('next_token')
if not next_token: break
time.sleep(0.5) # Politeness delay
except XRateLimitError as e:
# Re-raise this specific exception for the task to handle
raise e
except XAPIError as e:
if "403" in str(e):
# Free tier limitation: Cannot use search endpoint
logger.warning(f"Search API Forbidden. Account might be on Free Tier.")
break # Stop task on other errors to prevent loops
return replies
@staticmethod
def post_reply(account, tweet_id, text):
payload = {"text": text, "reply": {"in_reply_to_tweet_id": tweet_id}}
return XService._make_request("tweets", account, "POST", payload=payload)