244 lines
8.8 KiB
Python
244 lines
8.8 KiB
Python
"""
|
|
Complaint duplicate detection service
|
|
|
|
Identifies potential duplicate complaints based on:
|
|
- Patient name similarity
|
|
- Incident date proximity
|
|
- Description text similarity
|
|
- Category match
|
|
"""
|
|
from difflib import SequenceMatcher
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
from django.db.models import Q
|
|
|
|
|
|
class ComplaintDuplicateDetector:
|
|
"""
|
|
Detect potential duplicate complaints.
|
|
|
|
Uses fuzzy matching to identify complaints that may be duplicates
|
|
of each other, helping prevent fragmentation and redundant work.
|
|
"""
|
|
|
|
def __init__(self, complaint_data, threshold=0.75):
|
|
"""
|
|
Initialize detector with complaint data.
|
|
|
|
Args:
|
|
complaint_data: Dict with complaint fields (patient_name, description, etc.)
|
|
threshold: Similarity threshold (0.0-1.0) for considering duplicates
|
|
"""
|
|
self.data = complaint_data
|
|
self.threshold = threshold
|
|
|
|
def find_duplicates(self, hospital_id, days=30, exclude_id=None):
|
|
"""
|
|
Find potential duplicate complaints from last N days.
|
|
|
|
Args:
|
|
hospital_id: Hospital to search within
|
|
days: Number of days to look back
|
|
exclude_id: Complaint ID to exclude (useful when editing)
|
|
|
|
Returns:
|
|
List of dicts with complaint, score, and match reasons
|
|
"""
|
|
from apps.complaints.models import Complaint
|
|
|
|
cutoff = timezone.now() - timedelta(days=days)
|
|
|
|
# Get candidate complaints (exclude closed/cancelled)
|
|
candidates = Complaint.objects.filter(
|
|
hospital_id=hospital_id,
|
|
created_at__gte=cutoff,
|
|
status__in=['open', 'in_progress', 'partially_resolved']
|
|
)
|
|
|
|
if exclude_id:
|
|
candidates = candidates.exclude(id=exclude_id)
|
|
|
|
# Select only needed fields for performance
|
|
candidates = candidates.select_related('patient', 'category').only(
|
|
'id', 'patient_name', 'description', 'created_at',
|
|
'incident_date', 'category_id'
|
|
)
|
|
|
|
duplicates = []
|
|
for complaint in candidates:
|
|
score, reasons = self._calculate_similarity(complaint)
|
|
if score >= self.threshold:
|
|
duplicates.append({
|
|
'complaint': complaint,
|
|
'score': score,
|
|
'score_percentage': int(score * 100),
|
|
'reasons': reasons,
|
|
'is_likely_duplicate': score >= 0.85 # 85%+ is very likely duplicate
|
|
})
|
|
|
|
# Sort by score descending
|
|
return sorted(duplicates, key=lambda x: x['score'], reverse=True)
|
|
|
|
def _calculate_similarity(self, complaint):
|
|
"""
|
|
Calculate similarity score between new complaint and existing one.
|
|
|
|
Returns:
|
|
Tuple of (score, list of match reasons)
|
|
"""
|
|
score = 0.0
|
|
reasons = []
|
|
|
|
# Weights for different factors
|
|
weights = {
|
|
'patient': 0.30, # Patient name match
|
|
'date': 0.20, # Incident date match
|
|
'description': 0.35, # Description text similarity
|
|
'category': 0.15, # Category match
|
|
}
|
|
|
|
# Patient name match (30%)
|
|
patient_match, patient_reason = self._match_patient(complaint)
|
|
if patient_match:
|
|
score += weights['patient']
|
|
reasons.append(patient_reason)
|
|
|
|
# Date match (20%)
|
|
date_match, date_reason = self._match_date(complaint)
|
|
if date_match:
|
|
score += weights['date']
|
|
reasons.append(date_reason)
|
|
|
|
# Description similarity (35%)
|
|
desc_score, desc_reason = self._text_similarity(complaint)
|
|
score += desc_score * weights['description']
|
|
if desc_score > 0.5:
|
|
reasons.append(desc_reason)
|
|
|
|
# Category match (15%)
|
|
category_match, category_reason = self._match_category(complaint)
|
|
if category_match:
|
|
score += weights['category']
|
|
reasons.append(category_reason)
|
|
|
|
return score, reasons
|
|
|
|
def _match_patient(self, complaint):
|
|
"""Check if patient names match"""
|
|
new_name = self._normalize_name(self.data.get('patient_name', ''))
|
|
existing_name = self._normalize_name(getattr(complaint, 'patient_name', ''))
|
|
|
|
if not new_name or not existing_name:
|
|
return False, None
|
|
|
|
# Exact match
|
|
if new_name == existing_name:
|
|
return True, f"Patient name matches: {complaint.patient_name}"
|
|
|
|
# Fuzzy match (80%+ similar)
|
|
similarity = SequenceMatcher(None, new_name, existing_name).ratio()
|
|
if similarity >= 0.80:
|
|
return True, f"Patient name similar ({int(similarity*100)}% match)"
|
|
|
|
return False, None
|
|
|
|
def _normalize_name(self, name):
|
|
"""Normalize name for comparison"""
|
|
if not name:
|
|
return ''
|
|
# Convert to uppercase, remove extra spaces
|
|
name = ' '.join(str(name).upper().split())
|
|
# Remove common prefixes/suffixes
|
|
for prefix in ['MR.', 'MRS.', 'MS.', 'DR.', 'MR ', 'MRS ', 'MS ', 'DR ']:
|
|
name = name.replace(prefix, '')
|
|
return name.strip()
|
|
|
|
def _match_date(self, complaint):
|
|
"""Check if incident dates are within 3 days"""
|
|
new_date = self.data.get('incident_date')
|
|
existing_date = getattr(complaint, 'incident_date', None)
|
|
|
|
if not new_date or not existing_date:
|
|
return False, None
|
|
|
|
# Check if dates are the same or within 3 days
|
|
date_diff = abs((new_date - existing_date).days)
|
|
if date_diff == 0:
|
|
return True, f"Same incident date: {existing_date}"
|
|
elif date_diff <= 3:
|
|
return True, f"Incident date within 3 days ({existing_date})"
|
|
|
|
return False, None
|
|
|
|
def _text_similarity(self, complaint):
|
|
"""Calculate text similarity between descriptions"""
|
|
new_desc = self._normalize_text(self.data.get('description', ''))
|
|
existing_desc = self._normalize_text(getattr(complaint, 'description', ''))
|
|
|
|
if not new_desc or not existing_desc:
|
|
return 0.0, None
|
|
|
|
# Use SequenceMatcher for text similarity
|
|
similarity = SequenceMatcher(None, new_desc, existing_desc).ratio()
|
|
|
|
# Also check for keyword overlap
|
|
new_words = set(new_desc.split())
|
|
existing_words = set(existing_desc.split())
|
|
if new_words and existing_words:
|
|
word_overlap = len(new_words & existing_words) / min(len(new_words), len(existing_words))
|
|
# Boost score if there's significant word overlap
|
|
if word_overlap > 0.5:
|
|
similarity = max(similarity, word_overlap * 0.8)
|
|
|
|
reason = None
|
|
if similarity > 0.7:
|
|
reason = f"Description {int(similarity*100)}% similar"
|
|
|
|
return similarity, reason
|
|
|
|
def _normalize_text(self, text):
|
|
"""Normalize text for comparison"""
|
|
if not text:
|
|
return ''
|
|
# Lowercase, remove extra spaces
|
|
text = ' '.join(str(text).lower().split())
|
|
# Remove common words
|
|
stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been',
|
|
'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
|
|
'would', 'could', 'should', 'may', 'might', 'must', 'shall'}
|
|
words = [w for w in text.split() if w not in stop_words and len(w) > 2]
|
|
return ' '.join(words)
|
|
|
|
def _match_category(self, complaint):
|
|
"""Check if categories match"""
|
|
new_category_id = self.data.get('category_id')
|
|
existing_category_id = getattr(complaint, 'category_id', None)
|
|
|
|
if not new_category_id or not existing_category_id:
|
|
return False, None
|
|
|
|
if new_category_id == existing_category_id:
|
|
# Get category name for display
|
|
category_name = getattr(complaint, 'category', None)
|
|
if category_name:
|
|
return True, f"Same category: {category_name.name_en}"
|
|
return True, "Same category"
|
|
|
|
return False, None
|
|
|
|
|
|
def check_for_duplicates(complaint_data, hospital_id, exclude_id=None):
|
|
"""
|
|
Convenience function to check for duplicate complaints.
|
|
|
|
Args:
|
|
complaint_data: Dict with complaint fields
|
|
hospital_id: Hospital ID to search within
|
|
exclude_id: Optional complaint ID to exclude
|
|
|
|
Returns:
|
|
List of potential duplicates with scores
|
|
"""
|
|
detector = ComplaintDuplicateDetector(complaint_data)
|
|
return detector.find_duplicates(hospital_id, exclude_id=exclude_id)
|