feat(memory): implement metrics and observability (#100)

Add comprehensive metrics collector for memory system with:
- Counter metrics: operations, retrievals, cache hits/misses, consolidations,
  episodes recorded, patterns/anomalies/insights detected
- Gauge metrics: item counts, memory size, cache size, procedure success rates,
  active sessions, pending consolidations
- Histogram metrics: working memory latency, retrieval latency, consolidation
  duration, embedding latency
- Prometheus format export
- Summary and cache stats helpers

31 tests covering all metric types, singleton pattern, and edge cases.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-05 11:00:53 +01:00
parent 997cfaa03a
commit 57680c3772
4 changed files with 1029 additions and 0 deletions

View File

@@ -0,0 +1,18 @@
# app/services/memory/metrics/__init__.py
"""Memory Metrics module."""
from .collector import (
MemoryMetrics,
get_memory_metrics,
record_memory_operation,
record_retrieval,
reset_memory_metrics,
)
__all__ = [
"MemoryMetrics",
"get_memory_metrics",
"record_memory_operation",
"record_retrieval",
"reset_memory_metrics",
]

View File

@@ -0,0 +1,539 @@
# app/services/memory/metrics/collector.py
"""
Memory Metrics Collector
Collects and exposes metrics for the memory system.
"""
import asyncio
import logging
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
logger = logging.getLogger(__name__)
class MetricType(str, Enum):
"""Types of metrics."""
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class MetricValue:
"""A single metric value."""
name: str
metric_type: MetricType
value: float
labels: dict[str, str] = field(default_factory=dict)
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
class HistogramBucket:
"""Histogram bucket for distribution metrics."""
le: float # Less than or equal
count: int = 0
class MemoryMetrics:
"""
Collects memory system metrics.
Metrics tracked:
- Memory operations (get/set/delete by type and scope)
- Retrieval operations and latencies
- Memory item counts by type
- Consolidation operations and durations
- Cache hit/miss rates
- Procedure success rates
- Embedding operations
"""
def __init__(self) -> None:
"""Initialize MemoryMetrics."""
self._counters: dict[str, Counter[str]] = defaultdict(Counter)
self._gauges: dict[str, dict[str, float]] = defaultdict(dict)
self._histograms: dict[str, list[float]] = defaultdict(list)
self._histogram_buckets: dict[str, list[HistogramBucket]] = {}
self._lock = asyncio.Lock()
# Initialize histogram buckets
self._init_histogram_buckets()
def _init_histogram_buckets(self) -> None:
"""Initialize histogram buckets for latency metrics."""
# Fast operations (working memory)
fast_buckets = [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, float("inf")]
# Normal operations (retrieval)
normal_buckets = [0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, float("inf")]
# Slow operations (consolidation)
slow_buckets = [0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, float("inf")]
self._histogram_buckets["memory_working_latency_seconds"] = [
HistogramBucket(le=b) for b in fast_buckets
]
self._histogram_buckets["memory_retrieval_latency_seconds"] = [
HistogramBucket(le=b) for b in normal_buckets
]
self._histogram_buckets["memory_consolidation_duration_seconds"] = [
HistogramBucket(le=b) for b in slow_buckets
]
self._histogram_buckets["memory_embedding_latency_seconds"] = [
HistogramBucket(le=b) for b in normal_buckets
]
# Counter methods - Operations
async def inc_operations(
self,
operation: str,
memory_type: str,
scope: str | None = None,
success: bool = True,
) -> None:
"""Increment memory operation counter."""
async with self._lock:
labels = f"operation={operation},memory_type={memory_type}"
if scope:
labels += f",scope={scope}"
labels += f",success={str(success).lower()}"
self._counters["memory_operations_total"][labels] += 1
async def inc_retrieval(
self,
memory_type: str,
strategy: str,
results_count: int,
) -> None:
"""Increment retrieval counter."""
async with self._lock:
labels = f"memory_type={memory_type},strategy={strategy}"
self._counters["memory_retrievals_total"][labels] += 1
# Track result counts as a separate metric
self._counters["memory_retrieval_results_total"][labels] += results_count
async def inc_cache_hit(self, cache_type: str) -> None:
"""Increment cache hit counter."""
async with self._lock:
labels = f"cache_type={cache_type}"
self._counters["memory_cache_hits_total"][labels] += 1
async def inc_cache_miss(self, cache_type: str) -> None:
"""Increment cache miss counter."""
async with self._lock:
labels = f"cache_type={cache_type}"
self._counters["memory_cache_misses_total"][labels] += 1
async def inc_consolidation(
self,
consolidation_type: str,
success: bool = True,
) -> None:
"""Increment consolidation counter."""
async with self._lock:
labels = f"type={consolidation_type},success={str(success).lower()}"
self._counters["memory_consolidations_total"][labels] += 1
async def inc_procedure_execution(
self,
procedure_id: str | None = None,
success: bool = True,
) -> None:
"""Increment procedure execution counter."""
async with self._lock:
labels = f"success={str(success).lower()}"
self._counters["memory_procedure_executions_total"][labels] += 1
async def inc_embeddings_generated(self, memory_type: str) -> None:
"""Increment embeddings generated counter."""
async with self._lock:
labels = f"memory_type={memory_type}"
self._counters["memory_embeddings_generated_total"][labels] += 1
async def inc_fact_reinforcements(self) -> None:
"""Increment fact reinforcement counter."""
async with self._lock:
self._counters["memory_fact_reinforcements_total"][""] += 1
async def inc_episodes_recorded(self, outcome: str) -> None:
"""Increment episodes recorded counter."""
async with self._lock:
labels = f"outcome={outcome}"
self._counters["memory_episodes_recorded_total"][labels] += 1
async def inc_anomalies_detected(self, anomaly_type: str) -> None:
"""Increment anomaly detection counter."""
async with self._lock:
labels = f"anomaly_type={anomaly_type}"
self._counters["memory_anomalies_detected_total"][labels] += 1
async def inc_patterns_detected(self, pattern_type: str) -> None:
"""Increment pattern detection counter."""
async with self._lock:
labels = f"pattern_type={pattern_type}"
self._counters["memory_patterns_detected_total"][labels] += 1
async def inc_insights_generated(self, insight_type: str) -> None:
"""Increment insight generation counter."""
async with self._lock:
labels = f"insight_type={insight_type}"
self._counters["memory_insights_generated_total"][labels] += 1
# Gauge methods
async def set_memory_items_count(
self,
memory_type: str,
scope: str,
count: int,
) -> None:
"""Set memory item count gauge."""
async with self._lock:
labels = f"memory_type={memory_type},scope={scope}"
self._gauges["memory_items_count"][labels] = float(count)
async def set_memory_size_bytes(
self,
memory_type: str,
scope: str,
size_bytes: int,
) -> None:
"""Set memory size gauge in bytes."""
async with self._lock:
labels = f"memory_type={memory_type},scope={scope}"
self._gauges["memory_size_bytes"][labels] = float(size_bytes)
async def set_cache_size(self, cache_type: str, size: int) -> None:
"""Set cache size gauge."""
async with self._lock:
labels = f"cache_type={cache_type}"
self._gauges["memory_cache_size"][labels] = float(size)
async def set_procedure_success_rate(
self,
procedure_name: str,
rate: float,
) -> None:
"""Set procedure success rate gauge (0-1)."""
async with self._lock:
labels = f"procedure_name={procedure_name}"
self._gauges["memory_procedure_success_rate"][labels] = rate
async def set_active_sessions(self, count: int) -> None:
"""Set active working memory sessions gauge."""
async with self._lock:
self._gauges["memory_active_sessions"][""] = float(count)
async def set_pending_consolidations(self, count: int) -> None:
"""Set pending consolidations gauge."""
async with self._lock:
self._gauges["memory_pending_consolidations"][""] = float(count)
# Histogram methods
async def observe_working_latency(self, latency_seconds: float) -> None:
"""Observe working memory operation latency."""
async with self._lock:
self._observe_histogram("memory_working_latency_seconds", latency_seconds)
async def observe_retrieval_latency(self, latency_seconds: float) -> None:
"""Observe retrieval latency."""
async with self._lock:
self._observe_histogram("memory_retrieval_latency_seconds", latency_seconds)
async def observe_consolidation_duration(self, duration_seconds: float) -> None:
"""Observe consolidation duration."""
async with self._lock:
self._observe_histogram(
"memory_consolidation_duration_seconds", duration_seconds
)
async def observe_embedding_latency(self, latency_seconds: float) -> None:
"""Observe embedding generation latency."""
async with self._lock:
self._observe_histogram("memory_embedding_latency_seconds", latency_seconds)
def _observe_histogram(self, name: str, value: float) -> None:
"""Record a value in a histogram."""
self._histograms[name].append(value)
# Update buckets
if name in self._histogram_buckets:
for bucket in self._histogram_buckets[name]:
if value <= bucket.le:
bucket.count += 1
# Export methods
async def get_all_metrics(self) -> list[MetricValue]:
"""Get all metrics as MetricValue objects."""
metrics: list[MetricValue] = []
async with self._lock:
# Export counters
for name, counter in self._counters.items():
for labels_str, value in counter.items():
labels = self._parse_labels(labels_str)
metrics.append(
MetricValue(
name=name,
metric_type=MetricType.COUNTER,
value=float(value),
labels=labels,
)
)
# Export gauges
for name, gauge_dict in self._gauges.items():
for labels_str, gauge_value in gauge_dict.items():
gauge_labels = self._parse_labels(labels_str)
metrics.append(
MetricValue(
name=name,
metric_type=MetricType.GAUGE,
value=gauge_value,
labels=gauge_labels,
)
)
# Export histogram summaries
for name, values in self._histograms.items():
if values:
metrics.append(
MetricValue(
name=f"{name}_count",
metric_type=MetricType.COUNTER,
value=float(len(values)),
)
)
metrics.append(
MetricValue(
name=f"{name}_sum",
metric_type=MetricType.COUNTER,
value=sum(values),
)
)
return metrics
async def get_prometheus_format(self) -> str:
"""Export metrics in Prometheus text format."""
lines: list[str] = []
async with self._lock:
# Export counters
for name, counter in self._counters.items():
lines.append(f"# TYPE {name} counter")
for labels_str, value in counter.items():
if labels_str:
lines.append(f"{name}{{{labels_str}}} {value}")
else:
lines.append(f"{name} {value}")
# Export gauges
for name, gauge_dict in self._gauges.items():
lines.append(f"# TYPE {name} gauge")
for labels_str, gauge_value in gauge_dict.items():
if labels_str:
lines.append(f"{name}{{{labels_str}}} {gauge_value}")
else:
lines.append(f"{name} {gauge_value}")
# Export histograms
for name, buckets in self._histogram_buckets.items():
lines.append(f"# TYPE {name} histogram")
for bucket in buckets:
le_str = "+Inf" if bucket.le == float("inf") else str(bucket.le)
lines.append(f'{name}_bucket{{le="{le_str}"}} {bucket.count}')
if name in self._histograms:
values = self._histograms[name]
lines.append(f"{name}_count {len(values)}")
lines.append(f"{name}_sum {sum(values)}")
return "\n".join(lines)
async def get_summary(self) -> dict[str, Any]:
"""Get a summary of key metrics."""
async with self._lock:
total_operations = sum(self._counters["memory_operations_total"].values())
successful_operations = sum(
v
for k, v in self._counters["memory_operations_total"].items()
if "success=true" in k
)
total_retrievals = sum(self._counters["memory_retrievals_total"].values())
total_cache_hits = sum(self._counters["memory_cache_hits_total"].values())
total_cache_misses = sum(
self._counters["memory_cache_misses_total"].values()
)
cache_hit_rate = (
total_cache_hits / (total_cache_hits + total_cache_misses)
if (total_cache_hits + total_cache_misses) > 0
else 0.0
)
total_consolidations = sum(
self._counters["memory_consolidations_total"].values()
)
total_episodes = sum(
self._counters["memory_episodes_recorded_total"].values()
)
# Calculate average latencies
retrieval_latencies = self._histograms.get(
"memory_retrieval_latency_seconds", []
)
avg_retrieval_latency = (
sum(retrieval_latencies) / len(retrieval_latencies)
if retrieval_latencies
else 0.0
)
return {
"total_operations": total_operations,
"successful_operations": successful_operations,
"operation_success_rate": (
successful_operations / total_operations
if total_operations > 0
else 1.0
),
"total_retrievals": total_retrievals,
"cache_hit_rate": cache_hit_rate,
"total_consolidations": total_consolidations,
"total_episodes_recorded": total_episodes,
"avg_retrieval_latency_ms": avg_retrieval_latency * 1000,
"patterns_detected": sum(
self._counters["memory_patterns_detected_total"].values()
),
"insights_generated": sum(
self._counters["memory_insights_generated_total"].values()
),
"anomalies_detected": sum(
self._counters["memory_anomalies_detected_total"].values()
),
"active_sessions": self._gauges.get("memory_active_sessions", {}).get(
"", 0
),
"pending_consolidations": self._gauges.get(
"memory_pending_consolidations", {}
).get("", 0),
}
async def get_cache_stats(self) -> dict[str, Any]:
"""Get detailed cache statistics."""
async with self._lock:
stats: dict[str, Any] = {}
# Get hits/misses by cache type
for labels_str, hits in self._counters["memory_cache_hits_total"].items():
cache_type = self._parse_labels(labels_str).get(
"cache_type", "unknown"
)
if cache_type not in stats:
stats[cache_type] = {"hits": 0, "misses": 0}
stats[cache_type]["hits"] = hits
for labels_str, misses in self._counters[
"memory_cache_misses_total"
].items():
cache_type = self._parse_labels(labels_str).get(
"cache_type", "unknown"
)
if cache_type not in stats:
stats[cache_type] = {"hits": 0, "misses": 0}
stats[cache_type]["misses"] = misses
# Calculate hit rates
for data in stats.values():
total = data["hits"] + data["misses"]
data["hit_rate"] = data["hits"] / total if total > 0 else 0.0
data["total"] = total
return stats
async def reset(self) -> None:
"""Reset all metrics."""
async with self._lock:
self._counters.clear()
self._gauges.clear()
self._histograms.clear()
self._init_histogram_buckets()
def _parse_labels(self, labels_str: str) -> dict[str, str]:
"""Parse labels string into dictionary."""
if not labels_str:
return {}
labels = {}
for pair in labels_str.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
labels[key.strip()] = value.strip()
return labels
# Singleton instance
_metrics: MemoryMetrics | None = None
_lock = asyncio.Lock()
async def get_memory_metrics() -> MemoryMetrics:
"""Get the singleton MemoryMetrics instance."""
global _metrics
async with _lock:
if _metrics is None:
_metrics = MemoryMetrics()
return _metrics
def reset_memory_metrics() -> None:
"""Reset the singleton instance (for testing)."""
global _metrics
_metrics = None
# Convenience functions
async def record_memory_operation(
operation: str,
memory_type: str,
scope: str | None = None,
success: bool = True,
latency_ms: float | None = None,
) -> None:
"""Record a memory operation."""
metrics = await get_memory_metrics()
await metrics.inc_operations(operation, memory_type, scope, success)
if latency_ms is not None and memory_type == "working":
await metrics.observe_working_latency(latency_ms / 1000)
async def record_retrieval(
memory_type: str,
strategy: str,
results_count: int,
latency_ms: float,
) -> None:
"""Record a retrieval operation."""
metrics = await get_memory_metrics()
await metrics.inc_retrieval(memory_type, strategy, results_count)
await metrics.observe_retrieval_latency(latency_ms / 1000)