422 lines
16 KiB
Python
422 lines
16 KiB
Python
# jobs/linkedin_service.py
|
||
import uuid
|
||
|
||
import requests
|
||
import logging
|
||
import time
|
||
from urllib.parse import quote, urlencode
|
||
from .utils import get_linkedin_config,get_setting
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Define constants
|
||
LINKEDIN_API_VERSION = get_setting('LINKEDIN_API_VERSION', '2.0.0')
|
||
LINKEDIN_VERSION = get_setting('LINKEDIN_VERSION', '202301')
|
||
|
||
|
||
class LinkedInService:
|
||
def __init__(self):
|
||
config = get_linkedin_config()
|
||
self.client_id = config['LINKEDIN_CLIENT_ID']
|
||
self.client_secret = config['LINKEDIN_CLIENT_SECRET']
|
||
self.redirect_uri = config['LINKEDIN_REDIRECT_URI']
|
||
self.access_token = None
|
||
# Configuration for image processing wait time
|
||
self.ASSET_STATUS_TIMEOUT = 15
|
||
self.ASSET_STATUS_INTERVAL = 2
|
||
|
||
# ---------------- AUTHENTICATION & PROFILE ----------------
|
||
|
||
def get_auth_url(self):
|
||
"""Generate LinkedIn OAuth URL"""
|
||
params = {
|
||
'response_type': 'code',
|
||
'client_id': self.client_id,
|
||
'redirect_uri': self.redirect_uri,
|
||
'scope': 'w_member_social openid profile',
|
||
'state': 'university_ats_linkedin'
|
||
}
|
||
return f"https://www.linkedin.com/oauth/v2/authorization?{urlencode(params)}"
|
||
|
||
def get_access_token(self, code):
|
||
"""Exchange authorization code for access token"""
|
||
url = "https://www.linkedin.com/oauth/v2/accessToken"
|
||
data = {
|
||
'grant_type': 'authorization_code',
|
||
'code': code,
|
||
'redirect_uri': self.redirect_uri,
|
||
'client_id': self.client_id,
|
||
'client_secret': self.client_secret
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, data=data, timeout=60)
|
||
response.raise_for_status()
|
||
token_data = response.json()
|
||
self.access_token = token_data.get('access_token')
|
||
return self.access_token
|
||
except Exception as e:
|
||
logger.error(f"Error getting access token: {e}")
|
||
raise
|
||
|
||
def get_user_profile(self):
|
||
"""Get user profile information (used to get person URN)"""
|
||
if not self.access_token:
|
||
raise Exception("No access token available")
|
||
|
||
url = "https://api.linkedin.com/v2/userinfo"
|
||
headers = {'Authorization': f'Bearer {self.access_token}'}
|
||
|
||
try:
|
||
response = requests.get(url, headers=headers, timeout=60)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
except Exception as e:
|
||
logger.error(f"Error getting user profile: {e}")
|
||
raise
|
||
|
||
# ---------------- ASSET UPLOAD & STATUS ----------------
|
||
|
||
def get_asset_status(self, asset_urn):
|
||
"""Checks the status of a registered asset (image) to ensure it's READY."""
|
||
url = f"https://api.linkedin.com/v2/assets/{quote(asset_urn)}"
|
||
headers = {
|
||
'Authorization': f'Bearer {self.access_token}',
|
||
'X-Restli-Protocol-Version': LINKEDIN_API_VERSION,
|
||
'LinkedIn-Version': LINKEDIN_VERSION,
|
||
}
|
||
|
||
try:
|
||
response = requests.get(url, headers=headers, timeout=10)
|
||
response.raise_for_status()
|
||
return response.json().get('status')
|
||
except Exception as e:
|
||
logger.error(f"Error checking asset status for {asset_urn}: {e}")
|
||
return "FAILED"
|
||
|
||
def register_image_upload(self, person_urn):
|
||
"""Step 1: Register image upload with LinkedIn, getting the upload URL and asset URN."""
|
||
url = "https://api.linkedin.com/v2/assets?action=registerUpload"
|
||
headers = {
|
||
'Authorization': f'Bearer {self.access_token}',
|
||
'Content-Type': 'application/json',
|
||
'X-Restli-Protocol-Version': LINKEDIN_API_VERSION,
|
||
'LinkedIn-Version': LINKEDIN_VERSION,
|
||
}
|
||
|
||
payload = {
|
||
"registerUploadRequest": {
|
||
"recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
|
||
"owner": f"urn:li:person:{person_urn}",
|
||
"serviceRelationships": [{
|
||
"relationshipType": "OWNER",
|
||
"identifier": "urn:li:userGeneratedContent"
|
||
}]
|
||
}
|
||
}
|
||
|
||
response = requests.post(url, headers=headers, json=payload, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
return {
|
||
'upload_url': data['value']['uploadMechanism']['com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest']['uploadUrl'],
|
||
'asset': data['value']['asset']
|
||
}
|
||
|
||
def upload_image_to_linkedin(self, upload_url, image_file, asset_urn):
|
||
"""Step 2: Upload image file and poll for 'READY' status."""
|
||
image_file.open()
|
||
image_content = image_file.read()
|
||
image_file.seek(0) # Reset pointer after reading
|
||
image_file.close()
|
||
|
||
headers = {
|
||
'Authorization': f'Bearer {self.access_token}',
|
||
}
|
||
|
||
response = requests.post(upload_url, headers=headers, data=image_content, timeout=60)
|
||
response.raise_for_status()
|
||
|
||
# --- POLL FOR ASSET STATUS ---
|
||
start_time = time.time()
|
||
while time.time() - start_time < self.ASSET_STATUS_TIMEOUT:
|
||
try:
|
||
status = self.get_asset_status(asset_urn)
|
||
if status == "READY" or status == "PROCESSING":
|
||
if status == "READY":
|
||
logger.info(f"Asset {asset_urn} is READY. Proceeding.")
|
||
return True
|
||
if status == "FAILED":
|
||
raise Exception(f"LinkedIn image processing failed for asset {asset_urn}")
|
||
|
||
logger.info(f"Asset {asset_urn} status: {status}. Waiting...")
|
||
time.sleep(self.ASSET_STATUS_INTERVAL)
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Error during asset status check for {asset_urn}: {e}. Retrying.")
|
||
time.sleep(self.ASSET_STATUS_INTERVAL * 2)
|
||
|
||
logger.warning(f"Asset {asset_urn} timed out, but upload succeeded. Forcing post attempt.")
|
||
return True
|
||
|
||
# ---------------- POSTING UTILITIES ----------------
|
||
|
||
# def clean_html_for_social_post(self, html_content):
|
||
# """Converts safe HTML to plain text with basic formatting."""
|
||
# if not html_content:
|
||
# return ""
|
||
|
||
# text = html_content
|
||
|
||
# # 1. Convert Bolding tags to *Markdown*
|
||
# text = re.sub(r'<strong>(.*?)</strong>', r'*\1*', text, flags=re.IGNORECASE)
|
||
# text = re.sub(r'<b>(.*?)</b>', r'*\1*', text, flags=re.IGNORECASE)
|
||
|
||
# # 2. Handle Lists: Convert <li> tags into a bullet point
|
||
# text = re.sub(r'</(ul|ol|div)>', '\n', text, flags=re.IGNORECASE)
|
||
# text = re.sub(r'<li[^>]*>', '• ', text, flags=re.IGNORECASE)
|
||
# text = re.sub(r'</li>', '\n', text, flags=re.IGNORECASE)
|
||
|
||
# # 3. Handle Paragraphs and Line Breaks
|
||
# text = re.sub(r'</p>', '\n\n', text, flags=re.IGNORECASE)
|
||
# text = re.sub(r'<br/?>', '\n', text, flags=re.IGNORECASE)
|
||
|
||
# # 4. Strip all remaining, unsupported HTML tags
|
||
# clean_text = re.sub(r'<[^>]+>', '', text)
|
||
|
||
# # 5. Unescape HTML entities
|
||
# clean_text = unescape(clean_text)
|
||
|
||
# # 6. Clean up excessive whitespace/newlines
|
||
# clean_text = re.sub(r'(\n\s*){3,}', '\n\n', clean_text).strip()
|
||
|
||
# return clean_text
|
||
|
||
# def hashtags_list(self, hash_tags_str):
|
||
# """Convert comma-separated hashtags string to list"""
|
||
# if not hash_tags_str:
|
||
# return ["#HigherEd", "#Hiring", "#UniversityJobs"]
|
||
|
||
# tags = [tag.strip() for tag in hash_tags_str.split(',') if tag.strip()]
|
||
# tags = [tag if tag.startswith('#') else f'#{tag}' for tag in tags]
|
||
|
||
# if not tags:
|
||
# return ["#HigherEd", "#Hiring", "#UniversityJobs"]
|
||
|
||
# return tags
|
||
|
||
# def _build_post_message(self, job_posting):
|
||
# """
|
||
# Constructs the final text message.
|
||
# Includes a unique suffix for duplicate content prevention (422 fix).
|
||
# """
|
||
# message_parts = [
|
||
# f"🔥 *Job Alert!* We’re looking for a talented professional to join our team.",
|
||
# f"👉 **{job_posting.title}** 👈",
|
||
# ]
|
||
|
||
# if job_posting.department:
|
||
# message_parts.append(f"*{job_posting.department}*")
|
||
|
||
# message_parts.append("\n" + "=" * 25 + "\n")
|
||
|
||
# # KEY DETAILS SECTION
|
||
# details_list = []
|
||
# if job_posting.job_type:
|
||
# details_list.append(f"💼 Type: {job_posting.get_job_type_display()}")
|
||
# if job_posting.get_location_display() != 'Not specified':
|
||
# details_list.append(f"📍 Location: {job_posting.get_location_display()}")
|
||
# if job_posting.workplace_type:
|
||
# details_list.append(f"🏠 Workplace: {job_posting.get_workplace_type_display()}")
|
||
# if job_posting.salary_range:
|
||
# details_list.append(f"💰 Salary: {job_posting.salary_range}")
|
||
|
||
# if details_list:
|
||
# message_parts.append("*Key Information*:")
|
||
# message_parts.extend(details_list)
|
||
# message_parts.append("\n")
|
||
|
||
# # DESCRIPTION SECTION
|
||
# clean_description = self.clean_html_for_social_post(job_posting.description)
|
||
# if clean_description:
|
||
# message_parts.append(f"🔎 *About the Role:*\n{clean_description}")
|
||
# clean_
|
||
|
||
# # CALL TO ACTION
|
||
# if job_posting.application_url:
|
||
# message_parts.append(f"\n\n---")
|
||
# # CRITICAL: Include the URL explicitly in the text body.
|
||
# # When media_category is NONE, LinkedIn often makes these URLs clickable.
|
||
# message_parts.append(f"🔗 **APPLY NOW:** {job_posting.application_url}")
|
||
|
||
# # HASHTAGS
|
||
# hashtags = self.hashtags_list(job_posting.hash_tags)
|
||
# if job_posting.department:
|
||
# dept_hashtag = f"#{job_posting.department.replace(' ', '')}"
|
||
# hashtags.insert(0, dept_hashtag)
|
||
|
||
# message_parts.append("\n" + " ".join(hashtags))
|
||
|
||
# final_message = "\n".join(message_parts)
|
||
|
||
# # --- FIX: ADD UNIQUE SUFFIX AND HANDLE LENGTH (422 fix) ---
|
||
# unique_suffix = f"\n\n| Ref: {int(time.time())}"
|
||
|
||
# available_length = MAX_POST_CHARS - len(unique_suffix)
|
||
|
||
# if len(final_message) > available_length:
|
||
# logger.warning("Post message truncated due to character limit.")
|
||
# final_message = final_message[:available_length - 3] + "..."
|
||
|
||
# return final_message + unique_suffix
|
||
|
||
|
||
# ---------------- MAIN POSTING METHODS ----------------
|
||
|
||
def _send_ugc_post(self, person_urn, job_posting, media_category="NONE", media_list=None):
|
||
"""
|
||
Private method to handle the final UGC post request.
|
||
CRITICAL FIX: Avoids ARTICLE category if not using an image to prevent 402 errors.
|
||
"""
|
||
|
||
message = job_posting.linkedin_post_formated_data
|
||
if len(message)>=3000:
|
||
message=message[:2900]+"...."
|
||
|
||
# --- FIX FOR 402: Force NONE if no image is present. ---
|
||
if media_category != "IMAGE":
|
||
# We explicitly force pure text share to avoid LinkedIn's link crawler
|
||
# which triggers the commercial 402 error on job reposts.
|
||
media_category = "NONE"
|
||
media_list = None
|
||
# --------------------------------------------------------
|
||
|
||
url = "https://api.linkedin.com/v2/ugcPosts"
|
||
headers = {
|
||
'Authorization': f'Bearer {self.access_token}',
|
||
'Content-Type': 'application/json',
|
||
'X-Restli-Protocol-Version': LINKEDIN_API_VERSION,
|
||
'LinkedIn-Version': LINKEDIN_VERSION,
|
||
}
|
||
|
||
specific_content = {
|
||
"com.linkedin.ugc.ShareContent": {
|
||
"shareCommentary": {"text": message},
|
||
"shareMediaCategory": media_category,
|
||
}
|
||
}
|
||
|
||
if media_list and media_category == "IMAGE":
|
||
specific_content["com.linkedin.ugc.ShareContent"]["media"] = media_list
|
||
|
||
payload = {
|
||
"author": f"urn:li:person:{person_urn}",
|
||
"lifecycleState": "PUBLISHED",
|
||
"specificContent": specific_content,
|
||
"visibility": {
|
||
"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
|
||
}
|
||
}
|
||
|
||
response = requests.post(url, headers=headers, json=payload, timeout=60)
|
||
|
||
# Log 402/422 details
|
||
if response.status_code in [402, 422]:
|
||
logger.error(f"{response.status_code} UGC Post Error Detail: {response.text}")
|
||
|
||
response.raise_for_status()
|
||
|
||
post_id = response.headers.get('x-restli-id', '')
|
||
post_url = f"https://www.linkedin.com/feed/update/{quote(post_id)}/" if post_id else ""
|
||
|
||
return {
|
||
'success': True,
|
||
'post_id': post_id,
|
||
'post_url': post_url,
|
||
'status_code': response.status_code
|
||
}
|
||
|
||
|
||
def create_job_post_with_image(self, job_posting, image_file, person_urn, asset_urn):
|
||
"""Creates the final LinkedIn post payload with the image asset."""
|
||
|
||
if not job_posting.application_url:
|
||
raise ValueError("Application URL is required for image link share on LinkedIn.")
|
||
|
||
# Media list for IMAGE category (retains link details)
|
||
# Note: This is an exception where we MUST provide link details for the image card
|
||
media_list = [{
|
||
"status": "READY",
|
||
"media": asset_urn,
|
||
"description": {"text": job_posting.title},
|
||
"originalUrl": job_posting.application_url,
|
||
"title": {"text": "Apply Now"}
|
||
}]
|
||
|
||
return self._send_ugc_post(
|
||
person_urn=person_urn,
|
||
job_posting=job_posting,
|
||
media_category="IMAGE",
|
||
media_list=media_list
|
||
)
|
||
|
||
|
||
def create_job_post(self, job_posting):
|
||
"""Main method to create a job announcement post (Image or Text)."""
|
||
if not self.access_token:
|
||
raise Exception("Not authenticated with LinkedIn")
|
||
|
||
try:
|
||
profile = self.get_user_profile()
|
||
person_urn = profile.get('sub')
|
||
if not person_urn:
|
||
raise Exception("Could not retrieve LinkedIn user ID")
|
||
|
||
asset_urn = None
|
||
has_image = False
|
||
|
||
# Check for image and attempt post
|
||
try:
|
||
image_upload = job_posting.post_images.first().post_image
|
||
has_image = image_upload is not None
|
||
except Exception:
|
||
pass
|
||
|
||
if has_image:
|
||
try:
|
||
# Steps 1, 2, 3 for image post
|
||
upload_info = self.register_image_upload(person_urn)
|
||
asset_urn = upload_info['asset']
|
||
self.upload_image_to_linkedin(
|
||
upload_info['upload_url'],
|
||
image_upload,
|
||
asset_urn
|
||
)
|
||
|
||
return self.create_job_post_with_image(
|
||
job_posting, image_upload, person_urn, asset_urn
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Image post failed, falling back to text: {e}")
|
||
has_image = False
|
||
|
||
# === FALLBACK TO PURE TEXT POST (shareMediaCategory: NONE) ===
|
||
# The _send_ugc_post method now ensures this is a PURE text post
|
||
# to avoid the 402/ARTICLE-related issues.
|
||
return self._send_ugc_post(
|
||
person_urn=person_urn,
|
||
job_posting=job_posting,
|
||
media_category="NONE"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error creating LinkedIn post: {e}")
|
||
status_code = getattr(getattr(e, 'response', None), 'status_code', 500)
|
||
return {
|
||
'success': False,
|
||
'error': str(e),
|
||
'status_code': status_code
|
||
}
|