HH/apps/complaints/services/duplicate_detection.py
2026-02-25 04:47:05 +03:00

244 lines
8.8 KiB
Python

"""
Complaint duplicate detection service
Identifies potential duplicate complaints based on:
- Patient name similarity
- Incident date proximity
- Description text similarity
- Category match
"""
from difflib import SequenceMatcher
from django.utils import timezone
from datetime import timedelta
from django.db.models import Q
class ComplaintDuplicateDetector:
"""
Detect potential duplicate complaints.
Uses fuzzy matching to identify complaints that may be duplicates
of each other, helping prevent fragmentation and redundant work.
"""
def __init__(self, complaint_data, threshold=0.75):
"""
Initialize detector with complaint data.
Args:
complaint_data: Dict with complaint fields (patient_name, description, etc.)
threshold: Similarity threshold (0.0-1.0) for considering duplicates
"""
self.data = complaint_data
self.threshold = threshold
def find_duplicates(self, hospital_id, days=30, exclude_id=None):
"""
Find potential duplicate complaints from last N days.
Args:
hospital_id: Hospital to search within
days: Number of days to look back
exclude_id: Complaint ID to exclude (useful when editing)
Returns:
List of dicts with complaint, score, and match reasons
"""
from apps.complaints.models import Complaint
cutoff = timezone.now() - timedelta(days=days)
# Get candidate complaints (exclude closed/cancelled)
candidates = Complaint.objects.filter(
hospital_id=hospital_id,
created_at__gte=cutoff,
status__in=['open', 'in_progress', 'partially_resolved']
)
if exclude_id:
candidates = candidates.exclude(id=exclude_id)
# Select only needed fields for performance
candidates = candidates.select_related('patient', 'category').only(
'id', 'patient_name', 'description', 'created_at',
'incident_date', 'category_id'
)
duplicates = []
for complaint in candidates:
score, reasons = self._calculate_similarity(complaint)
if score >= self.threshold:
duplicates.append({
'complaint': complaint,
'score': score,
'score_percentage': int(score * 100),
'reasons': reasons,
'is_likely_duplicate': score >= 0.85 # 85%+ is very likely duplicate
})
# Sort by score descending
return sorted(duplicates, key=lambda x: x['score'], reverse=True)
def _calculate_similarity(self, complaint):
"""
Calculate similarity score between new complaint and existing one.
Returns:
Tuple of (score, list of match reasons)
"""
score = 0.0
reasons = []
# Weights for different factors
weights = {
'patient': 0.30, # Patient name match
'date': 0.20, # Incident date match
'description': 0.35, # Description text similarity
'category': 0.15, # Category match
}
# Patient name match (30%)
patient_match, patient_reason = self._match_patient(complaint)
if patient_match:
score += weights['patient']
reasons.append(patient_reason)
# Date match (20%)
date_match, date_reason = self._match_date(complaint)
if date_match:
score += weights['date']
reasons.append(date_reason)
# Description similarity (35%)
desc_score, desc_reason = self._text_similarity(complaint)
score += desc_score * weights['description']
if desc_score > 0.5:
reasons.append(desc_reason)
# Category match (15%)
category_match, category_reason = self._match_category(complaint)
if category_match:
score += weights['category']
reasons.append(category_reason)
return score, reasons
def _match_patient(self, complaint):
"""Check if patient names match"""
new_name = self._normalize_name(self.data.get('patient_name', ''))
existing_name = self._normalize_name(getattr(complaint, 'patient_name', ''))
if not new_name or not existing_name:
return False, None
# Exact match
if new_name == existing_name:
return True, f"Patient name matches: {complaint.patient_name}"
# Fuzzy match (80%+ similar)
similarity = SequenceMatcher(None, new_name, existing_name).ratio()
if similarity >= 0.80:
return True, f"Patient name similar ({int(similarity*100)}% match)"
return False, None
def _normalize_name(self, name):
"""Normalize name for comparison"""
if not name:
return ''
# Convert to uppercase, remove extra spaces
name = ' '.join(str(name).upper().split())
# Remove common prefixes/suffixes
for prefix in ['MR.', 'MRS.', 'MS.', 'DR.', 'MR ', 'MRS ', 'MS ', 'DR ']:
name = name.replace(prefix, '')
return name.strip()
def _match_date(self, complaint):
"""Check if incident dates are within 3 days"""
new_date = self.data.get('incident_date')
existing_date = getattr(complaint, 'incident_date', None)
if not new_date or not existing_date:
return False, None
# Check if dates are the same or within 3 days
date_diff = abs((new_date - existing_date).days)
if date_diff == 0:
return True, f"Same incident date: {existing_date}"
elif date_diff <= 3:
return True, f"Incident date within 3 days ({existing_date})"
return False, None
def _text_similarity(self, complaint):
"""Calculate text similarity between descriptions"""
new_desc = self._normalize_text(self.data.get('description', ''))
existing_desc = self._normalize_text(getattr(complaint, 'description', ''))
if not new_desc or not existing_desc:
return 0.0, None
# Use SequenceMatcher for text similarity
similarity = SequenceMatcher(None, new_desc, existing_desc).ratio()
# Also check for keyword overlap
new_words = set(new_desc.split())
existing_words = set(existing_desc.split())
if new_words and existing_words:
word_overlap = len(new_words & existing_words) / min(len(new_words), len(existing_words))
# Boost score if there's significant word overlap
if word_overlap > 0.5:
similarity = max(similarity, word_overlap * 0.8)
reason = None
if similarity > 0.7:
reason = f"Description {int(similarity*100)}% similar"
return similarity, reason
def _normalize_text(self, text):
"""Normalize text for comparison"""
if not text:
return ''
# Lowercase, remove extra spaces
text = ' '.join(str(text).lower().split())
# Remove common words
stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been',
'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
'would', 'could', 'should', 'may', 'might', 'must', 'shall'}
words = [w for w in text.split() if w not in stop_words and len(w) > 2]
return ' '.join(words)
def _match_category(self, complaint):
"""Check if categories match"""
new_category_id = self.data.get('category_id')
existing_category_id = getattr(complaint, 'category_id', None)
if not new_category_id or not existing_category_id:
return False, None
if new_category_id == existing_category_id:
# Get category name for display
category_name = getattr(complaint, 'category', None)
if category_name:
return True, f"Same category: {category_name.name_en}"
return True, "Same category"
return False, None
def check_for_duplicates(complaint_data, hospital_id, exclude_id=None):
"""
Convenience function to check for duplicate complaints.
Args:
complaint_data: Dict with complaint fields
hospital_id: Hospital ID to search within
exclude_id: Optional complaint ID to exclude
Returns:
List of potential duplicates with scores
"""
detector = ComplaintDuplicateDetector(complaint_data)
return detector.find_duplicates(hospital_id, exclude_id=exclude_id)