docs: add architecture decision records (ADRs) for key technical choices

- Added the following ADRs to `docs/adrs/` directory:
  - ADR-001: MCP Integration Architecture
  - ADR-002: Real-time Communication Architecture
  - ADR-003: Background Task Architecture
  - ADR-004: LLM Provider Abstraction
  - ADR-005: Technology Stack Selection
- Each ADR details the context, decision drivers, considered options, final decisions, and implementation plans.
- Documentation aligns technical choices with architecture principles and system requirements for Syndarix.
This commit is contained in:
2025-12-29 13:16:02 +01:00
parent a6a336b66e
commit 6e3cdebbfb
7 changed files with 1565 additions and 0 deletions

View File

@@ -0,0 +1,134 @@
# ADR-001: MCP Integration Architecture
**Status:** Accepted
**Date:** 2025-12-29
**Deciders:** Architecture Team
**Related Spikes:** SPIKE-001
---
## Context
Syndarix requires integration with multiple external services (LLM providers, Git, issue trackers, file systems, CI/CD). The Model Context Protocol (MCP) was identified as the standard for tool integration in AI applications. We need to decide on:
1. The MCP framework to use
2. Server deployment pattern (singleton vs per-project)
3. Scoping mechanism for multi-project/multi-agent access
## Decision Drivers
- **Simplicity:** Minimize operational complexity
- **Resource Efficiency:** Avoid spawning redundant processes
- **Consistency:** Unified interface across all integrations
- **Scalability:** Support 10+ concurrent projects
- **Maintainability:** Easy to add new MCP servers
## Considered Options
### Option 1: Per-Project MCP Servers
Spawn dedicated MCP server instances for each project.
**Pros:**
- Complete isolation between projects
- Simple access control (project owns server)
**Cons:**
- Resource heavy (7 servers × N projects)
- Complex orchestration
- Difficult to share cross-project resources
### Option 2: Unified Singleton MCP Servers (Selected)
Single instance of each MCP server type, with explicit project/agent scoping.
**Pros:**
- Resource efficient (7 total servers)
- Simpler deployment
- Enables cross-project learning (if desired)
- Consistent management
**Cons:**
- Requires explicit scoping in all tools
- Shared state requires careful design
### Option 3: Hybrid (MCP Proxy)
Single proxy that routes to per-project backends.
**Pros:**
- Balance of isolation and efficiency
**Cons:**
- Added complexity
- Routing overhead
## Decision
**Adopt Option 2: Unified Singleton MCP Servers with explicit scoping.**
All MCP servers are deployed as singletons. Every tool accepts `project_id` and `agent_id` parameters for:
- Access control validation
- Audit logging
- Context filtering
## Implementation
### MCP Server Registry
| Server | Port | Purpose |
|--------|------|---------|
| LLM Gateway | 9001 | Route LLM requests with failover |
| Git MCP | 9002 | Git operations across providers |
| Knowledge Base MCP | 9003 | RAG and document search |
| Issues MCP | 9004 | Issue tracking operations |
| File System MCP | 9005 | Workspace file operations |
| Code Analysis MCP | 9006 | Static analysis, linting |
| CI/CD MCP | 9007 | Pipeline operations |
### Framework Selection
Use **FastMCP 2.0** for all MCP server implementations:
- Decorator-based tool registration
- Built-in async support
- Compatible with SSE transport
- Type-safe with Pydantic
### Tool Signature Pattern
```python
@mcp.tool()
def tool_name(
project_id: str, # Required: project scope
agent_id: str, # Required: calling agent
# ... tool-specific params
) -> Result:
validate_access(agent_id, project_id)
log_tool_usage(agent_id, project_id, "tool_name")
# ... implementation
```
## Consequences
### Positive
- Single deployment per MCP type simplifies operations
- Consistent interface across all tools
- Easy to add monitoring/logging centrally
- Cross-project analytics possible
### Negative
- All tools must include scoping parameters
- Shared state requires careful design
- Single point of failure per MCP type (mitigated by multiple instances)
### Neutral
- Requires MCP client manager in FastAPI backend
- Authentication handled internally (service tokens for v1)
## Compliance
This decision aligns with:
- FR-802: MCP-first architecture requirement
- NFR-201: Horizontal scalability requirement
- NFR-602: Centralized logging requirement
---
*This ADR supersedes any previous decisions regarding MCP architecture.*

View File

@@ -0,0 +1,160 @@
# ADR-002: Real-time Communication Architecture
**Status:** Accepted
**Date:** 2025-12-29
**Deciders:** Architecture Team
**Related Spikes:** SPIKE-003
---
## Context
Syndarix requires real-time communication for:
- Agent activity streams
- Project progress updates
- Build/pipeline status
- Client approval requests
- Issue change notifications
- Interactive chat with agents
We need to decide between WebSocket and Server-Sent Events (SSE) for real-time data delivery.
## Decision Drivers
- **Simplicity:** Minimize implementation complexity
- **Reliability:** Built-in reconnection handling
- **Scalability:** Support 200+ concurrent connections
- **Compatibility:** Work through proxies and load balancers
- **Use Case Fit:** Match communication patterns
## Considered Options
### Option 1: WebSocket Only
Use WebSocket for all real-time communication.
**Pros:**
- Bidirectional communication
- Single protocol to manage
- Well-supported in FastAPI
**Cons:**
- Manual reconnection logic required
- More complex through proxies
- Overkill for server-to-client streams
### Option 2: SSE Only
Use Server-Sent Events for all real-time communication.
**Pros:**
- Built-in automatic reconnection
- Native HTTP (proxy-friendly)
- Simpler implementation
**Cons:**
- Unidirectional only
- Browser connection limits per domain
### Option 3: SSE Primary + WebSocket for Chat (Selected)
Use SSE for server-to-client events, WebSocket for bidirectional chat.
**Pros:**
- Best tool for each use case
- SSE simplicity for 90% of needs
- WebSocket only where truly needed
**Cons:**
- Two protocols to manage
## Decision
**Adopt Option 3: SSE as primary transport, WebSocket for interactive chat.**
### SSE Use Cases (90%)
- Agent activity streams
- Project progress updates
- Build/pipeline status
- Approval request notifications
- Issue change notifications
### WebSocket Use Cases (10%)
- Interactive chat with agents
- Real-time debugging sessions
- Future collaboration features
## Implementation
### Event Bus with Redis Pub/Sub
```
FastAPI Backend ──publish──> Redis Pub/Sub ──subscribe──> SSE Endpoints
└──> Other Backend Instances
```
### SSE Endpoint Pattern
```python
@router.get("/projects/{project_id}/events")
async def project_events(project_id: str, request: Request):
async def event_generator():
subscriber = await event_bus.subscribe(f"project:{project_id}")
try:
while not await request.is_disconnected():
event = await asyncio.wait_for(
subscriber.get_event(), timeout=30.0
)
yield f"event: {event.type}\ndata: {event.json()}\n\n"
finally:
await subscriber.unsubscribe()
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
```
### Event Types
| Category | Event Types |
|----------|-------------|
| Agent | `agent_started`, `agent_activity`, `agent_completed`, `agent_error` |
| Project | `issue_created`, `issue_updated`, `issue_closed` |
| Git | `branch_created`, `commit_pushed`, `pr_created`, `pr_merged` |
| Workflow | `approval_required`, `sprint_started`, `sprint_completed` |
| Pipeline | `pipeline_started`, `pipeline_completed`, `pipeline_failed` |
### Client Implementation
- Single SSE connection per project
- Event multiplexing through event types
- Exponential backoff on reconnection
- Native `EventSource` API with automatic reconnect
## Consequences
### Positive
- Simpler implementation for server-to-client streams
- Automatic reconnection reduces client complexity
- Works through all HTTP proxies
- Reduced server resource usage vs WebSocket
### Negative
- Two protocols to maintain
- WebSocket requires manual reconnect logic
- SSE limited to ~6 connections per domain (HTTP/1.1)
### Mitigation
- Use HTTP/2 where possible (higher connection limits)
- Multiplex all project events on single connection
- WebSocket only for interactive chat sessions
## Compliance
This decision aligns with:
- FR-105: Real-time agent activity monitoring
- NFR-102: 200+ concurrent connections requirement
- NFR-501: Responsive UI updates
---
*This ADR supersedes any previous decisions regarding real-time communication.*

View File

@@ -0,0 +1,179 @@
# ADR-003: Background Task Architecture
**Status:** Accepted
**Date:** 2025-12-29
**Deciders:** Architecture Team
**Related Spikes:** SPIKE-004
---
## Context
Syndarix requires background task processing for:
- Agent actions (LLM calls, code generation)
- Git operations (clone, commit, push, PR creation)
- External synchronization (issue sync with Gitea/GitHub/GitLab)
- CI/CD pipeline triggers
- Long-running workflows (sprints, story implementation)
These tasks are too slow for synchronous API responses and need proper queuing, retry, and monitoring.
## Decision Drivers
- **Reliability:** Tasks must complete even if workers restart
- **Visibility:** Progress tracking for long-running operations
- **Scalability:** Handle concurrent agent operations
- **Rate Limiting:** Respect LLM API rate limits
- **Async Compatibility:** Work with async FastAPI
## Considered Options
### Option 1: FastAPI BackgroundTasks
Use FastAPI's built-in background tasks.
**Pros:**
- Simple, no additional infrastructure
- Direct async integration
**Cons:**
- No persistence (lost on restart)
- No retry mechanism
- No distributed workers
### Option 2: Celery + Redis (Selected)
Use Celery for task queue with Redis as broker/backend.
**Pros:**
- Mature, battle-tested
- Persistent task queue
- Built-in retry with backoff
- Distributed workers
- Task chaining and workflows
- Monitoring with Flower
**Cons:**
- Additional infrastructure
- Sync-only task execution (bridge needed for async)
### Option 3: Dramatiq + Redis
Use Dramatiq as a simpler Celery alternative.
**Pros:**
- Simpler API than Celery
- Good async support
**Cons:**
- Less mature ecosystem
- Fewer monitoring tools
### Option 4: ARQ (Async Redis Queue)
Use ARQ for native async task processing.
**Pros:**
- Native async
- Simple API
**Cons:**
- Less feature-rich
- Smaller community
## Decision
**Adopt Option 2: Celery + Redis.**
Celery provides the reliability, monitoring, and ecosystem maturity needed for production workloads. Redis serves as both broker and result backend.
## Implementation
### Queue Architecture
```
┌─────────────────────────────────────────────────┐
│ Redis (Broker + Backend) │
├─────────────┬─────────────┬─────────────────────┤
│ agent_queue │ git_queue │ sync_queue │
│ (prefetch=1)│ (prefetch=4)│ (prefetch=4) │
└──────┬──────┴──────┬──────┴──────────┬──────────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent │ │ Git │ │ Sync │
│ Workers │ │ Workers │ │ Workers │
└─────────┘ └─────────┘ └─────────┘
```
### Queue Configuration
| Queue | Prefetch | Concurrency | Purpose |
|-------|----------|-------------|---------|
| `agent_queue` | 1 | 4 | LLM-based tasks (rate limited) |
| `git_queue` | 4 | 8 | Git operations |
| `sync_queue` | 4 | 4 | External sync |
| `cicd_queue` | 4 | 4 | Pipeline operations |
### Task Patterns
**Progress Reporting:**
```python
@celery_app.task(bind=True)
def implement_story(self, story_id: str, agent_id: str, project_id: str):
for i, step in enumerate(steps):
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": len(steps)}
)
# Publish SSE event for real-time UI update
event_bus.publish(f"project:{project_id}", {
"type": "agent_progress",
"step": i + 1,
"total": len(steps)
})
execute_step(step)
```
**Task Chaining:**
```python
workflow = chain(
analyze_requirements.s(story_id),
design_solution.s(),
implement_code.s(),
run_tests.s(),
create_pr.s()
)
```
### Monitoring
- **Flower:** Web UI for task monitoring (port 5555)
- **Prometheus:** Metrics export for alerting
- **Dead Letter Queue:** Failed tasks for investigation
## Consequences
### Positive
- Reliable task execution with persistence
- Automatic retry with exponential backoff
- Progress tracking for long operations
- Distributed workers for scalability
- Rich monitoring and debugging tools
### Negative
- Additional infrastructure (Redis, workers)
- Celery is synchronous (event_loop bridge for async calls)
- Learning curve for task patterns
### Mitigation
- Use existing Redis instance (already needed for SSE)
- Wrap async calls with `asyncio.run()` or `sync_to_async`
- Document common task patterns
## Compliance
This decision aligns with:
- FR-304: Long-running implementation workflow
- NFR-102: 500+ background jobs per minute
- NFR-402: Task reliability and fault tolerance
---
*This ADR supersedes any previous decisions regarding background task processing.*

View File

@@ -0,0 +1,189 @@
# ADR-004: LLM Provider Abstraction
**Status:** Accepted
**Date:** 2025-12-29
**Deciders:** Architecture Team
**Related Spikes:** SPIKE-005
---
## Context
Syndarix agents require access to large language models (LLMs) from multiple providers:
- **Anthropic** (Claude) - Primary provider
- **OpenAI** (GPT-4) - Fallback provider
- **Local models** (Ollama/Llama) - Cost optimization, privacy
We need a unified abstraction layer that provides:
- Consistent API across providers
- Automatic failover on errors
- Usage tracking and cost management
- Rate limiting compliance
## Decision Drivers
- **Reliability:** Automatic failover on provider outages
- **Cost Control:** Track and limit API spending
- **Flexibility:** Easy to add/swap providers
- **Consistency:** Single interface for all agents
- **Async Support:** Compatible with async FastAPI
## Considered Options
### Option 1: Direct Provider SDKs
Use Anthropic and OpenAI SDKs directly with custom abstraction.
**Pros:**
- Full control over implementation
- No external dependencies
**Cons:**
- Significant development effort
- Must maintain failover logic
- Must track token costs manually
### Option 2: LiteLLM (Selected)
Use LiteLLM as unified abstraction layer.
**Pros:**
- Unified API for 100+ providers
- Built-in failover and routing
- Automatic token counting
- Cost tracking built-in
- Redis caching support
- Active community
**Cons:**
- External dependency
- May lag behind provider SDK updates
### Option 3: LangChain
Use LangChain's LLM abstraction.
**Pros:**
- Large ecosystem
- Many integrations
**Cons:**
- Heavy dependency
- Overkill for just LLM abstraction
- Complexity overhead
## Decision
**Adopt Option 2: LiteLLM for unified LLM provider abstraction.**
LiteLLM provides the reliability, monitoring, and multi-provider support needed with minimal overhead.
## Implementation
### Model Groups
| Group Name | Use Case | Primary Model | Fallback |
|------------|----------|---------------|----------|
| `high-reasoning` | Complex analysis, architecture | Claude 3.5 Sonnet | GPT-4 Turbo |
| `fast-response` | Quick tasks, simple queries | Claude 3 Haiku | GPT-4o Mini |
| `cost-optimized` | High-volume, non-critical | Local Llama 3 | Claude 3 Haiku |
### Failover Chain
```
Claude 3.5 Sonnet (Anthropic)
▼ (on failure)
GPT-4 Turbo (OpenAI)
▼ (on failure)
Llama 3 (Ollama/Local)
▼ (on failure)
Error with retry
```
### LLM Gateway Service
```python
class LLMGateway:
def __init__(self):
self.router = Router(
model_list=model_list,
fallbacks=[
{"high-reasoning": ["high-reasoning", "local-fallback"]},
],
routing_strategy="latency-based-routing",
num_retries=3,
)
async def complete(
self,
agent_id: str,
project_id: str,
messages: list[dict],
model_preference: str = "high-reasoning",
) -> dict:
response = await self.router.acompletion(
model=model_preference,
messages=messages,
)
await self._track_usage(agent_id, project_id, response)
return response
```
### Cost Tracking
| Model | Input (per 1M tokens) | Output (per 1M tokens) |
|-------|----------------------|------------------------|
| Claude 3.5 Sonnet | $3.00 | $15.00 |
| Claude 3 Haiku | $0.25 | $1.25 |
| GPT-4 Turbo | $10.00 | $30.00 |
| GPT-4o Mini | $0.15 | $0.60 |
| Ollama (local) | $0.00 | $0.00 |
### Agent Type Mapping
| Agent Type | Model Preference | Rationale |
|------------|------------------|-----------|
| Product Owner | high-reasoning | Complex requirements analysis |
| Software Architect | high-reasoning | Architecture decisions |
| Software Engineer | high-reasoning | Code generation |
| QA Engineer | fast-response | Test case generation |
| DevOps Engineer | fast-response | Config generation |
| Project Manager | fast-response | Status updates |
### Caching Strategy
- **Redis-backed cache** for repeated queries
- **TTL:** 1 hour for general queries
- **Skip cache:** For context-dependent generation
- **Cache key:** Hash of (model, messages, temperature)
## Consequences
### Positive
- Single interface for all LLM operations
- Automatic failover improves reliability
- Built-in cost tracking and budgeting
- Easy to add new providers
- Caching reduces API costs
### Negative
- Dependency on LiteLLM library
- May lag behind provider SDK features
- Additional abstraction layer
### Mitigation
- Pin LiteLLM version, test before upgrades
- Direct SDK access available if needed
- Monitor LiteLLM updates for breaking changes
## Compliance
This decision aligns with:
- FR-101: Agent type model configuration
- NFR-103: Agent response time targets
- NFR-402: Failover requirements
- TR-001: LLM API unavailability mitigation
---
*This ADR supersedes any previous decisions regarding LLM integration.*

View File

@@ -0,0 +1,156 @@
# ADR-005: Technology Stack Selection
**Status:** Accepted
**Date:** 2025-12-29
**Deciders:** Architecture Team
---
## Context
Syndarix needs a robust, modern technology stack that can support:
- Multi-agent orchestration with real-time communication
- Full-stack web application with API backend
- Background task processing for long-running operations
- Vector search for RAG (Retrieval-Augmented Generation)
- Multiple external integrations via MCP
The decision was made to build upon **PragmaStack** as the foundation, extending it with Syndarix-specific components.
## Decision Drivers
- **Productivity:** Rapid development with modern frameworks
- **Type Safety:** Minimize runtime errors
- **Async Performance:** Handle concurrent agent operations
- **Ecosystem:** Rich library support
- **Familiarity:** Team expertise with selected technologies
- **Production-Ready:** Proven technologies for production workloads
## Decision
**Adopt PragmaStack as foundation with Syndarix-specific extensions.**
### Core Stack (from PragmaStack)
| Layer | Technology | Version | Rationale |
|-------|------------|---------|-----------|
| **Backend** | FastAPI | 0.115+ | Async, OpenAPI, type hints |
| **Backend Language** | Python | 3.11+ | Type hints, async/await, ecosystem |
| **Frontend** | Next.js | 16 | React 19, server components, App Router |
| **Frontend Language** | TypeScript | 5.0+ | Type safety, IDE support |
| **Database** | PostgreSQL | 15+ | Robust, extensible, pgvector |
| **ORM** | SQLAlchemy | 2.0+ | Async support, type hints |
| **Validation** | Pydantic | 2.0+ | Data validation, serialization |
| **State Management** | Zustand | 4.0+ | Simple, performant |
| **Data Fetching** | TanStack Query | 5.0+ | Caching, invalidation |
| **UI Components** | shadcn/ui | Latest | Accessible, customizable |
| **CSS** | Tailwind CSS | 4.0+ | Utility-first, fast styling |
| **Auth** | JWT | - | Dual-token (access + refresh) |
### Syndarix Extensions
| Component | Technology | Version | Purpose |
|-----------|------------|---------|---------|
| **Task Queue** | Celery | 5.3+ | Background job processing |
| **Message Broker** | Redis | 7.0+ | Celery broker, caching, pub/sub |
| **Vector Store** | pgvector | Latest | Embeddings for RAG |
| **MCP Framework** | FastMCP | 2.0+ | MCP server development |
| **LLM Abstraction** | LiteLLM | Latest | Multi-provider LLM access |
| **Real-time** | SSE + WebSocket | - | Event streaming, chat |
### Testing Stack
| Type | Technology | Purpose |
|------|------------|---------|
| **Backend Unit** | pytest | 8.0+ | Python testing |
| **Backend Async** | pytest-asyncio | Async test support |
| **Backend Coverage** | coverage.py | Code coverage |
| **Frontend Unit** | Jest | 29+ | React testing |
| **Frontend Components** | React Testing Library | Component testing |
| **E2E** | Playwright | 1.40+ | Browser automation |
### DevOps Stack
| Component | Technology | Purpose |
|-----------|------------|---------|
| **Containerization** | Docker | 24+ | Application packaging |
| **Orchestration** | Docker Compose | Local development |
| **CI/CD** | Gitea Actions | Automated pipelines |
| **Database Migrations** | Alembic | Schema versioning |
## Architecture Overview
```
┌─────────────────────────────────────────────────────────────────┐
│ Frontend (Next.js 16) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Pages │ │ Components │ │ Stores │ │
│ │ (App Router)│ │ (shadcn/ui) │ │ (Zustand) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│ REST + SSE + WebSocket
┌─────────────────────────────────────────────────────────────────┐
│ Backend (FastAPI 0.115+) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ API │ │ Services │ │ CRUD │ │
│ │ Routes │ │ Layer │ │ Layer │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ LLM Gateway │ │ MCP Client │ │ Event Bus │ │
│ │ (LiteLLM) │ │ Manager │ │ (Redis) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────────────────┐
│ PostgreSQL │ │ Redis │ │ MCP Servers │
│ + pgvector │ │ (Cache/Queue) │ │ (LLM, Git, KB, Issues...) │
└───────────────┘ └───────────────┘ └───────────────────────────┘
┌───────────────┐
│ Celery │
│ Workers │
└───────────────┘
```
## Consequences
### Positive
- Proven, production-ready stack
- Strong typing throughout (Python + TypeScript)
- Excellent async performance
- Rich ecosystem for extensions
- Team familiarity reduces learning curve
### Negative
- Python GIL limits CPU-bound concurrency (mitigated by Celery)
- Multiple languages (Python + TypeScript) to maintain
- PostgreSQL requires management (vs serverless options)
### Neutral
- PragmaStack provides solid foundation but may include unused features
- Stack is opinionated, limiting some technology choices
## Version Pinning Strategy
| Component | Strategy | Rationale |
|-----------|----------|-----------|
| Python | 3.11+ (specific minor) | Stability |
| Node.js | 20 LTS | Long-term support |
| FastAPI | 0.115+ | Latest stable |
| Next.js | 16 | Current major |
| PostgreSQL | 15+ | Required for features |
## Compliance
This decision aligns with:
- NFR-601: Code quality standards (TypeScript, type hints)
- NFR-603: Docker containerization requirement
- TC-001 through TC-006: Technical constraints
---
*This ADR establishes the foundational technology choices for Syndarix.*

View File

@@ -0,0 +1,260 @@
# ADR-006: Agent Orchestration Architecture
**Status:** Accepted
**Date:** 2025-12-29
**Deciders:** Architecture Team
**Related Spikes:** SPIKE-002
---
## Context
Syndarix requires an agent orchestration system that can:
- Define reusable agent types with specific capabilities
- Spawn multiple instances of the same type with unique identities
- Manage agent state, context, and conversation history
- Route messages between agents
- Handle agent failover and recovery
- Track resource usage per agent
## Decision Drivers
- **Flexibility:** Support diverse agent roles and capabilities
- **Scalability:** Handle 50+ concurrent agent instances
- **Isolation:** Each instance maintains separate state
- **Observability:** Full visibility into agent activities
- **Reliability:** Graceful handling of failures
## Decision
**Adopt a Type-Instance pattern** where:
- **Agent Types** define templates (model, expertise, personality)
- **Agent Instances** are spawned from types with unique identities
- **Agent Orchestrator** manages lifecycle and communication
## Architecture
### Agent Type Definition
```python
class AgentType(Base):
id = Column(UUID, primary_key=True)
name = Column(String(50), unique=True) # "Software Engineer"
role = Column(Enum(AgentRole)) # ENGINEER
base_model = Column(String(100)) # "claude-3-5-sonnet-20241022"
failover_model = Column(String(100)) # "gpt-4-turbo"
expertise = Column(ARRAY(String)) # ["python", "fastapi", "testing"]
personality = Column(JSONB) # {"style": "detailed", "tone": "professional"}
system_prompt = Column(Text) # Base system prompt template
capabilities = Column(ARRAY(String)) # ["code_generation", "code_review"]
is_active = Column(Boolean, default=True)
```
### Agent Instance Definition
```python
class AgentInstance(Base):
id = Column(UUID, primary_key=True)
name = Column(String(50)) # "Dave"
agent_type_id = Column(UUID, ForeignKey)
project_id = Column(UUID, ForeignKey)
status = Column(Enum(InstanceStatus)) # ACTIVE, IDLE, TERMINATED
context = Column(JSONB) # Current working context
conversation_id = Column(UUID) # Active conversation
rag_collection_id = Column(String) # Domain knowledge collection
token_usage = Column(JSONB) # {"prompt": 0, "completion": 0}
last_active_at = Column(DateTime)
created_at = Column(DateTime)
terminated_at = Column(DateTime)
```
### Orchestrator Service
```python
class AgentOrchestrator:
"""Central service for agent lifecycle management."""
async def spawn_agent(
self,
agent_type_id: UUID,
project_id: UUID,
name: str,
domain_knowledge: list[str] = None
) -> AgentInstance:
"""Spawn a new agent instance from a type definition."""
agent_type = await self.get_agent_type(agent_type_id)
instance = AgentInstance(
name=name,
agent_type_id=agent_type_id,
project_id=project_id,
status=InstanceStatus.ACTIVE,
context={"initialized_at": datetime.utcnow().isoformat()},
)
# Initialize RAG collection if domain knowledge provided
if domain_knowledge:
instance.rag_collection_id = await self._init_rag_collection(
instance.id, domain_knowledge
)
await self.db.add(instance)
await self.db.commit()
# Publish spawn event
await self.event_bus.publish(f"project:{project_id}", {
"type": "agent_spawned",
"agent_id": str(instance.id),
"name": name,
"role": agent_type.role.value
})
return instance
async def terminate_agent(self, instance_id: UUID) -> None:
"""Terminate an agent instance and release resources."""
instance = await self.get_instance(instance_id)
instance.status = InstanceStatus.TERMINATED
instance.terminated_at = datetime.utcnow()
# Cleanup RAG collection
if instance.rag_collection_id:
await self._cleanup_rag_collection(instance.rag_collection_id)
await self.db.commit()
async def send_message(
self,
from_id: UUID,
to_id: UUID,
message: AgentMessage
) -> None:
"""Route a message from one agent to another."""
# Validate both agents exist and are active
sender = await self.get_instance(from_id)
recipient = await self.get_instance(to_id)
# Persist message
await self.message_store.save(message)
# If recipient is idle, trigger action
if recipient.status == InstanceStatus.IDLE:
await self._trigger_agent_action(recipient.id, message)
# Publish for real-time tracking
await self.event_bus.publish(f"project:{sender.project_id}", {
"type": "agent_message",
"from": str(from_id),
"to": str(to_id),
"preview": message.content[:100]
})
async def broadcast(
self,
from_id: UUID,
target_role: AgentRole,
message: AgentMessage
) -> None:
"""Broadcast a message to all agents of a specific role."""
sender = await self.get_instance(from_id)
recipients = await self.get_instances_by_role(
sender.project_id, target_role
)
for recipient in recipients:
await self.send_message(from_id, recipient.id, message)
```
### Agent Execution Pattern
```python
class AgentRunner:
"""Executes agent actions using LLM."""
def __init__(self, instance: AgentInstance, llm_gateway: LLMGateway):
self.instance = instance
self.llm = llm_gateway
async def execute(self, action: str, context: dict) -> dict:
"""Execute an action using the agent's configured model."""
agent_type = await self.get_agent_type(self.instance.agent_type_id)
# Build messages with system prompt and context
messages = [
{"role": "system", "content": self._build_system_prompt(agent_type)},
*self._get_conversation_history(),
{"role": "user", "content": self._build_action_prompt(action, context)}
]
# Add RAG context if available
if self.instance.rag_collection_id:
rag_context = await self._query_rag(action, context)
messages.insert(1, {
"role": "system",
"content": f"Relevant context:\n{rag_context}"
})
# Execute with failover
response = await self.llm.complete(
agent_id=str(self.instance.id),
project_id=str(self.instance.project_id),
messages=messages,
model_preference=self._get_model_preference(agent_type)
)
# Update instance context
self.instance.context = {
**self.instance.context,
"last_action": action,
"last_response_at": datetime.utcnow().isoformat()
}
return response
```
### Agent Roles
| Role | Instances | Primary Capabilities |
|------|-----------|---------------------|
| Product Owner | 1 | requirements, prioritization, client_communication |
| Project Manager | 1 | planning, tracking, coordination |
| Business Analyst | 1 | analysis, documentation, process_modeling |
| Software Architect | 1 | design, architecture_decisions, tech_selection |
| Software Engineer | 1-5 | code_generation, code_review, testing |
| UI/UX Designer | 1 | design, wireframes, accessibility |
| QA Engineer | 1-2 | test_planning, test_automation, bug_reporting |
| DevOps Engineer | 1 | cicd, infrastructure, deployment |
| AI/ML Engineer | 1 | ml_development, model_training, mlops |
| Security Expert | 1 | security_review, vulnerability_assessment |
## Consequences
### Positive
- Clear separation between type definition and instance runtime
- Multiple instances share type configuration (DRY)
- Easy to add new agent roles
- Full observability through events
- Graceful failure handling with model failover
### Negative
- Complexity in managing instance lifecycle
- State synchronization across instances
- Memory overhead for context storage
### Mitigation
- Context archival for long-running instances
- Periodic cleanup of terminated instances
- State compression for large contexts
## Compliance
This decision aligns with:
- FR-101: Agent type configuration
- FR-102: Agent instance spawning
- FR-103: Agent domain knowledge (RAG)
- FR-104: Inter-agent communication
- FR-105: Agent activity monitoring
---
*This ADR establishes the agent orchestration architecture for Syndarix.*