# jobs/linkedin_service.py
import uuid
import requests
import logging
import time
from urllib.parse import quote, urlencode
from .utils import get_linkedin_config,get_setting
logger = logging.getLogger(__name__)
# Define constants
LINKEDIN_API_VERSION = get_setting('LINKEDIN_API_VERSION', '2.0.0')
LINKEDIN_VERSION = get_setting('LINKEDIN_VERSION', '202301')
class LinkedInService:
def __init__(self):
config = get_linkedin_config()
self.client_id = config['LINKEDIN_CLIENT_ID']
self.client_secret = config['LINKEDIN_CLIENT_SECRET']
self.redirect_uri = config['LINKEDIN_REDIRECT_URI']
self.access_token = None
# Configuration for image processing wait time
self.ASSET_STATUS_TIMEOUT = 15
self.ASSET_STATUS_INTERVAL = 2
# ---------------- AUTHENTICATION & PROFILE ----------------
def get_auth_url(self):
"""Generate LinkedIn OAuth URL"""
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': 'w_member_social openid profile',
'state': 'university_ats_linkedin'
}
return f"https://www.linkedin.com/oauth/v2/authorization?{urlencode(params)}"
def get_access_token(self, code):
"""Exchange authorization code for access token"""
url = "https://www.linkedin.com/oauth/v2/accessToken"
data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.redirect_uri,
'client_id': self.client_id,
'client_secret': self.client_secret
}
try:
response = requests.post(url, data=data, timeout=60)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data.get('access_token')
return self.access_token
except Exception as e:
logger.error(f"Error getting access token: {e}")
raise
def get_user_profile(self):
"""Get user profile information (used to get person URN)"""
if not self.access_token:
raise Exception("No access token available")
url = "https://api.linkedin.com/v2/userinfo"
headers = {'Authorization': f'Bearer {self.access_token}'}
try:
response = requests.get(url, headers=headers, timeout=60)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error getting user profile: {e}")
raise
# ---------------- ASSET UPLOAD & STATUS ----------------
def get_asset_status(self, asset_urn):
"""Checks the status of a registered asset (image) to ensure it's READY."""
url = f"https://api.linkedin.com/v2/assets/{quote(asset_urn)}"
headers = {
'Authorization': f'Bearer {self.access_token}',
'X-Restli-Protocol-Version': LINKEDIN_API_VERSION,
'LinkedIn-Version': LINKEDIN_VERSION,
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response.json().get('status')
except Exception as e:
logger.error(f"Error checking asset status for {asset_urn}: {e}")
return "FAILED"
def register_image_upload(self, person_urn):
"""Step 1: Register image upload with LinkedIn, getting the upload URL and asset URN."""
url = "https://api.linkedin.com/v2/assets?action=registerUpload"
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json',
'X-Restli-Protocol-Version': LINKEDIN_API_VERSION,
'LinkedIn-Version': LINKEDIN_VERSION,
}
payload = {
"registerUploadRequest": {
"recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
"owner": f"urn:li:person:{person_urn}",
"serviceRelationships": [{
"relationshipType": "OWNER",
"identifier": "urn:li:userGeneratedContent"
}]
}
}
response = requests.post(url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
return {
'upload_url': data['value']['uploadMechanism']['com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest']['uploadUrl'],
'asset': data['value']['asset']
}
def upload_image_to_linkedin(self, upload_url, image_file, asset_urn):
"""Step 2: Upload image file and poll for 'READY' status."""
image_file.open()
image_content = image_file.read()
image_file.seek(0) # Reset pointer after reading
image_file.close()
headers = {
'Authorization': f'Bearer {self.access_token}',
}
response = requests.post(upload_url, headers=headers, data=image_content, timeout=60)
response.raise_for_status()
# --- POLL FOR ASSET STATUS ---
start_time = time.time()
while time.time() - start_time < self.ASSET_STATUS_TIMEOUT:
try:
status = self.get_asset_status(asset_urn)
if status == "READY" or status == "PROCESSING":
if status == "READY":
logger.info(f"Asset {asset_urn} is READY. Proceeding.")
return True
if status == "FAILED":
raise Exception(f"LinkedIn image processing failed for asset {asset_urn}")
logger.info(f"Asset {asset_urn} status: {status}. Waiting...")
time.sleep(self.ASSET_STATUS_INTERVAL)
except Exception as e:
logger.warning(f"Error during asset status check for {asset_urn}: {e}. Retrying.")
time.sleep(self.ASSET_STATUS_INTERVAL * 2)
logger.warning(f"Asset {asset_urn} timed out, but upload succeeded. Forcing post attempt.")
return True
# ---------------- POSTING UTILITIES ----------------
# def clean_html_for_social_post(self, html_content):
# """Converts safe HTML to plain text with basic formatting."""
# if not html_content:
# return ""
# text = html_content
# # 1. Convert Bolding tags to *Markdown*
# text = re.sub(r'(.*?)', r'*\1*', text, flags=re.IGNORECASE)
# text = re.sub(r'(.*?)', r'*\1*', text, flags=re.IGNORECASE)
# # 2. Handle Lists: Convert
tags into a bullet point
# text = re.sub(r'(ul|ol|div)>', '\n', text, flags=re.IGNORECASE)
# text = re.sub(r']*>', 'β’ ', text, flags=re.IGNORECASE)
# text = re.sub(r'', '\n', text, flags=re.IGNORECASE)
# # 3. Handle Paragraphs and Line Breaks
# text = re.sub(r'', '\n\n', text, flags=re.IGNORECASE)
# text = re.sub(r'
', '\n', text, flags=re.IGNORECASE)
# # 4. Strip all remaining, unsupported HTML tags
# clean_text = re.sub(r'<[^>]+>', '', text)
# # 5. Unescape HTML entities
# clean_text = unescape(clean_text)
# # 6. Clean up excessive whitespace/newlines
# clean_text = re.sub(r'(\n\s*){3,}', '\n\n', clean_text).strip()
# return clean_text
# def hashtags_list(self, hash_tags_str):
# """Convert comma-separated hashtags string to list"""
# if not hash_tags_str:
# return ["#HigherEd", "#Hiring", "#UniversityJobs"]
# tags = [tag.strip() for tag in hash_tags_str.split(',') if tag.strip()]
# tags = [tag if tag.startswith('#') else f'#{tag}' for tag in tags]
# if not tags:
# return ["#HigherEd", "#Hiring", "#UniversityJobs"]
# return tags
# def _build_post_message(self, job_posting):
# """
# Constructs the final text message.
# Includes a unique suffix for duplicate content prevention (422 fix).
# """
# message_parts = [
# f"π₯ *Job Alert!* Weβre looking for a talented professional to join our team.",
# f"π **{job_posting.title}** π",
# ]
# if job_posting.department:
# message_parts.append(f"*{job_posting.department}*")
# message_parts.append("\n" + "=" * 25 + "\n")
# # KEY DETAILS SECTION
# details_list = []
# if job_posting.job_type:
# details_list.append(f"πΌ Type: {job_posting.get_job_type_display()}")
# if job_posting.get_location_display() != 'Not specified':
# details_list.append(f"π Location: {job_posting.get_location_display()}")
# if job_posting.workplace_type:
# details_list.append(f"π Workplace: {job_posting.get_workplace_type_display()}")
# if job_posting.salary_range:
# details_list.append(f"π° Salary: {job_posting.salary_range}")
# if details_list:
# message_parts.append("*Key Information*:")
# message_parts.extend(details_list)
# message_parts.append("\n")
# # DESCRIPTION SECTION
# clean_description = self.clean_html_for_social_post(job_posting.description)
# if clean_description:
# message_parts.append(f"π *About the Role:*\n{clean_description}")
# clean_
# # CALL TO ACTION
# if job_posting.application_url:
# message_parts.append(f"\n\n---")
# # CRITICAL: Include the URL explicitly in the text body.
# # When media_category is NONE, LinkedIn often makes these URLs clickable.
# message_parts.append(f"π **APPLY NOW:** {job_posting.application_url}")
# # HASHTAGS
# hashtags = self.hashtags_list(job_posting.hash_tags)
# if job_posting.department:
# dept_hashtag = f"#{job_posting.department.replace(' ', '')}"
# hashtags.insert(0, dept_hashtag)
# message_parts.append("\n" + " ".join(hashtags))
# final_message = "\n".join(message_parts)
# # --- FIX: ADD UNIQUE SUFFIX AND HANDLE LENGTH (422 fix) ---
# unique_suffix = f"\n\n| Ref: {int(time.time())}"
# available_length = MAX_POST_CHARS - len(unique_suffix)
# if len(final_message) > available_length:
# logger.warning("Post message truncated due to character limit.")
# final_message = final_message[:available_length - 3] + "..."
# return final_message + unique_suffix
# ---------------- MAIN POSTING METHODS ----------------
def _send_ugc_post(self, person_urn, job_posting, media_category="NONE", media_list=None):
"""
Private method to handle the final UGC post request.
CRITICAL FIX: Avoids ARTICLE category if not using an image to prevent 402 errors.
"""
message = job_posting.linkedin_post_formated_data
if len(message)>=3000:
message=message[:2900]+"...."
# --- FIX FOR 402: Force NONE if no image is present. ---
if media_category != "IMAGE":
# We explicitly force pure text share to avoid LinkedIn's link crawler
# which triggers the commercial 402 error on job reposts.
media_category = "NONE"
media_list = None
# --------------------------------------------------------
url = "https://api.linkedin.com/v2/ugcPosts"
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json',
'X-Restli-Protocol-Version': LINKEDIN_API_VERSION,
'LinkedIn-Version': LINKEDIN_VERSION,
}
specific_content = {
"com.linkedin.ugc.ShareContent": {
"shareCommentary": {"text": message},
"shareMediaCategory": media_category,
}
}
if media_list and media_category == "IMAGE":
specific_content["com.linkedin.ugc.ShareContent"]["media"] = media_list
payload = {
"author": f"urn:li:person:{person_urn}",
"lifecycleState": "PUBLISHED",
"specificContent": specific_content,
"visibility": {
"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
}
}
response = requests.post(url, headers=headers, json=payload, timeout=60)
# Log 402/422 details
if response.status_code in [402, 422]:
logger.error(f"{response.status_code} UGC Post Error Detail: {response.text}")
response.raise_for_status()
post_id = response.headers.get('x-restli-id', '')
post_url = f"https://www.linkedin.com/feed/update/{quote(post_id)}/" if post_id else ""
return {
'success': True,
'post_id': post_id,
'post_url': post_url,
'status_code': response.status_code
}
def create_job_post_with_image(self, job_posting, image_file, person_urn, asset_urn):
"""Creates the final LinkedIn post payload with the image asset."""
if not job_posting.application_url:
raise ValueError("Application URL is required for image link share on LinkedIn.")
# Media list for IMAGE category (retains link details)
# Note: This is an exception where we MUST provide link details for the image card
media_list = [{
"status": "READY",
"media": asset_urn,
"description": {"text": job_posting.title},
"originalUrl": job_posting.application_url,
"title": {"text": "Apply Now"}
}]
return self._send_ugc_post(
person_urn=person_urn,
job_posting=job_posting,
media_category="IMAGE",
media_list=media_list
)
def create_job_post(self, job_posting):
"""Main method to create a job announcement post (Image or Text)."""
if not self.access_token:
raise Exception("Not authenticated with LinkedIn")
try:
profile = self.get_user_profile()
person_urn = profile.get('sub')
if not person_urn:
raise Exception("Could not retrieve LinkedIn user ID")
asset_urn = None
has_image = False
# Check for image and attempt post
try:
image_upload = job_posting.post_images.first().post_image
has_image = image_upload is not None
except Exception:
pass
if has_image:
try:
# Steps 1, 2, 3 for image post
upload_info = self.register_image_upload(person_urn)
asset_urn = upload_info['asset']
self.upload_image_to_linkedin(
upload_info['upload_url'],
image_upload,
asset_urn
)
return self.create_job_post_with_image(
job_posting, image_upload, person_urn, asset_urn
)
except Exception as e:
logger.error(f"Image post failed, falling back to text: {e}")
has_image = False
# === FALLBACK TO PURE TEXT POST (shareMediaCategory: NONE) ===
# The _send_ugc_post method now ensures this is a PURE text post
# to avoid the 402/ARTICLE-related issues.
return self._send_ugc_post(
person_urn=person_urn,
job_posting=job_posting,
media_category="NONE"
)
except Exception as e:
logger.error(f"Error creating LinkedIn post: {e}")
status_code = getattr(getattr(e, 'response', None), 'status_code', 500)
return {
'success': False,
'error': str(e),
'status_code': status_code
}