""" Complaint duplicate detection service Identifies potential duplicate complaints based on: - Patient name similarity - Incident date proximity - Description text similarity - Category match """ from difflib import SequenceMatcher from django.utils import timezone from datetime import timedelta from django.db.models import Q class ComplaintDuplicateDetector: """ Detect potential duplicate complaints. Uses fuzzy matching to identify complaints that may be duplicates of each other, helping prevent fragmentation and redundant work. """ def __init__(self, complaint_data, threshold=0.75): """ Initialize detector with complaint data. Args: complaint_data: Dict with complaint fields (patient_name, description, etc.) threshold: Similarity threshold (0.0-1.0) for considering duplicates """ self.data = complaint_data self.threshold = threshold def find_duplicates(self, hospital_id, days=30, exclude_id=None): """ Find potential duplicate complaints from last N days. Args: hospital_id: Hospital to search within days: Number of days to look back exclude_id: Complaint ID to exclude (useful when editing) Returns: List of dicts with complaint, score, and match reasons """ from apps.complaints.models import Complaint cutoff = timezone.now() - timedelta(days=days) # Get candidate complaints (exclude closed/cancelled) candidates = Complaint.objects.filter( hospital_id=hospital_id, created_at__gte=cutoff, status__in=['open', 'in_progress', 'partially_resolved'] ) if exclude_id: candidates = candidates.exclude(id=exclude_id) # Select only needed fields for performance candidates = candidates.select_related('patient', 'category').only( 'id', 'patient_name', 'description', 'created_at', 'incident_date', 'category_id' ) duplicates = [] for complaint in candidates: score, reasons = self._calculate_similarity(complaint) if score >= self.threshold: duplicates.append({ 'complaint': complaint, 'score': score, 'score_percentage': int(score * 100), 'reasons': reasons, 'is_likely_duplicate': score >= 0.85 # 85%+ is very likely duplicate }) # Sort by score descending return sorted(duplicates, key=lambda x: x['score'], reverse=True) def _calculate_similarity(self, complaint): """ Calculate similarity score between new complaint and existing one. Returns: Tuple of (score, list of match reasons) """ score = 0.0 reasons = [] # Weights for different factors weights = { 'patient': 0.30, # Patient name match 'date': 0.20, # Incident date match 'description': 0.35, # Description text similarity 'category': 0.15, # Category match } # Patient name match (30%) patient_match, patient_reason = self._match_patient(complaint) if patient_match: score += weights['patient'] reasons.append(patient_reason) # Date match (20%) date_match, date_reason = self._match_date(complaint) if date_match: score += weights['date'] reasons.append(date_reason) # Description similarity (35%) desc_score, desc_reason = self._text_similarity(complaint) score += desc_score * weights['description'] if desc_score > 0.5: reasons.append(desc_reason) # Category match (15%) category_match, category_reason = self._match_category(complaint) if category_match: score += weights['category'] reasons.append(category_reason) return score, reasons def _match_patient(self, complaint): """Check if patient names match""" new_name = self._normalize_name(self.data.get('patient_name', '')) existing_name = self._normalize_name(getattr(complaint, 'patient_name', '')) if not new_name or not existing_name: return False, None # Exact match if new_name == existing_name: return True, f"Patient name matches: {complaint.patient_name}" # Fuzzy match (80%+ similar) similarity = SequenceMatcher(None, new_name, existing_name).ratio() if similarity >= 0.80: return True, f"Patient name similar ({int(similarity*100)}% match)" return False, None def _normalize_name(self, name): """Normalize name for comparison""" if not name: return '' # Convert to uppercase, remove extra spaces name = ' '.join(str(name).upper().split()) # Remove common prefixes/suffixes for prefix in ['MR.', 'MRS.', 'MS.', 'DR.', 'MR ', 'MRS ', 'MS ', 'DR ']: name = name.replace(prefix, '') return name.strip() def _match_date(self, complaint): """Check if incident dates are within 3 days""" new_date = self.data.get('incident_date') existing_date = getattr(complaint, 'incident_date', None) if not new_date or not existing_date: return False, None # Check if dates are the same or within 3 days date_diff = abs((new_date - existing_date).days) if date_diff == 0: return True, f"Same incident date: {existing_date}" elif date_diff <= 3: return True, f"Incident date within 3 days ({existing_date})" return False, None def _text_similarity(self, complaint): """Calculate text similarity between descriptions""" new_desc = self._normalize_text(self.data.get('description', '')) existing_desc = self._normalize_text(getattr(complaint, 'description', '')) if not new_desc or not existing_desc: return 0.0, None # Use SequenceMatcher for text similarity similarity = SequenceMatcher(None, new_desc, existing_desc).ratio() # Also check for keyword overlap new_words = set(new_desc.split()) existing_words = set(existing_desc.split()) if new_words and existing_words: word_overlap = len(new_words & existing_words) / min(len(new_words), len(existing_words)) # Boost score if there's significant word overlap if word_overlap > 0.5: similarity = max(similarity, word_overlap * 0.8) reason = None if similarity > 0.7: reason = f"Description {int(similarity*100)}% similar" return similarity, reason def _normalize_text(self, text): """Normalize text for comparison""" if not text: return '' # Lowercase, remove extra spaces text = ' '.join(str(text).lower().split()) # Remove common words stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall'} words = [w for w in text.split() if w not in stop_words and len(w) > 2] return ' '.join(words) def _match_category(self, complaint): """Check if categories match""" new_category_id = self.data.get('category_id') existing_category_id = getattr(complaint, 'category_id', None) if not new_category_id or not existing_category_id: return False, None if new_category_id == existing_category_id: # Get category name for display category_name = getattr(complaint, 'category', None) if category_name: return True, f"Same category: {category_name.name_en}" return True, "Same category" return False, None def check_for_duplicates(complaint_data, hospital_id, exclude_id=None): """ Convenience function to check for duplicate complaints. Args: complaint_data: Dict with complaint fields hospital_id: Hospital ID to search within exclude_id: Optional complaint ID to exclude Returns: List of potential duplicates with scores """ detector = ComplaintDuplicateDetector(complaint_data) return detector.find_duplicates(hospital_id, exclude_id=exclude_id)