feat(memory): implement memory reflection service (#99)

Add reflection layer for memory system with pattern detection, success/failure
factor analysis, anomaly detection, and insights generation. Enables agents to
learn from past experiences and identify optimization opportunities.

Key components:
- Pattern detection: recurring success/failure, action sequences, temporal, efficiency
- Factor analysis: action, context, timing, resource, preceding state factors
- Anomaly detection: unusual duration, token usage, failure rates, action patterns
- Insight generation: optimization, warning, learning, recommendation, trend insights

Also fixes pre-existing timezone issues in test_types.py (datetime.now() -> datetime.now(UTC)).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-05 04:22:23 +01:00
parent 6954774e36
commit 997cfaa03a
8 changed files with 3125 additions and 4 deletions

View File

@@ -90,6 +90,9 @@ from .types import (
WorkingMemoryItem,
)
# Reflection (lazy import available)
# Import directly: from app.services.memory.reflection import MemoryReflection
__all__ = [
"CheckpointError",
"ConsolidationStatus",

View File

@@ -0,0 +1,38 @@
# app/services/memory/reflection/__init__.py
"""
Memory Reflection Layer.
Analyzes patterns in agent experiences to generate actionable insights.
"""
from .service import (
MemoryReflection,
ReflectionConfig,
get_memory_reflection,
)
from .types import (
Anomaly,
AnomalyType,
Factor,
FactorType,
Insight,
InsightType,
Pattern,
PatternType,
TimeRange,
)
__all__ = [
"Anomaly",
"AnomalyType",
"Factor",
"FactorType",
"Insight",
"InsightType",
"MemoryReflection",
"Pattern",
"PatternType",
"ReflectionConfig",
"TimeRange",
"get_memory_reflection",
]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,305 @@
# app/services/memory/reflection/types.py
"""
Memory Reflection Types.
Type definitions for pattern detection, anomaly detection, and insights.
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from uuid import UUID
def _utcnow() -> datetime:
"""Get current UTC time as timezone-aware datetime."""
return datetime.now(UTC)
class PatternType(str, Enum):
"""Types of patterns detected in episodic memory."""
RECURRING_SUCCESS = "recurring_success"
RECURRING_FAILURE = "recurring_failure"
ACTION_SEQUENCE = "action_sequence"
CONTEXT_CORRELATION = "context_correlation"
TEMPORAL = "temporal"
EFFICIENCY = "efficiency"
class FactorType(str, Enum):
"""Types of factors contributing to outcomes."""
ACTION = "action"
CONTEXT = "context"
TIMING = "timing"
RESOURCE = "resource"
PRECEDING_STATE = "preceding_state"
class AnomalyType(str, Enum):
"""Types of anomalies detected."""
UNUSUAL_DURATION = "unusual_duration"
UNEXPECTED_OUTCOME = "unexpected_outcome"
UNUSUAL_TOKEN_USAGE = "unusual_token_usage"
UNUSUAL_FAILURE_RATE = "unusual_failure_rate"
UNUSUAL_ACTION_PATTERN = "unusual_action_pattern"
class InsightType(str, Enum):
"""Types of insights generated."""
OPTIMIZATION = "optimization"
WARNING = "warning"
LEARNING = "learning"
RECOMMENDATION = "recommendation"
TREND = "trend"
@dataclass
class TimeRange:
"""Time range for reflection analysis."""
start: datetime
end: datetime
@classmethod
def last_hours(cls, hours: int = 24) -> "TimeRange":
"""Create time range for last N hours."""
end = _utcnow()
start = datetime(
end.year, end.month, end.day, end.hour, end.minute, end.second,
tzinfo=UTC
) - __import__("datetime").timedelta(hours=hours)
return cls(start=start, end=end)
@classmethod
def last_days(cls, days: int = 7) -> "TimeRange":
"""Create time range for last N days."""
from datetime import timedelta
end = _utcnow()
start = end - timedelta(days=days)
return cls(start=start, end=end)
@property
def duration_hours(self) -> float:
"""Get duration in hours."""
return (self.end - self.start).total_seconds() / 3600
@property
def duration_days(self) -> float:
"""Get duration in days."""
return (self.end - self.start).total_seconds() / 86400
@dataclass
class Pattern:
"""A detected pattern in episodic memory."""
id: UUID
pattern_type: PatternType
name: str
description: str
confidence: float
occurrence_count: int
episode_ids: list[UUID]
first_seen: datetime
last_seen: datetime
metadata: dict[str, Any] = field(default_factory=dict)
@property
def frequency(self) -> float:
"""Calculate pattern frequency per day."""
duration_days = (self.last_seen - self.first_seen).total_seconds() / 86400
if duration_days < 1:
duration_days = 1
return self.occurrence_count / duration_days
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"id": str(self.id),
"pattern_type": self.pattern_type.value,
"name": self.name,
"description": self.description,
"confidence": self.confidence,
"occurrence_count": self.occurrence_count,
"episode_ids": [str(eid) for eid in self.episode_ids],
"first_seen": self.first_seen.isoformat(),
"last_seen": self.last_seen.isoformat(),
"frequency": self.frequency,
"metadata": self.metadata,
}
@dataclass
class Factor:
"""A factor contributing to success or failure."""
id: UUID
factor_type: FactorType
name: str
description: str
impact_score: float
correlation: float
sample_size: int
positive_examples: list[UUID]
negative_examples: list[UUID]
metadata: dict[str, Any] = field(default_factory=dict)
@property
def net_impact(self) -> float:
"""Calculate net impact considering sample size."""
# Weight impact by sample confidence
confidence_weight = min(1.0, self.sample_size / 20)
return self.impact_score * self.correlation * confidence_weight
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"id": str(self.id),
"factor_type": self.factor_type.value,
"name": self.name,
"description": self.description,
"impact_score": self.impact_score,
"correlation": self.correlation,
"sample_size": self.sample_size,
"positive_examples": [str(eid) for eid in self.positive_examples],
"negative_examples": [str(eid) for eid in self.negative_examples],
"net_impact": self.net_impact,
"metadata": self.metadata,
}
@dataclass
class Anomaly:
"""An anomaly detected in memory patterns."""
id: UUID
anomaly_type: AnomalyType
description: str
severity: float
episode_ids: list[UUID]
detected_at: datetime
baseline_value: float
observed_value: float
deviation_factor: float
metadata: dict[str, Any] = field(default_factory=dict)
@property
def is_critical(self) -> bool:
"""Check if anomaly is critical (severity > 0.8)."""
return self.severity > 0.8
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"id": str(self.id),
"anomaly_type": self.anomaly_type.value,
"description": self.description,
"severity": self.severity,
"episode_ids": [str(eid) for eid in self.episode_ids],
"detected_at": self.detected_at.isoformat(),
"baseline_value": self.baseline_value,
"observed_value": self.observed_value,
"deviation_factor": self.deviation_factor,
"is_critical": self.is_critical,
"metadata": self.metadata,
}
@dataclass
class Insight:
"""An actionable insight generated from reflection."""
id: UUID
insight_type: InsightType
title: str
description: str
priority: float
confidence: float
source_patterns: list[UUID]
source_factors: list[UUID]
source_anomalies: list[UUID]
recommended_actions: list[str]
generated_at: datetime
metadata: dict[str, Any] = field(default_factory=dict)
@property
def actionable_score(self) -> float:
"""Calculate how actionable this insight is."""
action_weight = min(1.0, len(self.recommended_actions) / 3)
return self.priority * self.confidence * action_weight
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"id": str(self.id),
"insight_type": self.insight_type.value,
"title": self.title,
"description": self.description,
"priority": self.priority,
"confidence": self.confidence,
"source_patterns": [str(pid) for pid in self.source_patterns],
"source_factors": [str(fid) for fid in self.source_factors],
"source_anomalies": [str(aid) for aid in self.source_anomalies],
"recommended_actions": self.recommended_actions,
"generated_at": self.generated_at.isoformat(),
"actionable_score": self.actionable_score,
"metadata": self.metadata,
}
@dataclass
class ReflectionResult:
"""Result of a reflection operation."""
patterns: list[Pattern]
factors: list[Factor]
anomalies: list[Anomaly]
insights: list[Insight]
time_range: TimeRange
episodes_analyzed: int
analysis_duration_seconds: float
generated_at: datetime = field(default_factory=_utcnow)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"patterns": [p.to_dict() for p in self.patterns],
"factors": [f.to_dict() for f in self.factors],
"anomalies": [a.to_dict() for a in self.anomalies],
"insights": [i.to_dict() for i in self.insights],
"time_range": {
"start": self.time_range.start.isoformat(),
"end": self.time_range.end.isoformat(),
"duration_hours": self.time_range.duration_hours,
},
"episodes_analyzed": self.episodes_analyzed,
"analysis_duration_seconds": self.analysis_duration_seconds,
"generated_at": self.generated_at.isoformat(),
}
@property
def summary(self) -> str:
"""Generate a summary of the reflection results."""
lines = [
f"Reflection Analysis ({self.time_range.duration_days:.1f} days)",
f"Episodes analyzed: {self.episodes_analyzed}",
"",
f"Patterns detected: {len(self.patterns)}",
f"Success/failure factors: {len(self.factors)}",
f"Anomalies found: {len(self.anomalies)}",
f"Insights generated: {len(self.insights)}",
]
if self.insights:
lines.append("")
lines.append("Top insights:")
for insight in sorted(self.insights, key=lambda i: -i.priority)[:3]:
lines.append(f" - [{insight.insight_type.value}] {insight.title}")
return "\n".join(lines)