274 lines
10 KiB
Python
274 lines
10 KiB
Python
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional
|
|
from django.utils import timezone
|
|
from django.core.exceptions import ValidationError
|
|
from django.http import HttpRequest
|
|
from .models import Source, JobPosting, IntegrationLog
|
|
from .serializers import JobPostingSerializer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ERPIntegrationService:
|
|
"""
|
|
Service to handle integration between external ERP system and ATS
|
|
"""
|
|
|
|
def __init__(self, source: Source):
|
|
self.source = source
|
|
self.logger = logging.getLogger(f'{__name__}.{source.name}')
|
|
|
|
def validate_request(self, request: HttpRequest) -> tuple[bool, str]:
|
|
"""
|
|
Validate the incoming request from ERP system
|
|
Returns: (is_valid, error_message)
|
|
"""
|
|
|
|
# Check if source is active
|
|
if not self.source.is_active:
|
|
return False, "Source is not active"
|
|
|
|
# Check if trusted IPs are configured and validate request IP
|
|
if self.source.trusted_ips:
|
|
client_ip = self.get_client_ip(request)
|
|
trusted_ips = [ip.strip() for ip in self.source.trusted_ips.split(',')]
|
|
|
|
if client_ip not in trusted_ips:
|
|
self.logger.warning(f"Request from untrusted IP: {client_ip}")
|
|
return False, f"Request from untrusted IP: {client_ip}"
|
|
|
|
# Check API key if provided
|
|
if self.source.api_key:
|
|
api_key = request.headers.get('X-API-Key') or request.GET.get('api_key')
|
|
if not api_key or api_key != self.source.api_key:
|
|
self.logger.warning("Invalid or missing API key")
|
|
return False, "Invalid or missing API key"
|
|
|
|
return True, ""
|
|
|
|
def log_integration_request(self, request: HttpRequest, action: str, **kwargs):
|
|
"""
|
|
Log the integration request/response
|
|
"""
|
|
IntegrationLog.objects.create(
|
|
source=self.source,
|
|
action=action,
|
|
endpoint=request.path,
|
|
method=request.method,
|
|
request_data=self.get_request_data(request),
|
|
ip_address=self.get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
**kwargs
|
|
)
|
|
|
|
def create_job_from_erp(self, request_data: Dict[str, Any]) -> tuple[Optional[JobPosting], str]:
|
|
"""
|
|
Create a JobPosting from ERP request data
|
|
Returns: (job, error_message)
|
|
"""
|
|
try:
|
|
# Map ERP fields to JobPosting fields
|
|
job_data = {
|
|
'internal_job_id': request_data.get('job_id', '').strip(),
|
|
'title': request_data.get('title', '').strip(),
|
|
'department': request_data.get('department', '').strip(),
|
|
'job_type': self.map_job_type(request_data.get('job_type', 'FULL_TIME')),
|
|
'workplace_type': self.map_workplace_type(request_data.get('workplace_type', 'ON_SITE')),
|
|
'location_city': request_data.get('location_city', '').strip(),
|
|
'location_state': request_data.get('location_state', '').strip(),
|
|
'location_country': request_data.get('location_country', 'United States').strip(),
|
|
'description': request_data.get('description', '').strip(),
|
|
'qualifications': request_data.get('qualifications', '').strip(),
|
|
'salary_range': request_data.get('salary_range', '').strip(),
|
|
'benefits': request_data.get('benefits', '').strip(),
|
|
'application_url': request_data.get('application_url', '').strip(),
|
|
'application_deadline': self.parse_date(request_data.get('application_deadline')),
|
|
'application_instructions': request_data.get('application_instructions', '').strip(),
|
|
'created_by': f'ERP Integration: {self.source.name}',
|
|
'status': 'DRAFT' if request_data.get('auto_publish', False) else 'DRAFT',
|
|
'source': self.source
|
|
}
|
|
|
|
# Validate required fields
|
|
if not job_data['title']:
|
|
return None, "Job title is required"
|
|
|
|
|
|
# Create the job
|
|
job = JobPosting(**job_data)
|
|
job.save()
|
|
|
|
self.logger.info(f"Created job {job.internal_job_id} from ERP integration")
|
|
return job, ""
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error creating job from ERP: {str(e)}"
|
|
self.logger.error(error_msg)
|
|
return None, error_msg
|
|
|
|
def update_job_from_erp(self, job_id: str, request_data: Dict[str, Any]) -> tuple[Optional[JobPosting], str]:
|
|
"""
|
|
Update an existing JobPosting from ERP request data
|
|
Returns: (job, error_message)
|
|
"""
|
|
try:
|
|
job = JobPosting.objects.get(internal_job_id=job_id)
|
|
|
|
# Update fields from ERP data
|
|
updatable_fields = [
|
|
'title', 'department', 'job_type', 'workplace_type',
|
|
'location_city', 'location_state', 'location_country',
|
|
'description', 'qualifications', 'salary_range', 'benefits',
|
|
'application_url', 'application_deadline', 'application_instructions',
|
|
'status'
|
|
]
|
|
|
|
for field in updatable_fields:
|
|
if field in request_data:
|
|
value = request_data[field]
|
|
|
|
# Special handling for date fields
|
|
if field == 'application_deadline':
|
|
value = self.parse_date(value)
|
|
|
|
setattr(job, field, value)
|
|
|
|
# Update source if provided
|
|
if 'source_id' in request_data:
|
|
try:
|
|
source = Source.objects.get(id=request_data['source_id'])
|
|
job.source = source
|
|
except Source.DoesNotExist:
|
|
pass
|
|
|
|
job.save()
|
|
|
|
self.logger.info(f"Updated job {job.internal_job_id} from ERP integration")
|
|
return job, ""
|
|
|
|
except JobPosting.DoesNotExist:
|
|
return None, f"Job with ID {job_id} not found"
|
|
except Exception as e:
|
|
error_msg = f"Error updating job from ERP: {str(e)}"
|
|
self.logger.error(error_msg)
|
|
return None, error_msg
|
|
|
|
def validate_erp_data(self, data: Dict[str, Any]) -> tuple[bool, str]:
|
|
"""
|
|
Validate ERP request data structure
|
|
Returns: (is_valid, error_message)
|
|
"""
|
|
required_fields = ['title']
|
|
|
|
for field in required_fields:
|
|
if field not in data or not data[field]:
|
|
return False, f"Required field '{field}' is missing or empty"
|
|
|
|
# Validate URL format
|
|
if data.get('application_url'):
|
|
from django.core.validators import URLValidator
|
|
from django.core.exceptions import ValidationError as DjangoValidationError
|
|
|
|
try:
|
|
URLValidator()(data['application_url'])
|
|
except DjangoValidationError:
|
|
return False, "Invalid application URL format"
|
|
|
|
# Validate job type
|
|
if 'job_type' in data and data['job_type']:
|
|
valid_job_types = dict(JobPosting.JOB_TYPES)
|
|
if data['job_type'] not in valid_job_types:
|
|
return False, f"Invalid job type: {data['job_type']}"
|
|
|
|
# Validate workplace type
|
|
if 'workplace_type' in data and data['workplace_type']:
|
|
valid_workplace_types = dict(JobPosting.WORKPLACE_TYPES)
|
|
if data['workplace_type'] not in valid_workplace_types:
|
|
return False, f"Invalid workplace type: {data['workplace_type']}"
|
|
|
|
return True, ""
|
|
|
|
# Helper methods
|
|
def get_client_ip(self, request: HttpRequest) -> str:
|
|
"""Get the client IP address from request"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
def get_request_data(self, request: HttpRequest) -> Dict[str, Any]:
|
|
"""Get request data from request object"""
|
|
if request.method == 'GET':
|
|
return dict(request.GET)
|
|
elif request.method in ['POST', 'PUT', 'PATCH']:
|
|
try:
|
|
if request.content_type == 'application/json':
|
|
return json.loads(request.body.decode('utf-8'))
|
|
else:
|
|
return dict(request.POST)
|
|
except:
|
|
return {}
|
|
return {}
|
|
|
|
def parse_date(self, date_str: str) -> Optional[datetime.date]:
|
|
"""Parse date string from ERP"""
|
|
if not date_str:
|
|
return None
|
|
|
|
try:
|
|
# Try different date formats
|
|
date_formats = [
|
|
'%Y-%m-%d',
|
|
'%m/%d/%Y',
|
|
'%d/%m/%Y',
|
|
'%Y-%m-%d %H:%M:%S',
|
|
'%m/%d/%Y %H:%M:%S',
|
|
'%d/%m/%Y %H:%M:%S'
|
|
]
|
|
|
|
for fmt in date_formats:
|
|
try:
|
|
dt = datetime.strptime(date_str, fmt)
|
|
if fmt.endswith('%H:%M:%S'):
|
|
return dt.date()
|
|
return dt.date()
|
|
except ValueError:
|
|
continue
|
|
|
|
# If no format matches, try to parse with dateutil
|
|
from dateutil import parser
|
|
dt = parser.parse(date_str)
|
|
return dt.date()
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not parse date '{date_str}': {str(e)}")
|
|
return None
|
|
|
|
def map_job_type(self, erp_job_type: str) -> str:
|
|
"""Map ERP job type to ATS job type"""
|
|
mapping = {
|
|
'full-time': 'FULL_TIME',
|
|
'part-time': 'PART_TIME',
|
|
'contract': 'CONTRACT',
|
|
'internship': 'INTERNSHIP',
|
|
'faculty': 'FACULTY',
|
|
'temporary': 'TEMPORARY',
|
|
}
|
|
|
|
return mapping.get(erp_job_type.lower(), 'FULL_TIME')
|
|
|
|
def map_workplace_type(self, erp_workplace_type: str) -> str:
|
|
"""Map ERP workplace type to ATS workplace type"""
|
|
mapping = {
|
|
'onsite': 'ON_SITE',
|
|
'on-site': 'ON_SITE',
|
|
'remote': 'REMOTE',
|
|
'hybrid': 'HYBRID',
|
|
}
|
|
|
|
return mapping.get(erp_workplace_type.lower(), 'ON_SITE')
|