# ADR-006: Agent Orchestration Architecture **Status:** Accepted **Date:** 2025-12-29 **Deciders:** Architecture Team **Related Spikes:** SPIKE-002 --- ## Context Syndarix requires an agent orchestration system that can: - Define reusable agent types with specific capabilities - Spawn multiple instances of the same type with unique identities - Manage agent state, context, and conversation history - Route messages between agents - Handle agent failover and recovery - Track resource usage per agent ## Decision Drivers - **Flexibility:** Support diverse agent roles and capabilities - **Scalability:** Handle 50+ concurrent agent instances - **Isolation:** Each instance maintains separate state - **Observability:** Full visibility into agent activities - **Reliability:** Graceful handling of failures ## Decision **Adopt a Type-Instance pattern** where: - **Agent Types** define templates (model, expertise, personality) - **Agent Instances** are spawned from types with unique identities - **Agent Orchestrator** manages lifecycle and communication ## Architecture ### Agent Type Definition ```python class AgentType(Base): id = Column(UUID, primary_key=True) name = Column(String(50), unique=True) # "Software Engineer" role = Column(Enum(AgentRole)) # ENGINEER base_model = Column(String(100)) # "claude-3-5-sonnet-20241022" failover_model = Column(String(100)) # "gpt-4-turbo" expertise = Column(ARRAY(String)) # ["python", "fastapi", "testing"] personality = Column(JSONB) # {"style": "detailed", "tone": "professional"} system_prompt = Column(Text) # Base system prompt template capabilities = Column(ARRAY(String)) # ["code_generation", "code_review"] is_active = Column(Boolean, default=True) ``` ### Agent Instance Definition ```python class AgentInstance(Base): id = Column(UUID, primary_key=True) name = Column(String(50)) # "Dave" agent_type_id = Column(UUID, ForeignKey) project_id = Column(UUID, ForeignKey) status = Column(Enum(InstanceStatus)) # ACTIVE, IDLE, TERMINATED context = Column(JSONB) # Current working context conversation_id = Column(UUID) # Active conversation rag_collection_id = Column(String) # Domain knowledge collection token_usage = Column(JSONB) # {"prompt": 0, "completion": 0} last_active_at = Column(DateTime) created_at = Column(DateTime) terminated_at = Column(DateTime) ``` ### Orchestrator Service ```python class AgentOrchestrator: """Central service for agent lifecycle management.""" async def spawn_agent( self, agent_type_id: UUID, project_id: UUID, name: str, domain_knowledge: list[str] = None ) -> AgentInstance: """Spawn a new agent instance from a type definition.""" agent_type = await self.get_agent_type(agent_type_id) instance = AgentInstance( name=name, agent_type_id=agent_type_id, project_id=project_id, status=InstanceStatus.ACTIVE, context={"initialized_at": datetime.utcnow().isoformat()}, ) # Initialize RAG collection if domain knowledge provided if domain_knowledge: instance.rag_collection_id = await self._init_rag_collection( instance.id, domain_knowledge ) await self.db.add(instance) await self.db.commit() # Publish spawn event await self.event_bus.publish(f"project:{project_id}", { "type": "agent_spawned", "agent_id": str(instance.id), "name": name, "role": agent_type.role.value }) return instance async def terminate_agent(self, instance_id: UUID) -> None: """Terminate an agent instance and release resources.""" instance = await self.get_instance(instance_id) instance.status = InstanceStatus.TERMINATED instance.terminated_at = datetime.utcnow() # Cleanup RAG collection if instance.rag_collection_id: await self._cleanup_rag_collection(instance.rag_collection_id) await self.db.commit() async def send_message( self, from_id: UUID, to_id: UUID, message: AgentMessage ) -> None: """Route a message from one agent to another.""" # Validate both agents exist and are active sender = await self.get_instance(from_id) recipient = await self.get_instance(to_id) # Persist message await self.message_store.save(message) # If recipient is idle, trigger action if recipient.status == InstanceStatus.IDLE: await self._trigger_agent_action(recipient.id, message) # Publish for real-time tracking await self.event_bus.publish(f"project:{sender.project_id}", { "type": "agent_message", "from": str(from_id), "to": str(to_id), "preview": message.content[:100] }) async def broadcast( self, from_id: UUID, target_role: AgentRole, message: AgentMessage ) -> None: """Broadcast a message to all agents of a specific role.""" sender = await self.get_instance(from_id) recipients = await self.get_instances_by_role( sender.project_id, target_role ) for recipient in recipients: await self.send_message(from_id, recipient.id, message) ``` ### Agent Execution Pattern ```python class AgentRunner: """Executes agent actions using LLM.""" def __init__(self, instance: AgentInstance, llm_gateway: LLMGateway): self.instance = instance self.llm = llm_gateway async def execute(self, action: str, context: dict) -> dict: """Execute an action using the agent's configured model.""" agent_type = await self.get_agent_type(self.instance.agent_type_id) # Build messages with system prompt and context messages = [ {"role": "system", "content": self._build_system_prompt(agent_type)}, *self._get_conversation_history(), {"role": "user", "content": self._build_action_prompt(action, context)} ] # Add RAG context if available if self.instance.rag_collection_id: rag_context = await self._query_rag(action, context) messages.insert(1, { "role": "system", "content": f"Relevant context:\n{rag_context}" }) # Execute with failover response = await self.llm.complete( agent_id=str(self.instance.id), project_id=str(self.instance.project_id), messages=messages, model_preference=self._get_model_preference(agent_type) ) # Update instance context self.instance.context = { **self.instance.context, "last_action": action, "last_response_at": datetime.utcnow().isoformat() } return response ``` ### Agent Roles | Role | Instances | Primary Capabilities | |------|-----------|---------------------| | Product Owner | 1 | requirements, prioritization, client_communication | | Project Manager | 1 | planning, tracking, coordination | | Business Analyst | 1 | analysis, documentation, process_modeling | | Software Architect | 1 | design, architecture_decisions, tech_selection | | Software Engineer | 1-5 | code_generation, code_review, testing | | UI/UX Designer | 1 | design, wireframes, accessibility | | QA Engineer | 1-2 | test_planning, test_automation, bug_reporting | | DevOps Engineer | 1 | cicd, infrastructure, deployment | | AI/ML Engineer | 1 | ml_development, model_training, mlops | | Security Expert | 1 | security_review, vulnerability_assessment | ## Consequences ### Positive - Clear separation between type definition and instance runtime - Multiple instances share type configuration (DRY) - Easy to add new agent roles - Full observability through events - Graceful failure handling with model failover ### Negative - Complexity in managing instance lifecycle - State synchronization across instances - Memory overhead for context storage ### Mitigation - Context archival for long-running instances - Periodic cleanup of terminated instances - State compression for large contexts ## Compliance This decision aligns with: - FR-101: Agent type configuration - FR-102: Agent instance spawning - FR-103: Agent domain knowledge (RAG) - FR-104: Inter-agent communication - FR-105: Agent activity monitoring --- *This ADR establishes the agent orchestration architecture for Syndarix.*