Files
syndarix/backend/app/services/memory/reflection/service.py
Felipe Cardoso 3edce9cd26 fix(memory): address critical bugs from multi-agent review
Bug Fixes:
- Remove singleton pattern from consolidation/reflection services to
  prevent stale database session bugs (session is now passed per-request)
- Add LRU eviction to MemoryToolService._working dict (max 1000 sessions)
  to prevent unbounded memory growth
- Replace O(n) list.remove() with O(1) OrderedDict.move_to_end() in
  RetrievalCache for better performance under load
- Use deque with maxlen for metrics histograms to prevent unbounded
  memory growth (circular buffer with 10k max samples)
- Use full UUID for checkpoint IDs instead of 8-char prefix to avoid
  collision risk at scale (birthday paradox at ~50k checkpoints)

Test Updates:
- Update checkpoint test to expect 36-char UUID
- Update reflection singleton tests to expect new factory behavior
- Add reset_memory_reflection() no-op for backwards compatibility

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 18:55:32 +01:00

1452 lines
55 KiB
Python

# app/services/memory/reflection/service.py
"""
Memory Reflection Service.
Analyzes patterns in episodic memory to generate actionable insights.
Implements pattern detection, success/failure analysis, anomaly detection,
and insight generation.
"""
import logging
import statistics
from collections import Counter, defaultdict
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any
from uuid import UUID, uuid4
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.memory.episodic.memory import EpisodicMemory
from app.services.memory.types import Episode, Outcome
from .types import (
Anomaly,
AnomalyType,
Factor,
FactorType,
Insight,
InsightType,
Pattern,
PatternType,
ReflectionResult,
TimeRange,
)
logger = logging.getLogger(__name__)
def _utcnow() -> datetime:
"""Get current UTC time as timezone-aware datetime."""
return datetime.now(UTC)
@dataclass
class ReflectionConfig:
"""Configuration for memory reflection."""
# Pattern detection thresholds
min_pattern_occurrences: int = 3
min_pattern_confidence: float = 0.6
max_patterns_to_return: int = 20
# Factor analysis thresholds
min_sample_size_for_factor: int = 5
min_correlation_for_factor: float = 0.3
max_factors_to_return: int = 20
# Anomaly detection thresholds
anomaly_std_dev_threshold: float = 2.0
min_baseline_samples: int = 10
max_anomalies_to_return: int = 20
# Insight generation thresholds
min_insight_confidence: float = 0.5
max_insights_to_return: int = 10
# Analysis limits
max_episodes_to_analyze: int = 1000
batch_size: int = 100
class MemoryReflection:
"""
Memory Reflection Service.
Analyzes patterns in agent experiences to generate insights:
- Pattern detection in episodic memory
- Success/failure factor analysis
- Anomaly detection
- Insights generation
Performance target: <5s for comprehensive reflection
"""
def __init__(
self,
session: AsyncSession,
config: ReflectionConfig | None = None,
embedding_generator: Any | None = None,
) -> None:
"""
Initialize memory reflection service.
Args:
session: Database session
config: Reflection configuration
embedding_generator: Optional embedding generator for similarity
"""
self._session = session
self._config = config or ReflectionConfig()
self._embedding_generator = embedding_generator
self._episodic: EpisodicMemory | None = None
async def _get_episodic(self) -> EpisodicMemory:
"""Get or create episodic memory service."""
if self._episodic is None:
self._episodic = await EpisodicMemory.create(
self._session, self._embedding_generator
)
return self._episodic
# =========================================================================
# Pattern Detection
# =========================================================================
async def analyze_patterns(
self,
project_id: UUID,
time_range: TimeRange,
agent_instance_id: UUID | None = None,
) -> list[Pattern]:
"""
Detect patterns in episodic memory.
Analyzes episodes to find:
- Recurring success patterns
- Recurring failure patterns
- Action sequence patterns
- Context correlations
- Temporal patterns
Args:
project_id: Project to analyze
time_range: Time range for analysis
agent_instance_id: Optional filter by agent instance
Returns:
List of detected patterns sorted by confidence
"""
episodic = await self._get_episodic()
# Get episodes in time range
episodes = await episodic.get_recent(
project_id,
limit=self._config.max_episodes_to_analyze,
since=time_range.start,
agent_instance_id=agent_instance_id,
)
# Filter to time range
episodes = [
e for e in episodes if time_range.start <= e.occurred_at <= time_range.end
]
if not episodes:
return []
patterns: list[Pattern] = []
# Detect different pattern types
patterns.extend(self._detect_outcome_patterns(episodes))
patterns.extend(self._detect_action_patterns(episodes))
patterns.extend(self._detect_task_type_patterns(episodes))
patterns.extend(self._detect_temporal_patterns(episodes))
patterns.extend(self._detect_efficiency_patterns(episodes))
# Sort by confidence and limit
patterns.sort(key=lambda p: -p.confidence)
return patterns[: self._config.max_patterns_to_return]
def _detect_outcome_patterns(self, episodes: list[Episode]) -> list[Pattern]:
"""Detect recurring success/failure patterns."""
patterns: list[Pattern] = []
# Group by task type and outcome
task_outcomes: dict[str, dict[Outcome, list[Episode]]] = defaultdict(
lambda: defaultdict(list)
)
for episode in episodes:
task_outcomes[episode.task_type][episode.outcome].append(episode)
for task_type, outcomes in task_outcomes.items():
for outcome, eps in outcomes.items():
if len(eps) < self._config.min_pattern_occurrences:
continue
total_for_type = sum(len(o) for o in outcomes.values())
ratio = len(eps) / total_for_type
if ratio >= self._config.min_pattern_confidence:
pattern_type = (
PatternType.RECURRING_SUCCESS
if outcome == Outcome.SUCCESS
else PatternType.RECURRING_FAILURE
)
patterns.append(
Pattern(
id=uuid4(),
pattern_type=pattern_type,
name=f"{outcome.value.title()} pattern for {task_type}",
description=(
f"Task type '{task_type}' shows {ratio:.0%} "
f"{outcome.value} rate ({len(eps)}/{total_for_type})"
),
confidence=ratio,
occurrence_count=len(eps),
episode_ids=[e.id for e in eps],
first_seen=min(e.occurred_at for e in eps),
last_seen=max(e.occurred_at for e in eps),
metadata={
"task_type": task_type,
"outcome": outcome.value,
"total_for_type": total_for_type,
},
)
)
return patterns
def _detect_action_patterns(self, episodes: list[Episode]) -> list[Pattern]:
"""Detect recurring action sequence patterns."""
patterns: list[Pattern] = []
# Extract action types from episodes
action_sequences: dict[tuple[str, ...], list[Episode]] = defaultdict(list)
for episode in episodes:
if not episode.actions:
continue
# Extract action types
action_types = tuple(
str(action.get("type", "unknown"))
for action in episode.actions[:5] # First 5 actions
)
if len(action_types) >= 2:
action_sequences[action_types].append(episode)
for sequence, eps in action_sequences.items():
if len(eps) < self._config.min_pattern_occurrences:
continue
confidence = len(eps) / len(episodes)
if confidence < self._config.min_pattern_confidence:
continue
patterns.append(
Pattern(
id=uuid4(),
pattern_type=PatternType.ACTION_SEQUENCE,
name=f"Action sequence: {' -> '.join(sequence)}",
description=(
f"Action sequence [{' -> '.join(sequence)}] "
f"appears in {len(eps)} episodes ({confidence:.0%})"
),
confidence=confidence,
occurrence_count=len(eps),
episode_ids=[e.id for e in eps],
first_seen=min(e.occurred_at for e in eps),
last_seen=max(e.occurred_at for e in eps),
metadata={
"action_sequence": list(sequence),
"avg_duration": statistics.mean(
e.duration_seconds for e in eps
),
},
)
)
return patterns
def _detect_task_type_patterns(self, episodes: list[Episode]) -> list[Pattern]:
"""Detect context correlation patterns."""
patterns: list[Pattern] = []
# Analyze task type correlations with success
task_stats: dict[str, dict[str, Any]] = defaultdict(
lambda: {
"total": 0,
"success": 0,
"episodes": [],
"durations": [],
"tokens": [],
}
)
for episode in episodes:
stats = task_stats[episode.task_type]
stats["total"] += 1
stats["episodes"].append(episode)
stats["durations"].append(episode.duration_seconds)
stats["tokens"].append(episode.tokens_used)
if episode.outcome == Outcome.SUCCESS:
stats["success"] += 1
for task_type, stats in task_stats.items():
if stats["total"] < self._config.min_pattern_occurrences:
continue
success_rate = stats["success"] / stats["total"]
avg_duration = statistics.mean(stats["durations"])
avg_tokens = statistics.mean(stats["tokens"])
patterns.append(
Pattern(
id=uuid4(),
pattern_type=PatternType.CONTEXT_CORRELATION,
name=f"Task type analysis: {task_type}",
description=(
f"Task type '{task_type}': {success_rate:.0%} success rate, "
f"avg {avg_duration:.1f}s duration, {avg_tokens:.0f} tokens"
),
confidence=min(
1.0, stats["total"] / 10
), # Higher sample = higher confidence
occurrence_count=stats["total"],
episode_ids=[e.id for e in stats["episodes"]],
first_seen=min(e.occurred_at for e in stats["episodes"]),
last_seen=max(e.occurred_at for e in stats["episodes"]),
metadata={
"task_type": task_type,
"success_rate": success_rate,
"avg_duration": avg_duration,
"avg_tokens": avg_tokens,
"total_episodes": stats["total"],
},
)
)
return patterns
def _detect_temporal_patterns(self, episodes: list[Episode]) -> list[Pattern]:
"""Detect time-based patterns."""
patterns: list[Pattern] = []
if len(episodes) < self._config.min_pattern_occurrences:
return patterns
# Group by hour of day
hour_stats: dict[int, dict[str, Any]] = defaultdict(
lambda: {"total": 0, "success": 0, "episodes": []}
)
for episode in episodes:
hour = episode.occurred_at.hour
hour_stats[hour]["total"] += 1
hour_stats[hour]["episodes"].append(episode)
if episode.outcome == Outcome.SUCCESS:
hour_stats[hour]["success"] += 1
# Find peak activity hours
total_episodes = len(episodes)
for hour, stats in hour_stats.items():
if stats["total"] < self._config.min_pattern_occurrences:
continue
activity_ratio = stats["total"] / total_episodes
if activity_ratio >= 0.15: # At least 15% of activity
success_rate = (
stats["success"] / stats["total"] if stats["total"] > 0 else 0
)
patterns.append(
Pattern(
id=uuid4(),
pattern_type=PatternType.TEMPORAL,
name=f"Peak activity at {hour:02d}:00",
description=(
f"Hour {hour:02d}:00 has {activity_ratio:.0%} of activity "
f"with {success_rate:.0%} success rate"
),
confidence=activity_ratio,
occurrence_count=stats["total"],
episode_ids=[e.id for e in stats["episodes"]],
first_seen=min(e.occurred_at for e in stats["episodes"]),
last_seen=max(e.occurred_at for e in stats["episodes"]),
metadata={
"hour": hour,
"activity_ratio": activity_ratio,
"success_rate": success_rate,
},
)
)
return patterns
def _detect_efficiency_patterns(self, episodes: list[Episode]) -> list[Pattern]:
"""Detect efficiency-related patterns."""
patterns: list[Pattern] = []
if len(episodes) < self._config.min_pattern_occurrences:
return patterns
# Analyze duration efficiency
successful = [e for e in episodes if e.outcome == Outcome.SUCCESS]
failed = [e for e in episodes if e.outcome == Outcome.FAILURE]
if len(successful) >= 3 and len(failed) >= 3:
avg_success_duration = statistics.mean(
e.duration_seconds for e in successful
)
avg_failure_duration = statistics.mean(e.duration_seconds for e in failed)
if avg_failure_duration > avg_success_duration * 1.5:
patterns.append(
Pattern(
id=uuid4(),
pattern_type=PatternType.EFFICIENCY,
name="Failures take longer than successes",
description=(
f"Failed tasks average {avg_failure_duration:.1f}s vs "
f"{avg_success_duration:.1f}s for successful tasks "
f"({avg_failure_duration / avg_success_duration:.1f}x longer)"
),
confidence=0.8,
occurrence_count=len(successful) + len(failed),
episode_ids=[e.id for e in successful + failed],
first_seen=min(e.occurred_at for e in episodes),
last_seen=max(e.occurred_at for e in episodes),
metadata={
"avg_success_duration": avg_success_duration,
"avg_failure_duration": avg_failure_duration,
"ratio": avg_failure_duration / avg_success_duration,
},
)
)
# Analyze token efficiency
if len(successful) >= 3:
avg_tokens = statistics.mean(e.tokens_used for e in successful)
std_tokens = (
statistics.stdev(e.tokens_used for e in successful)
if len(successful) > 1
else 0
)
efficient = [
e for e in successful if e.tokens_used < avg_tokens - std_tokens
]
if len(efficient) >= self._config.min_pattern_occurrences:
patterns.append(
Pattern(
id=uuid4(),
pattern_type=PatternType.EFFICIENCY,
name="Token-efficient successful episodes",
description=(
f"{len(efficient)} episodes completed successfully "
f"with below-average token usage"
),
confidence=len(efficient) / len(successful),
occurrence_count=len(efficient),
episode_ids=[e.id for e in efficient],
first_seen=min(e.occurred_at for e in efficient),
last_seen=max(e.occurred_at for e in efficient),
metadata={
"avg_tokens": avg_tokens,
"efficient_avg_tokens": statistics.mean(
e.tokens_used for e in efficient
),
},
)
)
return patterns
# =========================================================================
# Success/Failure Factor Analysis
# =========================================================================
async def identify_success_factors(
self,
project_id: UUID,
task_type: str | None = None,
time_range: TimeRange | None = None,
agent_instance_id: UUID | None = None,
) -> list[Factor]:
"""
Identify factors that contribute to success or failure.
Analyzes episodes to find correlating factors:
- Action patterns that correlate with success
- Context factors that affect outcomes
- Timing-related factors
- Resource usage patterns
Args:
project_id: Project to analyze
task_type: Optional filter by task type
time_range: Optional time range (defaults to last 30 days)
agent_instance_id: Optional filter by agent instance
Returns:
List of factors sorted by impact score
"""
episodic = await self._get_episodic()
if time_range is None:
time_range = TimeRange.last_days(30)
# Get episodes
if task_type:
episodes = await episodic.get_by_task_type(
project_id,
task_type,
limit=self._config.max_episodes_to_analyze,
agent_instance_id=agent_instance_id,
)
else:
episodes = await episodic.get_recent(
project_id,
limit=self._config.max_episodes_to_analyze,
since=time_range.start,
agent_instance_id=agent_instance_id,
)
# Filter to time range
episodes = [
e for e in episodes if time_range.start <= e.occurred_at <= time_range.end
]
if len(episodes) < self._config.min_sample_size_for_factor:
return []
factors: list[Factor] = []
# Analyze different factor types
factors.extend(self._analyze_action_factors(episodes))
factors.extend(self._analyze_context_factors(episodes))
factors.extend(self._analyze_timing_factors(episodes))
factors.extend(self._analyze_resource_factors(episodes))
# Sort by net impact and limit
factors.sort(key=lambda f: -abs(f.net_impact))
return factors[: self._config.max_factors_to_return]
def _analyze_action_factors(self, episodes: list[Episode]) -> list[Factor]:
"""Analyze action-related success factors."""
factors: list[Factor] = []
# Count action types in success vs failure
success_actions: Counter[str] = Counter()
failure_actions: Counter[str] = Counter()
action_episodes: dict[str, list[Episode]] = defaultdict(list)
for episode in episodes:
for action in episode.actions:
action_type = str(action.get("type", "unknown"))
action_episodes[action_type].append(episode)
if episode.outcome == Outcome.SUCCESS:
success_actions[action_type] += 1
else:
failure_actions[action_type] += 1
# Find differentiating actions
all_actions = set(success_actions.keys()) | set(failure_actions.keys())
for action_type in all_actions:
success_count = success_actions.get(action_type, 0)
failure_count = failure_actions.get(action_type, 0)
total = success_count + failure_count
if total < self._config.min_sample_size_for_factor:
continue
success_rate = success_count / total
baseline_success = sum(
1 for e in episodes if e.outcome == Outcome.SUCCESS
) / len(episodes)
correlation = success_rate - baseline_success
if abs(correlation) >= self._config.min_correlation_for_factor:
eps = action_episodes[action_type]
positive = [e for e in eps if e.outcome == Outcome.SUCCESS]
negative = [e for e in eps if e.outcome != Outcome.SUCCESS]
factors.append(
Factor(
id=uuid4(),
factor_type=FactorType.ACTION,
name=f"Action: {action_type}",
description=(
f"Action '{action_type}' has {success_rate:.0%} success rate "
f"vs {baseline_success:.0%} baseline ({correlation:+.0%} difference)"
),
impact_score=abs(correlation),
correlation=correlation,
sample_size=total,
positive_examples=[e.id for e in positive[:5]],
negative_examples=[e.id for e in negative[:5]],
metadata={
"action_type": action_type,
"success_count": success_count,
"failure_count": failure_count,
"baseline_success": baseline_success,
},
)
)
return factors
def _analyze_context_factors(self, episodes: list[Episode]) -> list[Factor]:
"""Analyze context-related success factors."""
factors: list[Factor] = []
# Analyze context length correlation
if len(episodes) >= self._config.min_sample_size_for_factor:
successful = [e for e in episodes if e.outcome == Outcome.SUCCESS]
failed = [e for e in episodes if e.outcome == Outcome.FAILURE]
if len(successful) >= 3 and len(failed) >= 3:
avg_success_context = statistics.mean(
len(e.context_summary) for e in successful
)
avg_failure_context = statistics.mean(
len(e.context_summary) for e in failed
)
context_diff = (avg_success_context - avg_failure_context) / max(
avg_success_context, avg_failure_context, 1
)
if abs(context_diff) >= self._config.min_correlation_for_factor:
factors.append(
Factor(
id=uuid4(),
factor_type=FactorType.CONTEXT,
name="Context summary length",
description=(
f"Successful episodes have {'longer' if context_diff > 0 else 'shorter'} "
f"context summaries ({avg_success_context:.0f} vs {avg_failure_context:.0f} chars)"
),
impact_score=abs(context_diff),
correlation=context_diff,
sample_size=len(episodes),
positive_examples=[e.id for e in successful[:5]],
negative_examples=[e.id for e in failed[:5]],
metadata={
"avg_success_length": avg_success_context,
"avg_failure_length": avg_failure_context,
},
)
)
return factors
def _analyze_timing_factors(self, episodes: list[Episode]) -> list[Factor]:
"""Analyze timing-related success factors."""
factors: list[Factor] = []
if len(episodes) < self._config.min_sample_size_for_factor:
return factors
successful = [e for e in episodes if e.outcome == Outcome.SUCCESS]
failed = [e for e in episodes if e.outcome == Outcome.FAILURE]
if len(successful) >= 3 and len(failed) >= 3:
# Duration analysis
avg_success_duration = statistics.mean(
e.duration_seconds for e in successful
)
avg_failure_duration = statistics.mean(e.duration_seconds for e in failed)
if avg_success_duration > 0:
duration_ratio = avg_failure_duration / avg_success_duration
if abs(duration_ratio - 1) >= 0.3: # 30% difference
factors.append(
Factor(
id=uuid4(),
factor_type=FactorType.TIMING,
name="Task duration",
description=(
f"Failed tasks take {duration_ratio:.1f}x the time of "
f"successful tasks ({avg_failure_duration:.1f}s vs {avg_success_duration:.1f}s)"
),
impact_score=abs(1 - duration_ratio) / 2,
correlation=1 - duration_ratio,
sample_size=len(episodes),
positive_examples=[e.id for e in successful[:5]],
negative_examples=[e.id for e in failed[:5]],
metadata={
"avg_success_duration": avg_success_duration,
"avg_failure_duration": avg_failure_duration,
"ratio": duration_ratio,
},
)
)
return factors
def _analyze_resource_factors(self, episodes: list[Episode]) -> list[Factor]:
"""Analyze resource usage success factors."""
factors: list[Factor] = []
if len(episodes) < self._config.min_sample_size_for_factor:
return factors
successful = [e for e in episodes if e.outcome == Outcome.SUCCESS]
failed = [e for e in episodes if e.outcome == Outcome.FAILURE]
if len(successful) >= 3 and len(failed) >= 3:
# Token usage analysis
avg_success_tokens = statistics.mean(e.tokens_used for e in successful)
avg_failure_tokens = statistics.mean(e.tokens_used for e in failed)
if avg_success_tokens > 0:
token_ratio = avg_failure_tokens / avg_success_tokens
if abs(token_ratio - 1) >= 0.2: # 20% difference
factors.append(
Factor(
id=uuid4(),
factor_type=FactorType.RESOURCE,
name="Token usage",
description=(
f"Failed tasks use {token_ratio:.1f}x the tokens of "
f"successful tasks ({avg_failure_tokens:.0f} vs {avg_success_tokens:.0f})"
),
impact_score=abs(1 - token_ratio) / 2,
correlation=1 - token_ratio,
sample_size=len(episodes),
positive_examples=[e.id for e in successful[:5]],
negative_examples=[e.id for e in failed[:5]],
metadata={
"avg_success_tokens": avg_success_tokens,
"avg_failure_tokens": avg_failure_tokens,
"ratio": token_ratio,
},
)
)
# Action count analysis
avg_success_actions = statistics.mean(len(e.actions) for e in successful)
avg_failure_actions = statistics.mean(len(e.actions) for e in failed)
if avg_success_actions > 0:
action_ratio = avg_failure_actions / avg_success_actions
if abs(action_ratio - 1) >= 0.3: # 30% difference
factors.append(
Factor(
id=uuid4(),
factor_type=FactorType.RESOURCE,
name="Action count",
description=(
f"Failed tasks have {action_ratio:.1f}x the actions of "
f"successful tasks ({avg_failure_actions:.1f} vs {avg_success_actions:.1f})"
),
impact_score=abs(1 - action_ratio) / 2,
correlation=1 - action_ratio,
sample_size=len(episodes),
positive_examples=[e.id for e in successful[:5]],
negative_examples=[e.id for e in failed[:5]],
metadata={
"avg_success_actions": avg_success_actions,
"avg_failure_actions": avg_failure_actions,
"ratio": action_ratio,
},
)
)
return factors
# =========================================================================
# Anomaly Detection
# =========================================================================
async def detect_anomalies(
self,
project_id: UUID,
baseline_days: int = 30,
agent_instance_id: UUID | None = None,
) -> list[Anomaly]:
"""
Detect anomalies compared to baseline behavior.
Analyzes recent activity against baseline to find:
- Unusual duration patterns
- Unexpected outcome distributions
- Unusual token usage
- Unusual failure rates
Args:
project_id: Project to analyze
baseline_days: Number of days for baseline calculation
agent_instance_id: Optional filter by agent instance
Returns:
List of anomalies sorted by severity
"""
episodic = await self._get_episodic()
# Get baseline data
baseline_start = _utcnow() - timedelta(days=baseline_days)
baseline_episodes = await episodic.get_recent(
project_id,
limit=self._config.max_episodes_to_analyze,
since=baseline_start,
agent_instance_id=agent_instance_id,
)
if len(baseline_episodes) < self._config.min_baseline_samples:
return []
# Get recent data (last 24 hours)
recent_start = _utcnow() - timedelta(hours=24)
recent_episodes = [
e for e in baseline_episodes if e.occurred_at >= recent_start
]
if not recent_episodes:
return []
anomalies: list[Anomaly] = []
# Detect different anomaly types
anomalies.extend(
self._detect_duration_anomalies(baseline_episodes, recent_episodes)
)
anomalies.extend(
self._detect_outcome_anomalies(baseline_episodes, recent_episodes)
)
anomalies.extend(
self._detect_token_anomalies(baseline_episodes, recent_episodes)
)
anomalies.extend(
self._detect_failure_rate_anomalies(baseline_episodes, recent_episodes)
)
# Sort by severity and limit
anomalies.sort(key=lambda a: -a.severity)
return anomalies[: self._config.max_anomalies_to_return]
def _detect_duration_anomalies(
self,
baseline: list[Episode],
recent: list[Episode],
) -> list[Anomaly]:
"""Detect unusual duration patterns."""
anomalies: list[Anomaly] = []
if len(baseline) < 2 or not recent:
return anomalies
baseline_durations = [e.duration_seconds for e in baseline]
baseline_mean = statistics.mean(baseline_durations)
baseline_std = (
statistics.stdev(baseline_durations) if len(baseline_durations) > 1 else 0
)
if baseline_std == 0:
return anomalies
for episode in recent:
z_score = abs(episode.duration_seconds - baseline_mean) / baseline_std
if z_score >= self._config.anomaly_std_dev_threshold:
severity = min(1.0, z_score / 5) # Cap at 5 std devs
anomalies.append(
Anomaly(
id=uuid4(),
anomaly_type=AnomalyType.UNUSUAL_DURATION,
description=(
f"Episode {episode.id} has unusual duration: "
f"{episode.duration_seconds:.1f}s ({z_score:.1f} std devs from mean)"
),
severity=severity,
episode_ids=[episode.id],
detected_at=_utcnow(),
baseline_value=baseline_mean,
observed_value=episode.duration_seconds,
deviation_factor=z_score,
metadata={
"baseline_std": baseline_std,
"task_type": episode.task_type,
},
)
)
return anomalies
def _detect_outcome_anomalies(
self,
baseline: list[Episode],
recent: list[Episode],
) -> list[Anomaly]:
"""Detect unexpected outcome distributions."""
anomalies: list[Anomaly] = []
if not recent:
return anomalies
# Calculate baseline success rate per task type
task_baseline: dict[str, float] = {}
for task_type in {e.task_type for e in baseline}:
type_episodes = [e for e in baseline if e.task_type == task_type]
if type_episodes:
success_count = sum(
1 for e in type_episodes if e.outcome == Outcome.SUCCESS
)
task_baseline[task_type] = success_count / len(type_episodes)
# Check recent outcomes against baseline
for episode in recent:
if episode.task_type not in task_baseline:
continue
expected_success = task_baseline[episode.task_type]
# Flag unexpected failures for usually successful tasks
if episode.outcome == Outcome.FAILURE and expected_success >= 0.8:
anomalies.append(
Anomaly(
id=uuid4(),
anomaly_type=AnomalyType.UNEXPECTED_OUTCOME,
description=(
f"Unexpected failure for '{episode.task_type}' "
f"(usually {expected_success:.0%} success rate)"
),
severity=expected_success,
episode_ids=[episode.id],
detected_at=_utcnow(),
baseline_value=expected_success,
observed_value=0.0,
deviation_factor=expected_success,
metadata={
"task_type": episode.task_type,
"outcome": episode.outcome.value,
},
)
)
return anomalies
def _detect_token_anomalies(
self,
baseline: list[Episode],
recent: list[Episode],
) -> list[Anomaly]:
"""Detect unusual token usage."""
anomalies: list[Anomaly] = []
if len(baseline) < 2 or not recent:
return anomalies
baseline_tokens = [e.tokens_used for e in baseline if e.tokens_used > 0]
if len(baseline_tokens) < 2:
return anomalies
baseline_mean = statistics.mean(baseline_tokens)
baseline_std = statistics.stdev(baseline_tokens)
if baseline_std == 0:
return anomalies
for episode in recent:
if episode.tokens_used == 0:
continue
z_score = abs(episode.tokens_used - baseline_mean) / baseline_std
if z_score >= self._config.anomaly_std_dev_threshold:
severity = min(1.0, z_score / 5)
anomalies.append(
Anomaly(
id=uuid4(),
anomaly_type=AnomalyType.UNUSUAL_TOKEN_USAGE,
description=(
f"Episode {episode.id} has unusual token usage: "
f"{episode.tokens_used} ({z_score:.1f} std devs from mean)"
),
severity=severity,
episode_ids=[episode.id],
detected_at=_utcnow(),
baseline_value=baseline_mean,
observed_value=float(episode.tokens_used),
deviation_factor=z_score,
metadata={
"baseline_std": baseline_std,
"task_type": episode.task_type,
},
)
)
return anomalies
def _detect_failure_rate_anomalies(
self,
baseline: list[Episode],
recent: list[Episode],
) -> list[Anomaly]:
"""Detect unusual failure rate spikes."""
anomalies: list[Anomaly] = []
if len(baseline) < 10 or len(recent) < 3:
return anomalies
baseline_failure_rate = sum(
1 for e in baseline if e.outcome == Outcome.FAILURE
) / len(baseline)
recent_failure_rate = sum(
1 for e in recent if e.outcome == Outcome.FAILURE
) / len(recent)
# Detect significant failure rate increase
if (
recent_failure_rate > baseline_failure_rate * 1.5
and recent_failure_rate > 0.3
):
rate_increase = recent_failure_rate / max(baseline_failure_rate, 0.01)
anomalies.append(
Anomaly(
id=uuid4(),
anomaly_type=AnomalyType.UNUSUAL_FAILURE_RATE,
description=(
f"Recent failure rate ({recent_failure_rate:.0%}) is "
f"{rate_increase:.1f}x the baseline ({baseline_failure_rate:.0%})"
),
severity=min(1.0, (rate_increase - 1) / 3),
episode_ids=[e.id for e in recent if e.outcome == Outcome.FAILURE],
detected_at=_utcnow(),
baseline_value=baseline_failure_rate,
observed_value=recent_failure_rate,
deviation_factor=rate_increase,
metadata={
"baseline_sample_size": len(baseline),
"recent_sample_size": len(recent),
},
)
)
return anomalies
# =========================================================================
# Insight Generation
# =========================================================================
async def generate_insights(
self,
project_id: UUID,
time_range: TimeRange | None = None,
agent_instance_id: UUID | None = None,
) -> list[Insight]:
"""
Generate actionable insights from reflection analysis.
Combines patterns, factors, and anomalies to generate:
- Optimization recommendations
- Warnings about issues
- Learning opportunities
- Trend analysis
Args:
project_id: Project to analyze
time_range: Optional time range (defaults to last 7 days)
agent_instance_id: Optional filter by agent instance
Returns:
List of insights sorted by priority
"""
if time_range is None:
time_range = TimeRange.last_days(7)
# Run all analyses
patterns = await self.analyze_patterns(
project_id, time_range, agent_instance_id
)
factors = await self.identify_success_factors(
project_id, None, time_range, agent_instance_id
)
anomalies = await self.detect_anomalies(project_id, 30, agent_instance_id)
insights: list[Insight] = []
# Generate insights from patterns
insights.extend(self._insights_from_patterns(patterns))
# Generate insights from factors
insights.extend(self._insights_from_factors(factors))
# Generate insights from anomalies
insights.extend(self._insights_from_anomalies(anomalies))
# Generate cross-cutting insights
insights.extend(self._generate_cross_insights(patterns, factors, anomalies))
# Filter by confidence and sort by priority
insights = [
i for i in insights if i.confidence >= self._config.min_insight_confidence
]
insights.sort(key=lambda i: -i.priority)
return insights[: self._config.max_insights_to_return]
def _insights_from_patterns(self, patterns: list[Pattern]) -> list[Insight]:
"""Generate insights from detected patterns."""
insights: list[Insight] = []
for pattern in patterns:
if pattern.pattern_type == PatternType.RECURRING_FAILURE:
insights.append(
Insight(
id=uuid4(),
insight_type=InsightType.WARNING,
title="Recurring failure pattern detected",
description=pattern.description,
priority=0.8,
confidence=pattern.confidence,
source_patterns=[pattern.id],
source_factors=[],
source_anomalies=[],
recommended_actions=[
"Review failing episodes for task type in pattern",
"Consider updating procedures or knowledge base",
"Add specific error handling for common failure modes",
],
generated_at=_utcnow(),
metadata=pattern.metadata,
)
)
elif pattern.pattern_type == PatternType.RECURRING_SUCCESS:
insights.append(
Insight(
id=uuid4(),
insight_type=InsightType.LEARNING,
title="Successful pattern identified",
description=pattern.description,
priority=0.6,
confidence=pattern.confidence,
source_patterns=[pattern.id],
source_factors=[],
source_anomalies=[],
recommended_actions=[
"Document this successful approach",
"Consider converting to a reusable procedure",
"Share learnings across similar task types",
],
generated_at=_utcnow(),
metadata=pattern.metadata,
)
)
elif pattern.pattern_type == PatternType.EFFICIENCY:
insights.append(
Insight(
id=uuid4(),
insight_type=InsightType.OPTIMIZATION,
title="Efficiency opportunity detected",
description=pattern.description,
priority=0.5,
confidence=pattern.confidence,
source_patterns=[pattern.id],
source_factors=[],
source_anomalies=[],
recommended_actions=[
"Analyze efficient episodes for best practices",
"Apply efficient approaches to similar tasks",
],
generated_at=_utcnow(),
metadata=pattern.metadata,
)
)
return insights
def _insights_from_factors(self, factors: list[Factor]) -> list[Insight]:
"""Generate insights from success/failure factors."""
insights: list[Insight] = []
positive_factors = [f for f in factors if f.correlation > 0]
negative_factors = [f for f in factors if f.correlation < 0]
if positive_factors:
top_positive = sorted(positive_factors, key=lambda f: -f.net_impact)[:3]
insights.append(
Insight(
id=uuid4(),
insight_type=InsightType.RECOMMENDATION,
title="Key success factors identified",
description=(
"Top success factors: "
+ ", ".join(f.name for f in top_positive)
),
priority=0.7,
confidence=statistics.mean(f.correlation for f in top_positive),
source_patterns=[],
source_factors=[f.id for f in top_positive],
source_anomalies=[],
recommended_actions=[f"Reinforce: {f.name}" for f in top_positive],
generated_at=_utcnow(),
metadata={
"factors": [f.to_dict() for f in top_positive],
},
)
)
if negative_factors:
top_negative = sorted(negative_factors, key=lambda f: f.net_impact)[:3]
insights.append(
Insight(
id=uuid4(),
insight_type=InsightType.WARNING,
title="Factors correlating with failure",
description=(
"Risky factors: " + ", ".join(f.name for f in top_negative)
),
priority=0.75,
confidence=statistics.mean(
abs(f.correlation) for f in top_negative
),
source_patterns=[],
source_factors=[f.id for f in top_negative],
source_anomalies=[],
recommended_actions=[f"Mitigate: {f.name}" for f in top_negative],
generated_at=_utcnow(),
metadata={
"factors": [f.to_dict() for f in top_negative],
},
)
)
return insights
def _insights_from_anomalies(self, anomalies: list[Anomaly]) -> list[Insight]:
"""Generate insights from detected anomalies."""
insights: list[Insight] = []
critical_anomalies = [a for a in anomalies if a.is_critical]
if critical_anomalies:
insights.append(
Insight(
id=uuid4(),
insight_type=InsightType.WARNING,
title=f"{len(critical_anomalies)} critical anomalies detected",
description=(
"Critical issues: "
+ "; ".join(a.description for a in critical_anomalies[:3])
),
priority=0.95,
confidence=0.9,
source_patterns=[],
source_factors=[],
source_anomalies=[a.id for a in critical_anomalies],
recommended_actions=[
"Investigate critical anomalies immediately",
"Review affected episodes for root cause",
"Consider pausing affected task types",
],
generated_at=_utcnow(),
metadata={
"anomaly_count": len(critical_anomalies),
"anomalies": [a.to_dict() for a in critical_anomalies[:5]],
},
)
)
failure_rate_anomalies = [
a for a in anomalies if a.anomaly_type == AnomalyType.UNUSUAL_FAILURE_RATE
]
if failure_rate_anomalies:
for anomaly in failure_rate_anomalies:
insights.append(
Insight(
id=uuid4(),
insight_type=InsightType.WARNING,
title="Failure rate spike detected",
description=anomaly.description,
priority=0.85,
confidence=0.8,
source_patterns=[],
source_factors=[],
source_anomalies=[anomaly.id],
recommended_actions=[
"Review recent failed episodes",
"Check for system changes or issues",
"Consider rolling back recent changes",
],
generated_at=_utcnow(),
metadata=anomaly.metadata,
)
)
return insights
def _generate_cross_insights(
self,
patterns: list[Pattern],
factors: list[Factor],
anomalies: list[Anomaly],
) -> list[Insight]:
"""Generate insights from combinations of patterns, factors, and anomalies."""
insights: list[Insight] = []
# Trend insight: Overall health
total_items = len(patterns) + len(factors) + len(anomalies)
if total_items > 0:
warning_count = (
len(
[
p
for p in patterns
if p.pattern_type == PatternType.RECURRING_FAILURE
]
)
+ len([a for a in anomalies if a.is_critical])
+ len([f for f in factors if f.correlation < -0.3])
)
health_score = 1.0 - (warning_count / max(total_items, 1))
insights.append(
Insight(
id=uuid4(),
insight_type=InsightType.TREND,
title=f"Overall health score: {health_score:.0%}",
description=(
f"Based on {len(patterns)} patterns, {len(factors)} factors, "
f"and {len(anomalies)} anomalies. "
f"Found {warning_count} warning indicators."
),
priority=0.6,
confidence=min(
1.0, total_items / 20
), # Higher sample = higher confidence
source_patterns=[p.id for p in patterns[:5]],
source_factors=[f.id for f in factors[:5]],
source_anomalies=[a.id for a in anomalies[:5]],
recommended_actions=(
["Continue current practices"]
if health_score > 0.7
else [
"Review warnings and address issues",
"Focus on improvement areas",
]
),
generated_at=_utcnow(),
metadata={
"health_score": health_score,
"pattern_count": len(patterns),
"factor_count": len(factors),
"anomaly_count": len(anomalies),
"warning_count": warning_count,
},
)
)
return insights
# =========================================================================
# Comprehensive Reflection
# =========================================================================
async def reflect(
self,
project_id: UUID,
time_range: TimeRange | None = None,
agent_instance_id: UUID | None = None,
) -> ReflectionResult:
"""
Run comprehensive memory reflection.
Performs all reflection operations and returns a combined result:
- Pattern detection
- Success/failure factor analysis
- Anomaly detection
- Insight generation
Args:
project_id: Project to reflect on
time_range: Optional time range (defaults to last 7 days)
agent_instance_id: Optional filter by agent instance
Returns:
ReflectionResult with all findings
"""
start_time = _utcnow()
if time_range is None:
time_range = TimeRange.last_days(7)
# Get episode count for metadata
episodic = await self._get_episodic()
episodes = await episodic.get_recent(
project_id,
limit=self._config.max_episodes_to_analyze,
since=time_range.start,
agent_instance_id=agent_instance_id,
)
episodes_in_range = [
e for e in episodes if time_range.start <= e.occurred_at <= time_range.end
]
# Run all analyses
patterns = await self.analyze_patterns(
project_id, time_range, agent_instance_id
)
factors = await self.identify_success_factors(
project_id, None, time_range, agent_instance_id
)
anomalies = await self.detect_anomalies(project_id, 30, agent_instance_id)
insights = await self.generate_insights(
project_id, time_range, agent_instance_id
)
duration = (_utcnow() - start_time).total_seconds()
logger.info(
f"Memory reflection completed for project {project_id}: "
f"{len(patterns)} patterns, {len(factors)} factors, "
f"{len(anomalies)} anomalies, {len(insights)} insights "
f"({duration:.2f}s)"
)
return ReflectionResult(
patterns=patterns,
factors=factors,
anomalies=anomalies,
insights=insights,
time_range=time_range,
episodes_analyzed=len(episodes_in_range),
analysis_duration_seconds=duration,
)
# Factory function - no singleton to avoid stale session issues
async def get_memory_reflection(
session: AsyncSession,
config: ReflectionConfig | None = None,
) -> MemoryReflection:
"""
Create a memory reflection service for the given session.
Note: This creates a new instance each time to avoid stale session issues.
The service is lightweight and safe to recreate per-request.
Args:
session: Database session (must be active)
config: Optional configuration
Returns:
MemoryReflection instance
"""
return MemoryReflection(session=session, config=config)
async def reset_memory_reflection() -> None:
"""No-op for backwards compatibility (singleton pattern removed)."""
return