# Agent Memory System Comprehensive multi-tier cognitive memory for AI agents, enabling state persistence, experiential learning, and context continuity across sessions. ## Overview The Agent Memory System implements a cognitive architecture inspired by human memory: ``` +------------------------------------------------------------------+ | Agent Memory System | +------------------------------------------------------------------+ | | | +------------------+ +------------------+ | | | Working Memory |----consolidate---->| Episodic Memory | | | | (Redis/In-Mem) | | (PostgreSQL) | | | | | | | | | | - Current task | | - Past sessions | | | | - Variables | | - Experiences | | | | - Scratchpad | | - Outcomes | | | +------------------+ +--------+---------+ | | | | | extract | | | v | | +------------------+ +------------------+ | | |Procedural Memory |<-----learn from----| Semantic Memory | | | | (PostgreSQL) | | (PostgreSQL + | | | | | | pgvector) | | | | - Procedures | | | | | | - Skills | | - Facts | | | | - Patterns | | - Entities | | | +------------------+ | - Relationships | | | +------------------+ | +------------------------------------------------------------------+ ``` ## Memory Types ### Working Memory Short-term, session-scoped memory for current task state. **Features:** - Key-value storage with TTL - Task state tracking - Scratchpad for reasoning - Checkpoint/restore support - Redis primary with in-memory fallback **Usage:** ```python from app.services.memory.working import WorkingMemory memory = WorkingMemory(scope_context) await memory.set("key", {"data": "value"}, ttl_seconds=3600) value = await memory.get("key") # Task state await memory.set_task_state(TaskState(task_id="t1", status="running")) state = await memory.get_task_state() # Checkpoints checkpoint_id = await memory.create_checkpoint() await memory.restore_checkpoint(checkpoint_id) ``` ### Episodic Memory Experiential records of past agent actions and outcomes. **Features:** - Records task completions and failures - Semantic similarity search (pgvector) - Temporal and outcome-based retrieval - Importance scoring - Episode summarization **Usage:** ```python from app.services.memory.episodic import EpisodicMemory memory = EpisodicMemory(session, embedder) # Record an episode episode = await memory.record_episode( project_id=project_id, episode=EpisodeCreate( task_type="code_review", task_description="Review PR #42", outcome=Outcome.SUCCESS, actions=[{"type": "analyze", "target": "src/"}], ) ) # Search similar experiences similar = await memory.search_similar( project_id=project_id, query="debugging memory leak", limit=5 ) # Get recent episodes recent = await memory.get_recent(project_id, limit=10) ``` ### Semantic Memory Learned facts and knowledge with confidence scoring. **Features:** - Triple format (subject, predicate, object) - Confidence scoring with decay - Fact extraction from episodes - Conflict resolution - Entity-based retrieval **Usage:** ```python from app.services.memory.semantic import SemanticMemory memory = SemanticMemory(session, embedder) # Store a fact fact = await memory.store_fact( project_id=project_id, fact=FactCreate( subject="UserService", predicate="handles", object="authentication", confidence=0.9, ) ) # Search facts facts = await memory.search_facts(project_id, "authentication flow") # Reinforce on repeated learning await memory.reinforce_fact(fact.id) ``` ### Procedural Memory Learned skills and procedures from successful patterns. **Features:** - Procedure recording from task patterns - Trigger-based matching - Success rate tracking - Procedure suggestions - Step-by-step storage **Usage:** ```python from app.services.memory.procedural import ProceduralMemory memory = ProceduralMemory(session, embedder) # Record a procedure procedure = await memory.record_procedure( project_id=project_id, procedure=ProcedureCreate( name="PR Review Process", trigger_pattern="code review requested", steps=[ Step(action="fetch_diff"), Step(action="analyze_changes"), Step(action="check_tests"), ] ) ) # Find matching procedures matches = await memory.find_matching(project_id, "need to review code") # Record outcomes await memory.record_outcome(procedure.id, success=True) ``` ## Memory Scoping Memory is organized in a hierarchical scope structure: ``` Global Memory (shared by all) └── Project Memory (per project) └── Agent Type Memory (per agent type) └── Agent Instance Memory (per instance) └── Session Memory (ephemeral) ``` **Usage:** ```python from app.services.memory.scoping import ScopeManager, ScopeLevel manager = ScopeManager(session) # Get scoped memories with inheritance memories = await manager.get_scoped_memories( context=ScopeContext( project_id=project_id, agent_type_id=agent_type_id, agent_instance_id=agent_instance_id, session_id=session_id, ), include_inherited=True, # Include parent scopes ) ``` ## Memory Consolidation Automatic background processes transfer and extract knowledge: ``` Working Memory ──> Episodic Memory ──> Semantic Memory └──> Procedural Memory ``` **Consolidation Types:** - `working_to_episodic`: Transfer session state to episodes (on session end) - `episodic_to_semantic`: Extract facts from experiences - `episodic_to_procedural`: Learn procedures from patterns - `prune`: Remove low-value memories **Celery Tasks:** ```python from app.tasks.memory_consolidation import ( consolidate_session, run_nightly_consolidation, prune_old_memories, ) # Manual consolidation consolidate_session.delay(session_id) # Scheduled nightly (3 AM by default) run_nightly_consolidation.delay() ``` ## Memory Retrieval ### Hybrid Retrieval Combine multiple retrieval strategies: ```python from app.services.memory.indexing import RetrievalEngine engine = RetrievalEngine(session, embedder) # Hybrid search across memory types results = await engine.retrieve_hybrid( project_id=project_id, query="authentication error handling", memory_types=["episodic", "semantic", "procedural"], filters={"outcome": "success"}, limit=10, ) ``` ### Index Types - **Vector Index**: Semantic similarity (HNSW/pgvector) - **Temporal Index**: Time-based retrieval - **Entity Index**: Entity mention lookup - **Outcome Index**: Success/failure filtering ## MCP Tools The memory system exposes MCP tools for agent use: ### `remember` Store information in memory. ```json { "memory_type": "working", "content": {"key": "value"}, "importance": 0.8, "ttl_seconds": 3600 } ``` ### `recall` Retrieve from memory. ```json { "query": "authentication patterns", "memory_types": ["episodic", "semantic"], "limit": 10, "filters": {"outcome": "success"} } ``` ### `forget` Remove from memory. ```json { "memory_type": "working", "key": "temp_data" } ``` ### `reflect` Analyze memory patterns. ```json { "analysis_type": "success_factors", "task_type": "code_review", "time_range_days": 30 } ``` ### `get_memory_stats` Get memory usage statistics. ### `record_outcome` Record task success/failure for learning. ## Memory Reflection Analyze patterns and generate insights from memory: ```python from app.services.memory.reflection import MemoryReflection, TimeRange reflection = MemoryReflection(session) # Detect patterns patterns = await reflection.analyze_patterns( project_id=project_id, time_range=TimeRange.last_days(30), ) # Identify success factors factors = await reflection.identify_success_factors( project_id=project_id, task_type="code_review", ) # Detect anomalies anomalies = await reflection.detect_anomalies( project_id=project_id, baseline_days=30, ) # Generate insights insights = await reflection.generate_insights(project_id) # Comprehensive reflection result = await reflection.reflect(project_id) print(result.summary) ``` ## Configuration All settings use the `MEM_` environment variable prefix: | Variable | Default | Description | |----------|---------|-------------| | `MEM_WORKING_MEMORY_BACKEND` | `redis` | Backend: `redis` or `memory` | | `MEM_WORKING_MEMORY_DEFAULT_TTL_SECONDS` | `3600` | Default TTL (1 hour) | | `MEM_REDIS_URL` | `redis://localhost:6379/0` | Redis connection URL | | `MEM_EPISODIC_MAX_EPISODES_PER_PROJECT` | `10000` | Max episodes per project | | `MEM_EPISODIC_RETENTION_DAYS` | `365` | Episode retention period | | `MEM_SEMANTIC_MAX_FACTS_PER_PROJECT` | `50000` | Max facts per project | | `MEM_SEMANTIC_CONFIDENCE_DECAY_DAYS` | `90` | Confidence half-life | | `MEM_EMBEDDING_MODEL` | `text-embedding-3-small` | Embedding model | | `MEM_EMBEDDING_DIMENSIONS` | `1536` | Vector dimensions | | `MEM_RETRIEVAL_MIN_SIMILARITY` | `0.5` | Minimum similarity score | | `MEM_CONSOLIDATION_ENABLED` | `true` | Enable auto-consolidation | | `MEM_CONSOLIDATION_SCHEDULE_CRON` | `0 3 * * *` | Nightly schedule | | `MEM_CACHE_ENABLED` | `true` | Enable retrieval caching | | `MEM_CACHE_TTL_SECONDS` | `300` | Cache TTL (5 minutes) | See `app/services/memory/config.py` for complete configuration options. ## Integration with Context Engine Memory integrates with the Context Engine as a context source: ```python from app.services.memory.integration import MemoryContextSource # Register as context source source = MemoryContextSource(memory_manager) context_engine.register_source(source) # Memory is automatically included in context assembly context = await context_engine.assemble_context( project_id=project_id, session_id=session_id, current_task="Review authentication code", ) ``` ## Caching Multi-layer caching for performance: - **Hot Cache**: Frequently accessed memories (LRU) - **Retrieval Cache**: Query result caching - **Embedding Cache**: Pre-computed embeddings ```python from app.services.memory.cache import CacheManager cache = CacheManager(settings) await cache.warm_hot_cache(project_id) # Pre-warm common memories ``` ## Metrics Prometheus-compatible metrics: | Metric | Type | Labels | |--------|------|--------| | `memory_operations_total` | Counter | operation, memory_type, scope, success | | `memory_retrievals_total` | Counter | memory_type, strategy | | `memory_cache_hits_total` | Counter | cache_type | | `memory_retrieval_latency_seconds` | Histogram | - | | `memory_consolidation_duration_seconds` | Histogram | - | | `memory_items_count` | Gauge | memory_type, scope | ```python from app.services.memory.metrics import get_memory_metrics metrics = await get_memory_metrics() summary = await metrics.get_summary() prometheus_output = await metrics.get_prometheus_format() ``` ## Performance Targets | Operation | Target P95 | |-----------|------------| | Working memory get/set | < 5ms | | Episodic memory retrieval | < 100ms | | Semantic memory search | < 100ms | | Procedural memory matching | < 50ms | | Consolidation batch (1000 items) | < 30s | ## Troubleshooting ### Redis Connection Issues ```bash # Check Redis connectivity redis-cli ping # Verify memory settings MEM_REDIS_URL=redis://localhost:6379/0 ``` ### Slow Retrieval 1. Check if caching is enabled: `MEM_CACHE_ENABLED=true` 2. Verify HNSW indexes exist on vector columns 3. Monitor `memory_retrieval_latency_seconds` metric ### High Memory Usage 1. Review `MEM_EPISODIC_MAX_EPISODES_PER_PROJECT` limit 2. Ensure pruning is enabled: `MEM_PRUNING_ENABLED=true` 3. Check consolidation is running (cron schedule) ### Embedding Errors 1. Verify LLM Gateway is accessible 2. Check embedding model is valid 3. Review batch size if hitting rate limits ## Directory Structure ``` app/services/memory/ ├── __init__.py # Public exports ├── config.py # MemorySettings ├── exceptions.py # Memory-specific errors ├── manager.py # MemoryManager facade ├── types.py # Core types ├── working/ # Working memory │ ├── memory.py │ └── storage.py ├── episodic/ # Episodic memory │ ├── memory.py │ ├── recorder.py │ └── retrieval.py ├── semantic/ # Semantic memory │ ├── memory.py │ ├── extraction.py │ └── verification.py ├── procedural/ # Procedural memory │ ├── memory.py │ └── matching.py ├── scoping/ # Memory scoping │ ├── scope.py │ └── resolver.py ├── indexing/ # Indexing & retrieval │ ├── index.py │ └── retrieval.py ├── consolidation/ # Memory consolidation │ └── service.py ├── reflection/ # Memory reflection │ ├── service.py │ └── types.py ├── integration/ # External integrations │ ├── context_source.py │ └── lifecycle.py ├── cache/ # Caching layer │ ├── cache_manager.py │ ├── hot_cache.py │ └── embedding_cache.py ├── mcp/ # MCP tools │ ├── service.py │ └── tools.py └── metrics/ # Observability └── collector.py ```