HH/apps/social/scrapers/linkedin.py
2026-01-15 14:31:58 +03:00

263 lines
9.3 KiB
Python

"""
LinkedIn comment scraper using LinkedIn Marketing API.
"""
import logging
from typing import List, Dict, Any
import requests
from .base import BaseScraper
class LinkedInScraper(BaseScraper):
"""
Scraper for LinkedIn comments using LinkedIn Marketing API.
Extracts comments from organization posts.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize LinkedIn scraper.
Args:
config: Dictionary with 'access_token' and 'organization_id'
"""
super().__init__(config)
self.access_token = config.get('access_token')
if not self.access_token:
raise ValueError(
"LinkedIn access token is required. "
"Set LINKEDIN_ACCESS_TOKEN in your .env file."
)
self.org_id = config.get('organization_id')
if not self.org_id:
raise ValueError(
"LinkedIn organization ID is required. "
"Set LINKEDIN_ORGANIZATION_ID in your .env file."
)
self.api_version = config.get('api_version', '202401')
self.headers = {
'Authorization': f'Bearer {self.access_token}',
'LinkedIn-Version': self.api_version,
'X-Restli-Protocol-Version': '2.0.0',
'Content-Type': 'application/json'
}
self.base_url = "https://api.linkedin.com/rest"
self.logger = logging.getLogger(self.__class__.__name__)
def scrape_comments(
self,
organization_id: str = None,
max_posts: int = 50,
max_comments_per_post: int = 100,
**kwargs
) -> List[Dict[str, Any]]:
"""
Scrape comments from LinkedIn organization posts.
Args:
organization_id: LinkedIn organization URN (e.g., 'urn:li:organization:1234567')
max_posts: Maximum number of posts to scrape
max_comments_per_post: Maximum comments to fetch per post
Returns:
List of standardized comment dictionaries
"""
organization_id = organization_id or self.org_id
if not organization_id:
raise ValueError("Organization ID is required")
all_comments = []
self.logger.info(f"Starting LinkedIn comment extraction for {organization_id}")
try:
# Get all posts for the organization
posts = self._get_all_page_posts(organization_id)
self.logger.info(f"Found {len(posts)} posts")
# Limit posts if needed
if max_posts and len(posts) > max_posts:
posts = posts[:max_posts]
self.logger.info(f"Limited to {max_posts} posts")
# Extract comments from each post
for i, post_urn in enumerate(posts, 1):
self.logger.info(f"Processing post {i}/{len(posts)}: {post_urn}")
try:
comments = self._get_comments_for_post(
post_urn,
max_comments=max_comments_per_post
)
for comment in comments:
standardized = self._extract_comment(post_urn, comment)
if standardized:
all_comments.append(standardized)
self.logger.info(f" - Found {len(comments)} comments")
except Exception as e:
self.logger.warning(f"Error processing post {post_urn}: {e}")
continue
self.logger.info(f"Completed LinkedIn scraping. Total comments: {len(all_comments)}")
return all_comments
except Exception as e:
self.logger.error(f"Error scraping LinkedIn: {e}")
raise
def _get_all_page_posts(self, org_urn: str, count: int = 50) -> List[str]:
"""
Retrieves all post URNs for the organization.
Args:
org_urn: Organization URN
count: Number of posts per request
Returns:
List of post URNs
"""
posts = []
start = 0
while True:
# Finder query for posts by author
url = f"{self.base_url}/posts?author={org_urn}&q=author&count={count}&start={start}"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data = response.json()
if 'elements' not in data or not data['elements']:
break
posts.extend([item['id'] for item in data['elements']])
start += count
self.logger.debug(f"Retrieved {len(data['elements'])} posts (total: {len(posts)})")
except requests.exceptions.RequestException as e:
self.logger.error(f"Error fetching posts: {e}")
break
return posts
def _get_comments_for_post(self, post_urn: str, max_comments: int = 100) -> List[Dict[str, Any]]:
"""
Retrieves all comments for a specific post URN.
Args:
post_urn: Post URN
max_comments: Maximum comments to fetch
Returns:
List of comment objects
"""
comments = []
start = 0
count = 100
while True:
# Social Actions API for comments
url = f"{self.base_url}/socialActions/{post_urn}/comments?count={count}&start={start}"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data = response.json()
if 'elements' not in data or not data['elements']:
break
for comment in data['elements']:
comments.append(comment)
# Check if we've reached the limit
if len(comments) >= max_comments:
return comments[:max_comments]
start += count
# Check if we need to stop
if len(comments) >= max_comments:
return comments[:max_comments]
except requests.exceptions.RequestException as e:
self.logger.warning(f"Error fetching comments for post {post_urn}: {e}")
break
return comments[:max_comments]
def _extract_comment(self, post_urn: str, comment: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract and standardize a comment from LinkedIn API response.
Args:
post_urn: Post URN
comment: Comment object from LinkedIn API
Returns:
Standardized comment dictionary
"""
try:
# Extract comment data
comment_id = comment.get('id', '')
message = comment.get('message', {})
comment_text = message.get('text', '')
actor = comment.get('actor', '')
# Extract author information
author_id = ''
author_name = ''
if isinstance(actor, str):
author_id = actor
elif isinstance(actor, dict):
author_id = actor.get('id', '')
author_name = actor.get('firstName', '') + ' ' + actor.get('lastName', '')
# Extract created time
created_time = comment.get('created', {}).get('time', '')
# Extract social actions (likes)
social_actions = comment.get('socialActions', [])
like_count = 0
for action in social_actions:
if action.get('actionType') == 'LIKE':
like_count = action.get('actorCount', 0)
break
# Build LinkedIn URL
linkedin_url = post_urn.replace('urn:li:activity:', 'https://www.linkedin.com/feed/update/')
comment_data = {
'comment_id': comment_id,
'comments': comment_text,
'author': author_name or author_id,
'published_at': self._parse_timestamp(created_time) if created_time else None,
'like_count': like_count,
'reply_count': 0, # LinkedIn API doesn't provide reply count easily
'post_id': post_urn,
'media_url': linkedin_url,
'raw_data': {
'post_urn': post_urn,
'comment_id': comment_id,
'comment_text': comment_text,
'author_id': author_id,
'author_name': author_name,
'created_time': created_time,
'like_count': like_count,
'full_comment': comment
}
}
return self._standardize_comment(comment_data)
except Exception as e:
self.logger.error(f"Error extracting LinkedIn comment: {e}")
return None