Files
syndarix/docs/spikes/SPIKE-002-agent-orchestration-pattern.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

49 KiB

SPIKE-002: Agent Orchestration Pattern

Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #2


Executive Summary

After researching leading multi-agent orchestration frameworks (AutoGen, CrewAI, LangGraph) and enterprise patterns, we recommend a hybrid architecture for Syndarix that combines:

  1. LangGraph for core agent orchestration logic (graph-based state machines)
  2. Temporal for durable, long-running workflow execution
  3. Event-driven communication via Redis Streams with A2A protocol concepts
  4. Hierarchical supervisor pattern with peer-to-peer collaboration within teams

This architecture provides the flexibility, scalability, and durability required for 50+ concurrent agent instances while maintaining individual agent context, token tracking, and LLM failover capabilities.

Key Recommendation

Do not adopt a single framework wholesale. Instead, build a custom orchestration layer using:

  • LangGraph for agent logic and state transitions (graph primitives)
  • Temporal for durable execution and workflow persistence
  • Redis Streams for event-driven inter-agent communication
  • LiteLLM for unified LLM access with failover (per SPIKE-005)

Research Questions Addressed

1. What are the leading patterns for multi-agent orchestration in 2024-2025?

The multi-agent landscape has matured significantly. Key patterns include:

Pattern Description Best For
Supervisor Central orchestrator coordinates specialized agents Complex multi-domain workflows
Hierarchical Tree structure with managers and workers Large-scale, layered problems
Peer-to-Peer Agents collaborate as equals without central control Open-ended ideation, negotiation
Orchestrator-Worker Planner breaks tasks, workers execute Deterministic pipelines
Blackboard Shared knowledge space, agents react to changes Incremental problem-solving
Market-Based Agents bid for tasks based on capabilities Dynamic resource allocation

Industry Trend (2025): Event-driven architectures are becoming dominant, with protocols like Google's A2A (Agent-to-Agent), IBM's ACP, and AG-UI standardizing communication.

2. Framework Comparison

Feature AutoGen 0.4 CrewAI LangGraph Custom
Architecture Event-driven, distributed Crews + Flows dual-mode Graph-based state machine Flexible
State Management Built-in, cross-language Shared memory Persistent, checkpointed Custom (Temporal)
Scalability High (Kubernetes-ready) Medium-High High Very High
Learning Curve Medium Low-Medium High High
Enterprise Adoption Microsoft ecosystem 60% Fortune 500 Klarna, Replit, Elastic N/A
Long-running Workflows Good Limited Good Excellent (Temporal)
Customization Medium Limited High Full
Token Tracking Basic Basic Via LangSmith Custom
Multi-Provider Failover Limited Limited Limited Full (LiteLLM)
50+ Concurrent Agents Possible Challenging Possible Designed for

AutoGen 0.4 (Microsoft)

Pros:

  • Event-driven, async-first architecture
  • Cross-language support (.NET, Python)
  • Built-in observability (OpenTelemetry)
  • Strong Microsoft ecosystem integration
  • New Agent Framework combines AutoGen + Semantic Kernel

Cons:

  • Tied to Microsoft patterns
  • Less flexible for custom orchestration
  • Newer 0.4 version still maturing

Best For: Microsoft-centric enterprises, standardized agent patterns

CrewAI

Pros:

  • Easy to get started (role-based agents)
  • Good for sequential/hierarchical workflows
  • Strong enterprise traction ($18M Series A)
  • LLM-agnostic design

Cons:

  • Limited ceiling for complex patterns
  • Teams report hitting walls at 6-12 months
  • Multi-agent complexity can cause loops
  • Flows architecture adds learning curve

Best For: Quick prototypes, straightforward workflows, teams new to agents

LangGraph

Pros:

  • Fine-grained control over agent flow
  • Excellent state management with persistence
  • Time-travel debugging (LangSmith)
  • Human-in-the-loop built-in
  • Cycles, conditionals, parallel execution

Cons:

  • Steep learning curve (graph theory, state machines)
  • Requires distributed systems knowledge
  • Observability requires LangSmith subscription

Best For: Complex, stateful workflows requiring precise control

3. Enterprise Agent State Management

Enterprise systems use these patterns:

Event Sourcing:

# All state changes stored as events
class AgentStateEvent:
    event_type: str  # "TASK_ASSIGNED", "STATE_CHANGED", etc.
    agent_id: str
    timestamp: datetime
    data: dict
    sequence_number: int

Checkpoint/Snapshot Pattern:

# Periodic snapshots for fast recovery
class AgentCheckpoint:
    agent_id: str
    state: AgentState
    memory: dict
    context_window: list[Message]
    checkpoint_time: datetime

Temporal Durable Execution:

# Workflow state is automatically persistent
@workflow.defn
class AgentWorkflow:
    @workflow.run
    async def run(self, task: Task) -> Result:
        # State survives crashes, restarts, deployments
        result = await workflow.execute_activity(
            execute_agent_task,
            task,
            start_to_close_timeout=timedelta(hours=24)
        )
        return result

4. Agent-to-Agent Communication

Recommended: Event-Driven with Redis Streams

┌─────────────────────────────────────────────────────────────┐
│                    Redis Streams                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │ project:{project_id}:events                          │    │
│  │   - agent_message                                    │    │
│  │   - task_assignment                                  │    │
│  │   - state_change                                     │    │
│  │   - artifact_produced                                │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
         ▲           ▲           ▲           ▲
         │           │           │           │
    ┌────┴───┐  ┌────┴───┐  ┌────┴───┐  ┌────┴───┐
    │ PO     │  │ Arch   │  │ Dev-1  │  │ Dev-2  │
    │ Agent  │  │ Agent  │  │ Agent  │  │ Agent  │
    └────────┘  └────────┘  └────────┘  └────────┘

Protocol Design (A2A-inspired):

@dataclass
class AgentMessage:
    """Agent-to-Agent message following A2A concepts."""
    id: UUID
    source_agent_id: str
    target_agent_id: str | None  # None = broadcast
    project_id: str
    message_type: Literal[
        "TASK_HANDOFF",
        "CONTEXT_SHARE",
        "REVIEW_REQUEST",
        "FEEDBACK",
        "ARTIFACT",
        "QUERY",
    ]
    payload: dict
    correlation_id: UUID | None  # For request/response
    timestamp: datetime

@dataclass
class AgentCard:
    """Agent capability advertisement (A2A concept)."""
    agent_id: str
    agent_type: str
    capabilities: list[str]
    current_state: AgentState
    project_id: str

5. Long-Running Agent Workflows

Recommended: Temporal for Durable Execution

Temporal provides:

  • Automatic state persistence - survives crashes and restarts
  • Built-in retries with exponential backoff
  • Long-running support - hours, days, even months
  • Human-in-the-loop - pause and wait for approval
  • Visibility - full execution history
# workflows/sprint_workflow.py
from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class SprintWorkflow:
    """Durable workflow for autonomous sprint execution."""

    def __init__(self):
        self._state = SprintState.PLANNING
        self._agents: dict[str, AgentHandle] = {}
        self._artifacts: list[Artifact] = []

    @workflow.run
    async def run(self, sprint: SprintConfig) -> SprintResult:
        # Phase 1: Planning (may take hours)
        backlog = await workflow.execute_activity(
            plan_sprint,
            sprint,
            start_to_close_timeout=timedelta(hours=4),
        )

        # Phase 2: Parallel development (may take days)
        dev_tasks = await self._execute_development(backlog)

        # Phase 3: Review checkpoint (human approval)
        if sprint.autonomy_level != AutonomyLevel.AUTONOMOUS:
            await workflow.wait_condition(
                lambda: self._human_approved
            )

        # Phase 4: QA and finalization
        result = await workflow.execute_activity(
            finalize_sprint,
            dev_tasks,
            start_to_close_timeout=timedelta(hours=2),
        )

        return result

    @workflow.signal
    async def approve_checkpoint(self, approved: bool):
        self._human_approved = approved

    @workflow.query
    def get_state(self) -> SprintState:
        return self._state

6. Orchestration Topology Comparison

Supervisor Pattern

                    ┌─────────────────┐
                    │   Supervisor    │
                    │   (Orchestrator)│
                    └────────┬────────┘
                             │
        ┌────────────────────┼────────────────────┐
        │                    │                    │
        ▼                    ▼                    ▼
┌──────────────┐   ┌──────────────┐   ┌──────────────┐
│ Specialist 1 │   │ Specialist 2 │   │ Specialist 3 │
│ (Dev Agent)  │   │ (QA Agent)   │   │ (DevOps)     │
└──────────────┘   └──────────────┘   └──────────────┘

Pros: Clear control, auditability, easier debugging Cons: Bottleneck at supervisor, single point of failure Syndarix Use: Project Manager as supervisor for sprints

Hierarchical Pattern

                         ┌────────────────┐
                         │  Project Lead  │
                         │      (PO)      │
                         └───────┬────────┘
                                 │
              ┌──────────────────┼──────────────────┐
              │                  │                  │
              ▼                  ▼                  ▼
       ┌────────────┐    ┌────────────┐    ┌────────────┐
       │ Dev Lead   │    │ QA Lead    │    │ Ops Lead   │
       │ (Architect)│    │            │    │            │
       └─────┬──────┘    └─────┬──────┘    └────────────┘
             │                 │
    ┌────────┼────────┐   ┌────┴────┐
    │        │        │   │         │
    ▼        ▼        ▼   ▼         ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│ Dave │ │ Ellis│ │ Kate │ │ QA-1 │ │ QA-2 │
└──────┘ └──────┘ └──────┘ └──────┘ └──────┘

Pros: Scalable, localized decision-making, mirrors real teams Cons: Communication overhead, coordination complexity Syndarix Use: Recommended primary pattern

Peer-to-Peer Pattern

    ┌──────┐     ┌──────┐
    │ Dave │◄───►│ Ellis│
    └──┬───┘     └──┬───┘
       │            │
       │   ┌──────┐ │
       └──►│ Kate │◄┘
           └──┬───┘
              │
              ▼
         [Shared Blackboard]

Pros: Flexible, emergent collaboration, no bottleneck Cons: Harder to control, potential infinite loops Syndarix Use: Within teams for brainstorming/spikes


Proposed Architecture for Syndarix

High-Level Design

┌───────────────────────────────────────────────────────────────────────┐
│                         Syndarix Orchestration Layer                   │
├───────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  ┌─────────────────────────────────────────────────────────────────┐  │
│  │                    Temporal Workflow Engine                      │  │
│  │  ┌───────────────┐  ┌───────────────┐  ┌───────────────┐        │  │
│  │  │ Project       │  │ Sprint        │  │ Agent Task    │        │  │
│  │  │ Workflow      │  │ Workflow      │  │ Workflow      │        │  │
│  │  └───────────────┘  └───────────────┘  └───────────────┘        │  │
│  └─────────────────────────────────────────────────────────────────┘  │
│                                │                                       │
│                                ▼                                       │
│  ┌─────────────────────────────────────────────────────────────────┐  │
│  │                    Agent Runtime (LangGraph)                     │  │
│  │                                                                  │  │
│  │  ┌─────────────────────────────────────────────────────────┐    │  │
│  │  │                  Agent State Graph                       │    │  │
│  │  │  [IDLE] ──► [THINKING] ──► [EXECUTING] ──► [BLOCKED]    │    │  │
│  │  │    ▲            │              │              │          │    │  │
│  │  │    └────────────┴──────────────┴──────────────┘          │    │  │
│  │  └─────────────────────────────────────────────────────────┘    │  │
│  │                                                                  │  │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐           │  │
│  │  │ Agent    │ │ Agent    │ │ Agent    │ │ Agent    │           │  │
│  │  │ Instance │ │ Instance │ │ Instance │ │ Instance │           │  │
│  │  │ (PO)     │ │ (Arch)   │ │ (Dev-1)  │ │ (Dev-2)  │           │  │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘           │  │
│  └─────────────────────────────────────────────────────────────────┘  │
│                                │                                       │
│                                ▼                                       │
│  ┌─────────────────────────────────────────────────────────────────┐  │
│  │                    Communication Layer                           │  │
│  │                                                                  │  │
│  │  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │  │
│  │  │  Redis Streams  │  │  WebSocket/SSE  │  │  Event Bus      │  │  │
│  │  │  (Agent-Agent)  │  │  (Agent-UI)     │  │  (System)       │  │  │
│  │  └─────────────────┘  └─────────────────┘  └─────────────────┘  │  │
│  └─────────────────────────────────────────────────────────────────┘  │
│                                │                                       │
│                                ▼                                       │
│  ┌─────────────────────────────────────────────────────────────────┐  │
│  │                    Infrastructure Layer                          │  │
│  │                                                                  │  │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │  │
│  │  │   LiteLLM    │  │   MCP        │  │   PostgreSQL │          │  │
│  │  │   (Gateway)  │  │   (Servers)  │  │   (pgvector) │          │  │
│  │  └──────────────┘  └──────────────┘  └──────────────┘          │  │
│  └─────────────────────────────────────────────────────────────────┘  │
│                                                                        │
└───────────────────────────────────────────────────────────────────────┘

Agent Instance Model

# app/models/agent_instance.py
from sqlalchemy import Column, String, Enum, ForeignKey, JSON, DateTime
from sqlalchemy.dialects.postgresql import UUID
from app.db.base import Base
import enum

class AgentState(str, enum.Enum):
    IDLE = "idle"
    THINKING = "thinking"
    EXECUTING = "executing"
    WAITING_INPUT = "waiting_input"
    WAITING_REVIEW = "waiting_review"
    BLOCKED = "blocked"
    COMPLETED = "completed"
    ERROR = "error"

class AgentInstance(Base):
    """Individual agent instance spawned from a type."""
    __tablename__ = "agent_instances"

    id = Column(UUID, primary_key=True)
    name = Column(String(50))  # "Dave", "Ellis", "Kate"

    # Type relationship
    agent_type_id = Column(UUID, ForeignKey("agent_types.id"))

    # Project assignment
    project_id = Column(UUID, ForeignKey("projects.id"))

    # Current state
    state = Column(Enum(AgentState), default=AgentState.IDLE)
    current_task_id = Column(UUID, ForeignKey("tasks.id"), nullable=True)

    # Context/Memory
    working_memory = Column(JSON, default=dict)  # Short-term context
    system_prompt_override = Column(String, nullable=True)

    # Token/Cost tracking (per instance)
    total_tokens_used = Column(Integer, default=0)
    total_cost_usd = Column(Float, default=0.0)

    # Timestamps
    created_at = Column(DateTime, default=datetime.utcnow)
    last_active_at = Column(DateTime, nullable=True)

    # Relationships
    agent_type = relationship("AgentType", back_populates="instances")
    project = relationship("Project", back_populates="agents")
    conversations = relationship("Conversation", back_populates="agent")

Agent State Machine (LangGraph)

# app/agents/state_machine.py
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict, Annotated, Literal
from operator import add

class AgentGraphState(TypedDict):
    """State shared across agent graph nodes."""
    agent_id: str
    project_id: str
    task: dict | None
    messages: Annotated[list, add]  # Append-only
    artifacts: Annotated[list, add]
    current_state: AgentState
    iteration_count: int
    error: str | None

def create_agent_graph(agent_type: AgentType) -> StateGraph:
    """Create a LangGraph state machine for an agent type."""

    graph = StateGraph(AgentGraphState)

    # Define nodes
    graph.add_node("think", think_node)
    graph.add_node("execute", execute_node)
    graph.add_node("review", review_node)
    graph.add_node("handoff", handoff_node)
    graph.add_node("error_handler", error_node)

    # Define edges
    graph.set_entry_point("think")

    graph.add_conditional_edges(
        "think",
        route_after_think,
        {
            "execute": "execute",
            "handoff": "handoff",
            "review": "review",
            "end": END,
        }
    )

    graph.add_conditional_edges(
        "execute",
        route_after_execute,
        {
            "think": "think",  # Continue reasoning
            "review": "review",
            "error": "error_handler",
        }
    )

    graph.add_edge("review", "think")
    graph.add_edge("error_handler", END)
    graph.add_edge("handoff", END)

    return graph.compile(
        checkpointer=PostgresSaver.from_conn_string(settings.DATABASE_URL)
    )

async def think_node(state: AgentGraphState) -> AgentGraphState:
    """Agent reasoning/planning node."""
    agent = await get_agent_instance(state["agent_id"])

    # Get LLM response
    response = await llm_gateway.complete(
        agent_id=state["agent_id"],
        project_id=state["project_id"],
        messages=build_messages(agent, state),
        model_preference=agent.agent_type.model_preference,
    )

    # Parse response for next action
    action = parse_agent_action(response["content"])

    return {
        **state,
        "messages": [{"role": "assistant", "content": response["content"]}],
        "current_action": action,
        "iteration_count": state["iteration_count"] + 1,
    }

async def execute_node(state: AgentGraphState) -> AgentGraphState:
    """Execute agent action (tool calls, artifact creation)."""
    action = state.get("current_action")

    if action["type"] == "tool_call":
        result = await mcp_client.call_tool(
            server=action["server"],
            tool_name=action["tool"],
            arguments={
                "project_id": state["project_id"],
                "agent_id": state["agent_id"],
                **action["arguments"],
            }
        )
        return {
            **state,
            "messages": [{"role": "tool", "content": str(result)}],
        }

    elif action["type"] == "produce_artifact":
        artifact = await create_artifact(action["artifact"])
        return {
            **state,
            "artifacts": [artifact],
        }

    return state

Inter-Agent Communication

# app/agents/communication.py
from redis import asyncio as aioredis
from dataclasses import dataclass, asdict
import json

@dataclass
class AgentMessage:
    id: str
    source_agent_id: str
    target_agent_id: str | None
    project_id: str
    message_type: str
    payload: dict
    timestamp: str
    correlation_id: str | None = None

class AgentMessageBus:
    """Redis Streams-based message bus for agent communication."""

    def __init__(self, redis: aioredis.Redis):
        self.redis = redis

    async def publish(self, message: AgentMessage) -> str:
        """Publish message to project stream."""
        stream_key = f"project:{message.project_id}:agent_events"

        message_id = await self.redis.xadd(
            stream_key,
            asdict(message),
            maxlen=10000,  # Keep last 10k messages
        )

        # Also publish to target-specific stream if targeted
        if message.target_agent_id:
            target_stream = f"agent:{message.target_agent_id}:inbox"
            await self.redis.xadd(target_stream, asdict(message))

        return message_id

    async def subscribe(
        self,
        agent_id: str,
        project_id: str,
    ) -> AsyncGenerator[AgentMessage, None]:
        """Subscribe to messages for an agent."""
        streams = {
            f"agent:{agent_id}:inbox": ">",
            f"project:{project_id}:agent_events": ">",
        }

        while True:
            messages = await self.redis.xread(
                streams,
                count=10,
                block=5000,  # 5 second timeout
            )

            for stream_name, stream_messages in messages:
                for msg_id, msg_data in stream_messages:
                    yield AgentMessage(**msg_data)

    async def handoff_task(
        self,
        source_agent_id: str,
        target_agent_id: str,
        project_id: str,
        task: dict,
        context: dict,
    ) -> None:
        """Hand off a task from one agent to another."""
        message = AgentMessage(
            id=str(uuid4()),
            source_agent_id=source_agent_id,
            target_agent_id=target_agent_id,
            project_id=project_id,
            message_type="TASK_HANDOFF",
            payload={
                "task": task,
                "context": context,
                "handoff_reason": "task_completion",
            },
            timestamp=datetime.utcnow().isoformat(),
        )
        await self.publish(message)

Agent Supervisor (Hierarchical Control)

# app/agents/supervisor.py
from langgraph.graph import StateGraph, END

class SupervisorState(TypedDict):
    """State for supervisor agent managing a team."""
    project_id: str
    team_agents: list[str]
    pending_tasks: list[dict]
    in_progress: dict[str, dict]  # agent_id -> task
    completed_tasks: list[dict]
    current_phase: str

def create_supervisor_graph(supervisor_type: str) -> StateGraph:
    """Create supervisor graph for team coordination."""

    graph = StateGraph(SupervisorState)

    graph.add_node("assess_state", assess_team_state)
    graph.add_node("assign_tasks", assign_tasks_to_agents)
    graph.add_node("monitor_progress", monitor_agent_progress)
    graph.add_node("handle_completion", handle_task_completion)
    graph.add_node("coordinate_handoff", coordinate_agent_handoff)
    graph.add_node("escalate", escalate_to_human)

    graph.set_entry_point("assess_state")

    graph.add_conditional_edges(
        "assess_state",
        decide_supervisor_action,
        {
            "assign": "assign_tasks",
            "monitor": "monitor_progress",
            "coordinate": "coordinate_handoff",
            "escalate": "escalate",
            "complete": END,
        }
    )

    # ... additional edges

    return graph.compile()

async def assess_team_state(state: SupervisorState) -> SupervisorState:
    """Assess current state of all team agents."""
    team_status = {}

    for agent_id in state["team_agents"]:
        agent = await get_agent_instance(agent_id)
        team_status[agent_id] = {
            "state": agent.state,
            "current_task": agent.current_task_id,
            "last_active": agent.last_active_at,
        }

    return {
        **state,
        "team_status": team_status,
    }

async def assign_tasks_to_agents(state: SupervisorState) -> SupervisorState:
    """Assign pending tasks to available agents."""
    available_agents = [
        agent_id for agent_id, status in state["team_status"].items()
        if status["state"] == AgentState.IDLE
    ]

    assignments = []
    for task in state["pending_tasks"]:
        if not available_agents:
            break

        # Find best agent for task
        best_agent = await select_best_agent(task, available_agents)

        # Assign task
        await message_bus.publish(AgentMessage(
            id=str(uuid4()),
            source_agent_id="supervisor",
            target_agent_id=best_agent,
            project_id=state["project_id"],
            message_type="TASK_ASSIGNMENT",
            payload={"task": task},
            timestamp=datetime.utcnow().isoformat(),
        ))

        assignments.append((best_agent, task))
        available_agents.remove(best_agent)

    return {
        **state,
        "in_progress": {
            **state["in_progress"],
            **{agent: task for agent, task in assignments},
        },
        "pending_tasks": [
            t for t in state["pending_tasks"]
            if t not in [task for _, task in assignments]
        ],
    }

Temporal Workflow Integration

# app/workflows/project_workflow.py
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
from datetime import timedelta

@workflow.defn
class ProjectWorkflow:
    """Long-running workflow for project lifecycle."""

    def __init__(self):
        self._phase = ProjectPhase.DISCOVERY
        self._sprints: list[SprintResult] = []
        self._human_feedback_pending = False

    @workflow.run
    async def run(self, project: ProjectConfig) -> ProjectResult:
        # Phase 1: Discovery (PO + BA brainstorm)
        requirements = await workflow.execute_activity(
            run_discovery_phase,
            project,
            start_to_close_timeout=timedelta(hours=8),
            retry_policy=RetryPolicy(maximum_attempts=3),
        )

        # Phase 2: Architecture spike
        architecture = await workflow.execute_activity(
            run_architecture_spike,
            requirements,
            start_to_close_timeout=timedelta(hours=4),
        )

        # Checkpoint: Human review of architecture
        if project.autonomy_level != AutonomyLevel.AUTONOMOUS:
            self._human_feedback_pending = True
            await workflow.wait_condition(
                lambda: not self._human_feedback_pending
            )

        # Phase 3: Sprint execution (iterative)
        while not self._is_complete():
            sprint_result = await workflow.execute_child_workflow(
                SprintWorkflow.run,
                self._next_sprint_config(),
                id=f"{project.id}-sprint-{len(self._sprints) + 1}",
            )
            self._sprints.append(sprint_result)

            # Milestone checkpoint if configured
            if project.autonomy_level == AutonomyLevel.MILESTONE:
                self._human_feedback_pending = True
                await workflow.wait_condition(
                    lambda: not self._human_feedback_pending
                )

        return ProjectResult(
            project_id=project.id,
            sprints=self._sprints,
            status="completed",
        )

    @workflow.signal
    async def provide_feedback(self, feedback: HumanFeedback):
        """Handle human feedback at checkpoints."""
        self._last_feedback = feedback
        self._human_feedback_pending = False

    @workflow.query
    def get_status(self) -> ProjectStatus:
        return ProjectStatus(
            phase=self._phase,
            sprints_completed=len(self._sprints),
            awaiting_feedback=self._human_feedback_pending,
        )

Token Usage Tracking Per Agent

# app/services/agent_metrics.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.token_usage import TokenUsage

class AgentMetricsService:
    """Track token usage and costs per agent instance."""

    def __init__(self, db: AsyncSession):
        self.db = db

    async def record_completion(
        self,
        agent_instance_id: str,
        project_id: str,
        model: str,
        prompt_tokens: int,
        completion_tokens: int,
        latency_ms: int,
    ) -> TokenUsage:
        """Record a completion for an agent instance."""
        cost = self._calculate_cost(model, prompt_tokens, completion_tokens)

        usage = TokenUsage(
            agent_instance_id=agent_instance_id,
            project_id=project_id,
            model=model,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=prompt_tokens + completion_tokens,
            cost_usd=cost,
            latency_ms=latency_ms,
            timestamp=datetime.utcnow(),
        )

        self.db.add(usage)

        # Update agent instance totals
        agent = await self.db.get(AgentInstance, agent_instance_id)
        agent.total_tokens_used += usage.total_tokens
        agent.total_cost_usd += cost
        agent.last_active_at = datetime.utcnow()

        await self.db.commit()
        return usage

    async def get_agent_usage_summary(
        self,
        agent_instance_id: str,
        period_days: int = 30,
    ) -> dict:
        """Get usage summary for an agent instance."""
        cutoff = datetime.utcnow() - timedelta(days=period_days)

        result = await self.db.execute(
            select(
                func.sum(TokenUsage.total_tokens).label("total_tokens"),
                func.sum(TokenUsage.cost_usd).label("total_cost"),
                func.count(TokenUsage.id).label("completion_count"),
                func.avg(TokenUsage.latency_ms).label("avg_latency"),
            )
            .where(TokenUsage.agent_instance_id == agent_instance_id)
            .where(TokenUsage.timestamp >= cutoff)
        )

        return dict(result.first())

    async def get_project_agent_breakdown(
        self,
        project_id: str,
    ) -> list[dict]:
        """Get token usage breakdown by agent for a project."""
        result = await self.db.execute(
            select(
                AgentInstance.id,
                AgentInstance.name,
                AgentType.role,
                func.sum(TokenUsage.total_tokens).label("tokens"),
                func.sum(TokenUsage.cost_usd).label("cost"),
            )
            .join(TokenUsage, TokenUsage.agent_instance_id == AgentInstance.id)
            .join(AgentType, AgentType.id == AgentInstance.agent_type_id)
            .where(AgentInstance.project_id == project_id)
            .group_by(AgentInstance.id, AgentInstance.name, AgentType.role)
            .order_by(func.sum(TokenUsage.cost_usd).desc())
        )

        return [dict(row) for row in result]

Real-Time Agent Activity Visibility

# app/services/realtime_updates.py
from fastapi import WebSocket
from redis import asyncio as aioredis

class AgentActivityStream:
    """Real-time agent activity updates via WebSocket/SSE."""

    def __init__(self, redis: aioredis.Redis):
        self.redis = redis
        self.pubsub = redis.pubsub()

    async def subscribe_project(
        self,
        websocket: WebSocket,
        project_id: str,
    ):
        """Stream agent activities for a project."""
        await websocket.accept()

        channel = f"project:{project_id}:activities"
        await self.pubsub.subscribe(channel)

        try:
            async for message in self.pubsub.listen():
                if message["type"] == "message":
                    await websocket.send_json(json.loads(message["data"]))
        except WebSocketDisconnect:
            await self.pubsub.unsubscribe(channel)

    async def publish_activity(
        self,
        project_id: str,
        activity: AgentActivity,
    ):
        """Publish an agent activity event."""
        channel = f"project:{project_id}:activities"
        await self.redis.publish(channel, json.dumps({
            "type": "agent_activity",
            "agent_id": activity.agent_id,
            "agent_name": activity.agent_name,
            "agent_role": activity.agent_role,
            "activity_type": activity.activity_type,
            "description": activity.description,
            "state": activity.state,
            "timestamp": activity.timestamp.isoformat(),
            "metadata": activity.metadata,
        }))

# Activity types for real-time updates
class AgentActivityType(str, enum.Enum):
    STATE_CHANGE = "state_change"
    THINKING = "thinking"
    TOOL_CALL = "tool_call"
    MESSAGE_SENT = "message_sent"
    ARTIFACT_CREATED = "artifact_created"
    ERROR = "error"
    TASK_STARTED = "task_started"
    TASK_COMPLETED = "task_completed"

Code Examples for Key Patterns

Pattern 1: Spawning Multiple Agent Instances

# app/services/agent_factory.py
class AgentFactory:
    """Factory for creating agent instances from types."""

    DEVELOPER_NAMES = ["Dave", "Ellis", "Kate", "Marcus", "Nina"]
    QA_NAMES = ["Quinn", "Raja", "Sierra", "Tyler", "Uma"]

    async def spawn_agent_team(
        self,
        project_id: str,
        team_config: TeamConfig,
    ) -> list[AgentInstance]:
        """Spawn a team of agents for a project."""
        agents = []
        name_counters = {}

        for agent_spec in team_config.agents:
            agent_type = await self.db.get(AgentType, agent_spec.type_id)

            # Get next available name for this type
            name = self._get_next_name(agent_type.role, name_counters)

            agent = AgentInstance(
                id=uuid4(),
                name=name,
                agent_type_id=agent_type.id,
                project_id=project_id,
                state=AgentState.IDLE,
                working_memory={
                    "project_context": {},
                    "conversation_history": [],
                    "current_focus": None,
                },
            )

            self.db.add(agent)
            agents.append(agent)

        await self.db.commit()

        # Initialize agent graphs
        for agent in agents:
            await self._initialize_agent_graph(agent)

        return agents

    def _get_next_name(self, role: str, counters: dict) -> str:
        """Get next name for an agent role."""
        names = {
            "Software Engineer": self.DEVELOPER_NAMES,
            "QA Engineer": self.QA_NAMES,
            # ... other roles
        }.get(role, ["Agent"])

        idx = counters.get(role, 0)
        counters[role] = idx + 1

        if idx < len(names):
            return names[idx]
        return f"{names[0]}-{idx + 1}"

Pattern 2: LLM Failover Per Agent Type

# app/agents/llm_config.py
from litellm import Router

class AgentLLMConfig:
    """Per-agent-type LLM configuration with failover."""

    # High-stakes agents get premium models with reliable fallbacks
    HIGH_REASONING_CHAIN = [
        {"model": "claude-sonnet-4-20250514", "provider": "anthropic"},
        {"model": "gpt-4-turbo", "provider": "openai"},
        {"model": "claude-3-5-sonnet-20241022", "provider": "anthropic"},
    ]

    # Fast agents get quick models
    FAST_RESPONSE_CHAIN = [
        {"model": "claude-3-haiku-20240307", "provider": "anthropic"},
        {"model": "gpt-4o-mini", "provider": "openai"},
    ]

    # Cost-optimized for high-volume tasks
    COST_OPTIMIZED_CHAIN = [
        {"model": "ollama/llama3", "provider": "ollama"},
        {"model": "gpt-4o-mini", "provider": "openai"},
    ]

    AGENT_TYPE_MAPPING = {
        "Product Owner": HIGH_REASONING_CHAIN,
        "Software Architect": HIGH_REASONING_CHAIN,
        "Software Engineer": HIGH_REASONING_CHAIN,
        "Business Analyst": HIGH_REASONING_CHAIN,
        "QA Engineer": FAST_RESPONSE_CHAIN,
        "Project Manager": FAST_RESPONSE_CHAIN,
        "DevOps Engineer": FAST_RESPONSE_CHAIN,
    }

    def build_router_for_agent(self, agent_type: str) -> Router:
        """Build LiteLLM router with failover for agent type."""
        chain = self.AGENT_TYPE_MAPPING.get(agent_type, self.FAST_RESPONSE_CHAIN)

        model_list = []
        for i, model_config in enumerate(chain):
            model_list.append({
                "model_name": f"agent-{agent_type.lower().replace(' ', '-')}",
                "litellm_params": {
                    "model": model_config["model"],
                    "api_key": self._get_api_key(model_config["provider"]),
                },
                "model_info": {"id": i, "priority": i},
            })

        return Router(
            model_list=model_list,
            routing_strategy="simple-shuffle",
            num_retries=3,
            timeout=120,
            fallbacks=[
                {f"agent-{agent_type.lower().replace(' ', '-')}":
                 [f"agent-{agent_type.lower().replace(' ', '-')}"]}
            ],
        )

Pattern 3: Agent Context Isolation

# app/agents/context_manager.py
class AgentContextManager:
    """Manage individual agent context/memory."""

    def __init__(self, redis: aioredis.Redis, db: AsyncSession):
        self.redis = redis
        self.db = db

    async def get_agent_context(
        self,
        agent_id: str,
        max_messages: int = 50,
    ) -> AgentContext:
        """Get full context for an agent."""
        agent = await self.db.get(AgentInstance, agent_id)

        # Get recent conversation history
        conversations = await self._get_recent_conversations(
            agent_id, max_messages
        )

        # Get project context
        project_context = await self._get_project_context(agent.project_id)

        # Get working memory from Redis (fast access)
        working_memory = await self.redis.hgetall(
            f"agent:{agent_id}:memory"
        )

        return AgentContext(
            agent=agent,
            system_prompt=self._build_system_prompt(agent),
            conversation_history=conversations,
            project_context=project_context,
            working_memory=working_memory,
            tools=await self._get_available_tools(agent),
        )

    def _build_system_prompt(self, agent: AgentInstance) -> str:
        """Build personalized system prompt for agent."""
        base_prompt = agent.agent_type.system_prompt

        return f"""
{base_prompt}

You are {agent.name}, a {agent.agent_type.role} working on project {agent.project.name}.

Your personality traits:
{agent.agent_type.personality_traits}

Current focus areas:
{json.dumps(agent.working_memory.get('current_focus', []))}

Remember: You have your own perspective and expertise. Collaborate with other agents
but bring your unique viewpoint to discussions.
"""

    async def update_working_memory(
        self,
        agent_id: str,
        key: str,
        value: Any,
        ttl: int = 3600,
    ):
        """Update agent's working memory."""
        await self.redis.hset(
            f"agent:{agent_id}:memory",
            key,
            json.dumps(value),
        )
        await self.redis.expire(f"agent:{agent_id}:memory", ttl)

Risks and Mitigations

Risk Impact Probability Mitigation
Temporal complexity High Medium Start with simple workflows, invest in team training
LangGraph learning curve Medium High Create abstractions, use simpler patterns initially
Agent coordination deadlocks High Medium Implement timeouts, circuit breakers, supervisor oversight
Token cost explosion High Medium Budget caps, usage monitoring, caching strategies
State consistency issues High Low Event sourcing, Temporal durability, comprehensive testing
Debugging complexity Medium High Invest in observability (LangSmith, Temporal UI)
Vendor lock-in (LangGraph) Medium Low Abstract core patterns, LangGraph is open source

Mitigation Strategies

1. Complexity Management:

  • Start with 2-3 agent types, expand gradually
  • Build comprehensive test harnesses
  • Document all state transitions and message types

2. Cost Control:

class BudgetGuard:
    async def check_before_completion(
        self,
        project_id: str,
        estimated_tokens: int,
    ) -> bool:
        project = await self.db.get(Project, project_id)
        current_spend = await self.get_project_spend(project_id)
        estimated_cost = self._estimate_cost(estimated_tokens)

        if current_spend + estimated_cost > project.budget_limit:
            await self.notify_budget_exceeded(project_id)
            return False
        return True

3. Deadlock Prevention:

class AgentTimeoutGuard:
    THINKING_TIMEOUT = 300  # 5 minutes
    EXECUTION_TIMEOUT = 600  # 10 minutes
    BLOCKED_TIMEOUT = 1800  # 30 minutes

    async def monitor_agent(self, agent_id: str):
        agent = await self.db.get(AgentInstance, agent_id)
        timeout = self._get_timeout(agent.state)

        if agent.last_active_at < datetime.utcnow() - timedelta(seconds=timeout):
            await self.handle_timeout(agent)

Implementation Roadmap

Phase 1: Foundation (Weeks 1-2)

  • Set up Temporal server and workers
  • Implement basic LangGraph agent state machine
  • Create AgentInstance model and database schema
  • Implement Redis Streams message bus

Phase 2: Core Orchestration (Weeks 3-4)

  • Implement supervisor pattern with Project Manager
  • Build agent spawning and team creation
  • Add token tracking per agent instance
  • Create basic inter-agent communication

Phase 3: Durability (Weeks 5-6)

  • Implement Temporal workflows for projects and sprints
  • Add checkpoint and recovery mechanisms
  • Build human-in-the-loop approval flows
  • Create agent context persistence

Phase 4: Observability (Week 7)

  • Implement real-time activity streaming
  • Add comprehensive logging and tracing
  • Build cost monitoring dashboards
  • Create agent performance analytics

References

Frameworks and Tools

Research and Articles


Decision

Adopt a hybrid architecture combining:

  1. LangGraph for agent state machines and logic (graph-based, stateful)
  2. Temporal for durable, long-running workflow orchestration
  3. Redis Streams for event-driven agent-to-agent communication
  4. Hierarchical supervisor pattern with Project Manager coordinating teams
  5. LiteLLM (per SPIKE-005) for unified LLM access with failover

This provides the flexibility, durability, and scalability required for Syndarix's 50+ agent autonomous consulting platform while maintaining individual agent context, comprehensive token tracking, and real-time visibility.


Spike completed. Findings will inform ADR-002: Agent Orchestration Architecture.