10 Commits

Author SHA1 Message Date
Felipe Cardoso
da85a8aba8 fix(memory): prevent entry metadata mutation in vector search
- Create shallow copy of VectorIndexEntry when adding similarity score
- Prevents mutation of cached entries that could corrupt shared state

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 17:39:54 +01:00
Felipe Cardoso
f8bd1011e9 security(memory): escape SQL ILIKE patterns to prevent injection
- Add _escape_like_pattern() helper to escape SQL wildcards (%, _, \)
- Apply escaping in SemanticMemory.search_facts and get_by_entity
- Apply escaping in ProceduralMemory.search and find_best_for_task

Prevents attackers from injecting SQL wildcard patterns through
user-controlled search terms.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 17:39:47 +01:00
Felipe Cardoso
f057c2f0b6 fix(memory): add thread-safe singleton initialization
- Add threading.Lock with double-check locking to ScopeManager
- Add asyncio.Lock with double-check locking to MemoryReflection
- Make reset_memory_metrics async with proper locking
- Update test fixtures to handle async reset functions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 17:39:39 +01:00
Felipe Cardoso
33ec889fc4 fix(memory): add data integrity constraints to Fact model
- Change source_episode_ids from JSON to JSONB for PostgreSQL consistency
- Add unique constraint for global facts (project_id IS NULL)
- Add CHECK constraint ensuring reinforcement_count >= 1

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 17:39:30 +01:00
Felipe Cardoso
74b8c65741 fix(tests): move memory model tests to avoid import conflicts
Moved tests/unit/models/memory/ to tests/models/memory/ to avoid
Python import path conflicts when pytest collects all tests.

The conflict was caused by tests/models/ and tests/unit/models/ both
having __init__.py files, causing Python to confuse app.models.memory
imports.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 15:45:30 +01:00
Felipe Cardoso
b232298c61 feat(memory): add memory consolidation task and switch source_episode_ids to JSON
- Added `memory_consolidation` to the task list and updated `__all__` in test files.
- Updated `source_episode_ids` in `Fact` model to use JSON for cross-database compatibility.
- Revised related database migrations to use JSONB instead of ARRAY.
- Adjusted test concurrency in Makefile for improved test performance.
2026-01-05 15:38:52 +01:00
Felipe Cardoso
cf6291ac8e style(memory): apply ruff formatting and linting fixes
Auto-fixed linting errors and formatting issues:
- Removed unused imports (F401): pytest, Any, AnalysisType, MemoryType, OutcomeType
- Removed unused variable (F841): hooks variable in test
- Applied consistent formatting across memory service and test files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 14:07:48 +01:00
Felipe Cardoso
e3fe0439fd docs(memory): add comprehensive memory system documentation (#101)
Add complete documentation for the Agent Memory System including:
- Architecture overview with ASCII diagram
- Memory type descriptions (working, episodic, semantic, procedural)
- Usage examples for all memory operations
- Memory scoping hierarchy explanation
- Consolidation flow documentation
- MCP tools reference
- Reflection capabilities
- Configuration reference table
- Integration with Context Engine
- Metrics reference
- Performance targets
- Troubleshooting guide
- Directory structure

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 11:03:57 +01:00
Felipe Cardoso
57680c3772 feat(memory): implement metrics and observability (#100)
Add comprehensive metrics collector for memory system with:
- Counter metrics: operations, retrievals, cache hits/misses, consolidations,
  episodes recorded, patterns/anomalies/insights detected
- Gauge metrics: item counts, memory size, cache size, procedure success rates,
  active sessions, pending consolidations
- Histogram metrics: working memory latency, retrieval latency, consolidation
  duration, embedding latency
- Prometheus format export
- Summary and cache stats helpers

31 tests covering all metric types, singleton pattern, and edge cases.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 11:00:53 +01:00
Felipe Cardoso
997cfaa03a feat(memory): implement memory reflection service (#99)
Add reflection layer for memory system with pattern detection, success/failure
factor analysis, anomaly detection, and insights generation. Enables agents to
learn from past experiences and identify optimization opportunities.

Key components:
- Pattern detection: recurring success/failure, action sequences, temporal, efficiency
- Factor analysis: action, context, timing, resource, preceding state factors
- Anomaly detection: unusual duration, token usage, failure rates, action patterns
- Insight generation: optimization, warning, learning, recommendation, trend insights

Also fixes pre-existing timezone issues in test_types.py (datetime.now() -> datetime.now(UTC)).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 04:22:23 +01:00
36 changed files with 4943 additions and 143 deletions

View File

@@ -80,7 +80,7 @@ test:
test-cov:
@echo "🧪 Running tests with coverage..."
@IS_TEST=True PYTHONPATH=. uv run pytest --cov=app --cov-report=term-missing --cov-report=html -n 16
@IS_TEST=True PYTHONPATH=. uv run pytest --cov=app --cov-report=term-missing --cov-report=html -n 20
@echo "📊 Coverage report generated in htmlcov/index.html"
# ============================================================================

View File

@@ -247,11 +247,12 @@ def upgrade() -> None:
sa.Column("predicate", sa.String(255), nullable=False),
sa.Column("object", sa.Text(), nullable=False),
sa.Column("confidence", sa.Float(), nullable=False, server_default="0.8"),
# Source episode IDs stored as JSON array of UUID strings for cross-db compatibility
sa.Column(
"source_episode_ids",
postgresql.ARRAY(postgresql.UUID(as_uuid=True)),
postgresql.JSONB(astext_type=sa.Text()),
nullable=False,
server_default="{}",
server_default="[]",
),
sa.Column("first_learned", sa.DateTime(timezone=True), nullable=False),
sa.Column("last_reinforced", sa.DateTime(timezone=True), nullable=False),

View File

@@ -18,10 +18,7 @@ from sqlalchemy import (
Text,
text,
)
from sqlalchemy.dialects.postgresql import (
ARRAY,
UUID as PGUUID,
)
from sqlalchemy.dialects.postgresql import JSONB, UUID as PGUUID
from sqlalchemy.orm import relationship
from app.models.base import Base, TimestampMixin, UUIDMixin
@@ -63,10 +60,8 @@ class Fact(Base, UUIDMixin, TimestampMixin):
# Confidence score (0.0 to 1.0)
confidence = Column(Float, nullable=False, default=0.8, index=True)
# Source tracking: which episodes contributed to this fact
source_episode_ids: Column[list] = Column(
ARRAY(PGUUID(as_uuid=True)), default=list, nullable=False
)
# Source tracking: which episodes contributed to this fact (stored as JSONB array of UUID strings)
source_episode_ids: Column[list] = Column(JSONB, default=list, nullable=False)
# Learning history
first_learned = Column(DateTime(timezone=True), nullable=False)
@@ -90,17 +85,29 @@ class Fact(Base, UUIDMixin, TimestampMixin):
unique=True,
postgresql_where=text("project_id IS NOT NULL"),
),
# Unique constraint on triple for global facts (project_id IS NULL)
Index(
"ix_facts_unique_triple_global",
"subject",
"predicate",
"object",
unique=True,
postgresql_where=text("project_id IS NULL"),
),
# Query patterns
Index("ix_facts_subject_predicate", "subject", "predicate"),
Index("ix_facts_project_subject", "project_id", "subject"),
Index("ix_facts_confidence_time", "confidence", "last_reinforced"),
# For finding facts by entity (subject or object)
Index("ix_facts_subject", "subject"),
# Note: subject already has index=True on Column definition, no need for explicit index
# Data integrity constraints
CheckConstraint(
"confidence >= 0.0 AND confidence <= 1.0",
name="ck_facts_confidence_range",
),
CheckConstraint(
"reinforcement_count >= 1",
name="ck_facts_reinforcement_positive",
),
)
def __repr__(self) -> str:

View File

@@ -344,7 +344,12 @@ class BudgetAllocator:
Rebalanced budget
"""
if prioritize is None:
prioritize = [ContextType.KNOWLEDGE, ContextType.MEMORY, ContextType.TASK, ContextType.SYSTEM]
prioritize = [
ContextType.KNOWLEDGE,
ContextType.MEMORY,
ContextType.TASK,
ContextType.SYSTEM,
]
# Calculate unused tokens per type
unused: dict[str, int] = {}

View File

@@ -90,6 +90,9 @@ from .types import (
WorkingMemoryItem,
)
# Reflection (lazy import available)
# Import directly: from app.services.memory.reflection import MemoryReflection
__all__ = [
"CheckpointError",
"ConsolidationStatus",

View File

@@ -50,7 +50,9 @@ class CacheStats:
"embedding_cache": self.embedding_cache,
"retrieval_cache": self.retrieval_cache,
"overall_hit_rate": self.overall_hit_rate,
"last_cleanup": self.last_cleanup.isoformat() if self.last_cleanup else None,
"last_cleanup": self.last_cleanup.isoformat()
if self.last_cleanup
else None,
"cleanup_count": self.cleanup_count,
}
@@ -104,7 +106,8 @@ class CacheManager:
else:
self._embedding_cache = create_embedding_cache(
max_size=self._settings.cache_max_items,
default_ttl_seconds=self._settings.cache_ttl_seconds * 12, # 1hr for embeddings
default_ttl_seconds=self._settings.cache_ttl_seconds
* 12, # 1hr for embeddings
redis=redis,
)
@@ -271,7 +274,9 @@ class CacheManager:
# Invalidate retrieval cache
if self._retrieval_cache:
uuid_id = UUID(str(memory_id)) if not isinstance(memory_id, UUID) else memory_id
uuid_id = (
UUID(str(memory_id)) if not isinstance(memory_id, UUID) else memory_id
)
count += self._retrieval_cache.invalidate_by_memory(uuid_id)
logger.debug(f"Invalidated {count} cache entries for {memory_type}:{memory_id}")

View File

@@ -405,9 +405,7 @@ class EmbeddingCache:
count = 0
with self._lock:
keys_to_remove = [
k for k, v in self._cache.items() if v.model == model
]
keys_to_remove = [k for k, v in self._cache.items() if v.model == model]
for key in keys_to_remove:
del self._cache[key]
count += 1
@@ -454,9 +452,7 @@ class EmbeddingCache:
Number of entries removed
"""
with self._lock:
keys_to_remove = [
k for k, v in self._cache.items() if v.is_expired()
]
keys_to_remove = [k for k, v in self._cache.items() if v.is_expired()]
for key in keys_to_remove:
del self._cache[key]
self._stats.expirations += 1

View File

@@ -384,9 +384,7 @@ class HotMemoryCache[T]:
Number of entries removed
"""
with self._lock:
keys_to_remove = [
k for k, v in self._cache.items() if v.is_expired()
]
keys_to_remove = [k for k, v in self._cache.items() if v.is_expired()]
for key in keys_to_remove:
del self._cache[key]
self._stats.expirations += 1

View File

@@ -197,10 +197,17 @@ class VectorIndex(MemoryIndex[T]):
results = [(s, e) for s, e in results if e.memory_type == memory_type]
# Store similarity in metadata for the returned entries
# Use a copy of metadata to avoid mutating cached entries
output = []
for similarity, entry in results[:limit]:
entry.metadata["similarity"] = similarity
output.append(entry)
# Create a shallow copy of the entry with updated metadata
entry_with_score = VectorIndexEntry(
memory_id=entry.memory_id,
memory_type=entry.memory_type,
embedding=entry.embedding,
metadata={**entry.metadata, "similarity": similarity},
)
output.append(entry_with_score)
logger.debug(f"Vector search returned {len(output)} results")
return output

View File

@@ -321,10 +321,7 @@ class MemoryContextSource:
min_confidence=min_relevance,
)
return [
MemoryContext.from_semantic_memory(fact, query=query)
for fact in facts
]
return [MemoryContext.from_semantic_memory(fact, query=query) for fact in facts]
async def _fetch_procedural(
self,

View File

@@ -287,7 +287,9 @@ class AgentLifecycleManager:
# Get all current state
all_keys = await working.list_keys()
# Filter out checkpoint keys
state_keys = [k for k in all_keys if not k.startswith(self.CHECKPOINT_PREFIX)]
state_keys = [
k for k in all_keys if not k.startswith(self.CHECKPOINT_PREFIX)
]
state: dict[str, Any] = {}
for key in state_keys:
@@ -483,7 +485,9 @@ class AgentLifecycleManager:
# Gather session state for consolidation
all_keys = await working.list_keys()
state_keys = [k for k in all_keys if not k.startswith(self.CHECKPOINT_PREFIX)]
state_keys = [
k for k in all_keys if not k.startswith(self.CHECKPOINT_PREFIX)
]
session_state: dict[str, Any] = {}
for key in state_keys:
@@ -597,14 +601,16 @@ class AgentLifecycleManager:
for key in all_keys:
if key.startswith(self.CHECKPOINT_PREFIX):
checkpoint_id = key[len(self.CHECKPOINT_PREFIX):]
checkpoint_id = key[len(self.CHECKPOINT_PREFIX) :]
checkpoint = await working.get(key)
if checkpoint:
checkpoints.append({
"checkpoint_id": checkpoint_id,
"timestamp": checkpoint.get("timestamp"),
"keys_count": checkpoint.get("keys_count", 0),
})
checkpoints.append(
{
"checkpoint_id": checkpoint_id,
"timestamp": checkpoint.get("timestamp"),
"keys_count": checkpoint.get("keys_count", 0),
}
)
# Sort by timestamp (newest first)
checkpoints.sort(

View File

@@ -414,12 +414,14 @@ class MemoryToolService:
if args.query.lower() in key.lower():
value = await working.get(key)
if value is not None:
results.append({
"type": "working",
"key": key,
"content": str(value),
"relevance": 1.0,
})
results.append(
{
"type": "working",
"key": key,
"content": str(value),
"relevance": 1.0,
}
)
elif memory_type == MemoryType.EPISODIC:
episodic = await self._get_episodic()
@@ -430,14 +432,18 @@ class MemoryToolService:
agent_instance_id=context.agent_instance_id,
)
for episode in episodes:
results.append({
"type": "episodic",
"id": str(episode.id),
"summary": episode.task_description,
"outcome": episode.outcome.value if episode.outcome else None,
"occurred_at": episode.occurred_at.isoformat(),
"relevance": episode.importance_score,
})
results.append(
{
"type": "episodic",
"id": str(episode.id),
"summary": episode.task_description,
"outcome": episode.outcome.value
if episode.outcome
else None,
"occurred_at": episode.occurred_at.isoformat(),
"relevance": episode.importance_score,
}
)
elif memory_type == MemoryType.SEMANTIC:
semantic = await self._get_semantic()
@@ -448,15 +454,17 @@ class MemoryToolService:
min_confidence=args.min_relevance,
)
for fact in facts:
results.append({
"type": "semantic",
"id": str(fact.id),
"subject": fact.subject,
"predicate": fact.predicate,
"object": fact.object,
"confidence": fact.confidence,
"relevance": fact.confidence,
})
results.append(
{
"type": "semantic",
"id": str(fact.id),
"subject": fact.subject,
"predicate": fact.predicate,
"object": fact.object,
"confidence": fact.confidence,
"relevance": fact.confidence,
}
)
elif memory_type == MemoryType.PROCEDURAL:
procedural = await self._get_procedural()
@@ -467,15 +475,17 @@ class MemoryToolService:
limit=args.limit,
)
for proc in procedures:
results.append({
"type": "procedural",
"id": str(proc.id),
"name": proc.name,
"trigger": proc.trigger_pattern,
"success_rate": proc.success_rate,
"steps_count": len(proc.steps) if proc.steps else 0,
"relevance": proc.success_rate,
})
results.append(
{
"type": "procedural",
"id": str(proc.id),
"name": proc.name,
"trigger": proc.trigger_pattern,
"success_rate": proc.success_rate,
"steps_count": len(proc.steps) if proc.steps else 0,
"relevance": proc.success_rate,
}
)
# Sort by relevance and limit
results.sort(key=lambda x: x.get("relevance", 0), reverse=True)
@@ -601,7 +611,11 @@ class MemoryToolService:
if ep.task_type:
task_types[ep.task_type] = task_types.get(ep.task_type, 0) + 1
if ep.outcome:
outcome_val = ep.outcome.value if hasattr(ep.outcome, "value") else str(ep.outcome)
outcome_val = (
ep.outcome.value
if hasattr(ep.outcome, "value")
else str(ep.outcome)
)
outcomes[outcome_val] = outcomes.get(outcome_val, 0) + 1
# Sort by frequency
@@ -613,11 +627,13 @@ class MemoryToolService:
examples = []
if args.include_examples:
for ep in episodes[: min(3, args.max_items)]:
examples.append({
"summary": ep.task_description,
"task_type": ep.task_type,
"outcome": ep.outcome.value if ep.outcome else None,
})
examples.append(
{
"summary": ep.task_description,
"task_type": ep.task_type,
"outcome": ep.outcome.value if ep.outcome else None,
}
)
return {
"analysis_type": "recent_patterns",
@@ -661,11 +677,13 @@ class MemoryToolService:
examples = []
if args.include_examples:
for ep in successful[: min(3, args.max_items)]:
examples.append({
"summary": ep.task_description,
"task_type": ep.task_type,
"lessons": ep.lessons_learned,
})
examples.append(
{
"summary": ep.task_description,
"task_type": ep.task_type,
"lessons": ep.lessons_learned,
}
)
return {
"analysis_type": "success_factors",
@@ -694,9 +712,7 @@ class MemoryToolService:
failure_by_task[task].append(ep)
# Most common failure types
failure_counts = {
task: len(eps) for task, eps in failure_by_task.items()
}
failure_counts = {task: len(eps) for task, eps in failure_by_task.items()}
top_failures = sorted(failure_counts.items(), key=lambda x: x[1], reverse=True)[
: args.max_items
]
@@ -704,12 +720,14 @@ class MemoryToolService:
examples = []
if args.include_examples:
for ep in failed[: min(3, args.max_items)]:
examples.append({
"summary": ep.task_description,
"task_type": ep.task_type,
"lessons": ep.lessons_learned,
"error": ep.outcome_details,
})
examples.append(
{
"summary": ep.task_description,
"task_type": ep.task_type,
"lessons": ep.lessons_learned,
"error": ep.outcome_details,
}
)
return {
"analysis_type": "failure_patterns",
@@ -794,15 +812,21 @@ class MemoryToolService:
insights = []
if top_tasks:
insights.append(f"Most common task type: {top_tasks[0][0]} ({top_tasks[0][1]} occurrences)")
insights.append(
f"Most common task type: {top_tasks[0][0]} ({top_tasks[0][1]} occurrences)"
)
total = sum(outcome_dist.values())
if total > 0:
success_rate = outcome_dist.get("success", 0) / total
if success_rate > 0.8:
insights.append("High success rate observed - current approach is working well")
insights.append(
"High success rate observed - current approach is working well"
)
elif success_rate < 0.5:
insights.append("Success rate below 50% - consider reviewing procedures")
insights.append(
"Success rate below 50% - consider reviewing procedures"
)
return insights
@@ -839,9 +863,13 @@ class MemoryToolService:
if top_failures:
worst_task, count = top_failures[0]
tips.append(f"'{worst_task}' has most failures ({count}) - needs procedure review")
tips.append(
f"'{worst_task}' has most failures ({count}) - needs procedure review"
)
tips.append("Review lessons_learned from past failures before attempting similar tasks")
tips.append(
"Review lessons_learned from past failures before attempting similar tasks"
)
return tips
@@ -912,7 +940,11 @@ class MemoryToolService:
outcomes = {"success": 0, "failure": 0, "partial": 0, "abandoned": 0}
for ep in recent_episodes:
if ep.outcome:
key = ep.outcome.value if hasattr(ep.outcome, "value") else str(ep.outcome)
key = (
ep.outcome.value
if hasattr(ep.outcome, "value")
else str(ep.outcome)
)
if key in outcomes:
outcomes[key] += 1
@@ -942,7 +974,8 @@ class MemoryToolService:
# Filter by minimum success rate if specified
procedures = [
p for p in all_procedures
p
for p in all_procedures
if args.min_success_rate is None or p.success_rate >= args.min_success_rate
][: args.limit]

View File

@@ -0,0 +1,18 @@
# app/services/memory/metrics/__init__.py
"""Memory Metrics module."""
from .collector import (
MemoryMetrics,
get_memory_metrics,
record_memory_operation,
record_retrieval,
reset_memory_metrics,
)
__all__ = [
"MemoryMetrics",
"get_memory_metrics",
"record_memory_operation",
"record_retrieval",
"reset_memory_metrics",
]

View File

@@ -0,0 +1,536 @@
# app/services/memory/metrics/collector.py
"""
Memory Metrics Collector
Collects and exposes metrics for the memory system.
"""
import asyncio
import logging
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
logger = logging.getLogger(__name__)
class MetricType(str, Enum):
"""Types of metrics."""
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class MetricValue:
"""A single metric value."""
name: str
metric_type: MetricType
value: float
labels: dict[str, str] = field(default_factory=dict)
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
class HistogramBucket:
"""Histogram bucket for distribution metrics."""
le: float # Less than or equal
count: int = 0
class MemoryMetrics:
"""
Collects memory system metrics.
Metrics tracked:
- Memory operations (get/set/delete by type and scope)
- Retrieval operations and latencies
- Memory item counts by type
- Consolidation operations and durations
- Cache hit/miss rates
- Procedure success rates
- Embedding operations
"""
def __init__(self) -> None:
"""Initialize MemoryMetrics."""
self._counters: dict[str, Counter[str]] = defaultdict(Counter)
self._gauges: dict[str, dict[str, float]] = defaultdict(dict)
self._histograms: dict[str, list[float]] = defaultdict(list)
self._histogram_buckets: dict[str, list[HistogramBucket]] = {}
self._lock = asyncio.Lock()
# Initialize histogram buckets
self._init_histogram_buckets()
def _init_histogram_buckets(self) -> None:
"""Initialize histogram buckets for latency metrics."""
# Fast operations (working memory)
fast_buckets = [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, float("inf")]
# Normal operations (retrieval)
normal_buckets = [0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, float("inf")]
# Slow operations (consolidation)
slow_buckets = [0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, float("inf")]
self._histogram_buckets["memory_working_latency_seconds"] = [
HistogramBucket(le=b) for b in fast_buckets
]
self._histogram_buckets["memory_retrieval_latency_seconds"] = [
HistogramBucket(le=b) for b in normal_buckets
]
self._histogram_buckets["memory_consolidation_duration_seconds"] = [
HistogramBucket(le=b) for b in slow_buckets
]
self._histogram_buckets["memory_embedding_latency_seconds"] = [
HistogramBucket(le=b) for b in normal_buckets
]
# Counter methods - Operations
async def inc_operations(
self,
operation: str,
memory_type: str,
scope: str | None = None,
success: bool = True,
) -> None:
"""Increment memory operation counter."""
async with self._lock:
labels = f"operation={operation},memory_type={memory_type}"
if scope:
labels += f",scope={scope}"
labels += f",success={str(success).lower()}"
self._counters["memory_operations_total"][labels] += 1
async def inc_retrieval(
self,
memory_type: str,
strategy: str,
results_count: int,
) -> None:
"""Increment retrieval counter."""
async with self._lock:
labels = f"memory_type={memory_type},strategy={strategy}"
self._counters["memory_retrievals_total"][labels] += 1
# Track result counts as a separate metric
self._counters["memory_retrieval_results_total"][labels] += results_count
async def inc_cache_hit(self, cache_type: str) -> None:
"""Increment cache hit counter."""
async with self._lock:
labels = f"cache_type={cache_type}"
self._counters["memory_cache_hits_total"][labels] += 1
async def inc_cache_miss(self, cache_type: str) -> None:
"""Increment cache miss counter."""
async with self._lock:
labels = f"cache_type={cache_type}"
self._counters["memory_cache_misses_total"][labels] += 1
async def inc_consolidation(
self,
consolidation_type: str,
success: bool = True,
) -> None:
"""Increment consolidation counter."""
async with self._lock:
labels = f"type={consolidation_type},success={str(success).lower()}"
self._counters["memory_consolidations_total"][labels] += 1
async def inc_procedure_execution(
self,
procedure_id: str | None = None,
success: bool = True,
) -> None:
"""Increment procedure execution counter."""
async with self._lock:
labels = f"success={str(success).lower()}"
self._counters["memory_procedure_executions_total"][labels] += 1
async def inc_embeddings_generated(self, memory_type: str) -> None:
"""Increment embeddings generated counter."""
async with self._lock:
labels = f"memory_type={memory_type}"
self._counters["memory_embeddings_generated_total"][labels] += 1
async def inc_fact_reinforcements(self) -> None:
"""Increment fact reinforcement counter."""
async with self._lock:
self._counters["memory_fact_reinforcements_total"][""] += 1
async def inc_episodes_recorded(self, outcome: str) -> None:
"""Increment episodes recorded counter."""
async with self._lock:
labels = f"outcome={outcome}"
self._counters["memory_episodes_recorded_total"][labels] += 1
async def inc_anomalies_detected(self, anomaly_type: str) -> None:
"""Increment anomaly detection counter."""
async with self._lock:
labels = f"anomaly_type={anomaly_type}"
self._counters["memory_anomalies_detected_total"][labels] += 1
async def inc_patterns_detected(self, pattern_type: str) -> None:
"""Increment pattern detection counter."""
async with self._lock:
labels = f"pattern_type={pattern_type}"
self._counters["memory_patterns_detected_total"][labels] += 1
async def inc_insights_generated(self, insight_type: str) -> None:
"""Increment insight generation counter."""
async with self._lock:
labels = f"insight_type={insight_type}"
self._counters["memory_insights_generated_total"][labels] += 1
# Gauge methods
async def set_memory_items_count(
self,
memory_type: str,
scope: str,
count: int,
) -> None:
"""Set memory item count gauge."""
async with self._lock:
labels = f"memory_type={memory_type},scope={scope}"
self._gauges["memory_items_count"][labels] = float(count)
async def set_memory_size_bytes(
self,
memory_type: str,
scope: str,
size_bytes: int,
) -> None:
"""Set memory size gauge in bytes."""
async with self._lock:
labels = f"memory_type={memory_type},scope={scope}"
self._gauges["memory_size_bytes"][labels] = float(size_bytes)
async def set_cache_size(self, cache_type: str, size: int) -> None:
"""Set cache size gauge."""
async with self._lock:
labels = f"cache_type={cache_type}"
self._gauges["memory_cache_size"][labels] = float(size)
async def set_procedure_success_rate(
self,
procedure_name: str,
rate: float,
) -> None:
"""Set procedure success rate gauge (0-1)."""
async with self._lock:
labels = f"procedure_name={procedure_name}"
self._gauges["memory_procedure_success_rate"][labels] = rate
async def set_active_sessions(self, count: int) -> None:
"""Set active working memory sessions gauge."""
async with self._lock:
self._gauges["memory_active_sessions"][""] = float(count)
async def set_pending_consolidations(self, count: int) -> None:
"""Set pending consolidations gauge."""
async with self._lock:
self._gauges["memory_pending_consolidations"][""] = float(count)
# Histogram methods
async def observe_working_latency(self, latency_seconds: float) -> None:
"""Observe working memory operation latency."""
async with self._lock:
self._observe_histogram("memory_working_latency_seconds", latency_seconds)
async def observe_retrieval_latency(self, latency_seconds: float) -> None:
"""Observe retrieval latency."""
async with self._lock:
self._observe_histogram("memory_retrieval_latency_seconds", latency_seconds)
async def observe_consolidation_duration(self, duration_seconds: float) -> None:
"""Observe consolidation duration."""
async with self._lock:
self._observe_histogram(
"memory_consolidation_duration_seconds", duration_seconds
)
async def observe_embedding_latency(self, latency_seconds: float) -> None:
"""Observe embedding generation latency."""
async with self._lock:
self._observe_histogram("memory_embedding_latency_seconds", latency_seconds)
def _observe_histogram(self, name: str, value: float) -> None:
"""Record a value in a histogram."""
self._histograms[name].append(value)
# Update buckets
if name in self._histogram_buckets:
for bucket in self._histogram_buckets[name]:
if value <= bucket.le:
bucket.count += 1
# Export methods
async def get_all_metrics(self) -> list[MetricValue]:
"""Get all metrics as MetricValue objects."""
metrics: list[MetricValue] = []
async with self._lock:
# Export counters
for name, counter in self._counters.items():
for labels_str, value in counter.items():
labels = self._parse_labels(labels_str)
metrics.append(
MetricValue(
name=name,
metric_type=MetricType.COUNTER,
value=float(value),
labels=labels,
)
)
# Export gauges
for name, gauge_dict in self._gauges.items():
for labels_str, gauge_value in gauge_dict.items():
gauge_labels = self._parse_labels(labels_str)
metrics.append(
MetricValue(
name=name,
metric_type=MetricType.GAUGE,
value=gauge_value,
labels=gauge_labels,
)
)
# Export histogram summaries
for name, values in self._histograms.items():
if values:
metrics.append(
MetricValue(
name=f"{name}_count",
metric_type=MetricType.COUNTER,
value=float(len(values)),
)
)
metrics.append(
MetricValue(
name=f"{name}_sum",
metric_type=MetricType.COUNTER,
value=sum(values),
)
)
return metrics
async def get_prometheus_format(self) -> str:
"""Export metrics in Prometheus text format."""
lines: list[str] = []
async with self._lock:
# Export counters
for name, counter in self._counters.items():
lines.append(f"# TYPE {name} counter")
for labels_str, value in counter.items():
if labels_str:
lines.append(f"{name}{{{labels_str}}} {value}")
else:
lines.append(f"{name} {value}")
# Export gauges
for name, gauge_dict in self._gauges.items():
lines.append(f"# TYPE {name} gauge")
for labels_str, gauge_value in gauge_dict.items():
if labels_str:
lines.append(f"{name}{{{labels_str}}} {gauge_value}")
else:
lines.append(f"{name} {gauge_value}")
# Export histograms
for name, buckets in self._histogram_buckets.items():
lines.append(f"# TYPE {name} histogram")
for bucket in buckets:
le_str = "+Inf" if bucket.le == float("inf") else str(bucket.le)
lines.append(f'{name}_bucket{{le="{le_str}"}} {bucket.count}')
if name in self._histograms:
values = self._histograms[name]
lines.append(f"{name}_count {len(values)}")
lines.append(f"{name}_sum {sum(values)}")
return "\n".join(lines)
async def get_summary(self) -> dict[str, Any]:
"""Get a summary of key metrics."""
async with self._lock:
total_operations = sum(self._counters["memory_operations_total"].values())
successful_operations = sum(
v
for k, v in self._counters["memory_operations_total"].items()
if "success=true" in k
)
total_retrievals = sum(self._counters["memory_retrievals_total"].values())
total_cache_hits = sum(self._counters["memory_cache_hits_total"].values())
total_cache_misses = sum(
self._counters["memory_cache_misses_total"].values()
)
cache_hit_rate = (
total_cache_hits / (total_cache_hits + total_cache_misses)
if (total_cache_hits + total_cache_misses) > 0
else 0.0
)
total_consolidations = sum(
self._counters["memory_consolidations_total"].values()
)
total_episodes = sum(
self._counters["memory_episodes_recorded_total"].values()
)
# Calculate average latencies
retrieval_latencies = self._histograms.get(
"memory_retrieval_latency_seconds", []
)
avg_retrieval_latency = (
sum(retrieval_latencies) / len(retrieval_latencies)
if retrieval_latencies
else 0.0
)
return {
"total_operations": total_operations,
"successful_operations": successful_operations,
"operation_success_rate": (
successful_operations / total_operations
if total_operations > 0
else 1.0
),
"total_retrievals": total_retrievals,
"cache_hit_rate": cache_hit_rate,
"total_consolidations": total_consolidations,
"total_episodes_recorded": total_episodes,
"avg_retrieval_latency_ms": avg_retrieval_latency * 1000,
"patterns_detected": sum(
self._counters["memory_patterns_detected_total"].values()
),
"insights_generated": sum(
self._counters["memory_insights_generated_total"].values()
),
"anomalies_detected": sum(
self._counters["memory_anomalies_detected_total"].values()
),
"active_sessions": self._gauges.get("memory_active_sessions", {}).get(
"", 0
),
"pending_consolidations": self._gauges.get(
"memory_pending_consolidations", {}
).get("", 0),
}
async def get_cache_stats(self) -> dict[str, Any]:
"""Get detailed cache statistics."""
async with self._lock:
stats: dict[str, Any] = {}
# Get hits/misses by cache type
for labels_str, hits in self._counters["memory_cache_hits_total"].items():
cache_type = self._parse_labels(labels_str).get("cache_type", "unknown")
if cache_type not in stats:
stats[cache_type] = {"hits": 0, "misses": 0}
stats[cache_type]["hits"] = hits
for labels_str, misses in self._counters[
"memory_cache_misses_total"
].items():
cache_type = self._parse_labels(labels_str).get("cache_type", "unknown")
if cache_type not in stats:
stats[cache_type] = {"hits": 0, "misses": 0}
stats[cache_type]["misses"] = misses
# Calculate hit rates
for data in stats.values():
total = data["hits"] + data["misses"]
data["hit_rate"] = data["hits"] / total if total > 0 else 0.0
data["total"] = total
return stats
async def reset(self) -> None:
"""Reset all metrics."""
async with self._lock:
self._counters.clear()
self._gauges.clear()
self._histograms.clear()
self._init_histogram_buckets()
def _parse_labels(self, labels_str: str) -> dict[str, str]:
"""Parse labels string into dictionary."""
if not labels_str:
return {}
labels = {}
for pair in labels_str.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
labels[key.strip()] = value.strip()
return labels
# Singleton instance
_metrics: MemoryMetrics | None = None
_lock = asyncio.Lock()
async def get_memory_metrics() -> MemoryMetrics:
"""Get the singleton MemoryMetrics instance."""
global _metrics
async with _lock:
if _metrics is None:
_metrics = MemoryMetrics()
return _metrics
async def reset_memory_metrics() -> None:
"""Reset the singleton instance (for testing)."""
global _metrics
async with _lock:
_metrics = None
# Convenience functions
async def record_memory_operation(
operation: str,
memory_type: str,
scope: str | None = None,
success: bool = True,
latency_ms: float | None = None,
) -> None:
"""Record a memory operation."""
metrics = await get_memory_metrics()
await metrics.inc_operations(operation, memory_type, scope, success)
if latency_ms is not None and memory_type == "working":
await metrics.observe_working_latency(latency_ms / 1000)
async def record_retrieval(
memory_type: str,
strategy: str,
results_count: int,
latency_ms: float,
) -> None:
"""Record a retrieval operation."""
metrics = await get_memory_metrics()
await metrics.inc_retrieval(memory_type, strategy, results_count)
await metrics.observe_retrieval_latency(latency_ms / 1000)

View File

@@ -22,6 +22,25 @@ from app.services.memory.types import Procedure, ProcedureCreate, RetrievalResul
logger = logging.getLogger(__name__)
def _escape_like_pattern(pattern: str) -> str:
"""
Escape SQL LIKE/ILIKE special characters to prevent pattern injection.
Characters escaped:
- % (matches zero or more characters)
- _ (matches exactly one character)
- \\ (escape character itself)
Args:
pattern: Raw search pattern from user input
Returns:
Escaped pattern safe for use in LIKE/ILIKE queries
"""
# Escape backslash first, then the wildcards
return pattern.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
def _model_to_procedure(model: ProcedureModel) -> Procedure:
"""Convert SQLAlchemy model to Procedure dataclass."""
return Procedure(
@@ -320,7 +339,9 @@ class ProceduralMemory:
if search_terms:
conditions = []
for term in search_terms:
term_pattern = f"%{term}%"
# Escape SQL wildcards to prevent pattern injection
escaped_term = _escape_like_pattern(term)
term_pattern = f"%{escaped_term}%"
conditions.append(
or_(
ProcedureModel.trigger_pattern.ilike(term_pattern),
@@ -368,6 +389,10 @@ class ProceduralMemory:
Returns:
Best matching procedure or None
"""
# Escape SQL wildcards to prevent pattern injection
escaped_task_type = _escape_like_pattern(task_type)
task_type_pattern = f"%{escaped_task_type}%"
# Build query for procedures matching task type
stmt = (
select(ProcedureModel)
@@ -376,8 +401,8 @@ class ProceduralMemory:
(ProcedureModel.success_count + ProcedureModel.failure_count)
>= min_uses,
or_(
ProcedureModel.trigger_pattern.ilike(f"%{task_type}%"),
ProcedureModel.name.ilike(f"%{task_type}%"),
ProcedureModel.trigger_pattern.ilike(task_type_pattern),
ProcedureModel.name.ilike(task_type_pattern),
),
)
)

View File

@@ -0,0 +1,38 @@
# app/services/memory/reflection/__init__.py
"""
Memory Reflection Layer.
Analyzes patterns in agent experiences to generate actionable insights.
"""
from .service import (
MemoryReflection,
ReflectionConfig,
get_memory_reflection,
)
from .types import (
Anomaly,
AnomalyType,
Factor,
FactorType,
Insight,
InsightType,
Pattern,
PatternType,
TimeRange,
)
__all__ = [
"Anomaly",
"AnomalyType",
"Factor",
"FactorType",
"Insight",
"InsightType",
"MemoryReflection",
"Pattern",
"PatternType",
"ReflectionConfig",
"TimeRange",
"get_memory_reflection",
]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,304 @@
# app/services/memory/reflection/types.py
"""
Memory Reflection Types.
Type definitions for pattern detection, anomaly detection, and insights.
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from uuid import UUID
def _utcnow() -> datetime:
"""Get current UTC time as timezone-aware datetime."""
return datetime.now(UTC)
class PatternType(str, Enum):
"""Types of patterns detected in episodic memory."""
RECURRING_SUCCESS = "recurring_success"
RECURRING_FAILURE = "recurring_failure"
ACTION_SEQUENCE = "action_sequence"
CONTEXT_CORRELATION = "context_correlation"
TEMPORAL = "temporal"
EFFICIENCY = "efficiency"
class FactorType(str, Enum):
"""Types of factors contributing to outcomes."""
ACTION = "action"
CONTEXT = "context"
TIMING = "timing"
RESOURCE = "resource"
PRECEDING_STATE = "preceding_state"
class AnomalyType(str, Enum):
"""Types of anomalies detected."""
UNUSUAL_DURATION = "unusual_duration"
UNEXPECTED_OUTCOME = "unexpected_outcome"
UNUSUAL_TOKEN_USAGE = "unusual_token_usage"
UNUSUAL_FAILURE_RATE = "unusual_failure_rate"
UNUSUAL_ACTION_PATTERN = "unusual_action_pattern"
class InsightType(str, Enum):
"""Types of insights generated."""
OPTIMIZATION = "optimization"
WARNING = "warning"
LEARNING = "learning"
RECOMMENDATION = "recommendation"
TREND = "trend"
@dataclass
class TimeRange:
"""Time range for reflection analysis."""
start: datetime
end: datetime
@classmethod
def last_hours(cls, hours: int = 24) -> "TimeRange":
"""Create time range for last N hours."""
end = _utcnow()
start = datetime(
end.year, end.month, end.day, end.hour, end.minute, end.second, tzinfo=UTC
) - __import__("datetime").timedelta(hours=hours)
return cls(start=start, end=end)
@classmethod
def last_days(cls, days: int = 7) -> "TimeRange":
"""Create time range for last N days."""
from datetime import timedelta
end = _utcnow()
start = end - timedelta(days=days)
return cls(start=start, end=end)
@property
def duration_hours(self) -> float:
"""Get duration in hours."""
return (self.end - self.start).total_seconds() / 3600
@property
def duration_days(self) -> float:
"""Get duration in days."""
return (self.end - self.start).total_seconds() / 86400
@dataclass
class Pattern:
"""A detected pattern in episodic memory."""
id: UUID
pattern_type: PatternType
name: str
description: str
confidence: float
occurrence_count: int
episode_ids: list[UUID]
first_seen: datetime
last_seen: datetime
metadata: dict[str, Any] = field(default_factory=dict)
@property
def frequency(self) -> float:
"""Calculate pattern frequency per day."""
duration_days = (self.last_seen - self.first_seen).total_seconds() / 86400
if duration_days < 1:
duration_days = 1
return self.occurrence_count / duration_days
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"id": str(self.id),
"pattern_type": self.pattern_type.value,
"name": self.name,
"description": self.description,
"confidence": self.confidence,
"occurrence_count": self.occurrence_count,
"episode_ids": [str(eid) for eid in self.episode_ids],
"first_seen": self.first_seen.isoformat(),
"last_seen": self.last_seen.isoformat(),
"frequency": self.frequency,
"metadata": self.metadata,
}
@dataclass
class Factor:
"""A factor contributing to success or failure."""
id: UUID
factor_type: FactorType
name: str
description: str
impact_score: float
correlation: float
sample_size: int
positive_examples: list[UUID]
negative_examples: list[UUID]
metadata: dict[str, Any] = field(default_factory=dict)
@property
def net_impact(self) -> float:
"""Calculate net impact considering sample size."""
# Weight impact by sample confidence
confidence_weight = min(1.0, self.sample_size / 20)
return self.impact_score * self.correlation * confidence_weight
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"id": str(self.id),
"factor_type": self.factor_type.value,
"name": self.name,
"description": self.description,
"impact_score": self.impact_score,
"correlation": self.correlation,
"sample_size": self.sample_size,
"positive_examples": [str(eid) for eid in self.positive_examples],
"negative_examples": [str(eid) for eid in self.negative_examples],
"net_impact": self.net_impact,
"metadata": self.metadata,
}
@dataclass
class Anomaly:
"""An anomaly detected in memory patterns."""
id: UUID
anomaly_type: AnomalyType
description: str
severity: float
episode_ids: list[UUID]
detected_at: datetime
baseline_value: float
observed_value: float
deviation_factor: float
metadata: dict[str, Any] = field(default_factory=dict)
@property
def is_critical(self) -> bool:
"""Check if anomaly is critical (severity > 0.8)."""
return self.severity > 0.8
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"id": str(self.id),
"anomaly_type": self.anomaly_type.value,
"description": self.description,
"severity": self.severity,
"episode_ids": [str(eid) for eid in self.episode_ids],
"detected_at": self.detected_at.isoformat(),
"baseline_value": self.baseline_value,
"observed_value": self.observed_value,
"deviation_factor": self.deviation_factor,
"is_critical": self.is_critical,
"metadata": self.metadata,
}
@dataclass
class Insight:
"""An actionable insight generated from reflection."""
id: UUID
insight_type: InsightType
title: str
description: str
priority: float
confidence: float
source_patterns: list[UUID]
source_factors: list[UUID]
source_anomalies: list[UUID]
recommended_actions: list[str]
generated_at: datetime
metadata: dict[str, Any] = field(default_factory=dict)
@property
def actionable_score(self) -> float:
"""Calculate how actionable this insight is."""
action_weight = min(1.0, len(self.recommended_actions) / 3)
return self.priority * self.confidence * action_weight
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"id": str(self.id),
"insight_type": self.insight_type.value,
"title": self.title,
"description": self.description,
"priority": self.priority,
"confidence": self.confidence,
"source_patterns": [str(pid) for pid in self.source_patterns],
"source_factors": [str(fid) for fid in self.source_factors],
"source_anomalies": [str(aid) for aid in self.source_anomalies],
"recommended_actions": self.recommended_actions,
"generated_at": self.generated_at.isoformat(),
"actionable_score": self.actionable_score,
"metadata": self.metadata,
}
@dataclass
class ReflectionResult:
"""Result of a reflection operation."""
patterns: list[Pattern]
factors: list[Factor]
anomalies: list[Anomaly]
insights: list[Insight]
time_range: TimeRange
episodes_analyzed: int
analysis_duration_seconds: float
generated_at: datetime = field(default_factory=_utcnow)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"patterns": [p.to_dict() for p in self.patterns],
"factors": [f.to_dict() for f in self.factors],
"anomalies": [a.to_dict() for a in self.anomalies],
"insights": [i.to_dict() for i in self.insights],
"time_range": {
"start": self.time_range.start.isoformat(),
"end": self.time_range.end.isoformat(),
"duration_hours": self.time_range.duration_hours,
},
"episodes_analyzed": self.episodes_analyzed,
"analysis_duration_seconds": self.analysis_duration_seconds,
"generated_at": self.generated_at.isoformat(),
}
@property
def summary(self) -> str:
"""Generate a summary of the reflection results."""
lines = [
f"Reflection Analysis ({self.time_range.duration_days:.1f} days)",
f"Episodes analyzed: {self.episodes_analyzed}",
"",
f"Patterns detected: {len(self.patterns)}",
f"Success/failure factors: {len(self.factors)}",
f"Anomalies found: {len(self.anomalies)}",
f"Insights generated: {len(self.insights)}",
]
if self.insights:
lines.append("")
lines.append("Top insights:")
for insight in sorted(self.insights, key=lambda i: -i.priority)[:3]:
lines.append(f" - [{insight.insight_type.value}] {insight.title}")
return "\n".join(lines)

View File

@@ -7,6 +7,7 @@ Global -> Project -> Agent Type -> Agent Instance -> Session
"""
import logging
import threading
from dataclasses import dataclass, field
from typing import Any, ClassVar
from uuid import UUID
@@ -448,13 +449,24 @@ class ScopeManager:
return False
# Singleton manager instance
# Singleton manager instance with thread-safe initialization
_manager: ScopeManager | None = None
_manager_lock = threading.Lock()
def get_scope_manager() -> ScopeManager:
"""Get the singleton scope manager instance."""
"""Get the singleton scope manager instance (thread-safe)."""
global _manager
if _manager is None:
_manager = ScopeManager()
with _manager_lock:
# Double-check locking pattern
if _manager is None:
_manager = ScopeManager()
return _manager
def reset_scope_manager() -> None:
"""Reset the scope manager singleton (for testing)."""
global _manager
with _manager_lock:
_manager = None

View File

@@ -22,6 +22,25 @@ from app.services.memory.types import Episode, Fact, FactCreate, RetrievalResult
logger = logging.getLogger(__name__)
def _escape_like_pattern(pattern: str) -> str:
"""
Escape SQL LIKE/ILIKE special characters to prevent pattern injection.
Characters escaped:
- % (matches zero or more characters)
- _ (matches exactly one character)
- \\ (escape character itself)
Args:
pattern: Raw search pattern from user input
Returns:
Escaped pattern safe for use in LIKE/ILIKE queries
"""
# Escape backslash first, then the wildcards
return pattern.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
def _model_to_fact(model: FactModel) -> Fact:
"""Convert SQLAlchemy model to Fact dataclass."""
# SQLAlchemy Column types are inferred as Column[T] by mypy, but at runtime
@@ -251,7 +270,9 @@ class SemanticMemory:
if search_terms:
conditions = []
for term in search_terms[:5]: # Limit to 5 terms
term_pattern = f"%{term}%"
# Escape SQL wildcards to prevent pattern injection
escaped_term = _escape_like_pattern(term)
term_pattern = f"%{escaped_term}%"
conditions.append(
or_(
FactModel.subject.ilike(term_pattern),
@@ -295,12 +316,16 @@ class SemanticMemory:
"""
start_time = time.perf_counter()
# Escape SQL wildcards to prevent pattern injection
escaped_entity = _escape_like_pattern(entity)
entity_pattern = f"%{escaped_entity}%"
stmt = (
select(FactModel)
.where(
or_(
FactModel.subject.ilike(f"%{entity}%"),
FactModel.object.ilike(f"%{entity}%"),
FactModel.subject.ilike(entity_pattern),
FactModel.object.ilike(entity_pattern),
)
)
.order_by(desc(FactModel.confidence), desc(FactModel.last_reinforced))

View File

@@ -0,0 +1,507 @@
# Agent Memory System
Comprehensive multi-tier cognitive memory for AI agents, enabling state persistence, experiential learning, and context continuity across sessions.
## Overview
The Agent Memory System implements a cognitive architecture inspired by human memory:
```
+------------------------------------------------------------------+
| Agent Memory System |
+------------------------------------------------------------------+
| |
| +------------------+ +------------------+ |
| | Working Memory |----consolidate---->| Episodic Memory | |
| | (Redis/In-Mem) | | (PostgreSQL) | |
| | | | | |
| | - Current task | | - Past sessions | |
| | - Variables | | - Experiences | |
| | - Scratchpad | | - Outcomes | |
| +------------------+ +--------+---------+ |
| | |
| extract | |
| v |
| +------------------+ +------------------+ |
| |Procedural Memory |<-----learn from----| Semantic Memory | |
| | (PostgreSQL) | | (PostgreSQL + | |
| | | | pgvector) | |
| | - Procedures | | | |
| | - Skills | | - Facts | |
| | - Patterns | | - Entities | |
| +------------------+ | - Relationships | |
| +------------------+ |
+------------------------------------------------------------------+
```
## Memory Types
### Working Memory
Short-term, session-scoped memory for current task state.
**Features:**
- Key-value storage with TTL
- Task state tracking
- Scratchpad for reasoning
- Checkpoint/restore support
- Redis primary with in-memory fallback
**Usage:**
```python
from app.services.memory.working import WorkingMemory
memory = WorkingMemory(scope_context)
await memory.set("key", {"data": "value"}, ttl_seconds=3600)
value = await memory.get("key")
# Task state
await memory.set_task_state(TaskState(task_id="t1", status="running"))
state = await memory.get_task_state()
# Checkpoints
checkpoint_id = await memory.create_checkpoint()
await memory.restore_checkpoint(checkpoint_id)
```
### Episodic Memory
Experiential records of past agent actions and outcomes.
**Features:**
- Records task completions and failures
- Semantic similarity search (pgvector)
- Temporal and outcome-based retrieval
- Importance scoring
- Episode summarization
**Usage:**
```python
from app.services.memory.episodic import EpisodicMemory
memory = EpisodicMemory(session, embedder)
# Record an episode
episode = await memory.record_episode(
project_id=project_id,
episode=EpisodeCreate(
task_type="code_review",
task_description="Review PR #42",
outcome=Outcome.SUCCESS,
actions=[{"type": "analyze", "target": "src/"}],
)
)
# Search similar experiences
similar = await memory.search_similar(
project_id=project_id,
query="debugging memory leak",
limit=5
)
# Get recent episodes
recent = await memory.get_recent(project_id, limit=10)
```
### Semantic Memory
Learned facts and knowledge with confidence scoring.
**Features:**
- Triple format (subject, predicate, object)
- Confidence scoring with decay
- Fact extraction from episodes
- Conflict resolution
- Entity-based retrieval
**Usage:**
```python
from app.services.memory.semantic import SemanticMemory
memory = SemanticMemory(session, embedder)
# Store a fact
fact = await memory.store_fact(
project_id=project_id,
fact=FactCreate(
subject="UserService",
predicate="handles",
object="authentication",
confidence=0.9,
)
)
# Search facts
facts = await memory.search_facts(project_id, "authentication flow")
# Reinforce on repeated learning
await memory.reinforce_fact(fact.id)
```
### Procedural Memory
Learned skills and procedures from successful patterns.
**Features:**
- Procedure recording from task patterns
- Trigger-based matching
- Success rate tracking
- Procedure suggestions
- Step-by-step storage
**Usage:**
```python
from app.services.memory.procedural import ProceduralMemory
memory = ProceduralMemory(session, embedder)
# Record a procedure
procedure = await memory.record_procedure(
project_id=project_id,
procedure=ProcedureCreate(
name="PR Review Process",
trigger_pattern="code review requested",
steps=[
Step(action="fetch_diff"),
Step(action="analyze_changes"),
Step(action="check_tests"),
]
)
)
# Find matching procedures
matches = await memory.find_matching(project_id, "need to review code")
# Record outcomes
await memory.record_outcome(procedure.id, success=True)
```
## Memory Scoping
Memory is organized in a hierarchical scope structure:
```
Global Memory (shared by all)
└── Project Memory (per project)
└── Agent Type Memory (per agent type)
└── Agent Instance Memory (per instance)
└── Session Memory (ephemeral)
```
**Usage:**
```python
from app.services.memory.scoping import ScopeManager, ScopeLevel
manager = ScopeManager(session)
# Get scoped memories with inheritance
memories = await manager.get_scoped_memories(
context=ScopeContext(
project_id=project_id,
agent_type_id=agent_type_id,
agent_instance_id=agent_instance_id,
session_id=session_id,
),
include_inherited=True, # Include parent scopes
)
```
## Memory Consolidation
Automatic background processes transfer and extract knowledge:
```
Working Memory ──> Episodic Memory ──> Semantic Memory
└──> Procedural Memory
```
**Consolidation Types:**
- `working_to_episodic`: Transfer session state to episodes (on session end)
- `episodic_to_semantic`: Extract facts from experiences
- `episodic_to_procedural`: Learn procedures from patterns
- `prune`: Remove low-value memories
**Celery Tasks:**
```python
from app.tasks.memory_consolidation import (
consolidate_session,
run_nightly_consolidation,
prune_old_memories,
)
# Manual consolidation
consolidate_session.delay(session_id)
# Scheduled nightly (3 AM by default)
run_nightly_consolidation.delay()
```
## Memory Retrieval
### Hybrid Retrieval
Combine multiple retrieval strategies:
```python
from app.services.memory.indexing import RetrievalEngine
engine = RetrievalEngine(session, embedder)
# Hybrid search across memory types
results = await engine.retrieve_hybrid(
project_id=project_id,
query="authentication error handling",
memory_types=["episodic", "semantic", "procedural"],
filters={"outcome": "success"},
limit=10,
)
```
### Index Types
- **Vector Index**: Semantic similarity (HNSW/pgvector)
- **Temporal Index**: Time-based retrieval
- **Entity Index**: Entity mention lookup
- **Outcome Index**: Success/failure filtering
## MCP Tools
The memory system exposes MCP tools for agent use:
### `remember`
Store information in memory.
```json
{
"memory_type": "working",
"content": {"key": "value"},
"importance": 0.8,
"ttl_seconds": 3600
}
```
### `recall`
Retrieve from memory.
```json
{
"query": "authentication patterns",
"memory_types": ["episodic", "semantic"],
"limit": 10,
"filters": {"outcome": "success"}
}
```
### `forget`
Remove from memory.
```json
{
"memory_type": "working",
"key": "temp_data"
}
```
### `reflect`
Analyze memory patterns.
```json
{
"analysis_type": "success_factors",
"task_type": "code_review",
"time_range_days": 30
}
```
### `get_memory_stats`
Get memory usage statistics.
### `record_outcome`
Record task success/failure for learning.
## Memory Reflection
Analyze patterns and generate insights from memory:
```python
from app.services.memory.reflection import MemoryReflection, TimeRange
reflection = MemoryReflection(session)
# Detect patterns
patterns = await reflection.analyze_patterns(
project_id=project_id,
time_range=TimeRange.last_days(30),
)
# Identify success factors
factors = await reflection.identify_success_factors(
project_id=project_id,
task_type="code_review",
)
# Detect anomalies
anomalies = await reflection.detect_anomalies(
project_id=project_id,
baseline_days=30,
)
# Generate insights
insights = await reflection.generate_insights(project_id)
# Comprehensive reflection
result = await reflection.reflect(project_id)
print(result.summary)
```
## Configuration
All settings use the `MEM_` environment variable prefix:
| Variable | Default | Description |
|----------|---------|-------------|
| `MEM_WORKING_MEMORY_BACKEND` | `redis` | Backend: `redis` or `memory` |
| `MEM_WORKING_MEMORY_DEFAULT_TTL_SECONDS` | `3600` | Default TTL (1 hour) |
| `MEM_REDIS_URL` | `redis://localhost:6379/0` | Redis connection URL |
| `MEM_EPISODIC_MAX_EPISODES_PER_PROJECT` | `10000` | Max episodes per project |
| `MEM_EPISODIC_RETENTION_DAYS` | `365` | Episode retention period |
| `MEM_SEMANTIC_MAX_FACTS_PER_PROJECT` | `50000` | Max facts per project |
| `MEM_SEMANTIC_CONFIDENCE_DECAY_DAYS` | `90` | Confidence half-life |
| `MEM_EMBEDDING_MODEL` | `text-embedding-3-small` | Embedding model |
| `MEM_EMBEDDING_DIMENSIONS` | `1536` | Vector dimensions |
| `MEM_RETRIEVAL_MIN_SIMILARITY` | `0.5` | Minimum similarity score |
| `MEM_CONSOLIDATION_ENABLED` | `true` | Enable auto-consolidation |
| `MEM_CONSOLIDATION_SCHEDULE_CRON` | `0 3 * * *` | Nightly schedule |
| `MEM_CACHE_ENABLED` | `true` | Enable retrieval caching |
| `MEM_CACHE_TTL_SECONDS` | `300` | Cache TTL (5 minutes) |
See `app/services/memory/config.py` for complete configuration options.
## Integration with Context Engine
Memory integrates with the Context Engine as a context source:
```python
from app.services.memory.integration import MemoryContextSource
# Register as context source
source = MemoryContextSource(memory_manager)
context_engine.register_source(source)
# Memory is automatically included in context assembly
context = await context_engine.assemble_context(
project_id=project_id,
session_id=session_id,
current_task="Review authentication code",
)
```
## Caching
Multi-layer caching for performance:
- **Hot Cache**: Frequently accessed memories (LRU)
- **Retrieval Cache**: Query result caching
- **Embedding Cache**: Pre-computed embeddings
```python
from app.services.memory.cache import CacheManager
cache = CacheManager(settings)
await cache.warm_hot_cache(project_id) # Pre-warm common memories
```
## Metrics
Prometheus-compatible metrics:
| Metric | Type | Labels |
|--------|------|--------|
| `memory_operations_total` | Counter | operation, memory_type, scope, success |
| `memory_retrievals_total` | Counter | memory_type, strategy |
| `memory_cache_hits_total` | Counter | cache_type |
| `memory_retrieval_latency_seconds` | Histogram | - |
| `memory_consolidation_duration_seconds` | Histogram | - |
| `memory_items_count` | Gauge | memory_type, scope |
```python
from app.services.memory.metrics import get_memory_metrics
metrics = await get_memory_metrics()
summary = await metrics.get_summary()
prometheus_output = await metrics.get_prometheus_format()
```
## Performance Targets
| Operation | Target P95 |
|-----------|------------|
| Working memory get/set | < 5ms |
| Episodic memory retrieval | < 100ms |
| Semantic memory search | < 100ms |
| Procedural memory matching | < 50ms |
| Consolidation batch (1000 items) | < 30s |
## Troubleshooting
### Redis Connection Issues
```bash
# Check Redis connectivity
redis-cli ping
# Verify memory settings
MEM_REDIS_URL=redis://localhost:6379/0
```
### Slow Retrieval
1. Check if caching is enabled: `MEM_CACHE_ENABLED=true`
2. Verify HNSW indexes exist on vector columns
3. Monitor `memory_retrieval_latency_seconds` metric
### High Memory Usage
1. Review `MEM_EPISODIC_MAX_EPISODES_PER_PROJECT` limit
2. Ensure pruning is enabled: `MEM_PRUNING_ENABLED=true`
3. Check consolidation is running (cron schedule)
### Embedding Errors
1. Verify LLM Gateway is accessible
2. Check embedding model is valid
3. Review batch size if hitting rate limits
## Directory Structure
```
app/services/memory/
├── __init__.py # Public exports
├── config.py # MemorySettings
├── exceptions.py # Memory-specific errors
├── manager.py # MemoryManager facade
├── types.py # Core types
├── working/ # Working memory
│ ├── memory.py
│ └── storage.py
├── episodic/ # Episodic memory
│ ├── memory.py
│ ├── recorder.py
│ └── retrieval.py
├── semantic/ # Semantic memory
│ ├── memory.py
│ ├── extraction.py
│ └── verification.py
├── procedural/ # Procedural memory
│ ├── memory.py
│ └── matching.py
├── scoping/ # Memory scoping
│ ├── scope.py
│ └── resolver.py
├── indexing/ # Indexing & retrieval
│ ├── index.py
│ └── retrieval.py
├── consolidation/ # Memory consolidation
│ └── service.py
├── reflection/ # Memory reflection
│ ├── service.py
│ └── types.py
├── integration/ # External integrations
│ ├── context_source.py
│ └── lifecycle.py
├── cache/ # Caching layer
│ ├── cache_manager.py
│ ├── hot_cache.py
│ └── embedding_cache.py
├── mcp/ # MCP tools
│ ├── service.py
│ └── tools.py
└── metrics/ # Observability
└── collector.py
```

View File

@@ -304,10 +304,18 @@ class TestTaskModuleExports:
assert hasattr(tasks, "sync")
assert hasattr(tasks, "workflow")
assert hasattr(tasks, "cost")
assert hasattr(tasks, "memory_consolidation")
def test_tasks_all_attribute_is_correct(self):
"""Test that __all__ contains all expected module names."""
from app import tasks
expected_modules = ["agent", "git", "sync", "workflow", "cost"]
expected_modules = [
"agent",
"git",
"sync",
"workflow",
"cost",
"memory_consolidation",
]
assert set(tasks.__all__) == set(expected_modules)

View File

@@ -5,8 +5,6 @@ from datetime import UTC, datetime
from unittest.mock import MagicMock
from uuid import uuid4
import pytest
from app.services.context.types import ContextType
from app.services.context.types.memory import MemoryContext, MemorySubtype

View File

@@ -133,9 +133,7 @@ class TestMemoryContextSource:
)
assert result.by_type["working"] == 2
assert all(
c.memory_subtype == MemorySubtype.WORKING for c in result.contexts
)
assert all(c.memory_subtype == MemorySubtype.WORKING for c in result.contexts)
@patch("app.services.memory.integration.context_source.EpisodicMemory")
async def test_fetch_episodic_memory(
@@ -252,11 +250,10 @@ class TestMemoryContextSource:
context_source: MemoryContextSource,
) -> None:
"""Results should be sorted by relevance score."""
with patch.object(
context_source, "_fetch_episodic"
) as mock_ep, patch.object(
context_source, "_fetch_semantic"
) as mock_sem:
with (
patch.object(context_source, "_fetch_episodic") as mock_ep,
patch.object(context_source, "_fetch_semantic") as mock_sem,
):
# Create contexts with different relevance scores
from app.services.context.types.memory import MemoryContext

View File

@@ -105,6 +105,7 @@ class TestLifecycleHooks:
def test_register_spawn_hook(self, lifecycle_hooks: LifecycleHooks) -> None:
"""Should register spawn hook."""
async def my_hook(event: LifecycleEvent) -> None:
pass
@@ -115,7 +116,7 @@ class TestLifecycleHooks:
def test_register_all_hooks(self, lifecycle_hooks: LifecycleHooks) -> None:
"""Should register hooks for all event types."""
hooks = [
[
lifecycle_hooks.on_spawn(AsyncMock()),
lifecycle_hooks.on_pause(AsyncMock()),
lifecycle_hooks.on_resume(AsyncMock()),

View File

@@ -2,7 +2,6 @@
"""Tests for MemoryToolService."""
from datetime import UTC, datetime
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import UUID, uuid4
@@ -14,11 +13,6 @@ from app.services.memory.mcp.service import (
ToolResult,
get_memory_tool_service,
)
from app.services.memory.mcp.tools import (
AnalysisType,
MemoryType,
OutcomeType,
)
from app.services.memory.types import Outcome
pytestmark = pytest.mark.asyncio(loop_scope="function")
@@ -192,7 +186,9 @@ class TestMemoryToolService:
context: ToolContext,
) -> None:
"""Remember should store in episodic memory."""
with patch("app.services.memory.mcp.service.EpisodicMemory") as mock_episodic_cls:
with patch(
"app.services.memory.mcp.service.EpisodicMemory"
) as mock_episodic_cls:
# Setup mock
mock_episode = MagicMock()
mock_episode.id = uuid4()
@@ -260,7 +256,9 @@ class TestMemoryToolService:
context: ToolContext,
) -> None:
"""Remember should store facts in semantic memory."""
with patch("app.services.memory.mcp.service.SemanticMemory") as mock_semantic_cls:
with patch(
"app.services.memory.mcp.service.SemanticMemory"
) as mock_semantic_cls:
mock_fact = MagicMock()
mock_fact.id = uuid4()
@@ -311,7 +309,9 @@ class TestMemoryToolService:
context: ToolContext,
) -> None:
"""Remember should store procedures in procedural memory."""
with patch("app.services.memory.mcp.service.ProceduralMemory") as mock_procedural_cls:
with patch(
"app.services.memory.mcp.service.ProceduralMemory"
) as mock_procedural_cls:
mock_procedure = MagicMock()
mock_procedure.id = uuid4()
@@ -530,15 +530,21 @@ class TestMemoryToolService:
mock_working_cls.for_session = AsyncMock(return_value=mock_working)
mock_episodic = AsyncMock()
mock_episodic.get_recent = AsyncMock(return_value=[MagicMock() for _ in range(10)])
mock_episodic.get_recent = AsyncMock(
return_value=[MagicMock() for _ in range(10)]
)
mock_episodic_cls.create = AsyncMock(return_value=mock_episodic)
mock_semantic = AsyncMock()
mock_semantic.search_facts = AsyncMock(return_value=[MagicMock() for _ in range(5)])
mock_semantic.search_facts = AsyncMock(
return_value=[MagicMock() for _ in range(5)]
)
mock_semantic_cls.create = AsyncMock(return_value=mock_semantic)
mock_procedural = AsyncMock()
mock_procedural.find_matching = AsyncMock(return_value=[MagicMock() for _ in range(3)])
mock_procedural.find_matching = AsyncMock(
return_value=[MagicMock() for _ in range(3)]
)
mock_procedural_cls.create = AsyncMock(return_value=mock_procedural)
result = await service.execute_tool(
@@ -603,8 +609,12 @@ class TestMemoryToolService:
) -> None:
"""Record outcome should store outcome and update procedure."""
with (
patch("app.services.memory.mcp.service.EpisodicMemory") as mock_episodic_cls,
patch("app.services.memory.mcp.service.ProceduralMemory") as mock_procedural_cls,
patch(
"app.services.memory.mcp.service.EpisodicMemory"
) as mock_episodic_cls,
patch(
"app.services.memory.mcp.service.ProceduralMemory"
) as mock_procedural_cls,
):
mock_episode = MagicMock()
mock_episode.id = uuid4()

View File

@@ -358,10 +358,12 @@ class TestMemoryToolDefinition:
)
# Valid args
validated = tool.validate_args({
"memory_type": "working",
"content": "Test content",
})
validated = tool.validate_args(
{
"memory_type": "working",
"content": "Test content",
}
)
assert isinstance(validated, RememberArgs)
# Invalid args
@@ -417,4 +419,6 @@ class TestToolDefinitions:
"""All tool schemas should have properties defined."""
for name, tool in MEMORY_TOOL_DEFINITIONS.items():
schema = tool.to_mcp_format()
assert "properties" in schema["inputSchema"], f"Tool {name} missing properties"
assert "properties" in schema["inputSchema"], (
f"Tool {name} missing properties"
)

View File

@@ -0,0 +1,2 @@
# tests/unit/services/memory/metrics/__init__.py
"""Tests for Memory Metrics."""

View File

@@ -0,0 +1,472 @@
# tests/unit/services/memory/metrics/test_collector.py
"""Tests for Memory Metrics Collector."""
import pytest
from app.services.memory.metrics.collector import (
MemoryMetrics,
MetricType,
MetricValue,
get_memory_metrics,
record_memory_operation,
record_retrieval,
reset_memory_metrics,
)
@pytest.fixture
def metrics() -> MemoryMetrics:
"""Create a fresh metrics instance for each test."""
return MemoryMetrics()
@pytest.fixture(autouse=True)
async def reset_singleton() -> None:
"""Reset singleton before each test."""
await reset_memory_metrics()
class TestMemoryMetrics:
"""Tests for MemoryMetrics class."""
@pytest.mark.asyncio
async def test_inc_operations(self, metrics: MemoryMetrics) -> None:
"""Should increment operation counters."""
await metrics.inc_operations("get", "working", "session", True)
await metrics.inc_operations("get", "working", "session", True)
await metrics.inc_operations("set", "working", "session", True)
summary = await metrics.get_summary()
assert summary["total_operations"] == 3
assert summary["successful_operations"] == 3
@pytest.mark.asyncio
async def test_inc_operations_failure(self, metrics: MemoryMetrics) -> None:
"""Should track failed operations."""
await metrics.inc_operations("get", "working", None, True)
await metrics.inc_operations("get", "working", None, False)
summary = await metrics.get_summary()
assert summary["total_operations"] == 2
assert summary["successful_operations"] == 1
assert summary["operation_success_rate"] == 0.5
@pytest.mark.asyncio
async def test_inc_retrieval(self, metrics: MemoryMetrics) -> None:
"""Should increment retrieval counters."""
await metrics.inc_retrieval("episodic", "similarity", 5)
await metrics.inc_retrieval("episodic", "temporal", 3)
await metrics.inc_retrieval("semantic", "similarity", 10)
summary = await metrics.get_summary()
assert summary["total_retrievals"] == 3
@pytest.mark.asyncio
async def test_cache_hit_miss(self, metrics: MemoryMetrics) -> None:
"""Should track cache hits and misses."""
await metrics.inc_cache_hit("hot")
await metrics.inc_cache_hit("hot")
await metrics.inc_cache_hit("hot")
await metrics.inc_cache_miss("hot")
summary = await metrics.get_summary()
assert summary["cache_hit_rate"] == 0.75
@pytest.mark.asyncio
async def test_cache_stats(self, metrics: MemoryMetrics) -> None:
"""Should provide detailed cache stats."""
await metrics.inc_cache_hit("hot")
await metrics.inc_cache_hit("hot")
await metrics.inc_cache_miss("hot")
await metrics.inc_cache_hit("embedding")
await metrics.inc_cache_miss("embedding")
await metrics.inc_cache_miss("embedding")
stats = await metrics.get_cache_stats()
assert stats["hot"]["hits"] == 2
assert stats["hot"]["misses"] == 1
assert stats["hot"]["hit_rate"] == pytest.approx(0.6667, rel=0.01)
assert stats["embedding"]["hits"] == 1
assert stats["embedding"]["misses"] == 2
assert stats["embedding"]["hit_rate"] == pytest.approx(0.3333, rel=0.01)
@pytest.mark.asyncio
async def test_inc_consolidation(self, metrics: MemoryMetrics) -> None:
"""Should increment consolidation counter."""
await metrics.inc_consolidation("working_to_episodic", True)
await metrics.inc_consolidation("episodic_to_semantic", True)
await metrics.inc_consolidation("prune", False)
summary = await metrics.get_summary()
assert summary["total_consolidations"] == 3
@pytest.mark.asyncio
async def test_inc_episodes_recorded(self, metrics: MemoryMetrics) -> None:
"""Should track episodes by outcome."""
await metrics.inc_episodes_recorded("success")
await metrics.inc_episodes_recorded("success")
await metrics.inc_episodes_recorded("failure")
summary = await metrics.get_summary()
assert summary["total_episodes_recorded"] == 3
@pytest.mark.asyncio
async def test_inc_patterns_insights_anomalies(
self, metrics: MemoryMetrics
) -> None:
"""Should track reflection metrics."""
await metrics.inc_patterns_detected("recurring_success")
await metrics.inc_patterns_detected("action_sequence")
await metrics.inc_insights_generated("optimization")
await metrics.inc_anomalies_detected("unusual_duration")
summary = await metrics.get_summary()
assert summary["patterns_detected"] == 2
assert summary["insights_generated"] == 1
assert summary["anomalies_detected"] == 1
@pytest.mark.asyncio
async def test_set_memory_items_count(self, metrics: MemoryMetrics) -> None:
"""Should set memory item count gauge."""
await metrics.set_memory_items_count("episodic", "project", 100)
await metrics.set_memory_items_count("semantic", "project", 50)
all_metrics = await metrics.get_all_metrics()
gauge_metrics = [m for m in all_metrics if m.name == "memory_items_count"]
assert len(gauge_metrics) == 2
@pytest.mark.asyncio
async def test_set_memory_size_bytes(self, metrics: MemoryMetrics) -> None:
"""Should set memory size gauge."""
await metrics.set_memory_size_bytes("working", "session", 1024)
all_metrics = await metrics.get_all_metrics()
size_metrics = [m for m in all_metrics if m.name == "memory_size_bytes"]
assert len(size_metrics) == 1
assert size_metrics[0].value == 1024.0
@pytest.mark.asyncio
async def test_set_procedure_success_rate(self, metrics: MemoryMetrics) -> None:
"""Should set procedure success rate gauge."""
await metrics.set_procedure_success_rate("code_review", 0.85)
all_metrics = await metrics.get_all_metrics()
rate_metrics = [
m for m in all_metrics if m.name == "memory_procedure_success_rate"
]
assert len(rate_metrics) == 1
assert rate_metrics[0].value == 0.85
@pytest.mark.asyncio
async def test_set_active_sessions(self, metrics: MemoryMetrics) -> None:
"""Should set active sessions gauge."""
await metrics.set_active_sessions(5)
summary = await metrics.get_summary()
assert summary["active_sessions"] == 5
@pytest.mark.asyncio
async def test_observe_working_latency(self, metrics: MemoryMetrics) -> None:
"""Should record working memory latency histogram."""
await metrics.observe_working_latency(0.005) # 5ms
await metrics.observe_working_latency(0.003) # 3ms
await metrics.observe_working_latency(0.010) # 10ms
all_metrics = await metrics.get_all_metrics()
count_metric = next(
(
m
for m in all_metrics
if m.name == "memory_working_latency_seconds_count"
),
None,
)
sum_metric = next(
(m for m in all_metrics if m.name == "memory_working_latency_seconds_sum"),
None,
)
assert count_metric is not None
assert count_metric.value == 3
assert sum_metric is not None
assert sum_metric.value == pytest.approx(0.018, rel=0.01)
@pytest.mark.asyncio
async def test_observe_retrieval_latency(self, metrics: MemoryMetrics) -> None:
"""Should record retrieval latency histogram."""
await metrics.observe_retrieval_latency(0.050) # 50ms
await metrics.observe_retrieval_latency(0.075) # 75ms
summary = await metrics.get_summary()
assert summary["avg_retrieval_latency_ms"] == pytest.approx(62.5, rel=0.01)
@pytest.mark.asyncio
async def test_observe_consolidation_duration(self, metrics: MemoryMetrics) -> None:
"""Should record consolidation duration histogram."""
await metrics.observe_consolidation_duration(5.0)
await metrics.observe_consolidation_duration(10.0)
all_metrics = await metrics.get_all_metrics()
count_metric = next(
(
m
for m in all_metrics
if m.name == "memory_consolidation_duration_seconds_count"
),
None,
)
assert count_metric is not None
assert count_metric.value == 2
@pytest.mark.asyncio
async def test_get_all_metrics(self, metrics: MemoryMetrics) -> None:
"""Should return all metrics as MetricValue objects."""
await metrics.inc_operations("get", "working", None, True)
await metrics.set_active_sessions(3)
await metrics.observe_retrieval_latency(0.05)
all_metrics = await metrics.get_all_metrics()
assert len(all_metrics) >= 3
# Check we have different metric types
counter_metrics = [
m for m in all_metrics if m.metric_type == MetricType.COUNTER
]
gauge_metrics = [m for m in all_metrics if m.metric_type == MetricType.GAUGE]
assert len(counter_metrics) >= 1
assert len(gauge_metrics) >= 1
@pytest.mark.asyncio
async def test_get_prometheus_format(self, metrics: MemoryMetrics) -> None:
"""Should export metrics in Prometheus format."""
await metrics.inc_operations("get", "working", "session", True)
await metrics.set_active_sessions(5)
prometheus_output = await metrics.get_prometheus_format()
assert "# TYPE memory_operations_total counter" in prometheus_output
assert "memory_operations_total{" in prometheus_output
assert "# TYPE memory_active_sessions gauge" in prometheus_output
assert "memory_active_sessions 5" in prometheus_output
@pytest.mark.asyncio
async def test_get_summary(self, metrics: MemoryMetrics) -> None:
"""Should return summary dictionary."""
await metrics.inc_operations("get", "working", None, True)
await metrics.inc_retrieval("episodic", "similarity", 5)
await metrics.inc_cache_hit("hot")
await metrics.inc_consolidation("prune", True)
summary = await metrics.get_summary()
assert "total_operations" in summary
assert "total_retrievals" in summary
assert "cache_hit_rate" in summary
assert "total_consolidations" in summary
assert "operation_success_rate" in summary
@pytest.mark.asyncio
async def test_reset(self, metrics: MemoryMetrics) -> None:
"""Should reset all metrics."""
await metrics.inc_operations("get", "working", None, True)
await metrics.set_active_sessions(5)
await metrics.observe_retrieval_latency(0.05)
await metrics.reset()
summary = await metrics.get_summary()
assert summary["total_operations"] == 0
assert summary["active_sessions"] == 0
class TestMetricValue:
"""Tests for MetricValue dataclass."""
def test_creates_metric_value(self) -> None:
"""Should create metric value with defaults."""
metric = MetricValue(
name="test_metric",
metric_type=MetricType.COUNTER,
value=42.0,
)
assert metric.name == "test_metric"
assert metric.metric_type == MetricType.COUNTER
assert metric.value == 42.0
assert metric.labels == {}
assert metric.timestamp is not None
def test_creates_metric_value_with_labels(self) -> None:
"""Should create metric value with labels."""
metric = MetricValue(
name="test_metric",
metric_type=MetricType.GAUGE,
value=100.0,
labels={"scope": "project", "type": "episodic"},
)
assert metric.labels == {"scope": "project", "type": "episodic"}
class TestSingleton:
"""Tests for singleton pattern."""
@pytest.mark.asyncio
async def test_get_memory_metrics_singleton(self) -> None:
"""Should return same instance."""
metrics1 = await get_memory_metrics()
metrics2 = await get_memory_metrics()
assert metrics1 is metrics2
@pytest.mark.asyncio
async def test_reset_singleton(self) -> None:
"""Should reset singleton instance."""
metrics1 = await get_memory_metrics()
await metrics1.inc_operations("get", "working", None, True)
await reset_memory_metrics()
metrics2 = await get_memory_metrics()
summary = await metrics2.get_summary()
assert metrics1 is not metrics2
assert summary["total_operations"] == 0
class TestConvenienceFunctions:
"""Tests for convenience functions."""
@pytest.mark.asyncio
async def test_record_memory_operation(self) -> None:
"""Should record memory operation."""
await record_memory_operation(
operation="get",
memory_type="working",
scope="session",
success=True,
latency_ms=5.0,
)
metrics = await get_memory_metrics()
summary = await metrics.get_summary()
assert summary["total_operations"] == 1
@pytest.mark.asyncio
async def test_record_retrieval(self) -> None:
"""Should record retrieval operation."""
await record_retrieval(
memory_type="episodic",
strategy="similarity",
results_count=10,
latency_ms=50.0,
)
metrics = await get_memory_metrics()
summary = await metrics.get_summary()
assert summary["total_retrievals"] == 1
assert summary["avg_retrieval_latency_ms"] == pytest.approx(50.0, rel=0.01)
class TestHistogramBuckets:
"""Tests for histogram bucket behavior."""
@pytest.mark.asyncio
async def test_histogram_buckets_populated(self, metrics: MemoryMetrics) -> None:
"""Should populate histogram buckets correctly."""
# Add values to different buckets
await metrics.observe_retrieval_latency(0.005) # <= 0.01
await metrics.observe_retrieval_latency(0.030) # <= 0.05
await metrics.observe_retrieval_latency(0.080) # <= 0.1
await metrics.observe_retrieval_latency(0.500) # <= 0.5
await metrics.observe_retrieval_latency(2.000) # <= 2.5
prometheus_output = await metrics.get_prometheus_format()
# Check that histogram buckets are in output
assert "memory_retrieval_latency_seconds_bucket" in prometheus_output
assert 'le="0.01"' in prometheus_output
assert 'le="+Inf"' in prometheus_output
@pytest.mark.asyncio
async def test_histogram_count_and_sum(self, metrics: MemoryMetrics) -> None:
"""Should track histogram count and sum."""
await metrics.observe_retrieval_latency(0.1)
await metrics.observe_retrieval_latency(0.2)
await metrics.observe_retrieval_latency(0.3)
prometheus_output = await metrics.get_prometheus_format()
assert "memory_retrieval_latency_seconds_count 3" in prometheus_output
assert "memory_retrieval_latency_seconds_sum 0.6" in prometheus_output
class TestLabelParsing:
"""Tests for label parsing."""
@pytest.mark.asyncio
async def test_parse_labels_in_output(self, metrics: MemoryMetrics) -> None:
"""Should correctly parse labels in output."""
await metrics.inc_operations("get", "episodic", "project", True)
all_metrics = await metrics.get_all_metrics()
op_metric = next(
(m for m in all_metrics if m.name == "memory_operations_total"), None
)
assert op_metric is not None
assert op_metric.labels["operation"] == "get"
assert op_metric.labels["memory_type"] == "episodic"
assert op_metric.labels["scope"] == "project"
assert op_metric.labels["success"] == "true"
class TestEdgeCases:
"""Tests for edge cases."""
@pytest.mark.asyncio
async def test_empty_metrics(self, metrics: MemoryMetrics) -> None:
"""Should handle empty metrics gracefully."""
summary = await metrics.get_summary()
assert summary["total_operations"] == 0
assert summary["operation_success_rate"] == 1.0 # Default when no ops
assert summary["cache_hit_rate"] == 0.0
assert summary["avg_retrieval_latency_ms"] == 0.0
@pytest.mark.asyncio
async def test_concurrent_operations(self, metrics: MemoryMetrics) -> None:
"""Should handle concurrent operations safely."""
import asyncio
async def increment_ops() -> None:
for _ in range(100):
await metrics.inc_operations("get", "working", None, True)
# Run multiple concurrent tasks
await asyncio.gather(
increment_ops(),
increment_ops(),
increment_ops(),
)
summary = await metrics.get_summary()
assert summary["total_operations"] == 300
@pytest.mark.asyncio
async def test_prometheus_format_empty(self, metrics: MemoryMetrics) -> None:
"""Should return valid format with no metrics."""
prometheus_output = await metrics.get_prometheus_format()
# Should just have histogram bucket definitions
assert "# TYPE memory_retrieval_latency_seconds histogram" in prometheus_output

View File

@@ -0,0 +1,2 @@
# tests/unit/services/memory/reflection/__init__.py
"""Tests for Memory Reflection."""

View File

@@ -0,0 +1,763 @@
# tests/unit/services/memory/reflection/test_service.py
"""Tests for Memory Reflection service."""
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, MagicMock
from uuid import uuid4
import pytest
from app.services.memory.reflection.service import (
MemoryReflection,
ReflectionConfig,
get_memory_reflection,
reset_memory_reflection,
)
from app.services.memory.reflection.types import (
AnomalyType,
FactorType,
InsightType,
PatternType,
TimeRange,
)
from app.services.memory.types import Episode, Outcome
pytestmark = pytest.mark.asyncio(loop_scope="function")
def create_mock_episode(
task_type: str = "test_task",
outcome: Outcome = Outcome.SUCCESS,
duration_seconds: float = 60.0,
tokens_used: int = 100,
actions: list | None = None,
occurred_at: datetime | None = None,
context_summary: str = "Test context",
) -> Episode:
"""Create a mock episode for testing."""
return Episode(
id=uuid4(),
project_id=uuid4(),
agent_instance_id=None,
agent_type_id=None,
session_id="session-123",
task_type=task_type,
task_description=f"Test {task_type}",
actions=actions or [{"type": "action1", "content": "test"}],
context_summary=context_summary,
outcome=outcome,
outcome_details="",
duration_seconds=duration_seconds,
tokens_used=tokens_used,
lessons_learned=[],
importance_score=0.5,
embedding=None,
occurred_at=occurred_at or datetime.now(UTC),
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
@pytest.fixture(autouse=True)
async def reset_singleton() -> None:
"""Reset singleton before each test."""
await reset_memory_reflection()
@pytest.fixture
def mock_session() -> MagicMock:
"""Create mock database session."""
return MagicMock()
@pytest.fixture
def config() -> ReflectionConfig:
"""Create test configuration."""
return ReflectionConfig(
min_pattern_occurrences=2,
min_pattern_confidence=0.5,
min_sample_size_for_factor=3,
min_correlation_for_factor=0.2,
min_baseline_samples=5,
anomaly_std_dev_threshold=2.0,
min_insight_confidence=0.1, # Lower for testing
)
@pytest.fixture
def reflection(mock_session: MagicMock, config: ReflectionConfig) -> MemoryReflection:
"""Create reflection service."""
return MemoryReflection(session=mock_session, config=config)
class TestReflectionConfig:
"""Tests for ReflectionConfig."""
def test_default_values(self) -> None:
"""Should have sensible defaults."""
config = ReflectionConfig()
assert config.min_pattern_occurrences == 3
assert config.min_pattern_confidence == 0.6
assert config.min_sample_size_for_factor == 5
assert config.anomaly_std_dev_threshold == 2.0
assert config.max_episodes_to_analyze == 1000
def test_custom_values(self) -> None:
"""Should allow custom values."""
config = ReflectionConfig(
min_pattern_occurrences=5,
min_pattern_confidence=0.8,
)
assert config.min_pattern_occurrences == 5
assert config.min_pattern_confidence == 0.8
class TestPatternDetection:
"""Tests for pattern detection."""
async def test_detect_recurring_success_pattern(
self,
reflection: MemoryReflection,
) -> None:
"""Should detect recurring success patterns."""
project_id = uuid4()
time_range = TimeRange.last_days(7)
# Create episodes with high success rate for a task type
# Ensure timestamps are within time range
now = datetime.now(UTC)
episodes = [
create_mock_episode(
task_type="build",
outcome=Outcome.SUCCESS,
occurred_at=now - timedelta(hours=i),
)
for i in range(8)
] + [
create_mock_episode(
task_type="build",
outcome=Outcome.FAILURE,
occurred_at=now - timedelta(hours=8 + i),
)
for i in range(2)
]
# Mock episodic memory
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
patterns = await reflection.analyze_patterns(project_id, time_range)
# Should find recurring success pattern for 'build' task
success_patterns = [
p for p in patterns if p.pattern_type == PatternType.RECURRING_SUCCESS
]
assert len(success_patterns) >= 1
assert any(p.name.find("build") >= 0 for p in success_patterns)
async def test_detect_recurring_failure_pattern(
self,
reflection: MemoryReflection,
) -> None:
"""Should detect recurring failure patterns."""
project_id = uuid4()
time_range = TimeRange.last_days(7)
# Create episodes with high failure rate
# Ensure timestamps are within time range
now = datetime.now(UTC)
episodes = [
create_mock_episode(
task_type="deploy",
outcome=Outcome.FAILURE,
occurred_at=now - timedelta(hours=i),
)
for i in range(7)
] + [
create_mock_episode(
task_type="deploy",
outcome=Outcome.SUCCESS,
occurred_at=now - timedelta(hours=7 + i),
)
for i in range(3)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
patterns = await reflection.analyze_patterns(project_id, time_range)
failure_patterns = [
p for p in patterns if p.pattern_type == PatternType.RECURRING_FAILURE
]
assert len(failure_patterns) >= 1
async def test_detect_action_sequence_pattern(
self,
reflection: MemoryReflection,
) -> None:
"""Should detect action sequence patterns."""
project_id = uuid4()
time_range = TimeRange.last_days(7)
# Create episodes with same action sequence
# Ensure timestamps are within time range
now = datetime.now(UTC)
actions = [
{"type": "read_file"},
{"type": "analyze"},
{"type": "write_file"},
]
episodes = [
create_mock_episode(
actions=actions,
occurred_at=now - timedelta(hours=i),
)
for i in range(5)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
patterns = await reflection.analyze_patterns(project_id, time_range)
action_patterns = [
p for p in patterns if p.pattern_type == PatternType.ACTION_SEQUENCE
]
assert len(action_patterns) >= 1
async def test_detect_temporal_pattern(
self,
reflection: MemoryReflection,
) -> None:
"""Should detect temporal patterns."""
project_id = uuid4()
time_range = TimeRange.last_days(7)
# Create episodes concentrated at a specific hour
base_time = datetime.now(UTC).replace(hour=10, minute=0)
episodes = [
create_mock_episode(occurred_at=base_time + timedelta(minutes=i * 5))
for i in range(10)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
patterns = await reflection.analyze_patterns(project_id, time_range)
# May or may not find temporal patterns depending on thresholds
# Just verify the analysis completes without error
assert isinstance(patterns, list)
async def test_empty_episodes_returns_empty(
self,
reflection: MemoryReflection,
) -> None:
"""Should return empty list when no episodes."""
project_id = uuid4()
time_range = TimeRange.last_days(7)
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=[])
reflection._episodic = mock_episodic
patterns = await reflection.analyze_patterns(project_id, time_range)
assert patterns == []
class TestSuccessFactors:
"""Tests for success factor identification."""
async def test_identify_action_factors(
self,
reflection: MemoryReflection,
) -> None:
"""Should identify action-related success factors."""
project_id = uuid4()
# Create episodes where 'validate' action correlates with success
successful = [
create_mock_episode(
outcome=Outcome.SUCCESS,
actions=[{"type": "validate"}, {"type": "commit"}],
)
for _ in range(5)
]
failed = [
create_mock_episode(
outcome=Outcome.FAILURE,
actions=[{"type": "commit"}], # Missing validate
)
for _ in range(5)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=successful + failed)
reflection._episodic = mock_episodic
factors = await reflection.identify_success_factors(project_id)
action_factors = [f for f in factors if f.factor_type == FactorType.ACTION]
assert len(action_factors) >= 0 # May or may not find based on thresholds
async def test_identify_timing_factors(
self,
reflection: MemoryReflection,
) -> None:
"""Should identify timing-related factors."""
project_id = uuid4()
# Successful tasks are faster
successful = [
create_mock_episode(outcome=Outcome.SUCCESS, duration_seconds=30.0)
for _ in range(5)
]
# Failed tasks take longer
failed = [
create_mock_episode(outcome=Outcome.FAILURE, duration_seconds=120.0)
for _ in range(5)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=successful + failed)
reflection._episodic = mock_episodic
factors = await reflection.identify_success_factors(project_id)
timing_factors = [f for f in factors if f.factor_type == FactorType.TIMING]
assert len(timing_factors) >= 1
async def test_identify_resource_factors(
self,
reflection: MemoryReflection,
) -> None:
"""Should identify resource usage factors."""
project_id = uuid4()
# Successful tasks use fewer tokens
successful = [
create_mock_episode(outcome=Outcome.SUCCESS, tokens_used=100)
for _ in range(5)
]
# Failed tasks use more tokens
failed = [
create_mock_episode(outcome=Outcome.FAILURE, tokens_used=500)
for _ in range(5)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=successful + failed)
reflection._episodic = mock_episodic
factors = await reflection.identify_success_factors(project_id)
resource_factors = [f for f in factors if f.factor_type == FactorType.RESOURCE]
assert len(resource_factors) >= 1
async def test_filter_by_task_type(
self,
reflection: MemoryReflection,
) -> None:
"""Should filter by task type when specified."""
project_id = uuid4()
episodes = [
create_mock_episode(task_type="target_task", outcome=Outcome.SUCCESS)
for _ in range(5)
]
mock_episodic = MagicMock()
mock_episodic.get_by_task_type = AsyncMock(return_value=episodes)
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
await reflection.identify_success_factors(project_id, task_type="target_task")
mock_episodic.get_by_task_type.assert_called_once()
async def test_insufficient_samples(
self,
reflection: MemoryReflection,
) -> None:
"""Should return empty when insufficient samples."""
project_id = uuid4()
# Only 2 episodes, config requires 3 minimum
episodes = [create_mock_episode() for _ in range(2)]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
factors = await reflection.identify_success_factors(project_id)
assert factors == []
class TestAnomalyDetection:
"""Tests for anomaly detection."""
async def test_detect_duration_anomaly(
self,
reflection: MemoryReflection,
) -> None:
"""Should detect unusual duration anomalies."""
project_id = uuid4()
# Create baseline with consistent durations
now = datetime.now(UTC)
baseline = [
create_mock_episode(
duration_seconds=60.0,
occurred_at=now - timedelta(days=i),
)
for i in range(2, 10)
]
# Add recent anomaly with very long duration
anomalous = create_mock_episode(
duration_seconds=300.0, # 5x longer
occurred_at=now - timedelta(hours=1),
)
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=[*baseline, anomalous])
reflection._episodic = mock_episodic
anomalies = await reflection.detect_anomalies(project_id, baseline_days=30)
duration_anomalies = [
a for a in anomalies if a.anomaly_type == AnomalyType.UNUSUAL_DURATION
]
assert len(duration_anomalies) >= 1
async def test_detect_unexpected_outcome_anomaly(
self,
reflection: MemoryReflection,
) -> None:
"""Should detect unexpected outcome anomalies."""
project_id = uuid4()
now = datetime.now(UTC)
# Create baseline with high success rate
baseline = [
create_mock_episode(
task_type="reliable_task",
outcome=Outcome.SUCCESS,
occurred_at=now - timedelta(days=i),
)
for i in range(2, 10)
]
# Add recent failure for usually successful task
anomalous = create_mock_episode(
task_type="reliable_task",
outcome=Outcome.FAILURE,
occurred_at=now - timedelta(hours=1),
)
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=[*baseline, anomalous])
reflection._episodic = mock_episodic
anomalies = await reflection.detect_anomalies(project_id, baseline_days=30)
outcome_anomalies = [
a for a in anomalies if a.anomaly_type == AnomalyType.UNEXPECTED_OUTCOME
]
assert len(outcome_anomalies) >= 1
async def test_detect_token_usage_anomaly(
self,
reflection: MemoryReflection,
) -> None:
"""Should detect unusual token usage."""
project_id = uuid4()
now = datetime.now(UTC)
# Create baseline with consistent token usage
baseline = [
create_mock_episode(
tokens_used=100,
occurred_at=now - timedelta(days=i),
)
for i in range(2, 10)
]
# Add recent anomaly with very high token usage
anomalous = create_mock_episode(
tokens_used=1000, # 10x higher
occurred_at=now - timedelta(hours=1),
)
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=[*baseline, anomalous])
reflection._episodic = mock_episodic
anomalies = await reflection.detect_anomalies(project_id, baseline_days=30)
token_anomalies = [
a for a in anomalies if a.anomaly_type == AnomalyType.UNUSUAL_TOKEN_USAGE
]
assert len(token_anomalies) >= 1
async def test_detect_failure_rate_spike(
self,
reflection: MemoryReflection,
) -> None:
"""Should detect failure rate spikes."""
project_id = uuid4()
now = datetime.now(UTC)
# Create baseline with low failure rate
baseline = [
create_mock_episode(
outcome=Outcome.SUCCESS if i % 10 != 0 else Outcome.FAILURE,
occurred_at=now - timedelta(days=i % 30),
)
for i in range(30)
]
# Add recent failures (spike)
recent_failures = [
create_mock_episode(
outcome=Outcome.FAILURE,
occurred_at=now - timedelta(hours=i),
)
for i in range(1, 6)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=baseline + recent_failures)
reflection._episodic = mock_episodic
anomalies = await reflection.detect_anomalies(project_id, baseline_days=30)
# May or may not detect based on thresholds
# Just verify the analysis completes without error
assert isinstance(anomalies, list)
async def test_insufficient_baseline(
self,
reflection: MemoryReflection,
) -> None:
"""Should return empty when insufficient baseline."""
project_id = uuid4()
# Only 3 episodes, config requires 5 minimum
episodes = [create_mock_episode() for _ in range(3)]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
anomalies = await reflection.detect_anomalies(project_id, baseline_days=30)
assert anomalies == []
class TestInsightGeneration:
"""Tests for insight generation."""
async def test_generate_warning_insight_from_failure_pattern(
self,
reflection: MemoryReflection,
) -> None:
"""Should generate warning insight from failure patterns."""
project_id = uuid4()
# Create episodes with recurring failure
episodes = [
create_mock_episode(task_type="failing_task", outcome=Outcome.FAILURE)
for _ in range(8)
] + [
create_mock_episode(task_type="failing_task", outcome=Outcome.SUCCESS)
for _ in range(2)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
insights = await reflection.generate_insights(project_id)
warning_insights = [
i for i in insights if i.insight_type == InsightType.WARNING
]
assert len(warning_insights) >= 1
async def test_generate_learning_insight_from_success_pattern(
self,
reflection: MemoryReflection,
) -> None:
"""Should generate learning insight from success patterns."""
project_id = uuid4()
# Create episodes with recurring success
episodes = [
create_mock_episode(task_type="good_task", outcome=Outcome.SUCCESS)
for _ in range(9)
] + [
create_mock_episode(task_type="good_task", outcome=Outcome.FAILURE)
for _ in range(1)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
insights = await reflection.generate_insights(project_id)
learning_insights = [
i for i in insights if i.insight_type == InsightType.LEARNING
]
assert len(learning_insights) >= 0 # May depend on thresholds
async def test_generate_trend_insight(
self,
reflection: MemoryReflection,
) -> None:
"""Should generate overall trend insight."""
project_id = uuid4()
# Create enough episodes with timestamps in range
now = datetime.now(UTC)
episodes = [
create_mock_episode(
outcome=Outcome.SUCCESS,
occurred_at=now - timedelta(hours=i),
)
for i in range(10)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
insights = await reflection.generate_insights(project_id)
trend_insights = [i for i in insights if i.insight_type == InsightType.TREND]
assert len(trend_insights) >= 1
async def test_insights_sorted_by_priority(
self,
reflection: MemoryReflection,
) -> None:
"""Should sort insights by priority."""
project_id = uuid4()
episodes = [create_mock_episode(outcome=Outcome.SUCCESS) for _ in range(10)]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
insights = await reflection.generate_insights(project_id)
if len(insights) >= 2:
for i in range(len(insights) - 1):
assert insights[i].priority >= insights[i + 1].priority
class TestComprehensiveReflection:
"""Tests for comprehensive reflect() method."""
async def test_reflect_returns_all_components(
self,
reflection: MemoryReflection,
) -> None:
"""Should return patterns, factors, anomalies, and insights."""
project_id = uuid4()
time_range = TimeRange.last_days(7)
now = datetime.now(UTC)
episodes = [
create_mock_episode(
task_type="test_task",
outcome=Outcome.SUCCESS if i % 2 == 0 else Outcome.FAILURE,
occurred_at=now - timedelta(hours=i),
)
for i in range(20)
]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
result = await reflection.reflect(project_id, time_range)
assert result.patterns is not None
assert result.factors is not None
assert result.anomalies is not None
assert result.insights is not None
assert result.episodes_analyzed >= 0
assert result.analysis_duration_seconds >= 0
async def test_reflect_with_default_time_range(
self,
reflection: MemoryReflection,
) -> None:
"""Should use default 7-day time range."""
project_id = uuid4()
episodes = [create_mock_episode() for _ in range(5)]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
result = await reflection.reflect(project_id)
assert 6.9 <= result.time_range.duration_days <= 7.1
async def test_reflect_summary(
self,
reflection: MemoryReflection,
) -> None:
"""Should generate meaningful summary."""
project_id = uuid4()
episodes = [create_mock_episode() for _ in range(10)]
mock_episodic = MagicMock()
mock_episodic.get_recent = AsyncMock(return_value=episodes)
reflection._episodic = mock_episodic
result = await reflection.reflect(project_id)
summary = result.summary
assert "Reflection Analysis" in summary
assert "Episodes analyzed" in summary
class TestSingleton:
"""Tests for singleton pattern."""
async def test_get_memory_reflection_returns_singleton(
self,
mock_session: MagicMock,
) -> None:
"""Should return same instance."""
r1 = await get_memory_reflection(mock_session)
r2 = await get_memory_reflection(mock_session)
assert r1 is r2
async def test_reset_creates_new_instance(
self,
mock_session: MagicMock,
) -> None:
"""Should create new instance after reset."""
r1 = await get_memory_reflection(mock_session)
await reset_memory_reflection()
r2 = await get_memory_reflection(mock_session)
assert r1 is not r2

View File

@@ -0,0 +1,559 @@
# tests/unit/services/memory/reflection/test_types.py
"""Tests for Memory Reflection types."""
from datetime import UTC, datetime, timedelta
from uuid import uuid4
from app.services.memory.reflection.types import (
Anomaly,
AnomalyType,
Factor,
FactorType,
Insight,
InsightType,
Pattern,
PatternType,
ReflectionResult,
TimeRange,
)
class TestTimeRange:
"""Tests for TimeRange."""
def test_creates_time_range(self) -> None:
"""Should create time range with start and end."""
start = datetime.now(UTC) - timedelta(days=7)
end = datetime.now(UTC)
tr = TimeRange(start=start, end=end)
assert tr.start == start
assert tr.end == end
def test_last_hours(self) -> None:
"""Should create time range for last N hours."""
tr = TimeRange.last_hours(24)
assert tr.duration_hours >= 23.9
assert tr.duration_hours <= 24.1
def test_last_days(self) -> None:
"""Should create time range for last N days."""
tr = TimeRange.last_days(7)
assert tr.duration_days >= 6.9
assert tr.duration_days <= 7.1
def test_duration_hours(self) -> None:
"""Should calculate duration in hours."""
start = datetime.now(UTC) - timedelta(hours=12)
end = datetime.now(UTC)
tr = TimeRange(start=start, end=end)
assert 11.9 <= tr.duration_hours <= 12.1
def test_duration_days(self) -> None:
"""Should calculate duration in days."""
start = datetime.now(UTC) - timedelta(days=3)
end = datetime.now(UTC)
tr = TimeRange(start=start, end=end)
assert 2.9 <= tr.duration_days <= 3.1
class TestPattern:
"""Tests for Pattern."""
def test_creates_pattern(self) -> None:
"""Should create pattern with all fields."""
now = datetime.now(UTC)
episode_ids = [uuid4(), uuid4(), uuid4()]
pattern = Pattern(
id=uuid4(),
pattern_type=PatternType.RECURRING_SUCCESS,
name="Test Pattern",
description="A test pattern",
confidence=0.85,
occurrence_count=10,
episode_ids=episode_ids,
first_seen=now - timedelta(days=7),
last_seen=now,
)
assert pattern.name == "Test Pattern"
assert pattern.confidence == 0.85
assert len(pattern.episode_ids) == 3
def test_frequency_calculation(self) -> None:
"""Should calculate frequency per day."""
now = datetime.now(UTC)
pattern = Pattern(
id=uuid4(),
pattern_type=PatternType.RECURRING_SUCCESS,
name="Test",
description="Test",
confidence=0.8,
occurrence_count=14,
episode_ids=[],
first_seen=now - timedelta(days=7),
last_seen=now,
)
assert pattern.frequency == 2.0 # 14 occurrences / 7 days
def test_frequency_minimum_one_day(self) -> None:
"""Should use minimum 1 day for frequency calculation."""
now = datetime.now(UTC)
pattern = Pattern(
id=uuid4(),
pattern_type=PatternType.RECURRING_SUCCESS,
name="Test",
description="Test",
confidence=0.8,
occurrence_count=5,
episode_ids=[],
first_seen=now - timedelta(hours=1), # Less than 1 day
last_seen=now,
)
assert pattern.frequency == 5.0 # 5 / 1 day minimum
def test_to_dict(self) -> None:
"""Should convert to dictionary."""
pattern = Pattern(
id=uuid4(),
pattern_type=PatternType.ACTION_SEQUENCE,
name="Action Pattern",
description="Action sequence",
confidence=0.75,
occurrence_count=5,
episode_ids=[uuid4()],
first_seen=datetime.now(UTC) - timedelta(days=1),
last_seen=datetime.now(UTC),
metadata={"key": "value"},
)
result = pattern.to_dict()
assert result["name"] == "Action Pattern"
assert result["pattern_type"] == "action_sequence"
assert result["confidence"] == 0.75
assert "frequency" in result
assert result["metadata"] == {"key": "value"}
class TestFactor:
"""Tests for Factor."""
def test_creates_factor(self) -> None:
"""Should create factor with all fields."""
factor = Factor(
id=uuid4(),
factor_type=FactorType.ACTION,
name="Test Factor",
description="A test factor",
impact_score=0.7,
correlation=0.5,
sample_size=20,
positive_examples=[uuid4()],
negative_examples=[uuid4()],
)
assert factor.name == "Test Factor"
assert factor.impact_score == 0.7
assert factor.correlation == 0.5
def test_net_impact_calculation(self) -> None:
"""Should calculate net impact."""
factor = Factor(
id=uuid4(),
factor_type=FactorType.CONTEXT,
name="Test",
description="Test",
impact_score=0.8,
correlation=0.6,
sample_size=20,
positive_examples=[],
negative_examples=[],
)
# net_impact = impact_score * correlation * confidence_weight
# confidence_weight = min(1.0, 20/20) = 1.0
expected = 0.8 * 0.6 * 1.0
assert factor.net_impact == expected
def test_net_impact_with_small_sample(self) -> None:
"""Should weight net impact by sample size."""
factor = Factor(
id=uuid4(),
factor_type=FactorType.CONTEXT,
name="Test",
description="Test",
impact_score=0.8,
correlation=0.6,
sample_size=10, # Half of 20
positive_examples=[],
negative_examples=[],
)
# confidence_weight = min(1.0, 10/20) = 0.5
expected = 0.8 * 0.6 * 0.5
assert factor.net_impact == expected
def test_to_dict(self) -> None:
"""Should convert to dictionary."""
factor = Factor(
id=uuid4(),
factor_type=FactorType.TIMING,
name="Timing Factor",
description="Time-related",
impact_score=0.6,
correlation=-0.3,
sample_size=15,
positive_examples=[],
negative_examples=[],
metadata={"key": "value"},
)
result = factor.to_dict()
assert result["name"] == "Timing Factor"
assert result["factor_type"] == "timing"
assert "net_impact" in result
assert result["metadata"] == {"key": "value"}
class TestAnomaly:
"""Tests for Anomaly."""
def test_creates_anomaly(self) -> None:
"""Should create anomaly with all fields."""
anomaly = Anomaly(
id=uuid4(),
anomaly_type=AnomalyType.UNUSUAL_DURATION,
description="Unusual duration detected",
severity=0.75,
episode_ids=[uuid4()],
detected_at=datetime.now(UTC),
baseline_value=10.0,
observed_value=30.0,
deviation_factor=3.0,
)
assert anomaly.severity == 0.75
assert anomaly.baseline_value == 10.0
assert anomaly.deviation_factor == 3.0
def test_is_critical_high_severity(self) -> None:
"""Should be critical when severity > 0.8."""
anomaly = Anomaly(
id=uuid4(),
anomaly_type=AnomalyType.UNUSUAL_FAILURE_RATE,
description="High failure rate",
severity=0.9,
episode_ids=[],
detected_at=datetime.now(UTC),
baseline_value=0.1,
observed_value=0.5,
deviation_factor=5.0,
)
assert anomaly.is_critical is True
def test_is_critical_low_severity(self) -> None:
"""Should not be critical when severity <= 0.8."""
anomaly = Anomaly(
id=uuid4(),
anomaly_type=AnomalyType.UNUSUAL_DURATION,
description="Slightly unusual",
severity=0.6,
episode_ids=[],
detected_at=datetime.now(UTC),
baseline_value=10.0,
observed_value=20.0,
deviation_factor=2.0,
)
assert anomaly.is_critical is False
def test_to_dict(self) -> None:
"""Should convert to dictionary."""
anomaly = Anomaly(
id=uuid4(),
anomaly_type=AnomalyType.UNEXPECTED_OUTCOME,
description="Unexpected failure",
severity=0.85,
episode_ids=[uuid4()],
detected_at=datetime.now(UTC),
baseline_value=0.9,
observed_value=0.0,
deviation_factor=0.9,
metadata={"task_type": "test"},
)
result = anomaly.to_dict()
assert result["anomaly_type"] == "unexpected_outcome"
assert result["severity"] == 0.85
assert result["is_critical"] is True
assert result["metadata"] == {"task_type": "test"}
class TestInsight:
"""Tests for Insight."""
def test_creates_insight(self) -> None:
"""Should create insight with all fields."""
insight = Insight(
id=uuid4(),
insight_type=InsightType.OPTIMIZATION,
title="Performance Opportunity",
description="Optimization potential found",
priority=0.8,
confidence=0.75,
source_patterns=[uuid4()],
source_factors=[],
source_anomalies=[],
recommended_actions=["Action 1", "Action 2"],
generated_at=datetime.now(UTC),
)
assert insight.title == "Performance Opportunity"
assert insight.priority == 0.8
assert len(insight.recommended_actions) == 2
def test_actionable_score(self) -> None:
"""Should calculate actionable score."""
insight = Insight(
id=uuid4(),
insight_type=InsightType.RECOMMENDATION,
title="Test",
description="Test",
priority=0.8,
confidence=0.9,
source_patterns=[],
source_factors=[],
source_anomalies=[],
recommended_actions=["Action 1", "Action 2", "Action 3"],
generated_at=datetime.now(UTC),
)
# actionable_score = priority * confidence * action_weight
# action_weight = min(1.0, 3/3) = 1.0
expected = 0.8 * 0.9 * 1.0
assert insight.actionable_score == expected
def test_actionable_score_few_actions(self) -> None:
"""Should weight by action count."""
insight = Insight(
id=uuid4(),
insight_type=InsightType.WARNING,
title="Test",
description="Test",
priority=0.8,
confidence=0.9,
source_patterns=[],
source_factors=[],
source_anomalies=[],
recommended_actions=["Action 1"], # Only 1 action
generated_at=datetime.now(UTC),
)
# action_weight = min(1.0, 1/3) = 0.333...
expected = 0.8 * 0.9 * (1 / 3)
assert abs(insight.actionable_score - expected) < 0.001
def test_to_dict(self) -> None:
"""Should convert to dictionary."""
insight = Insight(
id=uuid4(),
insight_type=InsightType.TREND,
title="Trend Analysis",
description="Performance trend",
priority=0.6,
confidence=0.7,
source_patterns=[uuid4()],
source_factors=[uuid4()],
source_anomalies=[],
recommended_actions=["Monitor", "Review"],
generated_at=datetime.now(UTC),
metadata={"health_score": 0.85},
)
result = insight.to_dict()
assert result["insight_type"] == "trend"
assert result["title"] == "Trend Analysis"
assert "actionable_score" in result
assert result["metadata"] == {"health_score": 0.85}
class TestReflectionResult:
"""Tests for ReflectionResult."""
def test_creates_result(self) -> None:
"""Should create reflection result."""
time_range = TimeRange.last_days(7)
result = ReflectionResult(
patterns=[],
factors=[],
anomalies=[],
insights=[],
time_range=time_range,
episodes_analyzed=100,
analysis_duration_seconds=2.5,
)
assert result.episodes_analyzed == 100
assert result.analysis_duration_seconds == 2.5
def test_to_dict(self) -> None:
"""Should convert to dictionary."""
time_range = TimeRange.last_days(7)
result = ReflectionResult(
patterns=[
Pattern(
id=uuid4(),
pattern_type=PatternType.RECURRING_SUCCESS,
name="Test",
description="Test",
confidence=0.8,
occurrence_count=5,
episode_ids=[],
first_seen=datetime.now(UTC),
last_seen=datetime.now(UTC),
)
],
factors=[],
anomalies=[],
insights=[],
time_range=time_range,
episodes_analyzed=50,
analysis_duration_seconds=1.5,
)
data = result.to_dict()
assert len(data["patterns"]) == 1
assert data["episodes_analyzed"] == 50
assert "time_range" in data
assert "duration_hours" in data["time_range"]
def test_summary(self) -> None:
"""Should generate summary text."""
time_range = TimeRange.last_days(7)
result = ReflectionResult(
patterns=[
Pattern(
id=uuid4(),
pattern_type=PatternType.RECURRING_SUCCESS,
name="Pattern 1",
description="Test",
confidence=0.8,
occurrence_count=5,
episode_ids=[],
first_seen=datetime.now(UTC),
last_seen=datetime.now(UTC),
)
],
factors=[
Factor(
id=uuid4(),
factor_type=FactorType.ACTION,
name="Factor 1",
description="Test",
impact_score=0.6,
correlation=0.4,
sample_size=10,
positive_examples=[],
negative_examples=[],
)
],
anomalies=[],
insights=[
Insight(
id=uuid4(),
insight_type=InsightType.OPTIMIZATION,
title="Top Insight",
description="Test",
priority=0.9,
confidence=0.8,
source_patterns=[],
source_factors=[],
source_anomalies=[],
recommended_actions=["Action"],
generated_at=datetime.now(UTC),
)
],
time_range=time_range,
episodes_analyzed=100,
analysis_duration_seconds=2.0,
)
summary = result.summary
assert "Reflection Analysis" in summary
assert "Episodes analyzed: 100" in summary
assert "Patterns detected: 1" in summary
assert "Success/failure factors: 1" in summary
assert "Insights generated: 1" in summary
assert "Top insights:" in summary
assert "Top Insight" in summary
class TestPatternType:
"""Tests for PatternType enum."""
def test_all_pattern_types(self) -> None:
"""Should have all expected pattern types."""
assert PatternType.RECURRING_SUCCESS.value == "recurring_success"
assert PatternType.RECURRING_FAILURE.value == "recurring_failure"
assert PatternType.ACTION_SEQUENCE.value == "action_sequence"
assert PatternType.CONTEXT_CORRELATION.value == "context_correlation"
assert PatternType.TEMPORAL.value == "temporal"
assert PatternType.EFFICIENCY.value == "efficiency"
class TestFactorType:
"""Tests for FactorType enum."""
def test_all_factor_types(self) -> None:
"""Should have all expected factor types."""
assert FactorType.ACTION.value == "action"
assert FactorType.CONTEXT.value == "context"
assert FactorType.TIMING.value == "timing"
assert FactorType.RESOURCE.value == "resource"
assert FactorType.PRECEDING_STATE.value == "preceding_state"
class TestAnomalyType:
"""Tests for AnomalyType enum."""
def test_all_anomaly_types(self) -> None:
"""Should have all expected anomaly types."""
assert AnomalyType.UNUSUAL_DURATION.value == "unusual_duration"
assert AnomalyType.UNEXPECTED_OUTCOME.value == "unexpected_outcome"
assert AnomalyType.UNUSUAL_TOKEN_USAGE.value == "unusual_token_usage"
assert AnomalyType.UNUSUAL_FAILURE_RATE.value == "unusual_failure_rate"
assert AnomalyType.UNUSUAL_ACTION_PATTERN.value == "unusual_action_pattern"
class TestInsightType:
"""Tests for InsightType enum."""
def test_all_insight_types(self) -> None:
"""Should have all expected insight types."""
assert InsightType.OPTIMIZATION.value == "optimization"
assert InsightType.WARNING.value == "warning"
assert InsightType.LEARNING.value == "learning"
assert InsightType.RECOMMENDATION.value == "recommendation"
assert InsightType.TREND.value == "trend"

View File

@@ -2,7 +2,7 @@
Tests for Memory System Types.
"""
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from uuid import uuid4
from app.services.memory.types import (
@@ -150,7 +150,7 @@ class TestMemoryItem:
def test_get_age_seconds(self) -> None:
"""Test getting item age."""
past = datetime.now() - timedelta(seconds=100)
past = datetime.now(UTC) - timedelta(seconds=100)
item = MemoryItem(
id=uuid4(),
memory_type=MemoryType.SEMANTIC,
@@ -202,7 +202,7 @@ class TestWorkingMemoryItem:
scope_id="sess-123",
key="my_key",
value="value",
expires_at=datetime.now() + timedelta(hours=1),
expires_at=datetime.now(UTC) + timedelta(hours=1),
)
assert item.is_expired() is False
@@ -215,7 +215,7 @@ class TestWorkingMemoryItem:
scope_id="sess-123",
key="my_key",
value="value",
expires_at=datetime.now() - timedelta(hours=1),
expires_at=datetime.now(UTC) - timedelta(hours=1),
)
assert item.is_expired() is True