forked from cardosofelipe/fast-next-template
- Added the following ADRs to `docs/adrs/` directory: - ADR-001: MCP Integration Architecture - ADR-002: Real-time Communication Architecture - ADR-003: Background Task Architecture - ADR-004: LLM Provider Abstraction - ADR-005: Technology Stack Selection - Each ADR details the context, decision drivers, considered options, final decisions, and implementation plans. - Documentation aligns technical choices with architecture principles and system requirements for Syndarix.
8.6 KiB
8.6 KiB
ADR-006: Agent Orchestration Architecture
Status: Accepted Date: 2025-12-29 Deciders: Architecture Team Related Spikes: SPIKE-002
Context
Syndarix requires an agent orchestration system that can:
- Define reusable agent types with specific capabilities
- Spawn multiple instances of the same type with unique identities
- Manage agent state, context, and conversation history
- Route messages between agents
- Handle agent failover and recovery
- Track resource usage per agent
Decision Drivers
- Flexibility: Support diverse agent roles and capabilities
- Scalability: Handle 50+ concurrent agent instances
- Isolation: Each instance maintains separate state
- Observability: Full visibility into agent activities
- Reliability: Graceful handling of failures
Decision
Adopt a Type-Instance pattern where:
- Agent Types define templates (model, expertise, personality)
- Agent Instances are spawned from types with unique identities
- Agent Orchestrator manages lifecycle and communication
Architecture
Agent Type Definition
class AgentType(Base):
id = Column(UUID, primary_key=True)
name = Column(String(50), unique=True) # "Software Engineer"
role = Column(Enum(AgentRole)) # ENGINEER
base_model = Column(String(100)) # "claude-3-5-sonnet-20241022"
failover_model = Column(String(100)) # "gpt-4-turbo"
expertise = Column(ARRAY(String)) # ["python", "fastapi", "testing"]
personality = Column(JSONB) # {"style": "detailed", "tone": "professional"}
system_prompt = Column(Text) # Base system prompt template
capabilities = Column(ARRAY(String)) # ["code_generation", "code_review"]
is_active = Column(Boolean, default=True)
Agent Instance Definition
class AgentInstance(Base):
id = Column(UUID, primary_key=True)
name = Column(String(50)) # "Dave"
agent_type_id = Column(UUID, ForeignKey)
project_id = Column(UUID, ForeignKey)
status = Column(Enum(InstanceStatus)) # ACTIVE, IDLE, TERMINATED
context = Column(JSONB) # Current working context
conversation_id = Column(UUID) # Active conversation
rag_collection_id = Column(String) # Domain knowledge collection
token_usage = Column(JSONB) # {"prompt": 0, "completion": 0}
last_active_at = Column(DateTime)
created_at = Column(DateTime)
terminated_at = Column(DateTime)
Orchestrator Service
class AgentOrchestrator:
"""Central service for agent lifecycle management."""
async def spawn_agent(
self,
agent_type_id: UUID,
project_id: UUID,
name: str,
domain_knowledge: list[str] = None
) -> AgentInstance:
"""Spawn a new agent instance from a type definition."""
agent_type = await self.get_agent_type(agent_type_id)
instance = AgentInstance(
name=name,
agent_type_id=agent_type_id,
project_id=project_id,
status=InstanceStatus.ACTIVE,
context={"initialized_at": datetime.utcnow().isoformat()},
)
# Initialize RAG collection if domain knowledge provided
if domain_knowledge:
instance.rag_collection_id = await self._init_rag_collection(
instance.id, domain_knowledge
)
await self.db.add(instance)
await self.db.commit()
# Publish spawn event
await self.event_bus.publish(f"project:{project_id}", {
"type": "agent_spawned",
"agent_id": str(instance.id),
"name": name,
"role": agent_type.role.value
})
return instance
async def terminate_agent(self, instance_id: UUID) -> None:
"""Terminate an agent instance and release resources."""
instance = await self.get_instance(instance_id)
instance.status = InstanceStatus.TERMINATED
instance.terminated_at = datetime.utcnow()
# Cleanup RAG collection
if instance.rag_collection_id:
await self._cleanup_rag_collection(instance.rag_collection_id)
await self.db.commit()
async def send_message(
self,
from_id: UUID,
to_id: UUID,
message: AgentMessage
) -> None:
"""Route a message from one agent to another."""
# Validate both agents exist and are active
sender = await self.get_instance(from_id)
recipient = await self.get_instance(to_id)
# Persist message
await self.message_store.save(message)
# If recipient is idle, trigger action
if recipient.status == InstanceStatus.IDLE:
await self._trigger_agent_action(recipient.id, message)
# Publish for real-time tracking
await self.event_bus.publish(f"project:{sender.project_id}", {
"type": "agent_message",
"from": str(from_id),
"to": str(to_id),
"preview": message.content[:100]
})
async def broadcast(
self,
from_id: UUID,
target_role: AgentRole,
message: AgentMessage
) -> None:
"""Broadcast a message to all agents of a specific role."""
sender = await self.get_instance(from_id)
recipients = await self.get_instances_by_role(
sender.project_id, target_role
)
for recipient in recipients:
await self.send_message(from_id, recipient.id, message)
Agent Execution Pattern
class AgentRunner:
"""Executes agent actions using LLM."""
def __init__(self, instance: AgentInstance, llm_gateway: LLMGateway):
self.instance = instance
self.llm = llm_gateway
async def execute(self, action: str, context: dict) -> dict:
"""Execute an action using the agent's configured model."""
agent_type = await self.get_agent_type(self.instance.agent_type_id)
# Build messages with system prompt and context
messages = [
{"role": "system", "content": self._build_system_prompt(agent_type)},
*self._get_conversation_history(),
{"role": "user", "content": self._build_action_prompt(action, context)}
]
# Add RAG context if available
if self.instance.rag_collection_id:
rag_context = await self._query_rag(action, context)
messages.insert(1, {
"role": "system",
"content": f"Relevant context:\n{rag_context}"
})
# Execute with failover
response = await self.llm.complete(
agent_id=str(self.instance.id),
project_id=str(self.instance.project_id),
messages=messages,
model_preference=self._get_model_preference(agent_type)
)
# Update instance context
self.instance.context = {
**self.instance.context,
"last_action": action,
"last_response_at": datetime.utcnow().isoformat()
}
return response
Agent Roles
| Role | Instances | Primary Capabilities |
|---|---|---|
| Product Owner | 1 | requirements, prioritization, client_communication |
| Project Manager | 1 | planning, tracking, coordination |
| Business Analyst | 1 | analysis, documentation, process_modeling |
| Software Architect | 1 | design, architecture_decisions, tech_selection |
| Software Engineer | 1-5 | code_generation, code_review, testing |
| UI/UX Designer | 1 | design, wireframes, accessibility |
| QA Engineer | 1-2 | test_planning, test_automation, bug_reporting |
| DevOps Engineer | 1 | cicd, infrastructure, deployment |
| AI/ML Engineer | 1 | ml_development, model_training, mlops |
| Security Expert | 1 | security_review, vulnerability_assessment |
Consequences
Positive
- Clear separation between type definition and instance runtime
- Multiple instances share type configuration (DRY)
- Easy to add new agent roles
- Full observability through events
- Graceful failure handling with model failover
Negative
- Complexity in managing instance lifecycle
- State synchronization across instances
- Memory overhead for context storage
Mitigation
- Context archival for long-running instances
- Periodic cleanup of terminated instances
- State compression for large contexts
Compliance
This decision aligns with:
- FR-101: Agent type configuration
- FR-102: Agent instance spawning
- FR-103: Agent domain knowledge (RAG)
- FR-104: Inter-agent communication
- FR-105: Agent activity monitoring
This ADR establishes the agent orchestration architecture for Syndarix.