# app/services/memory/procedural/matching.py """ Procedure Matching. Provides utilities for matching procedures to contexts, ranking procedures by relevance, and suggesting procedures. """ import logging import re from dataclasses import dataclass, field from typing import Any, ClassVar from app.services.memory.types import Procedure logger = logging.getLogger(__name__) @dataclass class MatchResult: """Result of a procedure match.""" procedure: Procedure score: float matched_terms: list[str] = field(default_factory=list) match_type: str = "keyword" # keyword, semantic, pattern def to_dict(self) -> dict[str, Any]: """Convert to dictionary.""" return { "procedure_id": str(self.procedure.id), "procedure_name": self.procedure.name, "score": self.score, "matched_terms": self.matched_terms, "match_type": self.match_type, "success_rate": self.procedure.success_rate, } @dataclass class MatchContext: """Context for procedure matching.""" query: str task_type: str | None = None project_id: Any | None = None agent_type_id: Any | None = None max_results: int = 5 min_score: float = 0.3 require_success_rate: float | None = None class ProcedureMatcher: """ Matches procedures to contexts using multiple strategies. Matching strategies: - Keyword matching on trigger pattern and name - Pattern-based matching using regex - Success rate weighting In production, this would be augmented with vector similarity search. """ # Common task-related keywords for boosting TASK_KEYWORDS: ClassVar[set[str]] = { "create", "update", "delete", "fix", "implement", "add", "remove", "refactor", "test", "deploy", "configure", "setup", "build", "debug", "optimize", } def __init__(self) -> None: """Initialize the matcher.""" self._compiled_patterns: dict[str, re.Pattern[str]] = {} def match( self, procedures: list[Procedure], context: MatchContext, ) -> list[MatchResult]: """ Match procedures against a context. Args: procedures: List of procedures to match context: Matching context Returns: List of match results, sorted by score (highest first) """ results: list[MatchResult] = [] query_terms = self._extract_terms(context.query) query_lower = context.query.lower() for procedure in procedures: score, matched = self._calculate_match_score( procedure=procedure, query_terms=query_terms, query_lower=query_lower, context=context, ) if score >= context.min_score: # Apply success rate boost if context.require_success_rate is not None: if procedure.success_rate < context.require_success_rate: continue # Boost score based on success rate success_boost = procedure.success_rate * 0.2 final_score = min(1.0, score + success_boost) results.append( MatchResult( procedure=procedure, score=final_score, matched_terms=matched, match_type="keyword", ) ) # Sort by score descending results.sort(key=lambda r: r.score, reverse=True) return results[: context.max_results] def _extract_terms(self, text: str) -> list[str]: """Extract searchable terms from text.""" # Remove special characters and split clean = re.sub(r"[^\w\s-]", " ", text.lower()) terms = clean.split() # Filter out very short terms return [t for t in terms if len(t) >= 2] def _calculate_match_score( self, procedure: Procedure, query_terms: list[str], query_lower: str, context: MatchContext, ) -> tuple[float, list[str]]: """ Calculate match score between procedure and query. Returns: Tuple of (score, matched_terms) """ score = 0.0 matched: list[str] = [] trigger_lower = procedure.trigger_pattern.lower() name_lower = procedure.name.lower() # Exact name match - high score if name_lower in query_lower or query_lower in name_lower: score += 0.5 matched.append(f"name:{procedure.name}") # Trigger pattern match if trigger_lower in query_lower or query_lower in trigger_lower: score += 0.4 matched.append(f"trigger:{procedure.trigger_pattern[:30]}") # Term-by-term matching for term in query_terms: if term in trigger_lower: score += 0.1 matched.append(term) elif term in name_lower: score += 0.08 matched.append(term) # Boost for task keywords if term in self.TASK_KEYWORDS: if term in trigger_lower or term in name_lower: score += 0.05 # Task type match if provided if context.task_type: task_type_lower = context.task_type.lower() if task_type_lower in trigger_lower or task_type_lower in name_lower: score += 0.3 matched.append(f"task_type:{context.task_type}") # Regex pattern matching on trigger try: pattern = self._get_or_compile_pattern(trigger_lower) if pattern and pattern.search(query_lower): score += 0.25 matched.append("pattern_match") except re.error: pass # Invalid regex, skip pattern matching return min(1.0, score), matched def _get_or_compile_pattern(self, pattern: str) -> re.Pattern[str] | None: """Get or compile a regex pattern with caching.""" if pattern in self._compiled_patterns: return self._compiled_patterns[pattern] # Only compile if it looks like a regex pattern if not any(c in pattern for c in r"\.*+?[]{}|()^$"): return None try: compiled = re.compile(pattern, re.IGNORECASE) self._compiled_patterns[pattern] = compiled return compiled except re.error: return None def rank_by_relevance( self, procedures: list[Procedure], task_type: str, ) -> list[Procedure]: """ Rank procedures by relevance to a task type. Args: procedures: Procedures to rank task_type: Task type for relevance Returns: Procedures sorted by relevance """ context = MatchContext( query=task_type, task_type=task_type, min_score=0.0, max_results=len(procedures), ) results = self.match(procedures, context) return [r.procedure for r in results] def suggest_procedures( self, procedures: list[Procedure], query: str, min_success_rate: float = 0.5, max_suggestions: int = 3, ) -> list[MatchResult]: """ Suggest the best procedures for a query. Only suggests procedures with sufficient success rate. Args: procedures: Available procedures query: Query/context min_success_rate: Minimum success rate to suggest max_suggestions: Maximum suggestions Returns: List of procedure suggestions """ context = MatchContext( query=query, max_results=max_suggestions, min_score=0.2, require_success_rate=min_success_rate, ) return self.match(procedures, context) # Singleton matcher instance _matcher: ProcedureMatcher | None = None def get_procedure_matcher() -> ProcedureMatcher: """Get the singleton procedure matcher instance.""" global _matcher if _matcher is None: _matcher = ProcedureMatcher() return _matcher