forked from cardosofelipe/fast-next-template
Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1327 lines
49 KiB
Markdown
1327 lines
49 KiB
Markdown
# SPIKE-002: Agent Orchestration Pattern
|
|
|
|
**Status:** Completed
|
|
**Date:** 2025-12-29
|
|
**Author:** Architecture Team
|
|
**Related Issue:** #2
|
|
|
|
---
|
|
|
|
## Executive Summary
|
|
|
|
After researching leading multi-agent orchestration frameworks (AutoGen, CrewAI, LangGraph) and enterprise patterns, we recommend a **hybrid architecture** for Syndarix that combines:
|
|
|
|
1. **LangGraph** for core agent orchestration logic (graph-based state machines)
|
|
2. **Temporal** for durable, long-running workflow execution
|
|
3. **Event-driven communication** via Redis Streams with A2A protocol concepts
|
|
4. **Hierarchical supervisor pattern** with peer-to-peer collaboration within teams
|
|
|
|
This architecture provides the flexibility, scalability, and durability required for 50+ concurrent agent instances while maintaining individual agent context, token tracking, and LLM failover capabilities.
|
|
|
|
### Key Recommendation
|
|
|
|
**Do not adopt a single framework wholesale.** Instead, build a custom orchestration layer using:
|
|
|
|
- **LangGraph** for agent logic and state transitions (graph primitives)
|
|
- **Temporal** for durable execution and workflow persistence
|
|
- **Redis Streams** for event-driven inter-agent communication
|
|
- **LiteLLM** for unified LLM access with failover (per SPIKE-005)
|
|
|
|
---
|
|
|
|
## Research Questions Addressed
|
|
|
|
### 1. What are the leading patterns for multi-agent orchestration in 2024-2025?
|
|
|
|
The multi-agent landscape has matured significantly. Key patterns include:
|
|
|
|
| Pattern | Description | Best For |
|
|
|---------|-------------|----------|
|
|
| **Supervisor** | Central orchestrator coordinates specialized agents | Complex multi-domain workflows |
|
|
| **Hierarchical** | Tree structure with managers and workers | Large-scale, layered problems |
|
|
| **Peer-to-Peer** | Agents collaborate as equals without central control | Open-ended ideation, negotiation |
|
|
| **Orchestrator-Worker** | Planner breaks tasks, workers execute | Deterministic pipelines |
|
|
| **Blackboard** | Shared knowledge space, agents react to changes | Incremental problem-solving |
|
|
| **Market-Based** | Agents bid for tasks based on capabilities | Dynamic resource allocation |
|
|
|
|
**Industry Trend (2025):** Event-driven architectures are becoming dominant, with protocols like Google's A2A (Agent-to-Agent), IBM's ACP, and AG-UI standardizing communication.
|
|
|
|
### 2. Framework Comparison
|
|
|
|
| Feature | AutoGen 0.4 | CrewAI | LangGraph | Custom |
|
|
|---------|-------------|--------|-----------|--------|
|
|
| **Architecture** | Event-driven, distributed | Crews + Flows dual-mode | Graph-based state machine | Flexible |
|
|
| **State Management** | Built-in, cross-language | Shared memory | Persistent, checkpointed | Custom (Temporal) |
|
|
| **Scalability** | High (Kubernetes-ready) | Medium-High | High | Very High |
|
|
| **Learning Curve** | Medium | Low-Medium | High | High |
|
|
| **Enterprise Adoption** | Microsoft ecosystem | 60% Fortune 500 | Klarna, Replit, Elastic | N/A |
|
|
| **Long-running Workflows** | Good | Limited | Good | Excellent (Temporal) |
|
|
| **Customization** | Medium | Limited | High | Full |
|
|
| **Token Tracking** | Basic | Basic | Via LangSmith | Custom |
|
|
| **Multi-Provider Failover** | Limited | Limited | Limited | Full (LiteLLM) |
|
|
| **50+ Concurrent Agents** | Possible | Challenging | Possible | Designed for |
|
|
|
|
#### AutoGen 0.4 (Microsoft)
|
|
|
|
**Pros:**
|
|
- Event-driven, async-first architecture
|
|
- Cross-language support (.NET, Python)
|
|
- Built-in observability (OpenTelemetry)
|
|
- Strong Microsoft ecosystem integration
|
|
- New Agent Framework combines AutoGen + Semantic Kernel
|
|
|
|
**Cons:**
|
|
- Tied to Microsoft patterns
|
|
- Less flexible for custom orchestration
|
|
- Newer 0.4 version still maturing
|
|
|
|
**Best For:** Microsoft-centric enterprises, standardized agent patterns
|
|
|
|
#### CrewAI
|
|
|
|
**Pros:**
|
|
- Easy to get started (role-based agents)
|
|
- Good for sequential/hierarchical workflows
|
|
- Strong enterprise traction ($18M Series A)
|
|
- LLM-agnostic design
|
|
|
|
**Cons:**
|
|
- Limited ceiling for complex patterns
|
|
- Teams report hitting walls at 6-12 months
|
|
- Multi-agent complexity can cause loops
|
|
- Flows architecture adds learning curve
|
|
|
|
**Best For:** Quick prototypes, straightforward workflows, teams new to agents
|
|
|
|
#### LangGraph
|
|
|
|
**Pros:**
|
|
- Fine-grained control over agent flow
|
|
- Excellent state management with persistence
|
|
- Time-travel debugging (LangSmith)
|
|
- Human-in-the-loop built-in
|
|
- Cycles, conditionals, parallel execution
|
|
|
|
**Cons:**
|
|
- Steep learning curve (graph theory, state machines)
|
|
- Requires distributed systems knowledge
|
|
- Observability requires LangSmith subscription
|
|
|
|
**Best For:** Complex, stateful workflows requiring precise control
|
|
|
|
### 3. Enterprise Agent State Management
|
|
|
|
Enterprise systems use these patterns:
|
|
|
|
**Event Sourcing:**
|
|
```python
|
|
# All state changes stored as events
|
|
class AgentStateEvent:
|
|
event_type: str # "TASK_ASSIGNED", "STATE_CHANGED", etc.
|
|
agent_id: str
|
|
timestamp: datetime
|
|
data: dict
|
|
sequence_number: int
|
|
```
|
|
|
|
**Checkpoint/Snapshot Pattern:**
|
|
```python
|
|
# Periodic snapshots for fast recovery
|
|
class AgentCheckpoint:
|
|
agent_id: str
|
|
state: AgentState
|
|
memory: dict
|
|
context_window: list[Message]
|
|
checkpoint_time: datetime
|
|
```
|
|
|
|
**Temporal Durable Execution:**
|
|
```python
|
|
# Workflow state is automatically persistent
|
|
@workflow.defn
|
|
class AgentWorkflow:
|
|
@workflow.run
|
|
async def run(self, task: Task) -> Result:
|
|
# State survives crashes, restarts, deployments
|
|
result = await workflow.execute_activity(
|
|
execute_agent_task,
|
|
task,
|
|
start_to_close_timeout=timedelta(hours=24)
|
|
)
|
|
return result
|
|
```
|
|
|
|
### 4. Agent-to-Agent Communication
|
|
|
|
**Recommended: Event-Driven with Redis Streams**
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Redis Streams │
|
|
│ ┌─────────────────────────────────────────────────────┐ │
|
|
│ │ project:{project_id}:events │ │
|
|
│ │ - agent_message │ │
|
|
│ │ - task_assignment │ │
|
|
│ │ - state_change │ │
|
|
│ │ - artifact_produced │ │
|
|
│ └─────────────────────────────────────────────────────┘ │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
▲ ▲ ▲ ▲
|
|
│ │ │ │
|
|
┌────┴───┐ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐
|
|
│ PO │ │ Arch │ │ Dev-1 │ │ Dev-2 │
|
|
│ Agent │ │ Agent │ │ Agent │ │ Agent │
|
|
└────────┘ └────────┘ └────────┘ └────────┘
|
|
```
|
|
|
|
**Protocol Design (A2A-inspired):**
|
|
|
|
```python
|
|
@dataclass
|
|
class AgentMessage:
|
|
"""Agent-to-Agent message following A2A concepts."""
|
|
id: UUID
|
|
source_agent_id: str
|
|
target_agent_id: str | None # None = broadcast
|
|
project_id: str
|
|
message_type: Literal[
|
|
"TASK_HANDOFF",
|
|
"CONTEXT_SHARE",
|
|
"REVIEW_REQUEST",
|
|
"FEEDBACK",
|
|
"ARTIFACT",
|
|
"QUERY",
|
|
]
|
|
payload: dict
|
|
correlation_id: UUID | None # For request/response
|
|
timestamp: datetime
|
|
|
|
@dataclass
|
|
class AgentCard:
|
|
"""Agent capability advertisement (A2A concept)."""
|
|
agent_id: str
|
|
agent_type: str
|
|
capabilities: list[str]
|
|
current_state: AgentState
|
|
project_id: str
|
|
```
|
|
|
|
### 5. Long-Running Agent Workflows
|
|
|
|
**Recommended: Temporal for Durable Execution**
|
|
|
|
Temporal provides:
|
|
- **Automatic state persistence** - survives crashes and restarts
|
|
- **Built-in retries** with exponential backoff
|
|
- **Long-running support** - hours, days, even months
|
|
- **Human-in-the-loop** - pause and wait for approval
|
|
- **Visibility** - full execution history
|
|
|
|
```python
|
|
# workflows/sprint_workflow.py
|
|
from temporalio import workflow, activity
|
|
from datetime import timedelta
|
|
|
|
@workflow.defn
|
|
class SprintWorkflow:
|
|
"""Durable workflow for autonomous sprint execution."""
|
|
|
|
def __init__(self):
|
|
self._state = SprintState.PLANNING
|
|
self._agents: dict[str, AgentHandle] = {}
|
|
self._artifacts: list[Artifact] = []
|
|
|
|
@workflow.run
|
|
async def run(self, sprint: SprintConfig) -> SprintResult:
|
|
# Phase 1: Planning (may take hours)
|
|
backlog = await workflow.execute_activity(
|
|
plan_sprint,
|
|
sprint,
|
|
start_to_close_timeout=timedelta(hours=4),
|
|
)
|
|
|
|
# Phase 2: Parallel development (may take days)
|
|
dev_tasks = await self._execute_development(backlog)
|
|
|
|
# Phase 3: Review checkpoint (human approval)
|
|
if sprint.autonomy_level != AutonomyLevel.AUTONOMOUS:
|
|
await workflow.wait_condition(
|
|
lambda: self._human_approved
|
|
)
|
|
|
|
# Phase 4: QA and finalization
|
|
result = await workflow.execute_activity(
|
|
finalize_sprint,
|
|
dev_tasks,
|
|
start_to_close_timeout=timedelta(hours=2),
|
|
)
|
|
|
|
return result
|
|
|
|
@workflow.signal
|
|
async def approve_checkpoint(self, approved: bool):
|
|
self._human_approved = approved
|
|
|
|
@workflow.query
|
|
def get_state(self) -> SprintState:
|
|
return self._state
|
|
```
|
|
|
|
### 6. Orchestration Topology Comparison
|
|
|
|
#### Supervisor Pattern
|
|
|
|
```
|
|
┌─────────────────┐
|
|
│ Supervisor │
|
|
│ (Orchestrator)│
|
|
└────────┬────────┘
|
|
│
|
|
┌────────────────────┼────────────────────┐
|
|
│ │ │
|
|
▼ ▼ ▼
|
|
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
|
|
│ Specialist 1 │ │ Specialist 2 │ │ Specialist 3 │
|
|
│ (Dev Agent) │ │ (QA Agent) │ │ (DevOps) │
|
|
└──────────────┘ └──────────────┘ └──────────────┘
|
|
```
|
|
|
|
**Pros:** Clear control, auditability, easier debugging
|
|
**Cons:** Bottleneck at supervisor, single point of failure
|
|
**Syndarix Use:** Project Manager as supervisor for sprints
|
|
|
|
#### Hierarchical Pattern
|
|
|
|
```
|
|
┌────────────────┐
|
|
│ Project Lead │
|
|
│ (PO) │
|
|
└───────┬────────┘
|
|
│
|
|
┌──────────────────┼──────────────────┐
|
|
│ │ │
|
|
▼ ▼ ▼
|
|
┌────────────┐ ┌────────────┐ ┌────────────┐
|
|
│ Dev Lead │ │ QA Lead │ │ Ops Lead │
|
|
│ (Architect)│ │ │ │ │
|
|
└─────┬──────┘ └─────┬──────┘ └────────────┘
|
|
│ │
|
|
┌────────┼────────┐ ┌────┴────┐
|
|
│ │ │ │ │
|
|
▼ ▼ ▼ ▼ ▼
|
|
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
|
|
│ Dave │ │ Ellis│ │ Kate │ │ QA-1 │ │ QA-2 │
|
|
└──────┘ └──────┘ └──────┘ └──────┘ └──────┘
|
|
```
|
|
|
|
**Pros:** Scalable, localized decision-making, mirrors real teams
|
|
**Cons:** Communication overhead, coordination complexity
|
|
**Syndarix Use:** Recommended primary pattern
|
|
|
|
#### Peer-to-Peer Pattern
|
|
|
|
```
|
|
┌──────┐ ┌──────┐
|
|
│ Dave │◄───►│ Ellis│
|
|
└──┬───┘ └──┬───┘
|
|
│ │
|
|
│ ┌──────┐ │
|
|
└──►│ Kate │◄┘
|
|
└──┬───┘
|
|
│
|
|
▼
|
|
[Shared Blackboard]
|
|
```
|
|
|
|
**Pros:** Flexible, emergent collaboration, no bottleneck
|
|
**Cons:** Harder to control, potential infinite loops
|
|
**Syndarix Use:** Within teams for brainstorming/spikes
|
|
|
|
---
|
|
|
|
## Proposed Architecture for Syndarix
|
|
|
|
### High-Level Design
|
|
|
|
```
|
|
┌───────────────────────────────────────────────────────────────────────┐
|
|
│ Syndarix Orchestration Layer │
|
|
├───────────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ ┌─────────────────────────────────────────────────────────────────┐ │
|
|
│ │ Temporal Workflow Engine │ │
|
|
│ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │
|
|
│ │ │ Project │ │ Sprint │ │ Agent Task │ │ │
|
|
│ │ │ Workflow │ │ Workflow │ │ Workflow │ │ │
|
|
│ │ └───────────────┘ └───────────────┘ └───────────────┘ │ │
|
|
│ └─────────────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ┌─────────────────────────────────────────────────────────────────┐ │
|
|
│ │ Agent Runtime (LangGraph) │ │
|
|
│ │ │ │
|
|
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
|
|
│ │ │ Agent State Graph │ │ │
|
|
│ │ │ [IDLE] ──► [THINKING] ──► [EXECUTING] ──► [BLOCKED] │ │ │
|
|
│ │ │ ▲ │ │ │ │ │ │
|
|
│ │ │ └────────────┴──────────────┴──────────────┘ │ │ │
|
|
│ │ └─────────────────────────────────────────────────────────┘ │ │
|
|
│ │ │ │
|
|
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
|
|
│ │ │ Agent │ │ Agent │ │ Agent │ │ Agent │ │ │
|
|
│ │ │ Instance │ │ Instance │ │ Instance │ │ Instance │ │ │
|
|
│ │ │ (PO) │ │ (Arch) │ │ (Dev-1) │ │ (Dev-2) │ │ │
|
|
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
|
|
│ └─────────────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ┌─────────────────────────────────────────────────────────────────┐ │
|
|
│ │ Communication Layer │ │
|
|
│ │ │ │
|
|
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
|
|
│ │ │ Redis Streams │ │ WebSocket/SSE │ │ Event Bus │ │ │
|
|
│ │ │ (Agent-Agent) │ │ (Agent-UI) │ │ (System) │ │ │
|
|
│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │
|
|
│ └─────────────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ┌─────────────────────────────────────────────────────────────────┐ │
|
|
│ │ Infrastructure Layer │ │
|
|
│ │ │ │
|
|
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
|
|
│ │ │ LiteLLM │ │ MCP │ │ PostgreSQL │ │ │
|
|
│ │ │ (Gateway) │ │ (Servers) │ │ (pgvector) │ │ │
|
|
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
|
|
│ └─────────────────────────────────────────────────────────────────┘ │
|
|
│ │
|
|
└───────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### Agent Instance Model
|
|
|
|
```python
|
|
# app/models/agent_instance.py
|
|
from sqlalchemy import Column, String, Enum, ForeignKey, JSON, DateTime
|
|
from sqlalchemy.dialects.postgresql import UUID
|
|
from app.db.base import Base
|
|
import enum
|
|
|
|
class AgentState(str, enum.Enum):
|
|
IDLE = "idle"
|
|
THINKING = "thinking"
|
|
EXECUTING = "executing"
|
|
WAITING_INPUT = "waiting_input"
|
|
WAITING_REVIEW = "waiting_review"
|
|
BLOCKED = "blocked"
|
|
COMPLETED = "completed"
|
|
ERROR = "error"
|
|
|
|
class AgentInstance(Base):
|
|
"""Individual agent instance spawned from a type."""
|
|
__tablename__ = "agent_instances"
|
|
|
|
id = Column(UUID, primary_key=True)
|
|
name = Column(String(50)) # "Dave", "Ellis", "Kate"
|
|
|
|
# Type relationship
|
|
agent_type_id = Column(UUID, ForeignKey("agent_types.id"))
|
|
|
|
# Project assignment
|
|
project_id = Column(UUID, ForeignKey("projects.id"))
|
|
|
|
# Current state
|
|
state = Column(Enum(AgentState), default=AgentState.IDLE)
|
|
current_task_id = Column(UUID, ForeignKey("tasks.id"), nullable=True)
|
|
|
|
# Context/Memory
|
|
working_memory = Column(JSON, default=dict) # Short-term context
|
|
system_prompt_override = Column(String, nullable=True)
|
|
|
|
# Token/Cost tracking (per instance)
|
|
total_tokens_used = Column(Integer, default=0)
|
|
total_cost_usd = Column(Float, default=0.0)
|
|
|
|
# Timestamps
|
|
created_at = Column(DateTime, default=datetime.utcnow)
|
|
last_active_at = Column(DateTime, nullable=True)
|
|
|
|
# Relationships
|
|
agent_type = relationship("AgentType", back_populates="instances")
|
|
project = relationship("Project", back_populates="agents")
|
|
conversations = relationship("Conversation", back_populates="agent")
|
|
```
|
|
|
|
### Agent State Machine (LangGraph)
|
|
|
|
```python
|
|
# app/agents/state_machine.py
|
|
from langgraph.graph import StateGraph, END
|
|
from langgraph.checkpoint.postgres import PostgresSaver
|
|
from typing import TypedDict, Annotated, Literal
|
|
from operator import add
|
|
|
|
class AgentGraphState(TypedDict):
|
|
"""State shared across agent graph nodes."""
|
|
agent_id: str
|
|
project_id: str
|
|
task: dict | None
|
|
messages: Annotated[list, add] # Append-only
|
|
artifacts: Annotated[list, add]
|
|
current_state: AgentState
|
|
iteration_count: int
|
|
error: str | None
|
|
|
|
def create_agent_graph(agent_type: AgentType) -> StateGraph:
|
|
"""Create a LangGraph state machine for an agent type."""
|
|
|
|
graph = StateGraph(AgentGraphState)
|
|
|
|
# Define nodes
|
|
graph.add_node("think", think_node)
|
|
graph.add_node("execute", execute_node)
|
|
graph.add_node("review", review_node)
|
|
graph.add_node("handoff", handoff_node)
|
|
graph.add_node("error_handler", error_node)
|
|
|
|
# Define edges
|
|
graph.set_entry_point("think")
|
|
|
|
graph.add_conditional_edges(
|
|
"think",
|
|
route_after_think,
|
|
{
|
|
"execute": "execute",
|
|
"handoff": "handoff",
|
|
"review": "review",
|
|
"end": END,
|
|
}
|
|
)
|
|
|
|
graph.add_conditional_edges(
|
|
"execute",
|
|
route_after_execute,
|
|
{
|
|
"think": "think", # Continue reasoning
|
|
"review": "review",
|
|
"error": "error_handler",
|
|
}
|
|
)
|
|
|
|
graph.add_edge("review", "think")
|
|
graph.add_edge("error_handler", END)
|
|
graph.add_edge("handoff", END)
|
|
|
|
return graph.compile(
|
|
checkpointer=PostgresSaver.from_conn_string(settings.DATABASE_URL)
|
|
)
|
|
|
|
async def think_node(state: AgentGraphState) -> AgentGraphState:
|
|
"""Agent reasoning/planning node."""
|
|
agent = await get_agent_instance(state["agent_id"])
|
|
|
|
# Get LLM response
|
|
response = await llm_gateway.complete(
|
|
agent_id=state["agent_id"],
|
|
project_id=state["project_id"],
|
|
messages=build_messages(agent, state),
|
|
model_preference=agent.agent_type.model_preference,
|
|
)
|
|
|
|
# Parse response for next action
|
|
action = parse_agent_action(response["content"])
|
|
|
|
return {
|
|
**state,
|
|
"messages": [{"role": "assistant", "content": response["content"]}],
|
|
"current_action": action,
|
|
"iteration_count": state["iteration_count"] + 1,
|
|
}
|
|
|
|
async def execute_node(state: AgentGraphState) -> AgentGraphState:
|
|
"""Execute agent action (tool calls, artifact creation)."""
|
|
action = state.get("current_action")
|
|
|
|
if action["type"] == "tool_call":
|
|
result = await mcp_client.call_tool(
|
|
server=action["server"],
|
|
tool_name=action["tool"],
|
|
arguments={
|
|
"project_id": state["project_id"],
|
|
"agent_id": state["agent_id"],
|
|
**action["arguments"],
|
|
}
|
|
)
|
|
return {
|
|
**state,
|
|
"messages": [{"role": "tool", "content": str(result)}],
|
|
}
|
|
|
|
elif action["type"] == "produce_artifact":
|
|
artifact = await create_artifact(action["artifact"])
|
|
return {
|
|
**state,
|
|
"artifacts": [artifact],
|
|
}
|
|
|
|
return state
|
|
```
|
|
|
|
### Inter-Agent Communication
|
|
|
|
```python
|
|
# app/agents/communication.py
|
|
from redis import asyncio as aioredis
|
|
from dataclasses import dataclass, asdict
|
|
import json
|
|
|
|
@dataclass
|
|
class AgentMessage:
|
|
id: str
|
|
source_agent_id: str
|
|
target_agent_id: str | None
|
|
project_id: str
|
|
message_type: str
|
|
payload: dict
|
|
timestamp: str
|
|
correlation_id: str | None = None
|
|
|
|
class AgentMessageBus:
|
|
"""Redis Streams-based message bus for agent communication."""
|
|
|
|
def __init__(self, redis: aioredis.Redis):
|
|
self.redis = redis
|
|
|
|
async def publish(self, message: AgentMessage) -> str:
|
|
"""Publish message to project stream."""
|
|
stream_key = f"project:{message.project_id}:agent_events"
|
|
|
|
message_id = await self.redis.xadd(
|
|
stream_key,
|
|
asdict(message),
|
|
maxlen=10000, # Keep last 10k messages
|
|
)
|
|
|
|
# Also publish to target-specific stream if targeted
|
|
if message.target_agent_id:
|
|
target_stream = f"agent:{message.target_agent_id}:inbox"
|
|
await self.redis.xadd(target_stream, asdict(message))
|
|
|
|
return message_id
|
|
|
|
async def subscribe(
|
|
self,
|
|
agent_id: str,
|
|
project_id: str,
|
|
) -> AsyncGenerator[AgentMessage, None]:
|
|
"""Subscribe to messages for an agent."""
|
|
streams = {
|
|
f"agent:{agent_id}:inbox": ">",
|
|
f"project:{project_id}:agent_events": ">",
|
|
}
|
|
|
|
while True:
|
|
messages = await self.redis.xread(
|
|
streams,
|
|
count=10,
|
|
block=5000, # 5 second timeout
|
|
)
|
|
|
|
for stream_name, stream_messages in messages:
|
|
for msg_id, msg_data in stream_messages:
|
|
yield AgentMessage(**msg_data)
|
|
|
|
async def handoff_task(
|
|
self,
|
|
source_agent_id: str,
|
|
target_agent_id: str,
|
|
project_id: str,
|
|
task: dict,
|
|
context: dict,
|
|
) -> None:
|
|
"""Hand off a task from one agent to another."""
|
|
message = AgentMessage(
|
|
id=str(uuid4()),
|
|
source_agent_id=source_agent_id,
|
|
target_agent_id=target_agent_id,
|
|
project_id=project_id,
|
|
message_type="TASK_HANDOFF",
|
|
payload={
|
|
"task": task,
|
|
"context": context,
|
|
"handoff_reason": "task_completion",
|
|
},
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
)
|
|
await self.publish(message)
|
|
```
|
|
|
|
### Agent Supervisor (Hierarchical Control)
|
|
|
|
```python
|
|
# app/agents/supervisor.py
|
|
from langgraph.graph import StateGraph, END
|
|
|
|
class SupervisorState(TypedDict):
|
|
"""State for supervisor agent managing a team."""
|
|
project_id: str
|
|
team_agents: list[str]
|
|
pending_tasks: list[dict]
|
|
in_progress: dict[str, dict] # agent_id -> task
|
|
completed_tasks: list[dict]
|
|
current_phase: str
|
|
|
|
def create_supervisor_graph(supervisor_type: str) -> StateGraph:
|
|
"""Create supervisor graph for team coordination."""
|
|
|
|
graph = StateGraph(SupervisorState)
|
|
|
|
graph.add_node("assess_state", assess_team_state)
|
|
graph.add_node("assign_tasks", assign_tasks_to_agents)
|
|
graph.add_node("monitor_progress", monitor_agent_progress)
|
|
graph.add_node("handle_completion", handle_task_completion)
|
|
graph.add_node("coordinate_handoff", coordinate_agent_handoff)
|
|
graph.add_node("escalate", escalate_to_human)
|
|
|
|
graph.set_entry_point("assess_state")
|
|
|
|
graph.add_conditional_edges(
|
|
"assess_state",
|
|
decide_supervisor_action,
|
|
{
|
|
"assign": "assign_tasks",
|
|
"monitor": "monitor_progress",
|
|
"coordinate": "coordinate_handoff",
|
|
"escalate": "escalate",
|
|
"complete": END,
|
|
}
|
|
)
|
|
|
|
# ... additional edges
|
|
|
|
return graph.compile()
|
|
|
|
async def assess_team_state(state: SupervisorState) -> SupervisorState:
|
|
"""Assess current state of all team agents."""
|
|
team_status = {}
|
|
|
|
for agent_id in state["team_agents"]:
|
|
agent = await get_agent_instance(agent_id)
|
|
team_status[agent_id] = {
|
|
"state": agent.state,
|
|
"current_task": agent.current_task_id,
|
|
"last_active": agent.last_active_at,
|
|
}
|
|
|
|
return {
|
|
**state,
|
|
"team_status": team_status,
|
|
}
|
|
|
|
async def assign_tasks_to_agents(state: SupervisorState) -> SupervisorState:
|
|
"""Assign pending tasks to available agents."""
|
|
available_agents = [
|
|
agent_id for agent_id, status in state["team_status"].items()
|
|
if status["state"] == AgentState.IDLE
|
|
]
|
|
|
|
assignments = []
|
|
for task in state["pending_tasks"]:
|
|
if not available_agents:
|
|
break
|
|
|
|
# Find best agent for task
|
|
best_agent = await select_best_agent(task, available_agents)
|
|
|
|
# Assign task
|
|
await message_bus.publish(AgentMessage(
|
|
id=str(uuid4()),
|
|
source_agent_id="supervisor",
|
|
target_agent_id=best_agent,
|
|
project_id=state["project_id"],
|
|
message_type="TASK_ASSIGNMENT",
|
|
payload={"task": task},
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
))
|
|
|
|
assignments.append((best_agent, task))
|
|
available_agents.remove(best_agent)
|
|
|
|
return {
|
|
**state,
|
|
"in_progress": {
|
|
**state["in_progress"],
|
|
**{agent: task for agent, task in assignments},
|
|
},
|
|
"pending_tasks": [
|
|
t for t in state["pending_tasks"]
|
|
if t not in [task for _, task in assignments]
|
|
],
|
|
}
|
|
```
|
|
|
|
### Temporal Workflow Integration
|
|
|
|
```python
|
|
# app/workflows/project_workflow.py
|
|
from temporalio import workflow, activity
|
|
from temporalio.common import RetryPolicy
|
|
from datetime import timedelta
|
|
|
|
@workflow.defn
|
|
class ProjectWorkflow:
|
|
"""Long-running workflow for project lifecycle."""
|
|
|
|
def __init__(self):
|
|
self._phase = ProjectPhase.DISCOVERY
|
|
self._sprints: list[SprintResult] = []
|
|
self._human_feedback_pending = False
|
|
|
|
@workflow.run
|
|
async def run(self, project: ProjectConfig) -> ProjectResult:
|
|
# Phase 1: Discovery (PO + BA brainstorm)
|
|
requirements = await workflow.execute_activity(
|
|
run_discovery_phase,
|
|
project,
|
|
start_to_close_timeout=timedelta(hours=8),
|
|
retry_policy=RetryPolicy(maximum_attempts=3),
|
|
)
|
|
|
|
# Phase 2: Architecture spike
|
|
architecture = await workflow.execute_activity(
|
|
run_architecture_spike,
|
|
requirements,
|
|
start_to_close_timeout=timedelta(hours=4),
|
|
)
|
|
|
|
# Checkpoint: Human review of architecture
|
|
if project.autonomy_level != AutonomyLevel.AUTONOMOUS:
|
|
self._human_feedback_pending = True
|
|
await workflow.wait_condition(
|
|
lambda: not self._human_feedback_pending
|
|
)
|
|
|
|
# Phase 3: Sprint execution (iterative)
|
|
while not self._is_complete():
|
|
sprint_result = await workflow.execute_child_workflow(
|
|
SprintWorkflow.run,
|
|
self._next_sprint_config(),
|
|
id=f"{project.id}-sprint-{len(self._sprints) + 1}",
|
|
)
|
|
self._sprints.append(sprint_result)
|
|
|
|
# Milestone checkpoint if configured
|
|
if project.autonomy_level == AutonomyLevel.MILESTONE:
|
|
self._human_feedback_pending = True
|
|
await workflow.wait_condition(
|
|
lambda: not self._human_feedback_pending
|
|
)
|
|
|
|
return ProjectResult(
|
|
project_id=project.id,
|
|
sprints=self._sprints,
|
|
status="completed",
|
|
)
|
|
|
|
@workflow.signal
|
|
async def provide_feedback(self, feedback: HumanFeedback):
|
|
"""Handle human feedback at checkpoints."""
|
|
self._last_feedback = feedback
|
|
self._human_feedback_pending = False
|
|
|
|
@workflow.query
|
|
def get_status(self) -> ProjectStatus:
|
|
return ProjectStatus(
|
|
phase=self._phase,
|
|
sprints_completed=len(self._sprints),
|
|
awaiting_feedback=self._human_feedback_pending,
|
|
)
|
|
```
|
|
|
|
### Token Usage Tracking Per Agent
|
|
|
|
```python
|
|
# app/services/agent_metrics.py
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.models.token_usage import TokenUsage
|
|
|
|
class AgentMetricsService:
|
|
"""Track token usage and costs per agent instance."""
|
|
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def record_completion(
|
|
self,
|
|
agent_instance_id: str,
|
|
project_id: str,
|
|
model: str,
|
|
prompt_tokens: int,
|
|
completion_tokens: int,
|
|
latency_ms: int,
|
|
) -> TokenUsage:
|
|
"""Record a completion for an agent instance."""
|
|
cost = self._calculate_cost(model, prompt_tokens, completion_tokens)
|
|
|
|
usage = TokenUsage(
|
|
agent_instance_id=agent_instance_id,
|
|
project_id=project_id,
|
|
model=model,
|
|
prompt_tokens=prompt_tokens,
|
|
completion_tokens=completion_tokens,
|
|
total_tokens=prompt_tokens + completion_tokens,
|
|
cost_usd=cost,
|
|
latency_ms=latency_ms,
|
|
timestamp=datetime.utcnow(),
|
|
)
|
|
|
|
self.db.add(usage)
|
|
|
|
# Update agent instance totals
|
|
agent = await self.db.get(AgentInstance, agent_instance_id)
|
|
agent.total_tokens_used += usage.total_tokens
|
|
agent.total_cost_usd += cost
|
|
agent.last_active_at = datetime.utcnow()
|
|
|
|
await self.db.commit()
|
|
return usage
|
|
|
|
async def get_agent_usage_summary(
|
|
self,
|
|
agent_instance_id: str,
|
|
period_days: int = 30,
|
|
) -> dict:
|
|
"""Get usage summary for an agent instance."""
|
|
cutoff = datetime.utcnow() - timedelta(days=period_days)
|
|
|
|
result = await self.db.execute(
|
|
select(
|
|
func.sum(TokenUsage.total_tokens).label("total_tokens"),
|
|
func.sum(TokenUsage.cost_usd).label("total_cost"),
|
|
func.count(TokenUsage.id).label("completion_count"),
|
|
func.avg(TokenUsage.latency_ms).label("avg_latency"),
|
|
)
|
|
.where(TokenUsage.agent_instance_id == agent_instance_id)
|
|
.where(TokenUsage.timestamp >= cutoff)
|
|
)
|
|
|
|
return dict(result.first())
|
|
|
|
async def get_project_agent_breakdown(
|
|
self,
|
|
project_id: str,
|
|
) -> list[dict]:
|
|
"""Get token usage breakdown by agent for a project."""
|
|
result = await self.db.execute(
|
|
select(
|
|
AgentInstance.id,
|
|
AgentInstance.name,
|
|
AgentType.role,
|
|
func.sum(TokenUsage.total_tokens).label("tokens"),
|
|
func.sum(TokenUsage.cost_usd).label("cost"),
|
|
)
|
|
.join(TokenUsage, TokenUsage.agent_instance_id == AgentInstance.id)
|
|
.join(AgentType, AgentType.id == AgentInstance.agent_type_id)
|
|
.where(AgentInstance.project_id == project_id)
|
|
.group_by(AgentInstance.id, AgentInstance.name, AgentType.role)
|
|
.order_by(func.sum(TokenUsage.cost_usd).desc())
|
|
)
|
|
|
|
return [dict(row) for row in result]
|
|
```
|
|
|
|
### Real-Time Agent Activity Visibility
|
|
|
|
```python
|
|
# app/services/realtime_updates.py
|
|
from fastapi import WebSocket
|
|
from redis import asyncio as aioredis
|
|
|
|
class AgentActivityStream:
|
|
"""Real-time agent activity updates via WebSocket/SSE."""
|
|
|
|
def __init__(self, redis: aioredis.Redis):
|
|
self.redis = redis
|
|
self.pubsub = redis.pubsub()
|
|
|
|
async def subscribe_project(
|
|
self,
|
|
websocket: WebSocket,
|
|
project_id: str,
|
|
):
|
|
"""Stream agent activities for a project."""
|
|
await websocket.accept()
|
|
|
|
channel = f"project:{project_id}:activities"
|
|
await self.pubsub.subscribe(channel)
|
|
|
|
try:
|
|
async for message in self.pubsub.listen():
|
|
if message["type"] == "message":
|
|
await websocket.send_json(json.loads(message["data"]))
|
|
except WebSocketDisconnect:
|
|
await self.pubsub.unsubscribe(channel)
|
|
|
|
async def publish_activity(
|
|
self,
|
|
project_id: str,
|
|
activity: AgentActivity,
|
|
):
|
|
"""Publish an agent activity event."""
|
|
channel = f"project:{project_id}:activities"
|
|
await self.redis.publish(channel, json.dumps({
|
|
"type": "agent_activity",
|
|
"agent_id": activity.agent_id,
|
|
"agent_name": activity.agent_name,
|
|
"agent_role": activity.agent_role,
|
|
"activity_type": activity.activity_type,
|
|
"description": activity.description,
|
|
"state": activity.state,
|
|
"timestamp": activity.timestamp.isoformat(),
|
|
"metadata": activity.metadata,
|
|
}))
|
|
|
|
# Activity types for real-time updates
|
|
class AgentActivityType(str, enum.Enum):
|
|
STATE_CHANGE = "state_change"
|
|
THINKING = "thinking"
|
|
TOOL_CALL = "tool_call"
|
|
MESSAGE_SENT = "message_sent"
|
|
ARTIFACT_CREATED = "artifact_created"
|
|
ERROR = "error"
|
|
TASK_STARTED = "task_started"
|
|
TASK_COMPLETED = "task_completed"
|
|
```
|
|
|
|
---
|
|
|
|
## Code Examples for Key Patterns
|
|
|
|
### Pattern 1: Spawning Multiple Agent Instances
|
|
|
|
```python
|
|
# app/services/agent_factory.py
|
|
class AgentFactory:
|
|
"""Factory for creating agent instances from types."""
|
|
|
|
DEVELOPER_NAMES = ["Dave", "Ellis", "Kate", "Marcus", "Nina"]
|
|
QA_NAMES = ["Quinn", "Raja", "Sierra", "Tyler", "Uma"]
|
|
|
|
async def spawn_agent_team(
|
|
self,
|
|
project_id: str,
|
|
team_config: TeamConfig,
|
|
) -> list[AgentInstance]:
|
|
"""Spawn a team of agents for a project."""
|
|
agents = []
|
|
name_counters = {}
|
|
|
|
for agent_spec in team_config.agents:
|
|
agent_type = await self.db.get(AgentType, agent_spec.type_id)
|
|
|
|
# Get next available name for this type
|
|
name = self._get_next_name(agent_type.role, name_counters)
|
|
|
|
agent = AgentInstance(
|
|
id=uuid4(),
|
|
name=name,
|
|
agent_type_id=agent_type.id,
|
|
project_id=project_id,
|
|
state=AgentState.IDLE,
|
|
working_memory={
|
|
"project_context": {},
|
|
"conversation_history": [],
|
|
"current_focus": None,
|
|
},
|
|
)
|
|
|
|
self.db.add(agent)
|
|
agents.append(agent)
|
|
|
|
await self.db.commit()
|
|
|
|
# Initialize agent graphs
|
|
for agent in agents:
|
|
await self._initialize_agent_graph(agent)
|
|
|
|
return agents
|
|
|
|
def _get_next_name(self, role: str, counters: dict) -> str:
|
|
"""Get next name for an agent role."""
|
|
names = {
|
|
"Software Engineer": self.DEVELOPER_NAMES,
|
|
"QA Engineer": self.QA_NAMES,
|
|
# ... other roles
|
|
}.get(role, ["Agent"])
|
|
|
|
idx = counters.get(role, 0)
|
|
counters[role] = idx + 1
|
|
|
|
if idx < len(names):
|
|
return names[idx]
|
|
return f"{names[0]}-{idx + 1}"
|
|
```
|
|
|
|
### Pattern 2: LLM Failover Per Agent Type
|
|
|
|
```python
|
|
# app/agents/llm_config.py
|
|
from litellm import Router
|
|
|
|
class AgentLLMConfig:
|
|
"""Per-agent-type LLM configuration with failover."""
|
|
|
|
# High-stakes agents get premium models with reliable fallbacks
|
|
HIGH_REASONING_CHAIN = [
|
|
{"model": "claude-sonnet-4-20250514", "provider": "anthropic"},
|
|
{"model": "gpt-4-turbo", "provider": "openai"},
|
|
{"model": "claude-3-5-sonnet-20241022", "provider": "anthropic"},
|
|
]
|
|
|
|
# Fast agents get quick models
|
|
FAST_RESPONSE_CHAIN = [
|
|
{"model": "claude-3-haiku-20240307", "provider": "anthropic"},
|
|
{"model": "gpt-4o-mini", "provider": "openai"},
|
|
]
|
|
|
|
# Cost-optimized for high-volume tasks
|
|
COST_OPTIMIZED_CHAIN = [
|
|
{"model": "ollama/llama3", "provider": "ollama"},
|
|
{"model": "gpt-4o-mini", "provider": "openai"},
|
|
]
|
|
|
|
AGENT_TYPE_MAPPING = {
|
|
"Product Owner": HIGH_REASONING_CHAIN,
|
|
"Software Architect": HIGH_REASONING_CHAIN,
|
|
"Software Engineer": HIGH_REASONING_CHAIN,
|
|
"Business Analyst": HIGH_REASONING_CHAIN,
|
|
"QA Engineer": FAST_RESPONSE_CHAIN,
|
|
"Project Manager": FAST_RESPONSE_CHAIN,
|
|
"DevOps Engineer": FAST_RESPONSE_CHAIN,
|
|
}
|
|
|
|
def build_router_for_agent(self, agent_type: str) -> Router:
|
|
"""Build LiteLLM router with failover for agent type."""
|
|
chain = self.AGENT_TYPE_MAPPING.get(agent_type, self.FAST_RESPONSE_CHAIN)
|
|
|
|
model_list = []
|
|
for i, model_config in enumerate(chain):
|
|
model_list.append({
|
|
"model_name": f"agent-{agent_type.lower().replace(' ', '-')}",
|
|
"litellm_params": {
|
|
"model": model_config["model"],
|
|
"api_key": self._get_api_key(model_config["provider"]),
|
|
},
|
|
"model_info": {"id": i, "priority": i},
|
|
})
|
|
|
|
return Router(
|
|
model_list=model_list,
|
|
routing_strategy="simple-shuffle",
|
|
num_retries=3,
|
|
timeout=120,
|
|
fallbacks=[
|
|
{f"agent-{agent_type.lower().replace(' ', '-')}":
|
|
[f"agent-{agent_type.lower().replace(' ', '-')}"]}
|
|
],
|
|
)
|
|
```
|
|
|
|
### Pattern 3: Agent Context Isolation
|
|
|
|
```python
|
|
# app/agents/context_manager.py
|
|
class AgentContextManager:
|
|
"""Manage individual agent context/memory."""
|
|
|
|
def __init__(self, redis: aioredis.Redis, db: AsyncSession):
|
|
self.redis = redis
|
|
self.db = db
|
|
|
|
async def get_agent_context(
|
|
self,
|
|
agent_id: str,
|
|
max_messages: int = 50,
|
|
) -> AgentContext:
|
|
"""Get full context for an agent."""
|
|
agent = await self.db.get(AgentInstance, agent_id)
|
|
|
|
# Get recent conversation history
|
|
conversations = await self._get_recent_conversations(
|
|
agent_id, max_messages
|
|
)
|
|
|
|
# Get project context
|
|
project_context = await self._get_project_context(agent.project_id)
|
|
|
|
# Get working memory from Redis (fast access)
|
|
working_memory = await self.redis.hgetall(
|
|
f"agent:{agent_id}:memory"
|
|
)
|
|
|
|
return AgentContext(
|
|
agent=agent,
|
|
system_prompt=self._build_system_prompt(agent),
|
|
conversation_history=conversations,
|
|
project_context=project_context,
|
|
working_memory=working_memory,
|
|
tools=await self._get_available_tools(agent),
|
|
)
|
|
|
|
def _build_system_prompt(self, agent: AgentInstance) -> str:
|
|
"""Build personalized system prompt for agent."""
|
|
base_prompt = agent.agent_type.system_prompt
|
|
|
|
return f"""
|
|
{base_prompt}
|
|
|
|
You are {agent.name}, a {agent.agent_type.role} working on project {agent.project.name}.
|
|
|
|
Your personality traits:
|
|
{agent.agent_type.personality_traits}
|
|
|
|
Current focus areas:
|
|
{json.dumps(agent.working_memory.get('current_focus', []))}
|
|
|
|
Remember: You have your own perspective and expertise. Collaborate with other agents
|
|
but bring your unique viewpoint to discussions.
|
|
"""
|
|
|
|
async def update_working_memory(
|
|
self,
|
|
agent_id: str,
|
|
key: str,
|
|
value: Any,
|
|
ttl: int = 3600,
|
|
):
|
|
"""Update agent's working memory."""
|
|
await self.redis.hset(
|
|
f"agent:{agent_id}:memory",
|
|
key,
|
|
json.dumps(value),
|
|
)
|
|
await self.redis.expire(f"agent:{agent_id}:memory", ttl)
|
|
```
|
|
|
|
---
|
|
|
|
## Risks and Mitigations
|
|
|
|
| Risk | Impact | Probability | Mitigation |
|
|
|------|--------|-------------|------------|
|
|
| **Temporal complexity** | High | Medium | Start with simple workflows, invest in team training |
|
|
| **LangGraph learning curve** | Medium | High | Create abstractions, use simpler patterns initially |
|
|
| **Agent coordination deadlocks** | High | Medium | Implement timeouts, circuit breakers, supervisor oversight |
|
|
| **Token cost explosion** | High | Medium | Budget caps, usage monitoring, caching strategies |
|
|
| **State consistency issues** | High | Low | Event sourcing, Temporal durability, comprehensive testing |
|
|
| **Debugging complexity** | Medium | High | Invest in observability (LangSmith, Temporal UI) |
|
|
| **Vendor lock-in (LangGraph)** | Medium | Low | Abstract core patterns, LangGraph is open source |
|
|
|
|
### Mitigation Strategies
|
|
|
|
**1. Complexity Management:**
|
|
- Start with 2-3 agent types, expand gradually
|
|
- Build comprehensive test harnesses
|
|
- Document all state transitions and message types
|
|
|
|
**2. Cost Control:**
|
|
```python
|
|
class BudgetGuard:
|
|
async def check_before_completion(
|
|
self,
|
|
project_id: str,
|
|
estimated_tokens: int,
|
|
) -> bool:
|
|
project = await self.db.get(Project, project_id)
|
|
current_spend = await self.get_project_spend(project_id)
|
|
estimated_cost = self._estimate_cost(estimated_tokens)
|
|
|
|
if current_spend + estimated_cost > project.budget_limit:
|
|
await self.notify_budget_exceeded(project_id)
|
|
return False
|
|
return True
|
|
```
|
|
|
|
**3. Deadlock Prevention:**
|
|
```python
|
|
class AgentTimeoutGuard:
|
|
THINKING_TIMEOUT = 300 # 5 minutes
|
|
EXECUTION_TIMEOUT = 600 # 10 minutes
|
|
BLOCKED_TIMEOUT = 1800 # 30 minutes
|
|
|
|
async def monitor_agent(self, agent_id: str):
|
|
agent = await self.db.get(AgentInstance, agent_id)
|
|
timeout = self._get_timeout(agent.state)
|
|
|
|
if agent.last_active_at < datetime.utcnow() - timedelta(seconds=timeout):
|
|
await self.handle_timeout(agent)
|
|
```
|
|
|
|
---
|
|
|
|
## Implementation Roadmap
|
|
|
|
### Phase 1: Foundation (Weeks 1-2)
|
|
- [ ] Set up Temporal server and workers
|
|
- [ ] Implement basic LangGraph agent state machine
|
|
- [ ] Create AgentInstance model and database schema
|
|
- [ ] Implement Redis Streams message bus
|
|
|
|
### Phase 2: Core Orchestration (Weeks 3-4)
|
|
- [ ] Implement supervisor pattern with Project Manager
|
|
- [ ] Build agent spawning and team creation
|
|
- [ ] Add token tracking per agent instance
|
|
- [ ] Create basic inter-agent communication
|
|
|
|
### Phase 3: Durability (Weeks 5-6)
|
|
- [ ] Implement Temporal workflows for projects and sprints
|
|
- [ ] Add checkpoint and recovery mechanisms
|
|
- [ ] Build human-in-the-loop approval flows
|
|
- [ ] Create agent context persistence
|
|
|
|
### Phase 4: Observability (Week 7)
|
|
- [ ] Implement real-time activity streaming
|
|
- [ ] Add comprehensive logging and tracing
|
|
- [ ] Build cost monitoring dashboards
|
|
- [ ] Create agent performance analytics
|
|
|
|
---
|
|
|
|
## References
|
|
|
|
### Frameworks and Tools
|
|
- [AutoGen 0.4 Documentation](https://github.com/microsoft/autogen)
|
|
- [CrewAI Documentation](https://docs.crewai.com/)
|
|
- [LangGraph Documentation](https://www.langchain.com/langgraph)
|
|
- [Temporal.io Documentation](https://temporal.io/)
|
|
- [LiteLLM Documentation](https://docs.litellm.ai/)
|
|
|
|
### Research and Articles
|
|
- [Microsoft AI Agent Design Patterns](https://learn.microsoft.com/en-us/azure/architecture/ai-ml/guide/ai-agent-design-patterns)
|
|
- [Four Design Patterns for Event-Driven Multi-Agent Systems (Confluent)](https://www.confluent.io/blog/event-driven-multi-agent-systems/)
|
|
- [Google A2A Protocol](https://arxiv.org/html/2505.02279v2)
|
|
- [Temporal for AI Agents](https://temporal.io/blog/durable-execution-meets-ai-why-temporal-is-the-perfect-foundation-for-ai)
|
|
- [LangFuse Token Tracking](https://langfuse.com/docs/observability/features/token-and-cost-tracking)
|
|
- [Portkey LLM Failover Patterns](https://portkey.ai/blog/failover-routing-strategies-for-llms-in-production/)
|
|
|
|
### Related Syndarix Documents
|
|
- [SPIKE-001: MCP Integration Pattern](./SPIKE-001-mcp-integration-pattern.md)
|
|
- [SPIKE-005: LLM Provider Abstraction](./SPIKE-005-llm-provider-abstraction.md)
|
|
|
|
---
|
|
|
|
## Decision
|
|
|
|
**Adopt a hybrid architecture** combining:
|
|
|
|
1. **LangGraph** for agent state machines and logic (graph-based, stateful)
|
|
2. **Temporal** for durable, long-running workflow orchestration
|
|
3. **Redis Streams** for event-driven agent-to-agent communication
|
|
4. **Hierarchical supervisor pattern** with Project Manager coordinating teams
|
|
5. **LiteLLM** (per SPIKE-005) for unified LLM access with failover
|
|
|
|
This provides the flexibility, durability, and scalability required for Syndarix's 50+ agent autonomous consulting platform while maintaining individual agent context, comprehensive token tracking, and real-time visibility.
|
|
|
|
---
|
|
|
|
*Spike completed. Findings will inform ADR-002: Agent Orchestration Architecture.*
|