# jobs/linkedin_service.py
import uuid
import re
from html import unescape
from urllib.parse import quote, urlencode
import requests
import logging
from django.conf import settings
import time
import random
from django.utils import timezone
logger = logging.getLogger(__name__)
# Define a constant for the API version for better maintenance
LINKEDIN_API_VERSION = '2.0.0'
LINKEDIN_VERSION = '202409' # Modern API version for header control
class LinkedInService:
def __init__(self):
self.client_id = settings.LINKEDIN_CLIENT_ID
self.client_secret = settings.LINKEDIN_CLIENT_SECRET
self.redirect_uri = settings.LINKEDIN_REDIRECT_URI
self.access_token = None
# Configuration for image processing wait time
self.ASSET_STATUS_TIMEOUT = 15 # Max time (seconds) to wait for image processing
self.ASSET_STATUS_INTERVAL = 2 # Check every 2 seconds
# --- AUTHENTICATION & PROFILE ---
def get_auth_url(self):
"""Generate LinkedIn OAuth URL"""
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': 'w_member_social openid profile',
'state': 'university_ats_linkedin'
}
return f"https://www.linkedin.com/oauth/v2/authorization?{urlencode(params)}"
def get_access_token(self, code):
"""Exchange authorization code for access token"""
url = "https://www.linkedin.com/oauth/v2/accessToken"
data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.redirect_uri,
'client_id': self.client_id,
'client_secret': self.client_secret
}
try:
response = requests.post(url, data=data, timeout=60)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data.get('access_token')
return self.access_token
except Exception as e:
logger.error(f"Error getting access token: {e}")
raise
def get_user_profile(self):
"""Get user profile information (used to get person URN)"""
if not self.access_token:
raise Exception("No access token available")
url = "https://api.linkedin.com/v2/userinfo"
headers = {'Authorization': f'Bearer {self.access_token}'}
try:
response = requests.get(url, headers=headers, timeout=60)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error getting user profile: {e}")
raise
# --- ASSET UPLOAD & STATUS ---
def get_asset_status(self, asset_urn):
"""Checks the status of a registered asset (image) to ensure it's READY."""
url = f"https://api.linkedin.com/v2/assets/{quote(asset_urn)}"
headers = {
'Authorization': f'Bearer {self.access_token}',
'X-Restli-Protocol-Version': LINKEDIN_API_VERSION,
'LinkedIn-Version': LINKEDIN_VERSION,
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response.json().get('status')
except Exception as e:
logger.error(f"Error checking asset status for {asset_urn}: {e}")
return "FAILED"
def register_image_upload(self, person_urn):
"""Step 1: Register image upload with LinkedIn, getting the upload URL and asset URN."""
url = "https://api.linkedin.com/v2/assets?action=registerUpload"
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json',
'X-Restli-Protocol-Version': LINKEDIN_API_VERSION,
'LinkedIn-Version': LINKEDIN_VERSION,
}
payload = {
"registerUploadRequest": {
"recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
"owner": f"urn:li:person:{person_urn}",
"serviceRelationships": [{
"relationshipType": "OWNER",
"identifier": "urn:li:userGeneratedContent"
}]
}
}
response = requests.post(url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
return {
'upload_url': data['value']['uploadMechanism']['com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest']['uploadUrl'],
'asset': data['value']['asset']
}
def upload_image_to_linkedin(self, upload_url, image_file, asset_urn):
"""Step 2: Upload image file and poll for 'READY' status."""
image_file.open()
image_content = image_file.read()
image_file.close()
headers = {
'Authorization': f'Bearer {self.access_token}',
}
response = requests.post(upload_url, headers=headers, data=image_content, timeout=60)
response.raise_for_status()
# --- CRITICAL FIX: POLL FOR ASSET STATUS ---
start_time = time.time()
while time.time() - start_time < self.ASSET_STATUS_TIMEOUT:
try:
status = self.get_asset_status(asset_urn)
if status == "READY" or status == "PROCESSING":
if status == "READY":
logger.info(f"Asset {asset_urn} is READY. Proceeding.")
return True
if status == "FAILED":
raise Exception(f"LinkedIn image processing failed for asset {asset_urn}")
logger.info(f"Asset {asset_urn} status: {status}. Waiting...")
time.sleep(self.ASSET_STATUS_INTERVAL)
except Exception as e:
logger.warning(f"Error during asset status check for {asset_urn}: {e}. Retrying.")
time.sleep(self.ASSET_STATUS_INTERVAL * 2)
# If the loop times out, return True to attempt post, but log warning
logger.warning(f"Asset {asset_urn} timed out, but upload succeeded. Forcing post attempt.")
return True
# --- POSTING LOGIC ---
def clean_html_for_social_post(self, html_content):
"""Converts safe HTML to plain text with basic formatting (bullets, bold, newlines)."""
if not html_content:
return ""
text = html_content
# 1. Convert Bolding tags to *Markdown*
text = re.sub(r'(.*?)', r'*\1*', text, flags=re.IGNORECASE)
text = re.sub(r'(.*?)', r'*\1*', text, flags=re.IGNORECASE)
# 2. Handle Lists: Convert
tags into a bullet point
text = re.sub(r'(ul|ol|div)>', '\n', text, flags=re.IGNORECASE)
text = re.sub(r']*>', 'β’ ', text, flags=re.IGNORECASE)
text = re.sub(r'', '\n', text, flags=re.IGNORECASE)
# 3. Handle Paragraphs and Line Breaks
text = re.sub(r'', '\n\n', text, flags=re.IGNORECASE)
text = re.sub(r'
', '\n', text, flags=re.IGNORECASE)
# 4. Strip all remaining, unsupported HTML tags
clean_text = re.sub(r'<[^>]+>', '', text)
# 5. Unescape HTML entities
clean_text = unescape(clean_text)
# 6. Clean up excessive whitespace/newlines
clean_text = re.sub(r'(\n\s*){3,}', '\n\n', clean_text).strip()
return clean_text
def hashtags_list(self, hash_tags_str):
"""Convert comma-separated hashtags string to list"""
if not hash_tags_str:
return ["#HigherEd", "#Hiring", "#UniversityJobs"]
tags = [tag.strip() for tag in hash_tags_str.split(',') if tag.strip()]
tags = [tag if tag.startswith('#') else f'#{tag}' for tag in tags]
if not tags:
return ["#HigherEd", "#Hiring", "#UniversityJobs"]
return tags
def _build_post_message(self, job_posting):
"""Centralized logic to construct the professionally formatted text message."""
message_parts = [
f"π₯ *Job Alert!* Weβre looking for a talented professional to join our team.",
f"π **{job_posting.title}** π",
]
if job_posting.department:
message_parts.append(f"*{job_posting.department}*")
message_parts.append("\n" + "=" * 25 + "\n")
# KEY DETAILS SECTION
details_list = []
if job_posting.job_type:
details_list.append(f"πΌ Type: {job_posting.get_job_type_display()}")
if job_posting.get_location_display() != 'Not specified':
details_list.append(f"π Location: {job_posting.get_location_display()}")
if job_posting.workplace_type:
details_list.append(f"π Workplace: {job_posting.get_workplace_type_display()}")
if job_posting.salary_range:
details_list.append(f"π° Salary: {job_posting.salary_range}")
if details_list:
message_parts.append("*Key Information*:")
message_parts.extend(details_list)
message_parts.append("\n")
# DESCRIPTION SECTION
clean_description = self.clean_html_for_social_post(job_posting.description)
if clean_description:
message_parts.append(f"π *About the Role:*\n{clean_description}")
# CALL TO ACTION
if job_posting.application_url:
message_parts.append(f"\n\n---")
message_parts.append(f"π **APPLY NOW:** {job_posting.application_url}")
# HASHTAGS
hashtags = self.hashtags_list(job_posting.hash_tags)
if job_posting.department:
dept_hashtag = f"#{job_posting.department.replace(' ', '')}"
hashtags.insert(0, dept_hashtag)
message_parts.append("\n" + " ".join(hashtags))
return "\n".join(message_parts)
def _send_ugc_post(self, person_urn, job_posting, media_category="NONE", media_list=None):
"""
New private method to handle the final UGC post request (text or image).
This eliminates the duplication between create_job_post and create_job_post_with_image.
"""
message = self._build_post_message(job_posting)
url = "https://api.linkedin.com/v2/ugcPosts"
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json',
'X-Restli-Protocol-Version': LINKEDIN_API_VERSION,
'LinkedIn-Version': LINKEDIN_VERSION,
}
specific_content = {
"com.linkedin.ugc.ShareContent": {
"shareCommentary": {"text": message},
"shareMediaCategory": media_category,
}
}
if media_list:
specific_content["com.linkedin.ugc.ShareContent"]["media"] = media_list
payload = {
"author": f"urn:li:person:{person_urn}",
"lifecycleState": "PUBLISHED",
"specificContent": specific_content,
"visibility": {
"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
}
}
response = requests.post(url, headers=headers, json=payload, timeout=60)
response.raise_for_status()
post_id = response.headers.get('x-restli-id', '')
post_url = f"https://www.linkedin.com/feed/update/{quote(post_id)}/" if post_id else ""
return {
'success': True,
'post_id': post_id,
'post_url': post_url,
'status_code': response.status_code
}
def create_job_post_with_image(self, job_posting, image_file, person_urn, asset_urn):
"""Step 3: Creates the final LinkedIn post payload with the image asset."""
# Prepare the media list for the _send_ugc_post helper
media_list = [{
"status": "READY",
"media": asset_urn,
"description": {"text": job_posting.title},
"originalUrl": job_posting.application_url,
"title": {"text": "Apply Now"}
}]
# Use the helper method to send the post
return self._send_ugc_post(
person_urn=person_urn,
job_posting=job_posting,
media_category="IMAGE",
media_list=media_list
)
def create_job_post(self, job_posting):
"""Main method to create a job announcement post (Image or Text)."""
if not self.access_token:
raise Exception("Not authenticated with LinkedIn")
try:
profile = self.get_user_profile()
person_urn = profile.get('sub')
if not person_urn:
raise Exception("Could not retrieve LinkedIn user ID")
asset_urn = None
has_image = False
# Check for image and attempt post
try:
# Assuming correct model path: job_posting.related_model_name.first().image_field_name
image_upload = job_posting.post_images.first().post_image
has_image = image_upload is not None
except Exception:
pass # No image available
if has_image:
try:
# Step 1: Register
upload_info = self.register_image_upload(person_urn)
asset_urn = upload_info['asset']
# Step 2: Upload and WAIT FOR READY (Crucial for 422 fix)
self.upload_image_to_linkedin(
upload_info['upload_url'],
image_upload,
asset_urn
)
# Step 3: Create post with image
return self.create_job_post_with_image(
job_posting, image_upload, person_urn, asset_urn
)
except Exception as e:
logger.error(f"Image post failed, falling back to text: {e}")
# Force fallback to text-only if image posting fails
has_image = False
# === FALLBACK TO PURE TEXT POST (shareMediaCategory: NONE) ===
# Use the single helper method here
return self._send_ugc_post(
person_urn=person_urn,
job_posting=job_posting,
media_category="NONE"
)
except Exception as e:
logger.error(f"Error creating LinkedIn post: {e}")
return {
'success': False,
'error': str(e),
'status_code': getattr(e.response, 'status_code', 500) if hasattr(e, 'response') else 500
}