forked from cardosofelipe/fast-next-template
- Added the following ADRs to `docs/adrs/` directory: - ADR-001: MCP Integration Architecture - ADR-002: Real-time Communication Architecture - ADR-003: Background Task Architecture - ADR-004: LLM Provider Abstraction - ADR-005: Technology Stack Selection - Each ADR details the context, decision drivers, considered options, final decisions, and implementation plans. - Documentation aligns technical choices with architecture principles and system requirements for Syndarix.
261 lines
8.6 KiB
Markdown
261 lines
8.6 KiB
Markdown
# ADR-006: Agent Orchestration Architecture
|
|
|
|
**Status:** Accepted
|
|
**Date:** 2025-12-29
|
|
**Deciders:** Architecture Team
|
|
**Related Spikes:** SPIKE-002
|
|
|
|
---
|
|
|
|
## Context
|
|
|
|
Syndarix requires an agent orchestration system that can:
|
|
- Define reusable agent types with specific capabilities
|
|
- Spawn multiple instances of the same type with unique identities
|
|
- Manage agent state, context, and conversation history
|
|
- Route messages between agents
|
|
- Handle agent failover and recovery
|
|
- Track resource usage per agent
|
|
|
|
## Decision Drivers
|
|
|
|
- **Flexibility:** Support diverse agent roles and capabilities
|
|
- **Scalability:** Handle 50+ concurrent agent instances
|
|
- **Isolation:** Each instance maintains separate state
|
|
- **Observability:** Full visibility into agent activities
|
|
- **Reliability:** Graceful handling of failures
|
|
|
|
## Decision
|
|
|
|
**Adopt a Type-Instance pattern** where:
|
|
- **Agent Types** define templates (model, expertise, personality)
|
|
- **Agent Instances** are spawned from types with unique identities
|
|
- **Agent Orchestrator** manages lifecycle and communication
|
|
|
|
## Architecture
|
|
|
|
### Agent Type Definition
|
|
|
|
```python
|
|
class AgentType(Base):
|
|
id = Column(UUID, primary_key=True)
|
|
name = Column(String(50), unique=True) # "Software Engineer"
|
|
role = Column(Enum(AgentRole)) # ENGINEER
|
|
base_model = Column(String(100)) # "claude-3-5-sonnet-20241022"
|
|
failover_model = Column(String(100)) # "gpt-4-turbo"
|
|
expertise = Column(ARRAY(String)) # ["python", "fastapi", "testing"]
|
|
personality = Column(JSONB) # {"style": "detailed", "tone": "professional"}
|
|
system_prompt = Column(Text) # Base system prompt template
|
|
capabilities = Column(ARRAY(String)) # ["code_generation", "code_review"]
|
|
is_active = Column(Boolean, default=True)
|
|
```
|
|
|
|
### Agent Instance Definition
|
|
|
|
```python
|
|
class AgentInstance(Base):
|
|
id = Column(UUID, primary_key=True)
|
|
name = Column(String(50)) # "Dave"
|
|
agent_type_id = Column(UUID, ForeignKey)
|
|
project_id = Column(UUID, ForeignKey)
|
|
status = Column(Enum(InstanceStatus)) # ACTIVE, IDLE, TERMINATED
|
|
context = Column(JSONB) # Current working context
|
|
conversation_id = Column(UUID) # Active conversation
|
|
rag_collection_id = Column(String) # Domain knowledge collection
|
|
token_usage = Column(JSONB) # {"prompt": 0, "completion": 0}
|
|
last_active_at = Column(DateTime)
|
|
created_at = Column(DateTime)
|
|
terminated_at = Column(DateTime)
|
|
```
|
|
|
|
### Orchestrator Service
|
|
|
|
```python
|
|
class AgentOrchestrator:
|
|
"""Central service for agent lifecycle management."""
|
|
|
|
async def spawn_agent(
|
|
self,
|
|
agent_type_id: UUID,
|
|
project_id: UUID,
|
|
name: str,
|
|
domain_knowledge: list[str] = None
|
|
) -> AgentInstance:
|
|
"""Spawn a new agent instance from a type definition."""
|
|
agent_type = await self.get_agent_type(agent_type_id)
|
|
|
|
instance = AgentInstance(
|
|
name=name,
|
|
agent_type_id=agent_type_id,
|
|
project_id=project_id,
|
|
status=InstanceStatus.ACTIVE,
|
|
context={"initialized_at": datetime.utcnow().isoformat()},
|
|
)
|
|
|
|
# Initialize RAG collection if domain knowledge provided
|
|
if domain_knowledge:
|
|
instance.rag_collection_id = await self._init_rag_collection(
|
|
instance.id, domain_knowledge
|
|
)
|
|
|
|
await self.db.add(instance)
|
|
await self.db.commit()
|
|
|
|
# Publish spawn event
|
|
await self.event_bus.publish(f"project:{project_id}", {
|
|
"type": "agent_spawned",
|
|
"agent_id": str(instance.id),
|
|
"name": name,
|
|
"role": agent_type.role.value
|
|
})
|
|
|
|
return instance
|
|
|
|
async def terminate_agent(self, instance_id: UUID) -> None:
|
|
"""Terminate an agent instance and release resources."""
|
|
instance = await self.get_instance(instance_id)
|
|
instance.status = InstanceStatus.TERMINATED
|
|
instance.terminated_at = datetime.utcnow()
|
|
|
|
# Cleanup RAG collection
|
|
if instance.rag_collection_id:
|
|
await self._cleanup_rag_collection(instance.rag_collection_id)
|
|
|
|
await self.db.commit()
|
|
|
|
async def send_message(
|
|
self,
|
|
from_id: UUID,
|
|
to_id: UUID,
|
|
message: AgentMessage
|
|
) -> None:
|
|
"""Route a message from one agent to another."""
|
|
# Validate both agents exist and are active
|
|
sender = await self.get_instance(from_id)
|
|
recipient = await self.get_instance(to_id)
|
|
|
|
# Persist message
|
|
await self.message_store.save(message)
|
|
|
|
# If recipient is idle, trigger action
|
|
if recipient.status == InstanceStatus.IDLE:
|
|
await self._trigger_agent_action(recipient.id, message)
|
|
|
|
# Publish for real-time tracking
|
|
await self.event_bus.publish(f"project:{sender.project_id}", {
|
|
"type": "agent_message",
|
|
"from": str(from_id),
|
|
"to": str(to_id),
|
|
"preview": message.content[:100]
|
|
})
|
|
|
|
async def broadcast(
|
|
self,
|
|
from_id: UUID,
|
|
target_role: AgentRole,
|
|
message: AgentMessage
|
|
) -> None:
|
|
"""Broadcast a message to all agents of a specific role."""
|
|
sender = await self.get_instance(from_id)
|
|
recipients = await self.get_instances_by_role(
|
|
sender.project_id, target_role
|
|
)
|
|
|
|
for recipient in recipients:
|
|
await self.send_message(from_id, recipient.id, message)
|
|
```
|
|
|
|
### Agent Execution Pattern
|
|
|
|
```python
|
|
class AgentRunner:
|
|
"""Executes agent actions using LLM."""
|
|
|
|
def __init__(self, instance: AgentInstance, llm_gateway: LLMGateway):
|
|
self.instance = instance
|
|
self.llm = llm_gateway
|
|
|
|
async def execute(self, action: str, context: dict) -> dict:
|
|
"""Execute an action using the agent's configured model."""
|
|
agent_type = await self.get_agent_type(self.instance.agent_type_id)
|
|
|
|
# Build messages with system prompt and context
|
|
messages = [
|
|
{"role": "system", "content": self._build_system_prompt(agent_type)},
|
|
*self._get_conversation_history(),
|
|
{"role": "user", "content": self._build_action_prompt(action, context)}
|
|
]
|
|
|
|
# Add RAG context if available
|
|
if self.instance.rag_collection_id:
|
|
rag_context = await self._query_rag(action, context)
|
|
messages.insert(1, {
|
|
"role": "system",
|
|
"content": f"Relevant context:\n{rag_context}"
|
|
})
|
|
|
|
# Execute with failover
|
|
response = await self.llm.complete(
|
|
agent_id=str(self.instance.id),
|
|
project_id=str(self.instance.project_id),
|
|
messages=messages,
|
|
model_preference=self._get_model_preference(agent_type)
|
|
)
|
|
|
|
# Update instance context
|
|
self.instance.context = {
|
|
**self.instance.context,
|
|
"last_action": action,
|
|
"last_response_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return response
|
|
```
|
|
|
|
### Agent Roles
|
|
|
|
| Role | Instances | Primary Capabilities |
|
|
|------|-----------|---------------------|
|
|
| Product Owner | 1 | requirements, prioritization, client_communication |
|
|
| Project Manager | 1 | planning, tracking, coordination |
|
|
| Business Analyst | 1 | analysis, documentation, process_modeling |
|
|
| Software Architect | 1 | design, architecture_decisions, tech_selection |
|
|
| Software Engineer | 1-5 | code_generation, code_review, testing |
|
|
| UI/UX Designer | 1 | design, wireframes, accessibility |
|
|
| QA Engineer | 1-2 | test_planning, test_automation, bug_reporting |
|
|
| DevOps Engineer | 1 | cicd, infrastructure, deployment |
|
|
| AI/ML Engineer | 1 | ml_development, model_training, mlops |
|
|
| Security Expert | 1 | security_review, vulnerability_assessment |
|
|
|
|
## Consequences
|
|
|
|
### Positive
|
|
- Clear separation between type definition and instance runtime
|
|
- Multiple instances share type configuration (DRY)
|
|
- Easy to add new agent roles
|
|
- Full observability through events
|
|
- Graceful failure handling with model failover
|
|
|
|
### Negative
|
|
- Complexity in managing instance lifecycle
|
|
- State synchronization across instances
|
|
- Memory overhead for context storage
|
|
|
|
### Mitigation
|
|
- Context archival for long-running instances
|
|
- Periodic cleanup of terminated instances
|
|
- State compression for large contexts
|
|
|
|
## Compliance
|
|
|
|
This decision aligns with:
|
|
- FR-101: Agent type configuration
|
|
- FR-102: Agent instance spawning
|
|
- FR-103: Agent domain knowledge (RAG)
|
|
- FR-104: Inter-agent communication
|
|
- FR-105: Agent activity monitoring
|
|
|
|
---
|
|
|
|
*This ADR establishes the agent orchestration architecture for Syndarix.*
|