kaauh_ats/recruitment/erp_integration_service.py
2025-10-07 13:44:00 +03:00

272 lines
10 KiB
Python

import json
import logging
from datetime import datetime
from typing import Dict, Any, Optional
from django.utils import timezone
from django.core.exceptions import ValidationError
from django.http import HttpRequest
from .models import Source, JobPosting, IntegrationLog
from .serializers import JobPostingSerializer
logger = logging.getLogger(__name__)
class ERPIntegrationService:
"""
Service to handle integration between external ERP system and ATS
"""
def __init__(self, source: Source):
self.source = source
self.logger = logging.getLogger(f'{__name__}.{source.name}')
def validate_request(self, request: HttpRequest) -> tuple[bool, str]:
"""
Validate the incoming request from ERP system
Returns: (is_valid, error_message)
"""
# Check if source is active
if not self.source.is_active:
return False, "Source is not active"
# Check if trusted IPs are configured and validate request IP
if self.source.trusted_ips:
client_ip = self.get_client_ip(request)
trusted_ips = [ip.strip() for ip in self.source.trusted_ips.split(',')]
if client_ip not in trusted_ips:
self.logger.warning(f"Request from untrusted IP: {client_ip}")
return False, f"Request from untrusted IP: {client_ip}"
# Check API key if provided
if self.source.api_key:
api_key = request.headers.get('X-API-Key') or request.GET.get('api_key')
if not api_key or api_key != self.source.api_key:
self.logger.warning("Invalid or missing API key")
return False, "Invalid or missing API key"
return True, ""
def log_integration_request(self, request: HttpRequest, action: str, **kwargs):
"""
Log the integration request/response
"""
IntegrationLog.objects.create(
source=self.source,
action=action,
endpoint=request.path,
method=request.method,
request_data=self.get_request_data(request),
ip_address=self.get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
**kwargs
)
def create_job_from_erp(self, request_data: Dict[str, Any]) -> tuple[Optional[JobPosting], str]:
"""
Create a JobPosting from ERP request data
Returns: (job, error_message)
"""
try:
# Map ERP fields to JobPosting fields
job_data = {
'title': request_data.get('title', '').strip(),
'department': request_data.get('department', '').strip(),
'job_type': self.map_job_type(request_data.get('job_type', 'FULL_TIME')),
'workplace_type': self.map_workplace_type(request_data.get('workplace_type', 'ON_SITE')),
'location_city': request_data.get('location_city', '').strip(),
'location_state': request_data.get('location_state', '').strip(),
'location_country': request_data.get('location_country', 'United States').strip(),
'description': request_data.get('description', '').strip(),
'qualifications': request_data.get('qualifications', '').strip(),
'salary_range': request_data.get('salary_range', '').strip(),
'benefits': request_data.get('benefits', '').strip(),
'application_url': request_data.get('application_url', '').strip(),
'application_deadline': self.parse_date(request_data.get('application_deadline')),
'application_instructions': request_data.get('application_instructions', '').strip(),
'created_by': f'ERP Integration: {self.source.name}',
'status': 'DRAFT' if request_data.get('auto_publish', False) else 'DRAFT',
'source': self.source
}
# Validate required fields
if not job_data['title']:
return None, "Job title is required"
# Create the job
job = JobPosting(**job_data)
job.save()
self.logger.info(f"Created job {job.internal_job_id} from ERP integration")
return job, ""
except Exception as e:
error_msg = f"Error creating job from ERP: {str(e)}"
self.logger.error(error_msg)
return None, error_msg
def update_job_from_erp(self, job_id: str, request_data: Dict[str, Any]) -> tuple[Optional[JobPosting], str]:
"""
Update an existing JobPosting from ERP request data
Returns: (job, error_message)
"""
try:
job = JobPosting.objects.get(internal_job_id=job_id)
# Update fields from ERP data
updatable_fields = [
'title', 'department', 'job_type', 'workplace_type',
'location_city', 'location_state', 'location_country',
'description', 'qualifications', 'salary_range', 'benefits',
'application_url', 'application_deadline', 'application_instructions',
'status'
]
for field in updatable_fields:
if field in request_data:
value = request_data[field]
# Special handling for date fields
if field == 'application_deadline':
value = self.parse_date(value)
setattr(job, field, value)
# Update source if provided
if 'source_id' in request_data:
try:
source = Source.objects.get(id=request_data['source_id'])
job.source = source
except Source.DoesNotExist:
pass
job.save()
self.logger.info(f"Updated job {job.internal_job_id} from ERP integration")
return job, ""
except JobPosting.DoesNotExist:
return None, f"Job with ID {job_id} not found"
except Exception as e:
error_msg = f"Error updating job from ERP: {str(e)}"
self.logger.error(error_msg)
return None, error_msg
def validate_erp_data(self, data: Dict[str, Any]) -> tuple[bool, str]:
"""
Validate ERP request data structure
Returns: (is_valid, error_message)
"""
required_fields = ['title']
for field in required_fields:
if field not in data or not data[field]:
return False, f"Required field '{field}' is missing or empty"
# Validate URL format
if data.get('application_url'):
from django.core.validators import URLValidator
from django.core.exceptions import ValidationError as DjangoValidationError
try:
URLValidator()(data['application_url'])
except DjangoValidationError:
return False, "Invalid application URL format"
# Validate job type
if 'job_type' in data and data['job_type']:
valid_job_types = dict(JobPosting.JOB_TYPES)
if data['job_type'] not in valid_job_types:
return False, f"Invalid job type: {data['job_type']}"
# Validate workplace type
if 'workplace_type' in data and data['workplace_type']:
valid_workplace_types = dict(JobPosting.WORKPLACE_TYPES)
if data['workplace_type'] not in valid_workplace_types:
return False, f"Invalid workplace type: {data['workplace_type']}"
return True, ""
# Helper methods
def get_client_ip(self, request: HttpRequest) -> str:
"""Get the client IP address from request"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def get_request_data(self, request: HttpRequest) -> Dict[str, Any]:
"""Get request data from request object"""
if request.method == 'GET':
return dict(request.GET)
elif request.method in ['POST', 'PUT', 'PATCH']:
try:
if request.content_type == 'application/json':
return json.loads(request.body.decode('utf-8'))
else:
return dict(request.POST)
except:
return {}
return {}
def parse_date(self, date_str: str) -> Optional[datetime.date]:
"""Parse date string from ERP"""
if not date_str:
return None
try:
# Try different date formats
date_formats = [
'%Y-%m-%d',
'%m/%d/%Y',
'%d/%m/%Y',
'%Y-%m-%d %H:%M:%S',
'%m/%d/%Y %H:%M:%S',
'%d/%m/%Y %H:%M:%S'
]
for fmt in date_formats:
try:
dt = datetime.strptime(date_str, fmt)
if fmt.endswith('%H:%M:%S'):
return dt.date()
return dt.date()
except ValueError:
continue
# If no format matches, try to parse with dateutil
from dateutil import parser
dt = parser.parse(date_str)
return dt.date()
except Exception as e:
self.logger.warning(f"Could not parse date '{date_str}': {str(e)}")
return None
def map_job_type(self, erp_job_type: str) -> str:
"""Map ERP job type to ATS job type"""
mapping = {
'full-time': 'FULL_TIME',
'part-time': 'PART_TIME',
'contract': 'CONTRACT',
'internship': 'INTERNSHIP',
'faculty': 'FACULTY',
'temporary': 'TEMPORARY',
}
return mapping.get(erp_job_type.lower(), 'FULL_TIME')
def map_workplace_type(self, erp_workplace_type: str) -> str:
"""Map ERP workplace type to ATS workplace type"""
mapping = {
'onsite': 'ON_SITE',
'on-site': 'ON_SITE',
'remote': 'REMOTE',
'hybrid': 'HYBRID',
}
return mapping.get(erp_workplace_type.lower(), 'ON_SITE')