# SPIKE-002: Agent Orchestration Pattern **Status:** Completed **Date:** 2025-12-29 **Author:** Architecture Team **Related Issue:** #2 --- ## Executive Summary After researching leading multi-agent orchestration frameworks (AutoGen, CrewAI, LangGraph) and enterprise patterns, we recommend a **hybrid architecture** for Syndarix that combines: 1. **LangGraph** for core agent orchestration logic (graph-based state machines) 2. **Temporal** for durable, long-running workflow execution 3. **Event-driven communication** via Redis Streams with A2A protocol concepts 4. **Hierarchical supervisor pattern** with peer-to-peer collaboration within teams This architecture provides the flexibility, scalability, and durability required for 50+ concurrent agent instances while maintaining individual agent context, token tracking, and LLM failover capabilities. ### Key Recommendation **Do not adopt a single framework wholesale.** Instead, build a custom orchestration layer using: - **LangGraph** for agent logic and state transitions (graph primitives) - **Temporal** for durable execution and workflow persistence - **Redis Streams** for event-driven inter-agent communication - **LiteLLM** for unified LLM access with failover (per SPIKE-005) --- ## Research Questions Addressed ### 1. What are the leading patterns for multi-agent orchestration in 2024-2025? The multi-agent landscape has matured significantly. Key patterns include: | Pattern | Description | Best For | |---------|-------------|----------| | **Supervisor** | Central orchestrator coordinates specialized agents | Complex multi-domain workflows | | **Hierarchical** | Tree structure with managers and workers | Large-scale, layered problems | | **Peer-to-Peer** | Agents collaborate as equals without central control | Open-ended ideation, negotiation | | **Orchestrator-Worker** | Planner breaks tasks, workers execute | Deterministic pipelines | | **Blackboard** | Shared knowledge space, agents react to changes | Incremental problem-solving | | **Market-Based** | Agents bid for tasks based on capabilities | Dynamic resource allocation | **Industry Trend (2025):** Event-driven architectures are becoming dominant, with protocols like Google's A2A (Agent-to-Agent), IBM's ACP, and AG-UI standardizing communication. ### 2. Framework Comparison | Feature | AutoGen 0.4 | CrewAI | LangGraph | Custom | |---------|-------------|--------|-----------|--------| | **Architecture** | Event-driven, distributed | Crews + Flows dual-mode | Graph-based state machine | Flexible | | **State Management** | Built-in, cross-language | Shared memory | Persistent, checkpointed | Custom (Temporal) | | **Scalability** | High (Kubernetes-ready) | Medium-High | High | Very High | | **Learning Curve** | Medium | Low-Medium | High | High | | **Enterprise Adoption** | Microsoft ecosystem | 60% Fortune 500 | Klarna, Replit, Elastic | N/A | | **Long-running Workflows** | Good | Limited | Good | Excellent (Temporal) | | **Customization** | Medium | Limited | High | Full | | **Token Tracking** | Basic | Basic | Via LangSmith | Custom | | **Multi-Provider Failover** | Limited | Limited | Limited | Full (LiteLLM) | | **50+ Concurrent Agents** | Possible | Challenging | Possible | Designed for | #### AutoGen 0.4 (Microsoft) **Pros:** - Event-driven, async-first architecture - Cross-language support (.NET, Python) - Built-in observability (OpenTelemetry) - Strong Microsoft ecosystem integration - New Agent Framework combines AutoGen + Semantic Kernel **Cons:** - Tied to Microsoft patterns - Less flexible for custom orchestration - Newer 0.4 version still maturing **Best For:** Microsoft-centric enterprises, standardized agent patterns #### CrewAI **Pros:** - Easy to get started (role-based agents) - Good for sequential/hierarchical workflows - Strong enterprise traction ($18M Series A) - LLM-agnostic design **Cons:** - Limited ceiling for complex patterns - Teams report hitting walls at 6-12 months - Multi-agent complexity can cause loops - Flows architecture adds learning curve **Best For:** Quick prototypes, straightforward workflows, teams new to agents #### LangGraph **Pros:** - Fine-grained control over agent flow - Excellent state management with persistence - Time-travel debugging (LangSmith) - Human-in-the-loop built-in - Cycles, conditionals, parallel execution **Cons:** - Steep learning curve (graph theory, state machines) - Requires distributed systems knowledge - Observability requires LangSmith subscription **Best For:** Complex, stateful workflows requiring precise control ### 3. Enterprise Agent State Management Enterprise systems use these patterns: **Event Sourcing:** ```python # All state changes stored as events class AgentStateEvent: event_type: str # "TASK_ASSIGNED", "STATE_CHANGED", etc. agent_id: str timestamp: datetime data: dict sequence_number: int ``` **Checkpoint/Snapshot Pattern:** ```python # Periodic snapshots for fast recovery class AgentCheckpoint: agent_id: str state: AgentState memory: dict context_window: list[Message] checkpoint_time: datetime ``` **Temporal Durable Execution:** ```python # Workflow state is automatically persistent @workflow.defn class AgentWorkflow: @workflow.run async def run(self, task: Task) -> Result: # State survives crashes, restarts, deployments result = await workflow.execute_activity( execute_agent_task, task, start_to_close_timeout=timedelta(hours=24) ) return result ``` ### 4. Agent-to-Agent Communication **Recommended: Event-Driven with Redis Streams** ``` ┌─────────────────────────────────────────────────────────────┐ │ Redis Streams │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ project:{project_id}:events │ │ │ │ - agent_message │ │ │ │ - task_assignment │ │ │ │ - state_change │ │ │ │ - artifact_produced │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ▲ ▲ ▲ ▲ │ │ │ │ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐ │ PO │ │ Arch │ │ Dev-1 │ │ Dev-2 │ │ Agent │ │ Agent │ │ Agent │ │ Agent │ └────────┘ └────────┘ └────────┘ └────────┘ ``` **Protocol Design (A2A-inspired):** ```python @dataclass class AgentMessage: """Agent-to-Agent message following A2A concepts.""" id: UUID source_agent_id: str target_agent_id: str | None # None = broadcast project_id: str message_type: Literal[ "TASK_HANDOFF", "CONTEXT_SHARE", "REVIEW_REQUEST", "FEEDBACK", "ARTIFACT", "QUERY", ] payload: dict correlation_id: UUID | None # For request/response timestamp: datetime @dataclass class AgentCard: """Agent capability advertisement (A2A concept).""" agent_id: str agent_type: str capabilities: list[str] current_state: AgentState project_id: str ``` ### 5. Long-Running Agent Workflows **Recommended: Temporal for Durable Execution** Temporal provides: - **Automatic state persistence** - survives crashes and restarts - **Built-in retries** with exponential backoff - **Long-running support** - hours, days, even months - **Human-in-the-loop** - pause and wait for approval - **Visibility** - full execution history ```python # workflows/sprint_workflow.py from temporalio import workflow, activity from datetime import timedelta @workflow.defn class SprintWorkflow: """Durable workflow for autonomous sprint execution.""" def __init__(self): self._state = SprintState.PLANNING self._agents: dict[str, AgentHandle] = {} self._artifacts: list[Artifact] = [] @workflow.run async def run(self, sprint: SprintConfig) -> SprintResult: # Phase 1: Planning (may take hours) backlog = await workflow.execute_activity( plan_sprint, sprint, start_to_close_timeout=timedelta(hours=4), ) # Phase 2: Parallel development (may take days) dev_tasks = await self._execute_development(backlog) # Phase 3: Review checkpoint (human approval) if sprint.autonomy_level != AutonomyLevel.AUTONOMOUS: await workflow.wait_condition( lambda: self._human_approved ) # Phase 4: QA and finalization result = await workflow.execute_activity( finalize_sprint, dev_tasks, start_to_close_timeout=timedelta(hours=2), ) return result @workflow.signal async def approve_checkpoint(self, approved: bool): self._human_approved = approved @workflow.query def get_state(self) -> SprintState: return self._state ``` ### 6. Orchestration Topology Comparison #### Supervisor Pattern ``` ┌─────────────────┐ │ Supervisor │ │ (Orchestrator)│ └────────┬────────┘ │ ┌────────────────────┼────────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Specialist 1 │ │ Specialist 2 │ │ Specialist 3 │ │ (Dev Agent) │ │ (QA Agent) │ │ (DevOps) │ └──────────────┘ └──────────────┘ └──────────────┘ ``` **Pros:** Clear control, auditability, easier debugging **Cons:** Bottleneck at supervisor, single point of failure **Syndarix Use:** Project Manager as supervisor for sprints #### Hierarchical Pattern ``` ┌────────────────┐ │ Project Lead │ │ (PO) │ └───────┬────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ Dev Lead │ │ QA Lead │ │ Ops Lead │ │ (Architect)│ │ │ │ │ └─────┬──────┘ └─────┬──────┘ └────────────┘ │ │ ┌────────┼────────┐ ┌────┴────┐ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ Dave │ │ Ellis│ │ Kate │ │ QA-1 │ │ QA-2 │ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ ``` **Pros:** Scalable, localized decision-making, mirrors real teams **Cons:** Communication overhead, coordination complexity **Syndarix Use:** Recommended primary pattern #### Peer-to-Peer Pattern ``` ┌──────┐ ┌──────┐ │ Dave │◄───►│ Ellis│ └──┬───┘ └──┬───┘ │ │ │ ┌──────┐ │ └──►│ Kate │◄┘ └──┬───┘ │ ▼ [Shared Blackboard] ``` **Pros:** Flexible, emergent collaboration, no bottleneck **Cons:** Harder to control, potential infinite loops **Syndarix Use:** Within teams for brainstorming/spikes --- ## Proposed Architecture for Syndarix ### High-Level Design ``` ┌───────────────────────────────────────────────────────────────────────┐ │ Syndarix Orchestration Layer │ ├───────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Temporal Workflow Engine │ │ │ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │ │ │ │ Project │ │ Sprint │ │ Agent Task │ │ │ │ │ │ Workflow │ │ Workflow │ │ Workflow │ │ │ │ │ └───────────────┘ └───────────────┘ └───────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Agent Runtime (LangGraph) │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ Agent State Graph │ │ │ │ │ │ [IDLE] ──► [THINKING] ──► [EXECUTING] ──► [BLOCKED] │ │ │ │ │ │ ▲ │ │ │ │ │ │ │ │ │ └────────────┴──────────────┴──────────────┘ │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │ Agent │ │ Agent │ │ Agent │ │ Agent │ │ │ │ │ │ Instance │ │ Instance │ │ Instance │ │ Instance │ │ │ │ │ │ (PO) │ │ (Arch) │ │ (Dev-1) │ │ (Dev-2) │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Communication Layer │ │ │ │ │ │ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ │ │ Redis Streams │ │ WebSocket/SSE │ │ Event Bus │ │ │ │ │ │ (Agent-Agent) │ │ (Agent-UI) │ │ (System) │ │ │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Infrastructure Layer │ │ │ │ │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ │ │ LiteLLM │ │ MCP │ │ PostgreSQL │ │ │ │ │ │ (Gateway) │ │ (Servers) │ │ (pgvector) │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ └───────────────────────────────────────────────────────────────────────┘ ``` ### Agent Instance Model ```python # app/models/agent_instance.py from sqlalchemy import Column, String, Enum, ForeignKey, JSON, DateTime from sqlalchemy.dialects.postgresql import UUID from app.db.base import Base import enum class AgentState(str, enum.Enum): IDLE = "idle" THINKING = "thinking" EXECUTING = "executing" WAITING_INPUT = "waiting_input" WAITING_REVIEW = "waiting_review" BLOCKED = "blocked" COMPLETED = "completed" ERROR = "error" class AgentInstance(Base): """Individual agent instance spawned from a type.""" __tablename__ = "agent_instances" id = Column(UUID, primary_key=True) name = Column(String(50)) # "Dave", "Ellis", "Kate" # Type relationship agent_type_id = Column(UUID, ForeignKey("agent_types.id")) # Project assignment project_id = Column(UUID, ForeignKey("projects.id")) # Current state state = Column(Enum(AgentState), default=AgentState.IDLE) current_task_id = Column(UUID, ForeignKey("tasks.id"), nullable=True) # Context/Memory working_memory = Column(JSON, default=dict) # Short-term context system_prompt_override = Column(String, nullable=True) # Token/Cost tracking (per instance) total_tokens_used = Column(Integer, default=0) total_cost_usd = Column(Float, default=0.0) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) last_active_at = Column(DateTime, nullable=True) # Relationships agent_type = relationship("AgentType", back_populates="instances") project = relationship("Project", back_populates="agents") conversations = relationship("Conversation", back_populates="agent") ``` ### Agent State Machine (LangGraph) ```python # app/agents/state_machine.py from langgraph.graph import StateGraph, END from langgraph.checkpoint.postgres import PostgresSaver from typing import TypedDict, Annotated, Literal from operator import add class AgentGraphState(TypedDict): """State shared across agent graph nodes.""" agent_id: str project_id: str task: dict | None messages: Annotated[list, add] # Append-only artifacts: Annotated[list, add] current_state: AgentState iteration_count: int error: str | None def create_agent_graph(agent_type: AgentType) -> StateGraph: """Create a LangGraph state machine for an agent type.""" graph = StateGraph(AgentGraphState) # Define nodes graph.add_node("think", think_node) graph.add_node("execute", execute_node) graph.add_node("review", review_node) graph.add_node("handoff", handoff_node) graph.add_node("error_handler", error_node) # Define edges graph.set_entry_point("think") graph.add_conditional_edges( "think", route_after_think, { "execute": "execute", "handoff": "handoff", "review": "review", "end": END, } ) graph.add_conditional_edges( "execute", route_after_execute, { "think": "think", # Continue reasoning "review": "review", "error": "error_handler", } ) graph.add_edge("review", "think") graph.add_edge("error_handler", END) graph.add_edge("handoff", END) return graph.compile( checkpointer=PostgresSaver.from_conn_string(settings.DATABASE_URL) ) async def think_node(state: AgentGraphState) -> AgentGraphState: """Agent reasoning/planning node.""" agent = await get_agent_instance(state["agent_id"]) # Get LLM response response = await llm_gateway.complete( agent_id=state["agent_id"], project_id=state["project_id"], messages=build_messages(agent, state), model_preference=agent.agent_type.model_preference, ) # Parse response for next action action = parse_agent_action(response["content"]) return { **state, "messages": [{"role": "assistant", "content": response["content"]}], "current_action": action, "iteration_count": state["iteration_count"] + 1, } async def execute_node(state: AgentGraphState) -> AgentGraphState: """Execute agent action (tool calls, artifact creation).""" action = state.get("current_action") if action["type"] == "tool_call": result = await mcp_client.call_tool( server=action["server"], tool_name=action["tool"], arguments={ "project_id": state["project_id"], "agent_id": state["agent_id"], **action["arguments"], } ) return { **state, "messages": [{"role": "tool", "content": str(result)}], } elif action["type"] == "produce_artifact": artifact = await create_artifact(action["artifact"]) return { **state, "artifacts": [artifact], } return state ``` ### Inter-Agent Communication ```python # app/agents/communication.py from redis import asyncio as aioredis from dataclasses import dataclass, asdict import json @dataclass class AgentMessage: id: str source_agent_id: str target_agent_id: str | None project_id: str message_type: str payload: dict timestamp: str correlation_id: str | None = None class AgentMessageBus: """Redis Streams-based message bus for agent communication.""" def __init__(self, redis: aioredis.Redis): self.redis = redis async def publish(self, message: AgentMessage) -> str: """Publish message to project stream.""" stream_key = f"project:{message.project_id}:agent_events" message_id = await self.redis.xadd( stream_key, asdict(message), maxlen=10000, # Keep last 10k messages ) # Also publish to target-specific stream if targeted if message.target_agent_id: target_stream = f"agent:{message.target_agent_id}:inbox" await self.redis.xadd(target_stream, asdict(message)) return message_id async def subscribe( self, agent_id: str, project_id: str, ) -> AsyncGenerator[AgentMessage, None]: """Subscribe to messages for an agent.""" streams = { f"agent:{agent_id}:inbox": ">", f"project:{project_id}:agent_events": ">", } while True: messages = await self.redis.xread( streams, count=10, block=5000, # 5 second timeout ) for stream_name, stream_messages in messages: for msg_id, msg_data in stream_messages: yield AgentMessage(**msg_data) async def handoff_task( self, source_agent_id: str, target_agent_id: str, project_id: str, task: dict, context: dict, ) -> None: """Hand off a task from one agent to another.""" message = AgentMessage( id=str(uuid4()), source_agent_id=source_agent_id, target_agent_id=target_agent_id, project_id=project_id, message_type="TASK_HANDOFF", payload={ "task": task, "context": context, "handoff_reason": "task_completion", }, timestamp=datetime.utcnow().isoformat(), ) await self.publish(message) ``` ### Agent Supervisor (Hierarchical Control) ```python # app/agents/supervisor.py from langgraph.graph import StateGraph, END class SupervisorState(TypedDict): """State for supervisor agent managing a team.""" project_id: str team_agents: list[str] pending_tasks: list[dict] in_progress: dict[str, dict] # agent_id -> task completed_tasks: list[dict] current_phase: str def create_supervisor_graph(supervisor_type: str) -> StateGraph: """Create supervisor graph for team coordination.""" graph = StateGraph(SupervisorState) graph.add_node("assess_state", assess_team_state) graph.add_node("assign_tasks", assign_tasks_to_agents) graph.add_node("monitor_progress", monitor_agent_progress) graph.add_node("handle_completion", handle_task_completion) graph.add_node("coordinate_handoff", coordinate_agent_handoff) graph.add_node("escalate", escalate_to_human) graph.set_entry_point("assess_state") graph.add_conditional_edges( "assess_state", decide_supervisor_action, { "assign": "assign_tasks", "monitor": "monitor_progress", "coordinate": "coordinate_handoff", "escalate": "escalate", "complete": END, } ) # ... additional edges return graph.compile() async def assess_team_state(state: SupervisorState) -> SupervisorState: """Assess current state of all team agents.""" team_status = {} for agent_id in state["team_agents"]: agent = await get_agent_instance(agent_id) team_status[agent_id] = { "state": agent.state, "current_task": agent.current_task_id, "last_active": agent.last_active_at, } return { **state, "team_status": team_status, } async def assign_tasks_to_agents(state: SupervisorState) -> SupervisorState: """Assign pending tasks to available agents.""" available_agents = [ agent_id for agent_id, status in state["team_status"].items() if status["state"] == AgentState.IDLE ] assignments = [] for task in state["pending_tasks"]: if not available_agents: break # Find best agent for task best_agent = await select_best_agent(task, available_agents) # Assign task await message_bus.publish(AgentMessage( id=str(uuid4()), source_agent_id="supervisor", target_agent_id=best_agent, project_id=state["project_id"], message_type="TASK_ASSIGNMENT", payload={"task": task}, timestamp=datetime.utcnow().isoformat(), )) assignments.append((best_agent, task)) available_agents.remove(best_agent) return { **state, "in_progress": { **state["in_progress"], **{agent: task for agent, task in assignments}, }, "pending_tasks": [ t for t in state["pending_tasks"] if t not in [task for _, task in assignments] ], } ``` ### Temporal Workflow Integration ```python # app/workflows/project_workflow.py from temporalio import workflow, activity from temporalio.common import RetryPolicy from datetime import timedelta @workflow.defn class ProjectWorkflow: """Long-running workflow for project lifecycle.""" def __init__(self): self._phase = ProjectPhase.DISCOVERY self._sprints: list[SprintResult] = [] self._human_feedback_pending = False @workflow.run async def run(self, project: ProjectConfig) -> ProjectResult: # Phase 1: Discovery (PO + BA brainstorm) requirements = await workflow.execute_activity( run_discovery_phase, project, start_to_close_timeout=timedelta(hours=8), retry_policy=RetryPolicy(maximum_attempts=3), ) # Phase 2: Architecture spike architecture = await workflow.execute_activity( run_architecture_spike, requirements, start_to_close_timeout=timedelta(hours=4), ) # Checkpoint: Human review of architecture if project.autonomy_level != AutonomyLevel.AUTONOMOUS: self._human_feedback_pending = True await workflow.wait_condition( lambda: not self._human_feedback_pending ) # Phase 3: Sprint execution (iterative) while not self._is_complete(): sprint_result = await workflow.execute_child_workflow( SprintWorkflow.run, self._next_sprint_config(), id=f"{project.id}-sprint-{len(self._sprints) + 1}", ) self._sprints.append(sprint_result) # Milestone checkpoint if configured if project.autonomy_level == AutonomyLevel.MILESTONE: self._human_feedback_pending = True await workflow.wait_condition( lambda: not self._human_feedback_pending ) return ProjectResult( project_id=project.id, sprints=self._sprints, status="completed", ) @workflow.signal async def provide_feedback(self, feedback: HumanFeedback): """Handle human feedback at checkpoints.""" self._last_feedback = feedback self._human_feedback_pending = False @workflow.query def get_status(self) -> ProjectStatus: return ProjectStatus( phase=self._phase, sprints_completed=len(self._sprints), awaiting_feedback=self._human_feedback_pending, ) ``` ### Token Usage Tracking Per Agent ```python # app/services/agent_metrics.py from sqlalchemy.ext.asyncio import AsyncSession from app.models.token_usage import TokenUsage class AgentMetricsService: """Track token usage and costs per agent instance.""" def __init__(self, db: AsyncSession): self.db = db async def record_completion( self, agent_instance_id: str, project_id: str, model: str, prompt_tokens: int, completion_tokens: int, latency_ms: int, ) -> TokenUsage: """Record a completion for an agent instance.""" cost = self._calculate_cost(model, prompt_tokens, completion_tokens) usage = TokenUsage( agent_instance_id=agent_instance_id, project_id=project_id, model=model, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=prompt_tokens + completion_tokens, cost_usd=cost, latency_ms=latency_ms, timestamp=datetime.utcnow(), ) self.db.add(usage) # Update agent instance totals agent = await self.db.get(AgentInstance, agent_instance_id) agent.total_tokens_used += usage.total_tokens agent.total_cost_usd += cost agent.last_active_at = datetime.utcnow() await self.db.commit() return usage async def get_agent_usage_summary( self, agent_instance_id: str, period_days: int = 30, ) -> dict: """Get usage summary for an agent instance.""" cutoff = datetime.utcnow() - timedelta(days=period_days) result = await self.db.execute( select( func.sum(TokenUsage.total_tokens).label("total_tokens"), func.sum(TokenUsage.cost_usd).label("total_cost"), func.count(TokenUsage.id).label("completion_count"), func.avg(TokenUsage.latency_ms).label("avg_latency"), ) .where(TokenUsage.agent_instance_id == agent_instance_id) .where(TokenUsage.timestamp >= cutoff) ) return dict(result.first()) async def get_project_agent_breakdown( self, project_id: str, ) -> list[dict]: """Get token usage breakdown by agent for a project.""" result = await self.db.execute( select( AgentInstance.id, AgentInstance.name, AgentType.role, func.sum(TokenUsage.total_tokens).label("tokens"), func.sum(TokenUsage.cost_usd).label("cost"), ) .join(TokenUsage, TokenUsage.agent_instance_id == AgentInstance.id) .join(AgentType, AgentType.id == AgentInstance.agent_type_id) .where(AgentInstance.project_id == project_id) .group_by(AgentInstance.id, AgentInstance.name, AgentType.role) .order_by(func.sum(TokenUsage.cost_usd).desc()) ) return [dict(row) for row in result] ``` ### Real-Time Agent Activity Visibility ```python # app/services/realtime_updates.py from fastapi import WebSocket from redis import asyncio as aioredis class AgentActivityStream: """Real-time agent activity updates via WebSocket/SSE.""" def __init__(self, redis: aioredis.Redis): self.redis = redis self.pubsub = redis.pubsub() async def subscribe_project( self, websocket: WebSocket, project_id: str, ): """Stream agent activities for a project.""" await websocket.accept() channel = f"project:{project_id}:activities" await self.pubsub.subscribe(channel) try: async for message in self.pubsub.listen(): if message["type"] == "message": await websocket.send_json(json.loads(message["data"])) except WebSocketDisconnect: await self.pubsub.unsubscribe(channel) async def publish_activity( self, project_id: str, activity: AgentActivity, ): """Publish an agent activity event.""" channel = f"project:{project_id}:activities" await self.redis.publish(channel, json.dumps({ "type": "agent_activity", "agent_id": activity.agent_id, "agent_name": activity.agent_name, "agent_role": activity.agent_role, "activity_type": activity.activity_type, "description": activity.description, "state": activity.state, "timestamp": activity.timestamp.isoformat(), "metadata": activity.metadata, })) # Activity types for real-time updates class AgentActivityType(str, enum.Enum): STATE_CHANGE = "state_change" THINKING = "thinking" TOOL_CALL = "tool_call" MESSAGE_SENT = "message_sent" ARTIFACT_CREATED = "artifact_created" ERROR = "error" TASK_STARTED = "task_started" TASK_COMPLETED = "task_completed" ``` --- ## Code Examples for Key Patterns ### Pattern 1: Spawning Multiple Agent Instances ```python # app/services/agent_factory.py class AgentFactory: """Factory for creating agent instances from types.""" DEVELOPER_NAMES = ["Dave", "Ellis", "Kate", "Marcus", "Nina"] QA_NAMES = ["Quinn", "Raja", "Sierra", "Tyler", "Uma"] async def spawn_agent_team( self, project_id: str, team_config: TeamConfig, ) -> list[AgentInstance]: """Spawn a team of agents for a project.""" agents = [] name_counters = {} for agent_spec in team_config.agents: agent_type = await self.db.get(AgentType, agent_spec.type_id) # Get next available name for this type name = self._get_next_name(agent_type.role, name_counters) agent = AgentInstance( id=uuid4(), name=name, agent_type_id=agent_type.id, project_id=project_id, state=AgentState.IDLE, working_memory={ "project_context": {}, "conversation_history": [], "current_focus": None, }, ) self.db.add(agent) agents.append(agent) await self.db.commit() # Initialize agent graphs for agent in agents: await self._initialize_agent_graph(agent) return agents def _get_next_name(self, role: str, counters: dict) -> str: """Get next name for an agent role.""" names = { "Software Engineer": self.DEVELOPER_NAMES, "QA Engineer": self.QA_NAMES, # ... other roles }.get(role, ["Agent"]) idx = counters.get(role, 0) counters[role] = idx + 1 if idx < len(names): return names[idx] return f"{names[0]}-{idx + 1}" ``` ### Pattern 2: LLM Failover Per Agent Type ```python # app/agents/llm_config.py from litellm import Router class AgentLLMConfig: """Per-agent-type LLM configuration with failover.""" # High-stakes agents get premium models with reliable fallbacks HIGH_REASONING_CHAIN = [ {"model": "claude-sonnet-4-20250514", "provider": "anthropic"}, {"model": "gpt-4-turbo", "provider": "openai"}, {"model": "claude-3-5-sonnet-20241022", "provider": "anthropic"}, ] # Fast agents get quick models FAST_RESPONSE_CHAIN = [ {"model": "claude-3-haiku-20240307", "provider": "anthropic"}, {"model": "gpt-4o-mini", "provider": "openai"}, ] # Cost-optimized for high-volume tasks COST_OPTIMIZED_CHAIN = [ {"model": "ollama/llama3", "provider": "ollama"}, {"model": "gpt-4o-mini", "provider": "openai"}, ] AGENT_TYPE_MAPPING = { "Product Owner": HIGH_REASONING_CHAIN, "Software Architect": HIGH_REASONING_CHAIN, "Software Engineer": HIGH_REASONING_CHAIN, "Business Analyst": HIGH_REASONING_CHAIN, "QA Engineer": FAST_RESPONSE_CHAIN, "Project Manager": FAST_RESPONSE_CHAIN, "DevOps Engineer": FAST_RESPONSE_CHAIN, } def build_router_for_agent(self, agent_type: str) -> Router: """Build LiteLLM router with failover for agent type.""" chain = self.AGENT_TYPE_MAPPING.get(agent_type, self.FAST_RESPONSE_CHAIN) model_list = [] for i, model_config in enumerate(chain): model_list.append({ "model_name": f"agent-{agent_type.lower().replace(' ', '-')}", "litellm_params": { "model": model_config["model"], "api_key": self._get_api_key(model_config["provider"]), }, "model_info": {"id": i, "priority": i}, }) return Router( model_list=model_list, routing_strategy="simple-shuffle", num_retries=3, timeout=120, fallbacks=[ {f"agent-{agent_type.lower().replace(' ', '-')}": [f"agent-{agent_type.lower().replace(' ', '-')}"]} ], ) ``` ### Pattern 3: Agent Context Isolation ```python # app/agents/context_manager.py class AgentContextManager: """Manage individual agent context/memory.""" def __init__(self, redis: aioredis.Redis, db: AsyncSession): self.redis = redis self.db = db async def get_agent_context( self, agent_id: str, max_messages: int = 50, ) -> AgentContext: """Get full context for an agent.""" agent = await self.db.get(AgentInstance, agent_id) # Get recent conversation history conversations = await self._get_recent_conversations( agent_id, max_messages ) # Get project context project_context = await self._get_project_context(agent.project_id) # Get working memory from Redis (fast access) working_memory = await self.redis.hgetall( f"agent:{agent_id}:memory" ) return AgentContext( agent=agent, system_prompt=self._build_system_prompt(agent), conversation_history=conversations, project_context=project_context, working_memory=working_memory, tools=await self._get_available_tools(agent), ) def _build_system_prompt(self, agent: AgentInstance) -> str: """Build personalized system prompt for agent.""" base_prompt = agent.agent_type.system_prompt return f""" {base_prompt} You are {agent.name}, a {agent.agent_type.role} working on project {agent.project.name}. Your personality traits: {agent.agent_type.personality_traits} Current focus areas: {json.dumps(agent.working_memory.get('current_focus', []))} Remember: You have your own perspective and expertise. Collaborate with other agents but bring your unique viewpoint to discussions. """ async def update_working_memory( self, agent_id: str, key: str, value: Any, ttl: int = 3600, ): """Update agent's working memory.""" await self.redis.hset( f"agent:{agent_id}:memory", key, json.dumps(value), ) await self.redis.expire(f"agent:{agent_id}:memory", ttl) ``` --- ## Risks and Mitigations | Risk | Impact | Probability | Mitigation | |------|--------|-------------|------------| | **Temporal complexity** | High | Medium | Start with simple workflows, invest in team training | | **LangGraph learning curve** | Medium | High | Create abstractions, use simpler patterns initially | | **Agent coordination deadlocks** | High | Medium | Implement timeouts, circuit breakers, supervisor oversight | | **Token cost explosion** | High | Medium | Budget caps, usage monitoring, caching strategies | | **State consistency issues** | High | Low | Event sourcing, Temporal durability, comprehensive testing | | **Debugging complexity** | Medium | High | Invest in observability (LangSmith, Temporal UI) | | **Vendor lock-in (LangGraph)** | Medium | Low | Abstract core patterns, LangGraph is open source | ### Mitigation Strategies **1. Complexity Management:** - Start with 2-3 agent types, expand gradually - Build comprehensive test harnesses - Document all state transitions and message types **2. Cost Control:** ```python class BudgetGuard: async def check_before_completion( self, project_id: str, estimated_tokens: int, ) -> bool: project = await self.db.get(Project, project_id) current_spend = await self.get_project_spend(project_id) estimated_cost = self._estimate_cost(estimated_tokens) if current_spend + estimated_cost > project.budget_limit: await self.notify_budget_exceeded(project_id) return False return True ``` **3. Deadlock Prevention:** ```python class AgentTimeoutGuard: THINKING_TIMEOUT = 300 # 5 minutes EXECUTION_TIMEOUT = 600 # 10 minutes BLOCKED_TIMEOUT = 1800 # 30 minutes async def monitor_agent(self, agent_id: str): agent = await self.db.get(AgentInstance, agent_id) timeout = self._get_timeout(agent.state) if agent.last_active_at < datetime.utcnow() - timedelta(seconds=timeout): await self.handle_timeout(agent) ``` --- ## Implementation Roadmap ### Phase 1: Foundation (Weeks 1-2) - [ ] Set up Temporal server and workers - [ ] Implement basic LangGraph agent state machine - [ ] Create AgentInstance model and database schema - [ ] Implement Redis Streams message bus ### Phase 2: Core Orchestration (Weeks 3-4) - [ ] Implement supervisor pattern with Project Manager - [ ] Build agent spawning and team creation - [ ] Add token tracking per agent instance - [ ] Create basic inter-agent communication ### Phase 3: Durability (Weeks 5-6) - [ ] Implement Temporal workflows for projects and sprints - [ ] Add checkpoint and recovery mechanisms - [ ] Build human-in-the-loop approval flows - [ ] Create agent context persistence ### Phase 4: Observability (Week 7) - [ ] Implement real-time activity streaming - [ ] Add comprehensive logging and tracing - [ ] Build cost monitoring dashboards - [ ] Create agent performance analytics --- ## References ### Frameworks and Tools - [AutoGen 0.4 Documentation](https://github.com/microsoft/autogen) - [CrewAI Documentation](https://docs.crewai.com/) - [LangGraph Documentation](https://www.langchain.com/langgraph) - [Temporal.io Documentation](https://temporal.io/) - [LiteLLM Documentation](https://docs.litellm.ai/) ### Research and Articles - [Microsoft AI Agent Design Patterns](https://learn.microsoft.com/en-us/azure/architecture/ai-ml/guide/ai-agent-design-patterns) - [Four Design Patterns for Event-Driven Multi-Agent Systems (Confluent)](https://www.confluent.io/blog/event-driven-multi-agent-systems/) - [Google A2A Protocol](https://arxiv.org/html/2505.02279v2) - [Temporal for AI Agents](https://temporal.io/blog/durable-execution-meets-ai-why-temporal-is-the-perfect-foundation-for-ai) - [LangFuse Token Tracking](https://langfuse.com/docs/observability/features/token-and-cost-tracking) - [Portkey LLM Failover Patterns](https://portkey.ai/blog/failover-routing-strategies-for-llms-in-production/) ### Related Syndarix Documents - [SPIKE-001: MCP Integration Pattern](./SPIKE-001-mcp-integration-pattern.md) - [SPIKE-005: LLM Provider Abstraction](./SPIKE-005-llm-provider-abstraction.md) --- ## Decision **Adopt a hybrid architecture** combining: 1. **LangGraph** for agent state machines and logic (graph-based, stateful) 2. **Temporal** for durable, long-running workflow orchestration 3. **Redis Streams** for event-driven agent-to-agent communication 4. **Hierarchical supervisor pattern** with Project Manager coordinating teams 5. **LiteLLM** (per SPIKE-005) for unified LLM access with failover This provides the flexibility, durability, and scalability required for Syndarix's 50+ agent autonomous consulting platform while maintaining individual agent context, comprehensive token tracking, and real-time visibility. --- *Spike completed. Findings will inform ADR-002: Agent Orchestration Architecture.*