# app/services/memory/metrics/collector.py """ Memory Metrics Collector Collects and exposes metrics for the memory system. """ import asyncio import logging from collections import Counter, defaultdict, deque from dataclasses import dataclass, field from datetime import UTC, datetime from enum import Enum from typing import Any logger = logging.getLogger(__name__) class MetricType(str, Enum): """Types of metrics.""" COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram" @dataclass class MetricValue: """A single metric value.""" name: str metric_type: MetricType value: float labels: dict[str, str] = field(default_factory=dict) timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) @dataclass class HistogramBucket: """Histogram bucket for distribution metrics.""" le: float # Less than or equal count: int = 0 class MemoryMetrics: """ Collects memory system metrics. Metrics tracked: - Memory operations (get/set/delete by type and scope) - Retrieval operations and latencies - Memory item counts by type - Consolidation operations and durations - Cache hit/miss rates - Procedure success rates - Embedding operations """ # Maximum samples to keep in histogram (circular buffer) MAX_HISTOGRAM_SAMPLES = 10000 def __init__(self) -> None: """Initialize MemoryMetrics.""" self._counters: dict[str, Counter[str]] = defaultdict(Counter) self._gauges: dict[str, dict[str, float]] = defaultdict(dict) # Use deque with maxlen for bounded memory (circular buffer) self._histograms: dict[str, deque[float]] = defaultdict( lambda: deque(maxlen=self.MAX_HISTOGRAM_SAMPLES) ) self._histogram_buckets: dict[str, list[HistogramBucket]] = {} self._lock = asyncio.Lock() # Initialize histogram buckets self._init_histogram_buckets() def _init_histogram_buckets(self) -> None: """Initialize histogram buckets for latency metrics.""" # Fast operations (working memory) fast_buckets = [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, float("inf")] # Normal operations (retrieval) normal_buckets = [0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, float("inf")] # Slow operations (consolidation) slow_buckets = [0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, float("inf")] self._histogram_buckets["memory_working_latency_seconds"] = [ HistogramBucket(le=b) for b in fast_buckets ] self._histogram_buckets["memory_retrieval_latency_seconds"] = [ HistogramBucket(le=b) for b in normal_buckets ] self._histogram_buckets["memory_consolidation_duration_seconds"] = [ HistogramBucket(le=b) for b in slow_buckets ] self._histogram_buckets["memory_embedding_latency_seconds"] = [ HistogramBucket(le=b) for b in normal_buckets ] # Counter methods - Operations async def inc_operations( self, operation: str, memory_type: str, scope: str | None = None, success: bool = True, ) -> None: """Increment memory operation counter.""" async with self._lock: labels = f"operation={operation},memory_type={memory_type}" if scope: labels += f",scope={scope}" labels += f",success={str(success).lower()}" self._counters["memory_operations_total"][labels] += 1 async def inc_retrieval( self, memory_type: str, strategy: str, results_count: int, ) -> None: """Increment retrieval counter.""" async with self._lock: labels = f"memory_type={memory_type},strategy={strategy}" self._counters["memory_retrievals_total"][labels] += 1 # Track result counts as a separate metric self._counters["memory_retrieval_results_total"][labels] += results_count async def inc_cache_hit(self, cache_type: str) -> None: """Increment cache hit counter.""" async with self._lock: labels = f"cache_type={cache_type}" self._counters["memory_cache_hits_total"][labels] += 1 async def inc_cache_miss(self, cache_type: str) -> None: """Increment cache miss counter.""" async with self._lock: labels = f"cache_type={cache_type}" self._counters["memory_cache_misses_total"][labels] += 1 async def inc_consolidation( self, consolidation_type: str, success: bool = True, ) -> None: """Increment consolidation counter.""" async with self._lock: labels = f"type={consolidation_type},success={str(success).lower()}" self._counters["memory_consolidations_total"][labels] += 1 async def inc_procedure_execution( self, procedure_id: str | None = None, success: bool = True, ) -> None: """Increment procedure execution counter.""" async with self._lock: labels = f"success={str(success).lower()}" self._counters["memory_procedure_executions_total"][labels] += 1 async def inc_embeddings_generated(self, memory_type: str) -> None: """Increment embeddings generated counter.""" async with self._lock: labels = f"memory_type={memory_type}" self._counters["memory_embeddings_generated_total"][labels] += 1 async def inc_fact_reinforcements(self) -> None: """Increment fact reinforcement counter.""" async with self._lock: self._counters["memory_fact_reinforcements_total"][""] += 1 async def inc_episodes_recorded(self, outcome: str) -> None: """Increment episodes recorded counter.""" async with self._lock: labels = f"outcome={outcome}" self._counters["memory_episodes_recorded_total"][labels] += 1 async def inc_anomalies_detected(self, anomaly_type: str) -> None: """Increment anomaly detection counter.""" async with self._lock: labels = f"anomaly_type={anomaly_type}" self._counters["memory_anomalies_detected_total"][labels] += 1 async def inc_patterns_detected(self, pattern_type: str) -> None: """Increment pattern detection counter.""" async with self._lock: labels = f"pattern_type={pattern_type}" self._counters["memory_patterns_detected_total"][labels] += 1 async def inc_insights_generated(self, insight_type: str) -> None: """Increment insight generation counter.""" async with self._lock: labels = f"insight_type={insight_type}" self._counters["memory_insights_generated_total"][labels] += 1 # Gauge methods async def set_memory_items_count( self, memory_type: str, scope: str, count: int, ) -> None: """Set memory item count gauge.""" async with self._lock: labels = f"memory_type={memory_type},scope={scope}" self._gauges["memory_items_count"][labels] = float(count) async def set_memory_size_bytes( self, memory_type: str, scope: str, size_bytes: int, ) -> None: """Set memory size gauge in bytes.""" async with self._lock: labels = f"memory_type={memory_type},scope={scope}" self._gauges["memory_size_bytes"][labels] = float(size_bytes) async def set_cache_size(self, cache_type: str, size: int) -> None: """Set cache size gauge.""" async with self._lock: labels = f"cache_type={cache_type}" self._gauges["memory_cache_size"][labels] = float(size) async def set_procedure_success_rate( self, procedure_name: str, rate: float, ) -> None: """Set procedure success rate gauge (0-1).""" async with self._lock: labels = f"procedure_name={procedure_name}" self._gauges["memory_procedure_success_rate"][labels] = rate async def set_active_sessions(self, count: int) -> None: """Set active working memory sessions gauge.""" async with self._lock: self._gauges["memory_active_sessions"][""] = float(count) async def set_pending_consolidations(self, count: int) -> None: """Set pending consolidations gauge.""" async with self._lock: self._gauges["memory_pending_consolidations"][""] = float(count) # Histogram methods async def observe_working_latency(self, latency_seconds: float) -> None: """Observe working memory operation latency.""" async with self._lock: self._observe_histogram("memory_working_latency_seconds", latency_seconds) async def observe_retrieval_latency(self, latency_seconds: float) -> None: """Observe retrieval latency.""" async with self._lock: self._observe_histogram("memory_retrieval_latency_seconds", latency_seconds) async def observe_consolidation_duration(self, duration_seconds: float) -> None: """Observe consolidation duration.""" async with self._lock: self._observe_histogram( "memory_consolidation_duration_seconds", duration_seconds ) async def observe_embedding_latency(self, latency_seconds: float) -> None: """Observe embedding generation latency.""" async with self._lock: self._observe_histogram("memory_embedding_latency_seconds", latency_seconds) def _observe_histogram(self, name: str, value: float) -> None: """Record a value in a histogram.""" self._histograms[name].append(value) # Update buckets if name in self._histogram_buckets: for bucket in self._histogram_buckets[name]: if value <= bucket.le: bucket.count += 1 # Export methods async def get_all_metrics(self) -> list[MetricValue]: """Get all metrics as MetricValue objects.""" metrics: list[MetricValue] = [] async with self._lock: # Export counters for name, counter in self._counters.items(): for labels_str, value in counter.items(): labels = self._parse_labels(labels_str) metrics.append( MetricValue( name=name, metric_type=MetricType.COUNTER, value=float(value), labels=labels, ) ) # Export gauges for name, gauge_dict in self._gauges.items(): for labels_str, gauge_value in gauge_dict.items(): gauge_labels = self._parse_labels(labels_str) metrics.append( MetricValue( name=name, metric_type=MetricType.GAUGE, value=gauge_value, labels=gauge_labels, ) ) # Export histogram summaries for name, values in self._histograms.items(): if values: metrics.append( MetricValue( name=f"{name}_count", metric_type=MetricType.COUNTER, value=float(len(values)), ) ) metrics.append( MetricValue( name=f"{name}_sum", metric_type=MetricType.COUNTER, value=sum(values), ) ) return metrics async def get_prometheus_format(self) -> str: """Export metrics in Prometheus text format.""" lines: list[str] = [] async with self._lock: # Export counters for name, counter in self._counters.items(): lines.append(f"# TYPE {name} counter") for labels_str, value in counter.items(): if labels_str: lines.append(f"{name}{{{labels_str}}} {value}") else: lines.append(f"{name} {value}") # Export gauges for name, gauge_dict in self._gauges.items(): lines.append(f"# TYPE {name} gauge") for labels_str, gauge_value in gauge_dict.items(): if labels_str: lines.append(f"{name}{{{labels_str}}} {gauge_value}") else: lines.append(f"{name} {gauge_value}") # Export histograms for name, buckets in self._histogram_buckets.items(): lines.append(f"# TYPE {name} histogram") for bucket in buckets: le_str = "+Inf" if bucket.le == float("inf") else str(bucket.le) lines.append(f'{name}_bucket{{le="{le_str}"}} {bucket.count}') if name in self._histograms: values = self._histograms[name] lines.append(f"{name}_count {len(values)}") lines.append(f"{name}_sum {sum(values)}") return "\n".join(lines) async def get_summary(self) -> dict[str, Any]: """Get a summary of key metrics.""" async with self._lock: total_operations = sum(self._counters["memory_operations_total"].values()) successful_operations = sum( v for k, v in self._counters["memory_operations_total"].items() if "success=true" in k ) total_retrievals = sum(self._counters["memory_retrievals_total"].values()) total_cache_hits = sum(self._counters["memory_cache_hits_total"].values()) total_cache_misses = sum( self._counters["memory_cache_misses_total"].values() ) cache_hit_rate = ( total_cache_hits / (total_cache_hits + total_cache_misses) if (total_cache_hits + total_cache_misses) > 0 else 0.0 ) total_consolidations = sum( self._counters["memory_consolidations_total"].values() ) total_episodes = sum( self._counters["memory_episodes_recorded_total"].values() ) # Calculate average latencies retrieval_latencies = list( self._histograms.get("memory_retrieval_latency_seconds", deque()) ) avg_retrieval_latency = ( sum(retrieval_latencies) / len(retrieval_latencies) if retrieval_latencies else 0.0 ) return { "total_operations": total_operations, "successful_operations": successful_operations, "operation_success_rate": ( successful_operations / total_operations if total_operations > 0 else 1.0 ), "total_retrievals": total_retrievals, "cache_hit_rate": cache_hit_rate, "total_consolidations": total_consolidations, "total_episodes_recorded": total_episodes, "avg_retrieval_latency_ms": avg_retrieval_latency * 1000, "patterns_detected": sum( self._counters["memory_patterns_detected_total"].values() ), "insights_generated": sum( self._counters["memory_insights_generated_total"].values() ), "anomalies_detected": sum( self._counters["memory_anomalies_detected_total"].values() ), "active_sessions": self._gauges.get("memory_active_sessions", {}).get( "", 0 ), "pending_consolidations": self._gauges.get( "memory_pending_consolidations", {} ).get("", 0), } async def get_cache_stats(self) -> dict[str, Any]: """Get detailed cache statistics.""" async with self._lock: stats: dict[str, Any] = {} # Get hits/misses by cache type for labels_str, hits in self._counters["memory_cache_hits_total"].items(): cache_type = self._parse_labels(labels_str).get("cache_type", "unknown") if cache_type not in stats: stats[cache_type] = {"hits": 0, "misses": 0} stats[cache_type]["hits"] = hits for labels_str, misses in self._counters[ "memory_cache_misses_total" ].items(): cache_type = self._parse_labels(labels_str).get("cache_type", "unknown") if cache_type not in stats: stats[cache_type] = {"hits": 0, "misses": 0} stats[cache_type]["misses"] = misses # Calculate hit rates for data in stats.values(): total = data["hits"] + data["misses"] data["hit_rate"] = data["hits"] / total if total > 0 else 0.0 data["total"] = total return stats async def reset(self) -> None: """Reset all metrics.""" async with self._lock: self._counters.clear() self._gauges.clear() self._histograms.clear() self._init_histogram_buckets() def _parse_labels(self, labels_str: str) -> dict[str, str]: """Parse labels string into dictionary.""" if not labels_str: return {} labels = {} for pair in labels_str.split(","): if "=" in pair: key, value = pair.split("=", 1) labels[key.strip()] = value.strip() return labels # Singleton instance _metrics: MemoryMetrics | None = None _lock = asyncio.Lock() async def get_memory_metrics() -> MemoryMetrics: """Get the singleton MemoryMetrics instance.""" global _metrics async with _lock: if _metrics is None: _metrics = MemoryMetrics() return _metrics async def reset_memory_metrics() -> None: """Reset the singleton instance (for testing).""" global _metrics async with _lock: _metrics = None # Convenience functions async def record_memory_operation( operation: str, memory_type: str, scope: str | None = None, success: bool = True, latency_ms: float | None = None, ) -> None: """Record a memory operation.""" metrics = await get_memory_metrics() await metrics.inc_operations(operation, memory_type, scope, success) if latency_ms is not None and memory_type == "working": await metrics.observe_working_latency(latency_ms / 1000) async def record_retrieval( memory_type: str, strategy: str, results_count: int, latency_ms: float, ) -> None: """Record a retrieval operation.""" metrics = await get_memory_metrics() await metrics.inc_retrieval(memory_type, strategy, results_count) await metrics.observe_retrieval_latency(latency_ms / 1000)