From 57680c37728a74ee05a69404c1ca14b28d037a44 Mon Sep 17 00:00:00 2001 From: Felipe Cardoso Date: Mon, 5 Jan 2026 11:00:53 +0100 Subject: [PATCH] feat(memory): implement metrics and observability (#100) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add comprehensive metrics collector for memory system with: - Counter metrics: operations, retrievals, cache hits/misses, consolidations, episodes recorded, patterns/anomalies/insights detected - Gauge metrics: item counts, memory size, cache size, procedure success rates, active sessions, pending consolidations - Histogram metrics: working memory latency, retrieval latency, consolidation duration, embedding latency - Prometheus format export - Summary and cache stats helpers 31 tests covering all metric types, singleton pattern, and edge cases. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../app/services/memory/metrics/__init__.py | 18 + .../app/services/memory/metrics/collector.py | 539 ++++++++++++++++++ .../unit/services/memory/metrics/__init__.py | 2 + .../services/memory/metrics/test_collector.py | 470 +++++++++++++++ 4 files changed, 1029 insertions(+) create mode 100644 backend/app/services/memory/metrics/__init__.py create mode 100644 backend/app/services/memory/metrics/collector.py create mode 100644 backend/tests/unit/services/memory/metrics/__init__.py create mode 100644 backend/tests/unit/services/memory/metrics/test_collector.py diff --git a/backend/app/services/memory/metrics/__init__.py b/backend/app/services/memory/metrics/__init__.py new file mode 100644 index 0000000..776dbaa --- /dev/null +++ b/backend/app/services/memory/metrics/__init__.py @@ -0,0 +1,18 @@ +# app/services/memory/metrics/__init__.py +"""Memory Metrics module.""" + +from .collector import ( + MemoryMetrics, + get_memory_metrics, + record_memory_operation, + record_retrieval, + reset_memory_metrics, +) + +__all__ = [ + "MemoryMetrics", + "get_memory_metrics", + "record_memory_operation", + "record_retrieval", + "reset_memory_metrics", +] diff --git a/backend/app/services/memory/metrics/collector.py b/backend/app/services/memory/metrics/collector.py new file mode 100644 index 0000000..c959b1d --- /dev/null +++ b/backend/app/services/memory/metrics/collector.py @@ -0,0 +1,539 @@ +# app/services/memory/metrics/collector.py +""" +Memory Metrics Collector + +Collects and exposes metrics for the memory system. +""" + +import asyncio +import logging +from collections import Counter, defaultdict +from dataclasses import dataclass, field +from datetime import UTC, datetime +from enum import Enum +from typing import Any + +logger = logging.getLogger(__name__) + + +class MetricType(str, Enum): + """Types of metrics.""" + + COUNTER = "counter" + GAUGE = "gauge" + HISTOGRAM = "histogram" + + +@dataclass +class MetricValue: + """A single metric value.""" + + name: str + metric_type: MetricType + value: float + labels: dict[str, str] = field(default_factory=dict) + timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) + + +@dataclass +class HistogramBucket: + """Histogram bucket for distribution metrics.""" + + le: float # Less than or equal + count: int = 0 + + +class MemoryMetrics: + """ + Collects memory system metrics. + + Metrics tracked: + - Memory operations (get/set/delete by type and scope) + - Retrieval operations and latencies + - Memory item counts by type + - Consolidation operations and durations + - Cache hit/miss rates + - Procedure success rates + - Embedding operations + """ + + def __init__(self) -> None: + """Initialize MemoryMetrics.""" + self._counters: dict[str, Counter[str]] = defaultdict(Counter) + self._gauges: dict[str, dict[str, float]] = defaultdict(dict) + self._histograms: dict[str, list[float]] = defaultdict(list) + self._histogram_buckets: dict[str, list[HistogramBucket]] = {} + self._lock = asyncio.Lock() + + # Initialize histogram buckets + self._init_histogram_buckets() + + def _init_histogram_buckets(self) -> None: + """Initialize histogram buckets for latency metrics.""" + # Fast operations (working memory) + fast_buckets = [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, float("inf")] + + # Normal operations (retrieval) + normal_buckets = [0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, float("inf")] + + # Slow operations (consolidation) + slow_buckets = [0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, float("inf")] + + self._histogram_buckets["memory_working_latency_seconds"] = [ + HistogramBucket(le=b) for b in fast_buckets + ] + self._histogram_buckets["memory_retrieval_latency_seconds"] = [ + HistogramBucket(le=b) for b in normal_buckets + ] + self._histogram_buckets["memory_consolidation_duration_seconds"] = [ + HistogramBucket(le=b) for b in slow_buckets + ] + self._histogram_buckets["memory_embedding_latency_seconds"] = [ + HistogramBucket(le=b) for b in normal_buckets + ] + + # Counter methods - Operations + + async def inc_operations( + self, + operation: str, + memory_type: str, + scope: str | None = None, + success: bool = True, + ) -> None: + """Increment memory operation counter.""" + async with self._lock: + labels = f"operation={operation},memory_type={memory_type}" + if scope: + labels += f",scope={scope}" + labels += f",success={str(success).lower()}" + self._counters["memory_operations_total"][labels] += 1 + + async def inc_retrieval( + self, + memory_type: str, + strategy: str, + results_count: int, + ) -> None: + """Increment retrieval counter.""" + async with self._lock: + labels = f"memory_type={memory_type},strategy={strategy}" + self._counters["memory_retrievals_total"][labels] += 1 + + # Track result counts as a separate metric + self._counters["memory_retrieval_results_total"][labels] += results_count + + async def inc_cache_hit(self, cache_type: str) -> None: + """Increment cache hit counter.""" + async with self._lock: + labels = f"cache_type={cache_type}" + self._counters["memory_cache_hits_total"][labels] += 1 + + async def inc_cache_miss(self, cache_type: str) -> None: + """Increment cache miss counter.""" + async with self._lock: + labels = f"cache_type={cache_type}" + self._counters["memory_cache_misses_total"][labels] += 1 + + async def inc_consolidation( + self, + consolidation_type: str, + success: bool = True, + ) -> None: + """Increment consolidation counter.""" + async with self._lock: + labels = f"type={consolidation_type},success={str(success).lower()}" + self._counters["memory_consolidations_total"][labels] += 1 + + async def inc_procedure_execution( + self, + procedure_id: str | None = None, + success: bool = True, + ) -> None: + """Increment procedure execution counter.""" + async with self._lock: + labels = f"success={str(success).lower()}" + self._counters["memory_procedure_executions_total"][labels] += 1 + + async def inc_embeddings_generated(self, memory_type: str) -> None: + """Increment embeddings generated counter.""" + async with self._lock: + labels = f"memory_type={memory_type}" + self._counters["memory_embeddings_generated_total"][labels] += 1 + + async def inc_fact_reinforcements(self) -> None: + """Increment fact reinforcement counter.""" + async with self._lock: + self._counters["memory_fact_reinforcements_total"][""] += 1 + + async def inc_episodes_recorded(self, outcome: str) -> None: + """Increment episodes recorded counter.""" + async with self._lock: + labels = f"outcome={outcome}" + self._counters["memory_episodes_recorded_total"][labels] += 1 + + async def inc_anomalies_detected(self, anomaly_type: str) -> None: + """Increment anomaly detection counter.""" + async with self._lock: + labels = f"anomaly_type={anomaly_type}" + self._counters["memory_anomalies_detected_total"][labels] += 1 + + async def inc_patterns_detected(self, pattern_type: str) -> None: + """Increment pattern detection counter.""" + async with self._lock: + labels = f"pattern_type={pattern_type}" + self._counters["memory_patterns_detected_total"][labels] += 1 + + async def inc_insights_generated(self, insight_type: str) -> None: + """Increment insight generation counter.""" + async with self._lock: + labels = f"insight_type={insight_type}" + self._counters["memory_insights_generated_total"][labels] += 1 + + # Gauge methods + + async def set_memory_items_count( + self, + memory_type: str, + scope: str, + count: int, + ) -> None: + """Set memory item count gauge.""" + async with self._lock: + labels = f"memory_type={memory_type},scope={scope}" + self._gauges["memory_items_count"][labels] = float(count) + + async def set_memory_size_bytes( + self, + memory_type: str, + scope: str, + size_bytes: int, + ) -> None: + """Set memory size gauge in bytes.""" + async with self._lock: + labels = f"memory_type={memory_type},scope={scope}" + self._gauges["memory_size_bytes"][labels] = float(size_bytes) + + async def set_cache_size(self, cache_type: str, size: int) -> None: + """Set cache size gauge.""" + async with self._lock: + labels = f"cache_type={cache_type}" + self._gauges["memory_cache_size"][labels] = float(size) + + async def set_procedure_success_rate( + self, + procedure_name: str, + rate: float, + ) -> None: + """Set procedure success rate gauge (0-1).""" + async with self._lock: + labels = f"procedure_name={procedure_name}" + self._gauges["memory_procedure_success_rate"][labels] = rate + + async def set_active_sessions(self, count: int) -> None: + """Set active working memory sessions gauge.""" + async with self._lock: + self._gauges["memory_active_sessions"][""] = float(count) + + async def set_pending_consolidations(self, count: int) -> None: + """Set pending consolidations gauge.""" + async with self._lock: + self._gauges["memory_pending_consolidations"][""] = float(count) + + # Histogram methods + + async def observe_working_latency(self, latency_seconds: float) -> None: + """Observe working memory operation latency.""" + async with self._lock: + self._observe_histogram("memory_working_latency_seconds", latency_seconds) + + async def observe_retrieval_latency(self, latency_seconds: float) -> None: + """Observe retrieval latency.""" + async with self._lock: + self._observe_histogram("memory_retrieval_latency_seconds", latency_seconds) + + async def observe_consolidation_duration(self, duration_seconds: float) -> None: + """Observe consolidation duration.""" + async with self._lock: + self._observe_histogram( + "memory_consolidation_duration_seconds", duration_seconds + ) + + async def observe_embedding_latency(self, latency_seconds: float) -> None: + """Observe embedding generation latency.""" + async with self._lock: + self._observe_histogram("memory_embedding_latency_seconds", latency_seconds) + + def _observe_histogram(self, name: str, value: float) -> None: + """Record a value in a histogram.""" + self._histograms[name].append(value) + + # Update buckets + if name in self._histogram_buckets: + for bucket in self._histogram_buckets[name]: + if value <= bucket.le: + bucket.count += 1 + + # Export methods + + async def get_all_metrics(self) -> list[MetricValue]: + """Get all metrics as MetricValue objects.""" + metrics: list[MetricValue] = [] + + async with self._lock: + # Export counters + for name, counter in self._counters.items(): + for labels_str, value in counter.items(): + labels = self._parse_labels(labels_str) + metrics.append( + MetricValue( + name=name, + metric_type=MetricType.COUNTER, + value=float(value), + labels=labels, + ) + ) + + # Export gauges + for name, gauge_dict in self._gauges.items(): + for labels_str, gauge_value in gauge_dict.items(): + gauge_labels = self._parse_labels(labels_str) + metrics.append( + MetricValue( + name=name, + metric_type=MetricType.GAUGE, + value=gauge_value, + labels=gauge_labels, + ) + ) + + # Export histogram summaries + for name, values in self._histograms.items(): + if values: + metrics.append( + MetricValue( + name=f"{name}_count", + metric_type=MetricType.COUNTER, + value=float(len(values)), + ) + ) + metrics.append( + MetricValue( + name=f"{name}_sum", + metric_type=MetricType.COUNTER, + value=sum(values), + ) + ) + + return metrics + + async def get_prometheus_format(self) -> str: + """Export metrics in Prometheus text format.""" + lines: list[str] = [] + + async with self._lock: + # Export counters + for name, counter in self._counters.items(): + lines.append(f"# TYPE {name} counter") + for labels_str, value in counter.items(): + if labels_str: + lines.append(f"{name}{{{labels_str}}} {value}") + else: + lines.append(f"{name} {value}") + + # Export gauges + for name, gauge_dict in self._gauges.items(): + lines.append(f"# TYPE {name} gauge") + for labels_str, gauge_value in gauge_dict.items(): + if labels_str: + lines.append(f"{name}{{{labels_str}}} {gauge_value}") + else: + lines.append(f"{name} {gauge_value}") + + # Export histograms + for name, buckets in self._histogram_buckets.items(): + lines.append(f"# TYPE {name} histogram") + for bucket in buckets: + le_str = "+Inf" if bucket.le == float("inf") else str(bucket.le) + lines.append(f'{name}_bucket{{le="{le_str}"}} {bucket.count}') + + if name in self._histograms: + values = self._histograms[name] + lines.append(f"{name}_count {len(values)}") + lines.append(f"{name}_sum {sum(values)}") + + return "\n".join(lines) + + async def get_summary(self) -> dict[str, Any]: + """Get a summary of key metrics.""" + async with self._lock: + total_operations = sum(self._counters["memory_operations_total"].values()) + successful_operations = sum( + v + for k, v in self._counters["memory_operations_total"].items() + if "success=true" in k + ) + + total_retrievals = sum(self._counters["memory_retrievals_total"].values()) + + total_cache_hits = sum(self._counters["memory_cache_hits_total"].values()) + total_cache_misses = sum( + self._counters["memory_cache_misses_total"].values() + ) + cache_hit_rate = ( + total_cache_hits / (total_cache_hits + total_cache_misses) + if (total_cache_hits + total_cache_misses) > 0 + else 0.0 + ) + + total_consolidations = sum( + self._counters["memory_consolidations_total"].values() + ) + + total_episodes = sum( + self._counters["memory_episodes_recorded_total"].values() + ) + + # Calculate average latencies + retrieval_latencies = self._histograms.get( + "memory_retrieval_latency_seconds", [] + ) + avg_retrieval_latency = ( + sum(retrieval_latencies) / len(retrieval_latencies) + if retrieval_latencies + else 0.0 + ) + + return { + "total_operations": total_operations, + "successful_operations": successful_operations, + "operation_success_rate": ( + successful_operations / total_operations + if total_operations > 0 + else 1.0 + ), + "total_retrievals": total_retrievals, + "cache_hit_rate": cache_hit_rate, + "total_consolidations": total_consolidations, + "total_episodes_recorded": total_episodes, + "avg_retrieval_latency_ms": avg_retrieval_latency * 1000, + "patterns_detected": sum( + self._counters["memory_patterns_detected_total"].values() + ), + "insights_generated": sum( + self._counters["memory_insights_generated_total"].values() + ), + "anomalies_detected": sum( + self._counters["memory_anomalies_detected_total"].values() + ), + "active_sessions": self._gauges.get("memory_active_sessions", {}).get( + "", 0 + ), + "pending_consolidations": self._gauges.get( + "memory_pending_consolidations", {} + ).get("", 0), + } + + async def get_cache_stats(self) -> dict[str, Any]: + """Get detailed cache statistics.""" + async with self._lock: + stats: dict[str, Any] = {} + + # Get hits/misses by cache type + for labels_str, hits in self._counters["memory_cache_hits_total"].items(): + cache_type = self._parse_labels(labels_str).get( + "cache_type", "unknown" + ) + if cache_type not in stats: + stats[cache_type] = {"hits": 0, "misses": 0} + stats[cache_type]["hits"] = hits + + for labels_str, misses in self._counters[ + "memory_cache_misses_total" + ].items(): + cache_type = self._parse_labels(labels_str).get( + "cache_type", "unknown" + ) + if cache_type not in stats: + stats[cache_type] = {"hits": 0, "misses": 0} + stats[cache_type]["misses"] = misses + + # Calculate hit rates + for data in stats.values(): + total = data["hits"] + data["misses"] + data["hit_rate"] = data["hits"] / total if total > 0 else 0.0 + data["total"] = total + + return stats + + async def reset(self) -> None: + """Reset all metrics.""" + async with self._lock: + self._counters.clear() + self._gauges.clear() + self._histograms.clear() + self._init_histogram_buckets() + + def _parse_labels(self, labels_str: str) -> dict[str, str]: + """Parse labels string into dictionary.""" + if not labels_str: + return {} + + labels = {} + for pair in labels_str.split(","): + if "=" in pair: + key, value = pair.split("=", 1) + labels[key.strip()] = value.strip() + + return labels + + +# Singleton instance +_metrics: MemoryMetrics | None = None +_lock = asyncio.Lock() + + +async def get_memory_metrics() -> MemoryMetrics: + """Get the singleton MemoryMetrics instance.""" + global _metrics + + async with _lock: + if _metrics is None: + _metrics = MemoryMetrics() + return _metrics + + +def reset_memory_metrics() -> None: + """Reset the singleton instance (for testing).""" + global _metrics + _metrics = None + + +# Convenience functions + + +async def record_memory_operation( + operation: str, + memory_type: str, + scope: str | None = None, + success: bool = True, + latency_ms: float | None = None, +) -> None: + """Record a memory operation.""" + metrics = await get_memory_metrics() + await metrics.inc_operations(operation, memory_type, scope, success) + + if latency_ms is not None and memory_type == "working": + await metrics.observe_working_latency(latency_ms / 1000) + + +async def record_retrieval( + memory_type: str, + strategy: str, + results_count: int, + latency_ms: float, +) -> None: + """Record a retrieval operation.""" + metrics = await get_memory_metrics() + await metrics.inc_retrieval(memory_type, strategy, results_count) + await metrics.observe_retrieval_latency(latency_ms / 1000) diff --git a/backend/tests/unit/services/memory/metrics/__init__.py b/backend/tests/unit/services/memory/metrics/__init__.py new file mode 100644 index 0000000..87d189b --- /dev/null +++ b/backend/tests/unit/services/memory/metrics/__init__.py @@ -0,0 +1,2 @@ +# tests/unit/services/memory/metrics/__init__.py +"""Tests for Memory Metrics.""" diff --git a/backend/tests/unit/services/memory/metrics/test_collector.py b/backend/tests/unit/services/memory/metrics/test_collector.py new file mode 100644 index 0000000..d240096 --- /dev/null +++ b/backend/tests/unit/services/memory/metrics/test_collector.py @@ -0,0 +1,470 @@ +# tests/unit/services/memory/metrics/test_collector.py +"""Tests for Memory Metrics Collector.""" + +import pytest + +from app.services.memory.metrics.collector import ( + MemoryMetrics, + MetricType, + MetricValue, + get_memory_metrics, + record_memory_operation, + record_retrieval, + reset_memory_metrics, +) + + +@pytest.fixture +def metrics() -> MemoryMetrics: + """Create a fresh metrics instance for each test.""" + return MemoryMetrics() + + +@pytest.fixture(autouse=True) +def reset_singleton() -> None: + """Reset singleton before each test.""" + reset_memory_metrics() + + +class TestMemoryMetrics: + """Tests for MemoryMetrics class.""" + + @pytest.mark.asyncio + async def test_inc_operations(self, metrics: MemoryMetrics) -> None: + """Should increment operation counters.""" + await metrics.inc_operations("get", "working", "session", True) + await metrics.inc_operations("get", "working", "session", True) + await metrics.inc_operations("set", "working", "session", True) + + summary = await metrics.get_summary() + assert summary["total_operations"] == 3 + assert summary["successful_operations"] == 3 + + @pytest.mark.asyncio + async def test_inc_operations_failure(self, metrics: MemoryMetrics) -> None: + """Should track failed operations.""" + await metrics.inc_operations("get", "working", None, True) + await metrics.inc_operations("get", "working", None, False) + + summary = await metrics.get_summary() + assert summary["total_operations"] == 2 + assert summary["successful_operations"] == 1 + assert summary["operation_success_rate"] == 0.5 + + @pytest.mark.asyncio + async def test_inc_retrieval(self, metrics: MemoryMetrics) -> None: + """Should increment retrieval counters.""" + await metrics.inc_retrieval("episodic", "similarity", 5) + await metrics.inc_retrieval("episodic", "temporal", 3) + await metrics.inc_retrieval("semantic", "similarity", 10) + + summary = await metrics.get_summary() + assert summary["total_retrievals"] == 3 + + @pytest.mark.asyncio + async def test_cache_hit_miss(self, metrics: MemoryMetrics) -> None: + """Should track cache hits and misses.""" + await metrics.inc_cache_hit("hot") + await metrics.inc_cache_hit("hot") + await metrics.inc_cache_hit("hot") + await metrics.inc_cache_miss("hot") + + summary = await metrics.get_summary() + assert summary["cache_hit_rate"] == 0.75 + + @pytest.mark.asyncio + async def test_cache_stats(self, metrics: MemoryMetrics) -> None: + """Should provide detailed cache stats.""" + await metrics.inc_cache_hit("hot") + await metrics.inc_cache_hit("hot") + await metrics.inc_cache_miss("hot") + await metrics.inc_cache_hit("embedding") + await metrics.inc_cache_miss("embedding") + await metrics.inc_cache_miss("embedding") + + stats = await metrics.get_cache_stats() + + assert stats["hot"]["hits"] == 2 + assert stats["hot"]["misses"] == 1 + assert stats["hot"]["hit_rate"] == pytest.approx(0.6667, rel=0.01) + + assert stats["embedding"]["hits"] == 1 + assert stats["embedding"]["misses"] == 2 + assert stats["embedding"]["hit_rate"] == pytest.approx(0.3333, rel=0.01) + + @pytest.mark.asyncio + async def test_inc_consolidation(self, metrics: MemoryMetrics) -> None: + """Should increment consolidation counter.""" + await metrics.inc_consolidation("working_to_episodic", True) + await metrics.inc_consolidation("episodic_to_semantic", True) + await metrics.inc_consolidation("prune", False) + + summary = await metrics.get_summary() + assert summary["total_consolidations"] == 3 + + @pytest.mark.asyncio + async def test_inc_episodes_recorded(self, metrics: MemoryMetrics) -> None: + """Should track episodes by outcome.""" + await metrics.inc_episodes_recorded("success") + await metrics.inc_episodes_recorded("success") + await metrics.inc_episodes_recorded("failure") + + summary = await metrics.get_summary() + assert summary["total_episodes_recorded"] == 3 + + @pytest.mark.asyncio + async def test_inc_patterns_insights_anomalies( + self, metrics: MemoryMetrics + ) -> None: + """Should track reflection metrics.""" + await metrics.inc_patterns_detected("recurring_success") + await metrics.inc_patterns_detected("action_sequence") + await metrics.inc_insights_generated("optimization") + await metrics.inc_anomalies_detected("unusual_duration") + + summary = await metrics.get_summary() + assert summary["patterns_detected"] == 2 + assert summary["insights_generated"] == 1 + assert summary["anomalies_detected"] == 1 + + @pytest.mark.asyncio + async def test_set_memory_items_count(self, metrics: MemoryMetrics) -> None: + """Should set memory item count gauge.""" + await metrics.set_memory_items_count("episodic", "project", 100) + await metrics.set_memory_items_count("semantic", "project", 50) + + all_metrics = await metrics.get_all_metrics() + gauge_metrics = [ + m for m in all_metrics if m.name == "memory_items_count" + ] + + assert len(gauge_metrics) == 2 + + @pytest.mark.asyncio + async def test_set_memory_size_bytes(self, metrics: MemoryMetrics) -> None: + """Should set memory size gauge.""" + await metrics.set_memory_size_bytes("working", "session", 1024) + + all_metrics = await metrics.get_all_metrics() + size_metrics = [m for m in all_metrics if m.name == "memory_size_bytes"] + + assert len(size_metrics) == 1 + assert size_metrics[0].value == 1024.0 + + @pytest.mark.asyncio + async def test_set_procedure_success_rate(self, metrics: MemoryMetrics) -> None: + """Should set procedure success rate gauge.""" + await metrics.set_procedure_success_rate("code_review", 0.85) + + all_metrics = await metrics.get_all_metrics() + rate_metrics = [ + m for m in all_metrics if m.name == "memory_procedure_success_rate" + ] + + assert len(rate_metrics) == 1 + assert rate_metrics[0].value == 0.85 + + @pytest.mark.asyncio + async def test_set_active_sessions(self, metrics: MemoryMetrics) -> None: + """Should set active sessions gauge.""" + await metrics.set_active_sessions(5) + + summary = await metrics.get_summary() + assert summary["active_sessions"] == 5 + + @pytest.mark.asyncio + async def test_observe_working_latency(self, metrics: MemoryMetrics) -> None: + """Should record working memory latency histogram.""" + await metrics.observe_working_latency(0.005) # 5ms + await metrics.observe_working_latency(0.003) # 3ms + await metrics.observe_working_latency(0.010) # 10ms + + all_metrics = await metrics.get_all_metrics() + count_metric = next( + (m for m in all_metrics if m.name == "memory_working_latency_seconds_count"), + None, + ) + sum_metric = next( + (m for m in all_metrics if m.name == "memory_working_latency_seconds_sum"), + None, + ) + + assert count_metric is not None + assert count_metric.value == 3 + assert sum_metric is not None + assert sum_metric.value == pytest.approx(0.018, rel=0.01) + + @pytest.mark.asyncio + async def test_observe_retrieval_latency(self, metrics: MemoryMetrics) -> None: + """Should record retrieval latency histogram.""" + await metrics.observe_retrieval_latency(0.050) # 50ms + await metrics.observe_retrieval_latency(0.075) # 75ms + + summary = await metrics.get_summary() + assert summary["avg_retrieval_latency_ms"] == pytest.approx(62.5, rel=0.01) + + @pytest.mark.asyncio + async def test_observe_consolidation_duration( + self, metrics: MemoryMetrics + ) -> None: + """Should record consolidation duration histogram.""" + await metrics.observe_consolidation_duration(5.0) + await metrics.observe_consolidation_duration(10.0) + + all_metrics = await metrics.get_all_metrics() + count_metric = next( + ( + m + for m in all_metrics + if m.name == "memory_consolidation_duration_seconds_count" + ), + None, + ) + + assert count_metric is not None + assert count_metric.value == 2 + + @pytest.mark.asyncio + async def test_get_all_metrics(self, metrics: MemoryMetrics) -> None: + """Should return all metrics as MetricValue objects.""" + await metrics.inc_operations("get", "working", None, True) + await metrics.set_active_sessions(3) + await metrics.observe_retrieval_latency(0.05) + + all_metrics = await metrics.get_all_metrics() + + assert len(all_metrics) >= 3 + + # Check we have different metric types + counter_metrics = [m for m in all_metrics if m.metric_type == MetricType.COUNTER] + gauge_metrics = [m for m in all_metrics if m.metric_type == MetricType.GAUGE] + + assert len(counter_metrics) >= 1 + assert len(gauge_metrics) >= 1 + + @pytest.mark.asyncio + async def test_get_prometheus_format(self, metrics: MemoryMetrics) -> None: + """Should export metrics in Prometheus format.""" + await metrics.inc_operations("get", "working", "session", True) + await metrics.set_active_sessions(5) + + prometheus_output = await metrics.get_prometheus_format() + + assert "# TYPE memory_operations_total counter" in prometheus_output + assert "memory_operations_total{" in prometheus_output + assert "# TYPE memory_active_sessions gauge" in prometheus_output + assert "memory_active_sessions 5" in prometheus_output + + @pytest.mark.asyncio + async def test_get_summary(self, metrics: MemoryMetrics) -> None: + """Should return summary dictionary.""" + await metrics.inc_operations("get", "working", None, True) + await metrics.inc_retrieval("episodic", "similarity", 5) + await metrics.inc_cache_hit("hot") + await metrics.inc_consolidation("prune", True) + + summary = await metrics.get_summary() + + assert "total_operations" in summary + assert "total_retrievals" in summary + assert "cache_hit_rate" in summary + assert "total_consolidations" in summary + assert "operation_success_rate" in summary + + @pytest.mark.asyncio + async def test_reset(self, metrics: MemoryMetrics) -> None: + """Should reset all metrics.""" + await metrics.inc_operations("get", "working", None, True) + await metrics.set_active_sessions(5) + await metrics.observe_retrieval_latency(0.05) + + await metrics.reset() + + summary = await metrics.get_summary() + assert summary["total_operations"] == 0 + assert summary["active_sessions"] == 0 + + +class TestMetricValue: + """Tests for MetricValue dataclass.""" + + def test_creates_metric_value(self) -> None: + """Should create metric value with defaults.""" + metric = MetricValue( + name="test_metric", + metric_type=MetricType.COUNTER, + value=42.0, + ) + + assert metric.name == "test_metric" + assert metric.metric_type == MetricType.COUNTER + assert metric.value == 42.0 + assert metric.labels == {} + assert metric.timestamp is not None + + def test_creates_metric_value_with_labels(self) -> None: + """Should create metric value with labels.""" + metric = MetricValue( + name="test_metric", + metric_type=MetricType.GAUGE, + value=100.0, + labels={"scope": "project", "type": "episodic"}, + ) + + assert metric.labels == {"scope": "project", "type": "episodic"} + + +class TestSingleton: + """Tests for singleton pattern.""" + + @pytest.mark.asyncio + async def test_get_memory_metrics_singleton(self) -> None: + """Should return same instance.""" + metrics1 = await get_memory_metrics() + metrics2 = await get_memory_metrics() + + assert metrics1 is metrics2 + + @pytest.mark.asyncio + async def test_reset_singleton(self) -> None: + """Should reset singleton instance.""" + metrics1 = await get_memory_metrics() + await metrics1.inc_operations("get", "working", None, True) + + reset_memory_metrics() + + metrics2 = await get_memory_metrics() + summary = await metrics2.get_summary() + + assert metrics1 is not metrics2 + assert summary["total_operations"] == 0 + + +class TestConvenienceFunctions: + """Tests for convenience functions.""" + + @pytest.mark.asyncio + async def test_record_memory_operation(self) -> None: + """Should record memory operation.""" + await record_memory_operation( + operation="get", + memory_type="working", + scope="session", + success=True, + latency_ms=5.0, + ) + + metrics = await get_memory_metrics() + summary = await metrics.get_summary() + + assert summary["total_operations"] == 1 + + @pytest.mark.asyncio + async def test_record_retrieval(self) -> None: + """Should record retrieval operation.""" + await record_retrieval( + memory_type="episodic", + strategy="similarity", + results_count=10, + latency_ms=50.0, + ) + + metrics = await get_memory_metrics() + summary = await metrics.get_summary() + + assert summary["total_retrievals"] == 1 + assert summary["avg_retrieval_latency_ms"] == pytest.approx(50.0, rel=0.01) + + +class TestHistogramBuckets: + """Tests for histogram bucket behavior.""" + + @pytest.mark.asyncio + async def test_histogram_buckets_populated(self, metrics: MemoryMetrics) -> None: + """Should populate histogram buckets correctly.""" + # Add values to different buckets + await metrics.observe_retrieval_latency(0.005) # <= 0.01 + await metrics.observe_retrieval_latency(0.030) # <= 0.05 + await metrics.observe_retrieval_latency(0.080) # <= 0.1 + await metrics.observe_retrieval_latency(0.500) # <= 0.5 + await metrics.observe_retrieval_latency(2.000) # <= 2.5 + + prometheus_output = await metrics.get_prometheus_format() + + # Check that histogram buckets are in output + assert "memory_retrieval_latency_seconds_bucket" in prometheus_output + assert 'le="0.01"' in prometheus_output + assert 'le="+Inf"' in prometheus_output + + @pytest.mark.asyncio + async def test_histogram_count_and_sum(self, metrics: MemoryMetrics) -> None: + """Should track histogram count and sum.""" + await metrics.observe_retrieval_latency(0.1) + await metrics.observe_retrieval_latency(0.2) + await metrics.observe_retrieval_latency(0.3) + + prometheus_output = await metrics.get_prometheus_format() + + assert "memory_retrieval_latency_seconds_count 3" in prometheus_output + assert "memory_retrieval_latency_seconds_sum 0.6" in prometheus_output + + +class TestLabelParsing: + """Tests for label parsing.""" + + @pytest.mark.asyncio + async def test_parse_labels_in_output(self, metrics: MemoryMetrics) -> None: + """Should correctly parse labels in output.""" + await metrics.inc_operations("get", "episodic", "project", True) + + all_metrics = await metrics.get_all_metrics() + op_metric = next( + (m for m in all_metrics if m.name == "memory_operations_total"), None + ) + + assert op_metric is not None + assert op_metric.labels["operation"] == "get" + assert op_metric.labels["memory_type"] == "episodic" + assert op_metric.labels["scope"] == "project" + assert op_metric.labels["success"] == "true" + + +class TestEdgeCases: + """Tests for edge cases.""" + + @pytest.mark.asyncio + async def test_empty_metrics(self, metrics: MemoryMetrics) -> None: + """Should handle empty metrics gracefully.""" + summary = await metrics.get_summary() + + assert summary["total_operations"] == 0 + assert summary["operation_success_rate"] == 1.0 # Default when no ops + assert summary["cache_hit_rate"] == 0.0 + assert summary["avg_retrieval_latency_ms"] == 0.0 + + @pytest.mark.asyncio + async def test_concurrent_operations(self, metrics: MemoryMetrics) -> None: + """Should handle concurrent operations safely.""" + import asyncio + + async def increment_ops() -> None: + for _ in range(100): + await metrics.inc_operations("get", "working", None, True) + + # Run multiple concurrent tasks + await asyncio.gather( + increment_ops(), + increment_ops(), + increment_ops(), + ) + + summary = await metrics.get_summary() + assert summary["total_operations"] == 300 + + @pytest.mark.asyncio + async def test_prometheus_format_empty(self, metrics: MemoryMetrics) -> None: + """Should return valid format with no metrics.""" + prometheus_output = await metrics.get_prometheus_format() + + # Should just have histogram bucket definitions + assert "# TYPE memory_retrieval_latency_seconds histogram" in prometheus_output