from celery import shared_task from django.utils import timezone from django.db import transaction from apps.social.models import SocialAccount, SocialContent, SocialComment from apps.social.services.linkedin import LinkedInService, LinkedInAPIError from apps.social.utils.linkedin import LinkedInConstants import logging import time logger = logging.getLogger(__name__) # ========================================== # INITIAL HISTORICAL SYNC (One-Time) # ========================================== @shared_task(bind=True, max_retries=3) def initial_historical_sync_task(self, account_id, max_comments_per_post=None): """ ONE-TIME initial historical sync when client first connects. Fetches posts and limited historical comments to bootstrap the database. """ if max_comments_per_post is None: max_comments_per_post = LinkedInConstants.INITIAL_SYNC_COMMENT_LIMIT try: account = SocialAccount.objects.select_for_update().get( id=account_id, platform_type='LI' ) # Check token validity if account.is_token_expired(): account.is_active = False account.save() logger.error(f"Account {account.name} token expired") return logger.info(f"Starting initial historical sync for {account.name}") # Fetch all posts (returns full URNs) posts_data = LinkedInService.fetch_posts(account, count=100) logger.info(f"Found {len(posts_data)} posts for {account.name}") total_comments_synced = 0 for idx, post_data in enumerate(posts_data, 1): post_urn = post_data.get('id') if not post_urn: logger.warning(f"Post missing ID, skipping: {post_data}") continue # Extract post data commentary = post_data.get('commentary', '') created_time_ms = post_data.get('created', {}).get('time') created_at = LinkedInService._parse_timestamp(created_time_ms) # Create or update post with transaction.atomic(): post, created = SocialContent.objects.get_or_create( platform_type='LI', content_id=post_urn, defaults={ 'account': account, 'text': commentary, 'created_at': created_at, 'content_data': post_data, 'last_comment_sync_at': None } ) if not created and post.last_comment_sync_at: logger.info(f"Post {post_urn} already synced, skipping") continue logger.info(f"Processing post {idx}/{len(posts_data)}: {post_urn}") # Fetch LIMITED historical comments using full URN comments_data = LinkedInService.fetch_comments_limited( account, post_urn, limit=max_comments_per_post ) logger.info(f"Found {len(comments_data)} comments for post {post_urn}") latest_comment_time = created_at # Save comments in batch with transaction.atomic(): for c_data in comments_data: comment_id = c_data.get('id') if not comment_id: continue c_time_ms = c_data.get('created', {}).get('time') c_time = LinkedInService._parse_timestamp(c_time_ms) actor = c_data.get('actor', 'Unknown') message_text = c_data.get('message', {}).get('text', '') SocialComment.objects.update_or_create( platform_type='LI', comment_id=comment_id, defaults={ 'account': account, 'content': post, 'author_name': actor, 'author_id': actor, 'text': message_text, 'created_at': c_time, 'synced_via_webhook': False, 'comment_data': c_data } ) if c_time > latest_comment_time: latest_comment_time = c_time total_comments_synced += 1 # Update post's last sync timestamp post.last_comment_sync_at = latest_comment_time post.save() # Rate limit protection time.sleep(0.5) # Mark account as synced account.last_synced_at = timezone.now() account.save() logger.info( f"✅ Initial historical sync complete for {account.name}: " f"{len(posts_data)} posts, {total_comments_synced} comments" ) except SocialAccount.DoesNotExist: logger.error(f"Account {account_id} not found") except Exception as e: logger.error(f"Initial historical sync error for account {account_id}: {e}") # Retry with exponential backoff raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries)) # ========================================== # DELTA SYNC FOR NEW COMMENTS (Scheduled) # ========================================== @shared_task(bind=True, max_retries=3) def sync_new_comments_task(self, account_id): """ DELTA SYNC for fetching only NEW comments since last sync. """ try: account = SocialAccount.objects.select_for_update().get( id=account_id, platform_type='LI' ) # Check token validity if account.is_token_expired(): account.is_active = False account.save() logger.error(f"Account {account.name} token expired") return # Check if account has been initially synced if not account.last_synced_at: logger.warning( f"Account {account.name} not initially synced. " f"Run initial_historical_sync_task first." ) return logger.info(f"Starting delta sync for new comments: {account.name}") # Fetch posts posts_data = LinkedInService.fetch_posts(account, count=100) total_new_comments = 0 for post_data in posts_data: post_urn = post_data.get('id') if not post_urn: continue # Get or create post record try: post = SocialContent.objects.get( platform_type='LI', content_id=post_urn ) except SocialContent.DoesNotExist: # This post wasn't in initial sync, create it commentary = post_data.get('commentary', '') created_time_ms = post_data.get('created', {}).get('time') created_at = LinkedInService._parse_timestamp(created_time_ms) post = SocialContent.objects.create( platform_type='LI', content_id=post_urn, account=account, text=commentary, created_at=created_at, content_data=post_data, last_comment_sync_at=None ) # Fetch only NEW comments since last sync using full URN since_timestamp = post.last_comment_sync_at comments_data = LinkedInService.fetch_comments_delta( account, post_urn, since_timestamp=since_timestamp ) if not comments_data: continue logger.info(f"Found {len(comments_data)} new comments for post {post_urn}") latest_comment_time = since_timestamp or timezone.now() # Save new comments with transaction.atomic(): for c_data in comments_data: comment_id = c_data.get('id') if not comment_id: continue c_time_ms = c_data.get('created', {}).get('time') c_time = LinkedInService._parse_timestamp(c_time_ms) actor = c_data.get('actor', 'Unknown') message_text = c_data.get('message', {}).get('text', '') comment, created = SocialComment.objects.update_or_create( platform_type='LI', comment_id=comment_id, defaults={ 'account': account, 'content': post, 'author_name': actor, 'author_id': actor, 'text': message_text, 'created_at': c_time, 'synced_via_webhook': False, 'comment_data': c_data } ) if created: total_new_comments += 1 if c_time > latest_comment_time: latest_comment_time = c_time # Update sync timestamp if latest_comment_time > (post.last_comment_sync_at or timezone.make_aware(timezone.datetime(1970, 1, 1))): post.last_comment_sync_at = latest_comment_time post.save() time.sleep(0.3) # Update account sync time account.last_synced_at = timezone.now() account.save() logger.info( f"✅ Delta sync complete for {account.name}: " f"{total_new_comments} new comments" ) except SocialAccount.DoesNotExist: logger.error(f"Account {account_id} not found") except Exception as e: logger.error(f"Delta sync error for account {account_id}: {e}") raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries)) # ========================================== # WEBHOOK PROCESSING (Real-Time) # ========================================== @shared_task(bind=True, max_retries=3) def process_webhook_comment_task(self, account_id, post_urn, comment_id): """ Process a single comment triggered by webhook (REAL-TIME sync). post_urn: Full URN expected (e.g., urn:li:share:123) """ try: account = SocialAccount.objects.get(id=account_id, platform_type='LI') logger.info(f"Processing webhook comment {comment_id} for post {post_urn}") # Fetch the specific comment (most efficient) comment_data = LinkedInService.fetch_single_comment(account, post_urn, comment_id) if not comment_data: logger.warning( f"Could not fetch comment {comment_id} for post {post_urn}. " f"It may have been deleted." ) return # Get or create the post try: content = SocialContent.objects.get( platform_type='LI', content_id=post_urn ) except SocialContent.DoesNotExist: logger.warning(f"Post {post_urn} not found in database, creating it") # Post doesn't exist, we need to fetch it posts_data = LinkedInService.fetch_posts(account, count=1) matching_post = next((p for p in posts_data if p.get('id') == post_urn), None) if matching_post: commentary = matching_post.get('commentary', '') created_time_ms = matching_post.get('created', {}).get('time') created_at = LinkedInService._parse_timestamp(created_time_ms) content = SocialContent.objects.create( platform_type='LI', content_id=post_urn, account=account, text=commentary, created_at=created_at, content_data=matching_post ) else: logger.error(f"Could not fetch post {post_urn} from API") return # Parse comment data c_time_ms = comment_data.get('created', {}).get('time') c_time = LinkedInService._parse_timestamp(c_time_ms) actor = comment_data.get('actor', 'Unknown') message_text = comment_data.get('message', {}).get('text', '') # Save or update comment with transaction.atomic(): comment, created = SocialComment.objects.update_or_create( platform_type='LI', comment_id=comment_id, defaults={ 'account': account, 'content': content, 'author_name': actor, 'author_id': actor, 'text': message_text, 'created_at': c_time, 'synced_via_webhook': True, 'comment_data': comment_data } ) # Update post's last comment sync time if c_time > (content.last_comment_sync_at or timezone.make_aware(timezone.datetime(1970, 1, 1))): content.last_comment_sync_at = c_time content.save() action = "Created" if created else "Updated" logger.info(f"✅ {action} comment {comment_id} via webhook") except SocialAccount.DoesNotExist: logger.error(f"Account {account_id} not found for webhook processing") except Exception as e: logger.error(f"Webhook processing error: {e}") raise self.retry(exc=e, countdown=30 * (2 ** self.request.retries)) # ========================================== # BULK OPERATIONS # ========================================== @shared_task def sync_all_accounts_task(): """ Wrapper to sync ALL active accounts (for scheduled Celery Beat job). Schedule this with Celery Beat to run every 15-30 minutes. """ accounts = SocialAccount.objects.filter( platform_type='LI', is_active=True, last_synced_at__isnull=False # Only sync accounts that have been initially synced ) logger.info(f"Starting delta sync for {accounts.count()} LinkedIn accounts") for account in accounts: # Queue each account sync as separate task sync_new_comments_task.delay(account.id) logger.info(f"Queued delta sync tasks for {accounts.count()} accounts") # @shared_task # def cleanup_old_comments_task(days_to_keep=90): # """Optional: Clean up very old comments to save database space.""" # cutoff_date = timezone.now() - timezone.timedelta(days=days_to_keep) # deleted_count, _ = SocialComment.objects.filter( # platform_type='LI', # created_at__lt=cutoff_date # ).delete() # logger.info(f"Cleaned up {deleted_count} LinkedIn comments older than {days_to_keep} days")