Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
49 KiB
SPIKE-002: Agent Orchestration Pattern
Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #2
Executive Summary
After researching leading multi-agent orchestration frameworks (AutoGen, CrewAI, LangGraph) and enterprise patterns, we recommend a hybrid architecture for Syndarix that combines:
- LangGraph for core agent orchestration logic (graph-based state machines)
- Temporal for durable, long-running workflow execution
- Event-driven communication via Redis Streams with A2A protocol concepts
- Hierarchical supervisor pattern with peer-to-peer collaboration within teams
This architecture provides the flexibility, scalability, and durability required for 50+ concurrent agent instances while maintaining individual agent context, token tracking, and LLM failover capabilities.
Key Recommendation
Do not adopt a single framework wholesale. Instead, build a custom orchestration layer using:
- LangGraph for agent logic and state transitions (graph primitives)
- Temporal for durable execution and workflow persistence
- Redis Streams for event-driven inter-agent communication
- LiteLLM for unified LLM access with failover (per SPIKE-005)
Research Questions Addressed
1. What are the leading patterns for multi-agent orchestration in 2024-2025?
The multi-agent landscape has matured significantly. Key patterns include:
| Pattern | Description | Best For |
|---|---|---|
| Supervisor | Central orchestrator coordinates specialized agents | Complex multi-domain workflows |
| Hierarchical | Tree structure with managers and workers | Large-scale, layered problems |
| Peer-to-Peer | Agents collaborate as equals without central control | Open-ended ideation, negotiation |
| Orchestrator-Worker | Planner breaks tasks, workers execute | Deterministic pipelines |
| Blackboard | Shared knowledge space, agents react to changes | Incremental problem-solving |
| Market-Based | Agents bid for tasks based on capabilities | Dynamic resource allocation |
Industry Trend (2025): Event-driven architectures are becoming dominant, with protocols like Google's A2A (Agent-to-Agent), IBM's ACP, and AG-UI standardizing communication.
2. Framework Comparison
| Feature | AutoGen 0.4 | CrewAI | LangGraph | Custom |
|---|---|---|---|---|
| Architecture | Event-driven, distributed | Crews + Flows dual-mode | Graph-based state machine | Flexible |
| State Management | Built-in, cross-language | Shared memory | Persistent, checkpointed | Custom (Temporal) |
| Scalability | High (Kubernetes-ready) | Medium-High | High | Very High |
| Learning Curve | Medium | Low-Medium | High | High |
| Enterprise Adoption | Microsoft ecosystem | 60% Fortune 500 | Klarna, Replit, Elastic | N/A |
| Long-running Workflows | Good | Limited | Good | Excellent (Temporal) |
| Customization | Medium | Limited | High | Full |
| Token Tracking | Basic | Basic | Via LangSmith | Custom |
| Multi-Provider Failover | Limited | Limited | Limited | Full (LiteLLM) |
| 50+ Concurrent Agents | Possible | Challenging | Possible | Designed for |
AutoGen 0.4 (Microsoft)
Pros:
- Event-driven, async-first architecture
- Cross-language support (.NET, Python)
- Built-in observability (OpenTelemetry)
- Strong Microsoft ecosystem integration
- New Agent Framework combines AutoGen + Semantic Kernel
Cons:
- Tied to Microsoft patterns
- Less flexible for custom orchestration
- Newer 0.4 version still maturing
Best For: Microsoft-centric enterprises, standardized agent patterns
CrewAI
Pros:
- Easy to get started (role-based agents)
- Good for sequential/hierarchical workflows
- Strong enterprise traction ($18M Series A)
- LLM-agnostic design
Cons:
- Limited ceiling for complex patterns
- Teams report hitting walls at 6-12 months
- Multi-agent complexity can cause loops
- Flows architecture adds learning curve
Best For: Quick prototypes, straightforward workflows, teams new to agents
LangGraph
Pros:
- Fine-grained control over agent flow
- Excellent state management with persistence
- Time-travel debugging (LangSmith)
- Human-in-the-loop built-in
- Cycles, conditionals, parallel execution
Cons:
- Steep learning curve (graph theory, state machines)
- Requires distributed systems knowledge
- Observability requires LangSmith subscription
Best For: Complex, stateful workflows requiring precise control
3. Enterprise Agent State Management
Enterprise systems use these patterns:
Event Sourcing:
# All state changes stored as events
class AgentStateEvent:
event_type: str # "TASK_ASSIGNED", "STATE_CHANGED", etc.
agent_id: str
timestamp: datetime
data: dict
sequence_number: int
Checkpoint/Snapshot Pattern:
# Periodic snapshots for fast recovery
class AgentCheckpoint:
agent_id: str
state: AgentState
memory: dict
context_window: list[Message]
checkpoint_time: datetime
Temporal Durable Execution:
# Workflow state is automatically persistent
@workflow.defn
class AgentWorkflow:
@workflow.run
async def run(self, task: Task) -> Result:
# State survives crashes, restarts, deployments
result = await workflow.execute_activity(
execute_agent_task,
task,
start_to_close_timeout=timedelta(hours=24)
)
return result
4. Agent-to-Agent Communication
Recommended: Event-Driven with Redis Streams
┌─────────────────────────────────────────────────────────────┐
│ Redis Streams │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ project:{project_id}:events │ │
│ │ - agent_message │ │
│ │ - task_assignment │ │
│ │ - state_change │ │
│ │ - artifact_produced │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
▲ ▲ ▲ ▲
│ │ │ │
┌────┴───┐ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐
│ PO │ │ Arch │ │ Dev-1 │ │ Dev-2 │
│ Agent │ │ Agent │ │ Agent │ │ Agent │
└────────┘ └────────┘ └────────┘ └────────┘
Protocol Design (A2A-inspired):
@dataclass
class AgentMessage:
"""Agent-to-Agent message following A2A concepts."""
id: UUID
source_agent_id: str
target_agent_id: str | None # None = broadcast
project_id: str
message_type: Literal[
"TASK_HANDOFF",
"CONTEXT_SHARE",
"REVIEW_REQUEST",
"FEEDBACK",
"ARTIFACT",
"QUERY",
]
payload: dict
correlation_id: UUID | None # For request/response
timestamp: datetime
@dataclass
class AgentCard:
"""Agent capability advertisement (A2A concept)."""
agent_id: str
agent_type: str
capabilities: list[str]
current_state: AgentState
project_id: str
5. Long-Running Agent Workflows
Recommended: Temporal for Durable Execution
Temporal provides:
- Automatic state persistence - survives crashes and restarts
- Built-in retries with exponential backoff
- Long-running support - hours, days, even months
- Human-in-the-loop - pause and wait for approval
- Visibility - full execution history
# workflows/sprint_workflow.py
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class SprintWorkflow:
"""Durable workflow for autonomous sprint execution."""
def __init__(self):
self._state = SprintState.PLANNING
self._agents: dict[str, AgentHandle] = {}
self._artifacts: list[Artifact] = []
@workflow.run
async def run(self, sprint: SprintConfig) -> SprintResult:
# Phase 1: Planning (may take hours)
backlog = await workflow.execute_activity(
plan_sprint,
sprint,
start_to_close_timeout=timedelta(hours=4),
)
# Phase 2: Parallel development (may take days)
dev_tasks = await self._execute_development(backlog)
# Phase 3: Review checkpoint (human approval)
if sprint.autonomy_level != AutonomyLevel.AUTONOMOUS:
await workflow.wait_condition(
lambda: self._human_approved
)
# Phase 4: QA and finalization
result = await workflow.execute_activity(
finalize_sprint,
dev_tasks,
start_to_close_timeout=timedelta(hours=2),
)
return result
@workflow.signal
async def approve_checkpoint(self, approved: bool):
self._human_approved = approved
@workflow.query
def get_state(self) -> SprintState:
return self._state
6. Orchestration Topology Comparison
Supervisor Pattern
┌─────────────────┐
│ Supervisor │
│ (Orchestrator)│
└────────┬────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Specialist 1 │ │ Specialist 2 │ │ Specialist 3 │
│ (Dev Agent) │ │ (QA Agent) │ │ (DevOps) │
└──────────────┘ └──────────────┘ └──────────────┘
Pros: Clear control, auditability, easier debugging Cons: Bottleneck at supervisor, single point of failure Syndarix Use: Project Manager as supervisor for sprints
Hierarchical Pattern
┌────────────────┐
│ Project Lead │
│ (PO) │
└───────┬────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Dev Lead │ │ QA Lead │ │ Ops Lead │
│ (Architect)│ │ │ │ │
└─────┬──────┘ └─────┬──────┘ └────────────┘
│ │
┌────────┼────────┐ ┌────┴────┐
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│ Dave │ │ Ellis│ │ Kate │ │ QA-1 │ │ QA-2 │
└──────┘ └──────┘ └──────┘ └──────┘ └──────┘
Pros: Scalable, localized decision-making, mirrors real teams Cons: Communication overhead, coordination complexity Syndarix Use: Recommended primary pattern
Peer-to-Peer Pattern
┌──────┐ ┌──────┐
│ Dave │◄───►│ Ellis│
└──┬───┘ └──┬───┘
│ │
│ ┌──────┐ │
└──►│ Kate │◄┘
└──┬───┘
│
▼
[Shared Blackboard]
Pros: Flexible, emergent collaboration, no bottleneck Cons: Harder to control, potential infinite loops Syndarix Use: Within teams for brainstorming/spikes
Proposed Architecture for Syndarix
High-Level Design
┌───────────────────────────────────────────────────────────────────────┐
│ Syndarix Orchestration Layer │
├───────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Temporal Workflow Engine │ │
│ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │
│ │ │ Project │ │ Sprint │ │ Agent Task │ │ │
│ │ │ Workflow │ │ Workflow │ │ Workflow │ │ │
│ │ └───────────────┘ └───────────────┘ └───────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Agent Runtime (LangGraph) │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ Agent State Graph │ │ │
│ │ │ [IDLE] ──► [THINKING] ──► [EXECUTING] ──► [BLOCKED] │ │ │
│ │ │ ▲ │ │ │ │ │ │
│ │ │ └────────────┴──────────────┴──────────────┘ │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Agent │ │ Agent │ │ Agent │ │ Agent │ │ │
│ │ │ Instance │ │ Instance │ │ Instance │ │ Instance │ │ │
│ │ │ (PO) │ │ (Arch) │ │ (Dev-1) │ │ (Dev-2) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Communication Layer │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ Redis Streams │ │ WebSocket/SSE │ │ Event Bus │ │ │
│ │ │ (Agent-Agent) │ │ (Agent-UI) │ │ (System) │ │ │
│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Infrastructure Layer │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ LiteLLM │ │ MCP │ │ PostgreSQL │ │ │
│ │ │ (Gateway) │ │ (Servers) │ │ (pgvector) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└───────────────────────────────────────────────────────────────────────┘
Agent Instance Model
# app/models/agent_instance.py
from sqlalchemy import Column, String, Enum, ForeignKey, JSON, DateTime
from sqlalchemy.dialects.postgresql import UUID
from app.db.base import Base
import enum
class AgentState(str, enum.Enum):
IDLE = "idle"
THINKING = "thinking"
EXECUTING = "executing"
WAITING_INPUT = "waiting_input"
WAITING_REVIEW = "waiting_review"
BLOCKED = "blocked"
COMPLETED = "completed"
ERROR = "error"
class AgentInstance(Base):
"""Individual agent instance spawned from a type."""
__tablename__ = "agent_instances"
id = Column(UUID, primary_key=True)
name = Column(String(50)) # "Dave", "Ellis", "Kate"
# Type relationship
agent_type_id = Column(UUID, ForeignKey("agent_types.id"))
# Project assignment
project_id = Column(UUID, ForeignKey("projects.id"))
# Current state
state = Column(Enum(AgentState), default=AgentState.IDLE)
current_task_id = Column(UUID, ForeignKey("tasks.id"), nullable=True)
# Context/Memory
working_memory = Column(JSON, default=dict) # Short-term context
system_prompt_override = Column(String, nullable=True)
# Token/Cost tracking (per instance)
total_tokens_used = Column(Integer, default=0)
total_cost_usd = Column(Float, default=0.0)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
last_active_at = Column(DateTime, nullable=True)
# Relationships
agent_type = relationship("AgentType", back_populates="instances")
project = relationship("Project", back_populates="agents")
conversations = relationship("Conversation", back_populates="agent")
Agent State Machine (LangGraph)
# app/agents/state_machine.py
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict, Annotated, Literal
from operator import add
class AgentGraphState(TypedDict):
"""State shared across agent graph nodes."""
agent_id: str
project_id: str
task: dict | None
messages: Annotated[list, add] # Append-only
artifacts: Annotated[list, add]
current_state: AgentState
iteration_count: int
error: str | None
def create_agent_graph(agent_type: AgentType) -> StateGraph:
"""Create a LangGraph state machine for an agent type."""
graph = StateGraph(AgentGraphState)
# Define nodes
graph.add_node("think", think_node)
graph.add_node("execute", execute_node)
graph.add_node("review", review_node)
graph.add_node("handoff", handoff_node)
graph.add_node("error_handler", error_node)
# Define edges
graph.set_entry_point("think")
graph.add_conditional_edges(
"think",
route_after_think,
{
"execute": "execute",
"handoff": "handoff",
"review": "review",
"end": END,
}
)
graph.add_conditional_edges(
"execute",
route_after_execute,
{
"think": "think", # Continue reasoning
"review": "review",
"error": "error_handler",
}
)
graph.add_edge("review", "think")
graph.add_edge("error_handler", END)
graph.add_edge("handoff", END)
return graph.compile(
checkpointer=PostgresSaver.from_conn_string(settings.DATABASE_URL)
)
async def think_node(state: AgentGraphState) -> AgentGraphState:
"""Agent reasoning/planning node."""
agent = await get_agent_instance(state["agent_id"])
# Get LLM response
response = await llm_gateway.complete(
agent_id=state["agent_id"],
project_id=state["project_id"],
messages=build_messages(agent, state),
model_preference=agent.agent_type.model_preference,
)
# Parse response for next action
action = parse_agent_action(response["content"])
return {
**state,
"messages": [{"role": "assistant", "content": response["content"]}],
"current_action": action,
"iteration_count": state["iteration_count"] + 1,
}
async def execute_node(state: AgentGraphState) -> AgentGraphState:
"""Execute agent action (tool calls, artifact creation)."""
action = state.get("current_action")
if action["type"] == "tool_call":
result = await mcp_client.call_tool(
server=action["server"],
tool_name=action["tool"],
arguments={
"project_id": state["project_id"],
"agent_id": state["agent_id"],
**action["arguments"],
}
)
return {
**state,
"messages": [{"role": "tool", "content": str(result)}],
}
elif action["type"] == "produce_artifact":
artifact = await create_artifact(action["artifact"])
return {
**state,
"artifacts": [artifact],
}
return state
Inter-Agent Communication
# app/agents/communication.py
from redis import asyncio as aioredis
from dataclasses import dataclass, asdict
import json
@dataclass
class AgentMessage:
id: str
source_agent_id: str
target_agent_id: str | None
project_id: str
message_type: str
payload: dict
timestamp: str
correlation_id: str | None = None
class AgentMessageBus:
"""Redis Streams-based message bus for agent communication."""
def __init__(self, redis: aioredis.Redis):
self.redis = redis
async def publish(self, message: AgentMessage) -> str:
"""Publish message to project stream."""
stream_key = f"project:{message.project_id}:agent_events"
message_id = await self.redis.xadd(
stream_key,
asdict(message),
maxlen=10000, # Keep last 10k messages
)
# Also publish to target-specific stream if targeted
if message.target_agent_id:
target_stream = f"agent:{message.target_agent_id}:inbox"
await self.redis.xadd(target_stream, asdict(message))
return message_id
async def subscribe(
self,
agent_id: str,
project_id: str,
) -> AsyncGenerator[AgentMessage, None]:
"""Subscribe to messages for an agent."""
streams = {
f"agent:{agent_id}:inbox": ">",
f"project:{project_id}:agent_events": ">",
}
while True:
messages = await self.redis.xread(
streams,
count=10,
block=5000, # 5 second timeout
)
for stream_name, stream_messages in messages:
for msg_id, msg_data in stream_messages:
yield AgentMessage(**msg_data)
async def handoff_task(
self,
source_agent_id: str,
target_agent_id: str,
project_id: str,
task: dict,
context: dict,
) -> None:
"""Hand off a task from one agent to another."""
message = AgentMessage(
id=str(uuid4()),
source_agent_id=source_agent_id,
target_agent_id=target_agent_id,
project_id=project_id,
message_type="TASK_HANDOFF",
payload={
"task": task,
"context": context,
"handoff_reason": "task_completion",
},
timestamp=datetime.utcnow().isoformat(),
)
await self.publish(message)
Agent Supervisor (Hierarchical Control)
# app/agents/supervisor.py
from langgraph.graph import StateGraph, END
class SupervisorState(TypedDict):
"""State for supervisor agent managing a team."""
project_id: str
team_agents: list[str]
pending_tasks: list[dict]
in_progress: dict[str, dict] # agent_id -> task
completed_tasks: list[dict]
current_phase: str
def create_supervisor_graph(supervisor_type: str) -> StateGraph:
"""Create supervisor graph for team coordination."""
graph = StateGraph(SupervisorState)
graph.add_node("assess_state", assess_team_state)
graph.add_node("assign_tasks", assign_tasks_to_agents)
graph.add_node("monitor_progress", monitor_agent_progress)
graph.add_node("handle_completion", handle_task_completion)
graph.add_node("coordinate_handoff", coordinate_agent_handoff)
graph.add_node("escalate", escalate_to_human)
graph.set_entry_point("assess_state")
graph.add_conditional_edges(
"assess_state",
decide_supervisor_action,
{
"assign": "assign_tasks",
"monitor": "monitor_progress",
"coordinate": "coordinate_handoff",
"escalate": "escalate",
"complete": END,
}
)
# ... additional edges
return graph.compile()
async def assess_team_state(state: SupervisorState) -> SupervisorState:
"""Assess current state of all team agents."""
team_status = {}
for agent_id in state["team_agents"]:
agent = await get_agent_instance(agent_id)
team_status[agent_id] = {
"state": agent.state,
"current_task": agent.current_task_id,
"last_active": agent.last_active_at,
}
return {
**state,
"team_status": team_status,
}
async def assign_tasks_to_agents(state: SupervisorState) -> SupervisorState:
"""Assign pending tasks to available agents."""
available_agents = [
agent_id for agent_id, status in state["team_status"].items()
if status["state"] == AgentState.IDLE
]
assignments = []
for task in state["pending_tasks"]:
if not available_agents:
break
# Find best agent for task
best_agent = await select_best_agent(task, available_agents)
# Assign task
await message_bus.publish(AgentMessage(
id=str(uuid4()),
source_agent_id="supervisor",
target_agent_id=best_agent,
project_id=state["project_id"],
message_type="TASK_ASSIGNMENT",
payload={"task": task},
timestamp=datetime.utcnow().isoformat(),
))
assignments.append((best_agent, task))
available_agents.remove(best_agent)
return {
**state,
"in_progress": {
**state["in_progress"],
**{agent: task for agent, task in assignments},
},
"pending_tasks": [
t for t in state["pending_tasks"]
if t not in [task for _, task in assignments]
],
}
Temporal Workflow Integration
# app/workflows/project_workflow.py
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
from datetime import timedelta
@workflow.defn
class ProjectWorkflow:
"""Long-running workflow for project lifecycle."""
def __init__(self):
self._phase = ProjectPhase.DISCOVERY
self._sprints: list[SprintResult] = []
self._human_feedback_pending = False
@workflow.run
async def run(self, project: ProjectConfig) -> ProjectResult:
# Phase 1: Discovery (PO + BA brainstorm)
requirements = await workflow.execute_activity(
run_discovery_phase,
project,
start_to_close_timeout=timedelta(hours=8),
retry_policy=RetryPolicy(maximum_attempts=3),
)
# Phase 2: Architecture spike
architecture = await workflow.execute_activity(
run_architecture_spike,
requirements,
start_to_close_timeout=timedelta(hours=4),
)
# Checkpoint: Human review of architecture
if project.autonomy_level != AutonomyLevel.AUTONOMOUS:
self._human_feedback_pending = True
await workflow.wait_condition(
lambda: not self._human_feedback_pending
)
# Phase 3: Sprint execution (iterative)
while not self._is_complete():
sprint_result = await workflow.execute_child_workflow(
SprintWorkflow.run,
self._next_sprint_config(),
id=f"{project.id}-sprint-{len(self._sprints) + 1}",
)
self._sprints.append(sprint_result)
# Milestone checkpoint if configured
if project.autonomy_level == AutonomyLevel.MILESTONE:
self._human_feedback_pending = True
await workflow.wait_condition(
lambda: not self._human_feedback_pending
)
return ProjectResult(
project_id=project.id,
sprints=self._sprints,
status="completed",
)
@workflow.signal
async def provide_feedback(self, feedback: HumanFeedback):
"""Handle human feedback at checkpoints."""
self._last_feedback = feedback
self._human_feedback_pending = False
@workflow.query
def get_status(self) -> ProjectStatus:
return ProjectStatus(
phase=self._phase,
sprints_completed=len(self._sprints),
awaiting_feedback=self._human_feedback_pending,
)
Token Usage Tracking Per Agent
# app/services/agent_metrics.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.token_usage import TokenUsage
class AgentMetricsService:
"""Track token usage and costs per agent instance."""
def __init__(self, db: AsyncSession):
self.db = db
async def record_completion(
self,
agent_instance_id: str,
project_id: str,
model: str,
prompt_tokens: int,
completion_tokens: int,
latency_ms: int,
) -> TokenUsage:
"""Record a completion for an agent instance."""
cost = self._calculate_cost(model, prompt_tokens, completion_tokens)
usage = TokenUsage(
agent_instance_id=agent_instance_id,
project_id=project_id,
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
cost_usd=cost,
latency_ms=latency_ms,
timestamp=datetime.utcnow(),
)
self.db.add(usage)
# Update agent instance totals
agent = await self.db.get(AgentInstance, agent_instance_id)
agent.total_tokens_used += usage.total_tokens
agent.total_cost_usd += cost
agent.last_active_at = datetime.utcnow()
await self.db.commit()
return usage
async def get_agent_usage_summary(
self,
agent_instance_id: str,
period_days: int = 30,
) -> dict:
"""Get usage summary for an agent instance."""
cutoff = datetime.utcnow() - timedelta(days=period_days)
result = await self.db.execute(
select(
func.sum(TokenUsage.total_tokens).label("total_tokens"),
func.sum(TokenUsage.cost_usd).label("total_cost"),
func.count(TokenUsage.id).label("completion_count"),
func.avg(TokenUsage.latency_ms).label("avg_latency"),
)
.where(TokenUsage.agent_instance_id == agent_instance_id)
.where(TokenUsage.timestamp >= cutoff)
)
return dict(result.first())
async def get_project_agent_breakdown(
self,
project_id: str,
) -> list[dict]:
"""Get token usage breakdown by agent for a project."""
result = await self.db.execute(
select(
AgentInstance.id,
AgentInstance.name,
AgentType.role,
func.sum(TokenUsage.total_tokens).label("tokens"),
func.sum(TokenUsage.cost_usd).label("cost"),
)
.join(TokenUsage, TokenUsage.agent_instance_id == AgentInstance.id)
.join(AgentType, AgentType.id == AgentInstance.agent_type_id)
.where(AgentInstance.project_id == project_id)
.group_by(AgentInstance.id, AgentInstance.name, AgentType.role)
.order_by(func.sum(TokenUsage.cost_usd).desc())
)
return [dict(row) for row in result]
Real-Time Agent Activity Visibility
# app/services/realtime_updates.py
from fastapi import WebSocket
from redis import asyncio as aioredis
class AgentActivityStream:
"""Real-time agent activity updates via WebSocket/SSE."""
def __init__(self, redis: aioredis.Redis):
self.redis = redis
self.pubsub = redis.pubsub()
async def subscribe_project(
self,
websocket: WebSocket,
project_id: str,
):
"""Stream agent activities for a project."""
await websocket.accept()
channel = f"project:{project_id}:activities"
await self.pubsub.subscribe(channel)
try:
async for message in self.pubsub.listen():
if message["type"] == "message":
await websocket.send_json(json.loads(message["data"]))
except WebSocketDisconnect:
await self.pubsub.unsubscribe(channel)
async def publish_activity(
self,
project_id: str,
activity: AgentActivity,
):
"""Publish an agent activity event."""
channel = f"project:{project_id}:activities"
await self.redis.publish(channel, json.dumps({
"type": "agent_activity",
"agent_id": activity.agent_id,
"agent_name": activity.agent_name,
"agent_role": activity.agent_role,
"activity_type": activity.activity_type,
"description": activity.description,
"state": activity.state,
"timestamp": activity.timestamp.isoformat(),
"metadata": activity.metadata,
}))
# Activity types for real-time updates
class AgentActivityType(str, enum.Enum):
STATE_CHANGE = "state_change"
THINKING = "thinking"
TOOL_CALL = "tool_call"
MESSAGE_SENT = "message_sent"
ARTIFACT_CREATED = "artifact_created"
ERROR = "error"
TASK_STARTED = "task_started"
TASK_COMPLETED = "task_completed"
Code Examples for Key Patterns
Pattern 1: Spawning Multiple Agent Instances
# app/services/agent_factory.py
class AgentFactory:
"""Factory for creating agent instances from types."""
DEVELOPER_NAMES = ["Dave", "Ellis", "Kate", "Marcus", "Nina"]
QA_NAMES = ["Quinn", "Raja", "Sierra", "Tyler", "Uma"]
async def spawn_agent_team(
self,
project_id: str,
team_config: TeamConfig,
) -> list[AgentInstance]:
"""Spawn a team of agents for a project."""
agents = []
name_counters = {}
for agent_spec in team_config.agents:
agent_type = await self.db.get(AgentType, agent_spec.type_id)
# Get next available name for this type
name = self._get_next_name(agent_type.role, name_counters)
agent = AgentInstance(
id=uuid4(),
name=name,
agent_type_id=agent_type.id,
project_id=project_id,
state=AgentState.IDLE,
working_memory={
"project_context": {},
"conversation_history": [],
"current_focus": None,
},
)
self.db.add(agent)
agents.append(agent)
await self.db.commit()
# Initialize agent graphs
for agent in agents:
await self._initialize_agent_graph(agent)
return agents
def _get_next_name(self, role: str, counters: dict) -> str:
"""Get next name for an agent role."""
names = {
"Software Engineer": self.DEVELOPER_NAMES,
"QA Engineer": self.QA_NAMES,
# ... other roles
}.get(role, ["Agent"])
idx = counters.get(role, 0)
counters[role] = idx + 1
if idx < len(names):
return names[idx]
return f"{names[0]}-{idx + 1}"
Pattern 2: LLM Failover Per Agent Type
# app/agents/llm_config.py
from litellm import Router
class AgentLLMConfig:
"""Per-agent-type LLM configuration with failover."""
# High-stakes agents get premium models with reliable fallbacks
HIGH_REASONING_CHAIN = [
{"model": "claude-sonnet-4-20250514", "provider": "anthropic"},
{"model": "gpt-4-turbo", "provider": "openai"},
{"model": "claude-3-5-sonnet-20241022", "provider": "anthropic"},
]
# Fast agents get quick models
FAST_RESPONSE_CHAIN = [
{"model": "claude-3-haiku-20240307", "provider": "anthropic"},
{"model": "gpt-4o-mini", "provider": "openai"},
]
# Cost-optimized for high-volume tasks
COST_OPTIMIZED_CHAIN = [
{"model": "ollama/llama3", "provider": "ollama"},
{"model": "gpt-4o-mini", "provider": "openai"},
]
AGENT_TYPE_MAPPING = {
"Product Owner": HIGH_REASONING_CHAIN,
"Software Architect": HIGH_REASONING_CHAIN,
"Software Engineer": HIGH_REASONING_CHAIN,
"Business Analyst": HIGH_REASONING_CHAIN,
"QA Engineer": FAST_RESPONSE_CHAIN,
"Project Manager": FAST_RESPONSE_CHAIN,
"DevOps Engineer": FAST_RESPONSE_CHAIN,
}
def build_router_for_agent(self, agent_type: str) -> Router:
"""Build LiteLLM router with failover for agent type."""
chain = self.AGENT_TYPE_MAPPING.get(agent_type, self.FAST_RESPONSE_CHAIN)
model_list = []
for i, model_config in enumerate(chain):
model_list.append({
"model_name": f"agent-{agent_type.lower().replace(' ', '-')}",
"litellm_params": {
"model": model_config["model"],
"api_key": self._get_api_key(model_config["provider"]),
},
"model_info": {"id": i, "priority": i},
})
return Router(
model_list=model_list,
routing_strategy="simple-shuffle",
num_retries=3,
timeout=120,
fallbacks=[
{f"agent-{agent_type.lower().replace(' ', '-')}":
[f"agent-{agent_type.lower().replace(' ', '-')}"]}
],
)
Pattern 3: Agent Context Isolation
# app/agents/context_manager.py
class AgentContextManager:
"""Manage individual agent context/memory."""
def __init__(self, redis: aioredis.Redis, db: AsyncSession):
self.redis = redis
self.db = db
async def get_agent_context(
self,
agent_id: str,
max_messages: int = 50,
) -> AgentContext:
"""Get full context for an agent."""
agent = await self.db.get(AgentInstance, agent_id)
# Get recent conversation history
conversations = await self._get_recent_conversations(
agent_id, max_messages
)
# Get project context
project_context = await self._get_project_context(agent.project_id)
# Get working memory from Redis (fast access)
working_memory = await self.redis.hgetall(
f"agent:{agent_id}:memory"
)
return AgentContext(
agent=agent,
system_prompt=self._build_system_prompt(agent),
conversation_history=conversations,
project_context=project_context,
working_memory=working_memory,
tools=await self._get_available_tools(agent),
)
def _build_system_prompt(self, agent: AgentInstance) -> str:
"""Build personalized system prompt for agent."""
base_prompt = agent.agent_type.system_prompt
return f"""
{base_prompt}
You are {agent.name}, a {agent.agent_type.role} working on project {agent.project.name}.
Your personality traits:
{agent.agent_type.personality_traits}
Current focus areas:
{json.dumps(agent.working_memory.get('current_focus', []))}
Remember: You have your own perspective and expertise. Collaborate with other agents
but bring your unique viewpoint to discussions.
"""
async def update_working_memory(
self,
agent_id: str,
key: str,
value: Any,
ttl: int = 3600,
):
"""Update agent's working memory."""
await self.redis.hset(
f"agent:{agent_id}:memory",
key,
json.dumps(value),
)
await self.redis.expire(f"agent:{agent_id}:memory", ttl)
Risks and Mitigations
| Risk | Impact | Probability | Mitigation |
|---|---|---|---|
| Temporal complexity | High | Medium | Start with simple workflows, invest in team training |
| LangGraph learning curve | Medium | High | Create abstractions, use simpler patterns initially |
| Agent coordination deadlocks | High | Medium | Implement timeouts, circuit breakers, supervisor oversight |
| Token cost explosion | High | Medium | Budget caps, usage monitoring, caching strategies |
| State consistency issues | High | Low | Event sourcing, Temporal durability, comprehensive testing |
| Debugging complexity | Medium | High | Invest in observability (LangSmith, Temporal UI) |
| Vendor lock-in (LangGraph) | Medium | Low | Abstract core patterns, LangGraph is open source |
Mitigation Strategies
1. Complexity Management:
- Start with 2-3 agent types, expand gradually
- Build comprehensive test harnesses
- Document all state transitions and message types
2. Cost Control:
class BudgetGuard:
async def check_before_completion(
self,
project_id: str,
estimated_tokens: int,
) -> bool:
project = await self.db.get(Project, project_id)
current_spend = await self.get_project_spend(project_id)
estimated_cost = self._estimate_cost(estimated_tokens)
if current_spend + estimated_cost > project.budget_limit:
await self.notify_budget_exceeded(project_id)
return False
return True
3. Deadlock Prevention:
class AgentTimeoutGuard:
THINKING_TIMEOUT = 300 # 5 minutes
EXECUTION_TIMEOUT = 600 # 10 minutes
BLOCKED_TIMEOUT = 1800 # 30 minutes
async def monitor_agent(self, agent_id: str):
agent = await self.db.get(AgentInstance, agent_id)
timeout = self._get_timeout(agent.state)
if agent.last_active_at < datetime.utcnow() - timedelta(seconds=timeout):
await self.handle_timeout(agent)
Implementation Roadmap
Phase 1: Foundation (Weeks 1-2)
- Set up Temporal server and workers
- Implement basic LangGraph agent state machine
- Create AgentInstance model and database schema
- Implement Redis Streams message bus
Phase 2: Core Orchestration (Weeks 3-4)
- Implement supervisor pattern with Project Manager
- Build agent spawning and team creation
- Add token tracking per agent instance
- Create basic inter-agent communication
Phase 3: Durability (Weeks 5-6)
- Implement Temporal workflows for projects and sprints
- Add checkpoint and recovery mechanisms
- Build human-in-the-loop approval flows
- Create agent context persistence
Phase 4: Observability (Week 7)
- Implement real-time activity streaming
- Add comprehensive logging and tracing
- Build cost monitoring dashboards
- Create agent performance analytics
References
Frameworks and Tools
- AutoGen 0.4 Documentation
- CrewAI Documentation
- LangGraph Documentation
- Temporal.io Documentation
- LiteLLM Documentation
Research and Articles
- Microsoft AI Agent Design Patterns
- Four Design Patterns for Event-Driven Multi-Agent Systems (Confluent)
- Google A2A Protocol
- Temporal for AI Agents
- LangFuse Token Tracking
- Portkey LLM Failover Patterns
Related Syndarix Documents
Decision
Adopt a hybrid architecture combining:
- LangGraph for agent state machines and logic (graph-based, stateful)
- Temporal for durable, long-running workflow orchestration
- Redis Streams for event-driven agent-to-agent communication
- Hierarchical supervisor pattern with Project Manager coordinating teams
- LiteLLM (per SPIKE-005) for unified LLM access with failover
This provides the flexibility, durability, and scalability required for Syndarix's 50+ agent autonomous consulting platform while maintaining individual agent context, comprehensive token tracking, and real-time visibility.
Spike completed. Findings will inform ADR-002: Agent Orchestration Architecture.