Files
syndarix/docs/adrs/ADR-006-agent-orchestration.md
Felipe Cardoso 6e3cdebbfb docs: add architecture decision records (ADRs) for key technical choices
- Added the following ADRs to `docs/adrs/` directory:
  - ADR-001: MCP Integration Architecture
  - ADR-002: Real-time Communication Architecture
  - ADR-003: Background Task Architecture
  - ADR-004: LLM Provider Abstraction
  - ADR-005: Technology Stack Selection
- Each ADR details the context, decision drivers, considered options, final decisions, and implementation plans.
- Documentation aligns technical choices with architecture principles and system requirements for Syndarix.
2025-12-29 13:16:02 +01:00

8.6 KiB

ADR-006: Agent Orchestration Architecture

Status: Accepted Date: 2025-12-29 Deciders: Architecture Team Related Spikes: SPIKE-002


Context

Syndarix requires an agent orchestration system that can:

  • Define reusable agent types with specific capabilities
  • Spawn multiple instances of the same type with unique identities
  • Manage agent state, context, and conversation history
  • Route messages between agents
  • Handle agent failover and recovery
  • Track resource usage per agent

Decision Drivers

  • Flexibility: Support diverse agent roles and capabilities
  • Scalability: Handle 50+ concurrent agent instances
  • Isolation: Each instance maintains separate state
  • Observability: Full visibility into agent activities
  • Reliability: Graceful handling of failures

Decision

Adopt a Type-Instance pattern where:

  • Agent Types define templates (model, expertise, personality)
  • Agent Instances are spawned from types with unique identities
  • Agent Orchestrator manages lifecycle and communication

Architecture

Agent Type Definition

class AgentType(Base):
    id = Column(UUID, primary_key=True)
    name = Column(String(50), unique=True)  # "Software Engineer"
    role = Column(Enum(AgentRole))          # ENGINEER
    base_model = Column(String(100))        # "claude-3-5-sonnet-20241022"
    failover_model = Column(String(100))    # "gpt-4-turbo"
    expertise = Column(ARRAY(String))       # ["python", "fastapi", "testing"]
    personality = Column(JSONB)             # {"style": "detailed", "tone": "professional"}
    system_prompt = Column(Text)            # Base system prompt template
    capabilities = Column(ARRAY(String))    # ["code_generation", "code_review"]
    is_active = Column(Boolean, default=True)

Agent Instance Definition

class AgentInstance(Base):
    id = Column(UUID, primary_key=True)
    name = Column(String(50))               # "Dave"
    agent_type_id = Column(UUID, ForeignKey)
    project_id = Column(UUID, ForeignKey)
    status = Column(Enum(InstanceStatus))   # ACTIVE, IDLE, TERMINATED
    context = Column(JSONB)                 # Current working context
    conversation_id = Column(UUID)          # Active conversation
    rag_collection_id = Column(String)      # Domain knowledge collection
    token_usage = Column(JSONB)             # {"prompt": 0, "completion": 0}
    last_active_at = Column(DateTime)
    created_at = Column(DateTime)
    terminated_at = Column(DateTime)

Orchestrator Service

class AgentOrchestrator:
    """Central service for agent lifecycle management."""

    async def spawn_agent(
        self,
        agent_type_id: UUID,
        project_id: UUID,
        name: str,
        domain_knowledge: list[str] = None
    ) -> AgentInstance:
        """Spawn a new agent instance from a type definition."""
        agent_type = await self.get_agent_type(agent_type_id)

        instance = AgentInstance(
            name=name,
            agent_type_id=agent_type_id,
            project_id=project_id,
            status=InstanceStatus.ACTIVE,
            context={"initialized_at": datetime.utcnow().isoformat()},
        )

        # Initialize RAG collection if domain knowledge provided
        if domain_knowledge:
            instance.rag_collection_id = await self._init_rag_collection(
                instance.id, domain_knowledge
            )

        await self.db.add(instance)
        await self.db.commit()

        # Publish spawn event
        await self.event_bus.publish(f"project:{project_id}", {
            "type": "agent_spawned",
            "agent_id": str(instance.id),
            "name": name,
            "role": agent_type.role.value
        })

        return instance

    async def terminate_agent(self, instance_id: UUID) -> None:
        """Terminate an agent instance and release resources."""
        instance = await self.get_instance(instance_id)
        instance.status = InstanceStatus.TERMINATED
        instance.terminated_at = datetime.utcnow()

        # Cleanup RAG collection
        if instance.rag_collection_id:
            await self._cleanup_rag_collection(instance.rag_collection_id)

        await self.db.commit()

    async def send_message(
        self,
        from_id: UUID,
        to_id: UUID,
        message: AgentMessage
    ) -> None:
        """Route a message from one agent to another."""
        # Validate both agents exist and are active
        sender = await self.get_instance(from_id)
        recipient = await self.get_instance(to_id)

        # Persist message
        await self.message_store.save(message)

        # If recipient is idle, trigger action
        if recipient.status == InstanceStatus.IDLE:
            await self._trigger_agent_action(recipient.id, message)

        # Publish for real-time tracking
        await self.event_bus.publish(f"project:{sender.project_id}", {
            "type": "agent_message",
            "from": str(from_id),
            "to": str(to_id),
            "preview": message.content[:100]
        })

    async def broadcast(
        self,
        from_id: UUID,
        target_role: AgentRole,
        message: AgentMessage
    ) -> None:
        """Broadcast a message to all agents of a specific role."""
        sender = await self.get_instance(from_id)
        recipients = await self.get_instances_by_role(
            sender.project_id, target_role
        )

        for recipient in recipients:
            await self.send_message(from_id, recipient.id, message)

Agent Execution Pattern

class AgentRunner:
    """Executes agent actions using LLM."""

    def __init__(self, instance: AgentInstance, llm_gateway: LLMGateway):
        self.instance = instance
        self.llm = llm_gateway

    async def execute(self, action: str, context: dict) -> dict:
        """Execute an action using the agent's configured model."""
        agent_type = await self.get_agent_type(self.instance.agent_type_id)

        # Build messages with system prompt and context
        messages = [
            {"role": "system", "content": self._build_system_prompt(agent_type)},
            *self._get_conversation_history(),
            {"role": "user", "content": self._build_action_prompt(action, context)}
        ]

        # Add RAG context if available
        if self.instance.rag_collection_id:
            rag_context = await self._query_rag(action, context)
            messages.insert(1, {
                "role": "system",
                "content": f"Relevant context:\n{rag_context}"
            })

        # Execute with failover
        response = await self.llm.complete(
            agent_id=str(self.instance.id),
            project_id=str(self.instance.project_id),
            messages=messages,
            model_preference=self._get_model_preference(agent_type)
        )

        # Update instance context
        self.instance.context = {
            **self.instance.context,
            "last_action": action,
            "last_response_at": datetime.utcnow().isoformat()
        }

        return response

Agent Roles

Role Instances Primary Capabilities
Product Owner 1 requirements, prioritization, client_communication
Project Manager 1 planning, tracking, coordination
Business Analyst 1 analysis, documentation, process_modeling
Software Architect 1 design, architecture_decisions, tech_selection
Software Engineer 1-5 code_generation, code_review, testing
UI/UX Designer 1 design, wireframes, accessibility
QA Engineer 1-2 test_planning, test_automation, bug_reporting
DevOps Engineer 1 cicd, infrastructure, deployment
AI/ML Engineer 1 ml_development, model_training, mlops
Security Expert 1 security_review, vulnerability_assessment

Consequences

Positive

  • Clear separation between type definition and instance runtime
  • Multiple instances share type configuration (DRY)
  • Easy to add new agent roles
  • Full observability through events
  • Graceful failure handling with model failover

Negative

  • Complexity in managing instance lifecycle
  • State synchronization across instances
  • Memory overhead for context storage

Mitigation

  • Context archival for long-running instances
  • Periodic cleanup of terminated instances
  • State compression for large contexts

Compliance

This decision aligns with:

  • FR-101: Agent type configuration
  • FR-102: Agent instance spawning
  • FR-103: Agent domain knowledge (RAG)
  • FR-104: Inter-agent communication
  • FR-105: Agent activity monitoring

This ADR establishes the agent orchestration architecture for Syndarix.