forked from cardosofelipe/fast-next-template
docs: add spike findings for LLM abstraction, MCP integration, and real-time updates
- Added research findings and recommendations as separate SPIKE documents in `docs/spikes/`: - `SPIKE-005-llm-provider-abstraction.md`: Research on unified abstraction for LLM providers with failover, cost tracking, and caching strategies. - `SPIKE-001-mcp-integration-pattern.md`: Optimal pattern for integrating MCP with project/agent scoping and authentication strategies. - `SPIKE-003-realtime-updates.md`: Evaluation of SSE vs WebSocket for real-time updates, aligned with use-case needs. - Focused on aligning implementation architectures with scalability, efficiency, and user needs. - Documentation intended to inform upcoming ADRs.
This commit is contained in:
288
docs/spikes/SPIKE-001-mcp-integration-pattern.md
Normal file
288
docs/spikes/SPIKE-001-mcp-integration-pattern.md
Normal file
@@ -0,0 +1,288 @@
|
||||
# SPIKE-001: MCP Integration Pattern
|
||||
|
||||
**Status:** Completed
|
||||
**Date:** 2025-12-29
|
||||
**Author:** Architecture Team
|
||||
**Related Issue:** #1
|
||||
|
||||
---
|
||||
|
||||
## Objective
|
||||
|
||||
Research the optimal pattern for integrating Model Context Protocol (MCP) servers with FastAPI backend, focusing on unified singleton servers with project/agent scoping.
|
||||
|
||||
## Research Questions
|
||||
|
||||
1. What is the recommended MCP SDK for Python/FastAPI?
|
||||
2. How should we structure unified MCP servers vs per-project servers?
|
||||
3. What is the best pattern for project/agent scoping in MCP tools?
|
||||
4. How do we handle authentication between Syndarix and MCP servers?
|
||||
|
||||
## Findings
|
||||
|
||||
### 1. FastMCP 2.0 - Recommended Framework
|
||||
|
||||
**FastMCP** is a high-level, Pythonic framework for building MCP servers that significantly reduces boilerplate compared to the low-level MCP SDK.
|
||||
|
||||
**Key Features:**
|
||||
- Decorator-based tool registration (`@mcp.tool()`)
|
||||
- Built-in context management for resources and prompts
|
||||
- Support for server-sent events (SSE) and stdio transports
|
||||
- Type-safe with Pydantic model support
|
||||
- Async-first design compatible with FastAPI
|
||||
|
||||
**Installation:**
|
||||
```bash
|
||||
pip install fastmcp
|
||||
```
|
||||
|
||||
**Basic Example:**
|
||||
```python
|
||||
from fastmcp import FastMCP
|
||||
|
||||
mcp = FastMCP("syndarix-knowledge-base")
|
||||
|
||||
@mcp.tool()
|
||||
def search_knowledge(
|
||||
project_id: str,
|
||||
query: str,
|
||||
scope: str = "project"
|
||||
) -> list[dict]:
|
||||
"""Search the knowledge base with project scoping."""
|
||||
# Implementation here
|
||||
return results
|
||||
|
||||
@mcp.resource("project://{project_id}/config")
|
||||
def get_project_config(project_id: str) -> dict:
|
||||
"""Get project configuration."""
|
||||
return config
|
||||
```
|
||||
|
||||
### 2. Unified Singleton Pattern (Recommended)
|
||||
|
||||
**Decision:** Use unified singleton MCP servers instead of per-project servers.
|
||||
|
||||
**Architecture:**
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────┐
|
||||
│ Syndarix Backend │
|
||||
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
||||
│ │ Agent 1 │ │ Agent 2 │ │ Agent 3 │ │
|
||||
│ │ (project A) │ │ (project A) │ │ (project B) │ │
|
||||
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
|
||||
│ │ │ │ │
|
||||
│ └────────────────┼────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌─────────────────────────────────────────────────┐ │
|
||||
│ │ MCP Client (Singleton) │ │
|
||||
│ │ Maintains connections to all MCP servers │ │
|
||||
│ └─────────────────────────────────────────────────┘ │
|
||||
└──────────────────────────┬──────────────────────────────┘
|
||||
│
|
||||
┌───────────────┼───────────────┐
|
||||
│ │ │
|
||||
▼ ▼ ▼
|
||||
┌────────────┐ ┌────────────┐ ┌────────────┐
|
||||
│ Git MCP │ │ KB MCP │ │ LLM MCP │
|
||||
│ (Singleton)│ │ (Singleton)│ │ (Singleton)│
|
||||
└────────────┘ └────────────┘ └────────────┘
|
||||
```
|
||||
|
||||
**Why Singleton:**
|
||||
- Resource efficiency (one process per MCP type)
|
||||
- Shared connection pools
|
||||
- Centralized logging and monitoring
|
||||
- Simpler deployment (7 services vs N×7)
|
||||
- Cross-project learning possible (if needed)
|
||||
|
||||
**Scoping Pattern:**
|
||||
```python
|
||||
@mcp.tool()
|
||||
def search_knowledge(
|
||||
project_id: str, # Required - scopes to project
|
||||
agent_id: str, # Required - identifies calling agent
|
||||
query: str,
|
||||
scope: Literal["project", "global"] = "project"
|
||||
) -> SearchResults:
|
||||
"""
|
||||
All tools accept project_id and agent_id for:
|
||||
- Access control validation
|
||||
- Audit logging
|
||||
- Context filtering
|
||||
"""
|
||||
# Validate agent has access to project
|
||||
validate_access(agent_id, project_id)
|
||||
|
||||
# Log the access
|
||||
log_tool_usage(agent_id, project_id, "search_knowledge")
|
||||
|
||||
# Perform scoped search
|
||||
if scope == "project":
|
||||
return search_project_kb(project_id, query)
|
||||
else:
|
||||
return search_global_kb(query)
|
||||
```
|
||||
|
||||
### 3. MCP Server Registry Architecture
|
||||
|
||||
```python
|
||||
# mcp/registry.py
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict
|
||||
|
||||
@dataclass
|
||||
class MCPServerConfig:
|
||||
name: str
|
||||
port: int
|
||||
transport: str # "sse" or "stdio"
|
||||
enabled: bool = True
|
||||
|
||||
MCP_SERVERS: Dict[str, MCPServerConfig] = {
|
||||
"llm_gateway": MCPServerConfig("llm-gateway", 9001, "sse"),
|
||||
"git": MCPServerConfig("git-mcp", 9002, "sse"),
|
||||
"knowledge_base": MCPServerConfig("kb-mcp", 9003, "sse"),
|
||||
"issues": MCPServerConfig("issues-mcp", 9004, "sse"),
|
||||
"file_system": MCPServerConfig("fs-mcp", 9005, "sse"),
|
||||
"code_analysis": MCPServerConfig("code-mcp", 9006, "sse"),
|
||||
"cicd": MCPServerConfig("cicd-mcp", 9007, "sse"),
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Authentication Pattern
|
||||
|
||||
**MCP OAuth 2.0 Integration:**
|
||||
```python
|
||||
from fastmcp import FastMCP
|
||||
from fastmcp.auth import OAuth2Bearer
|
||||
|
||||
mcp = FastMCP(
|
||||
"syndarix-mcp",
|
||||
auth=OAuth2Bearer(
|
||||
token_url="https://syndarix.local/oauth/token",
|
||||
scopes=["mcp:read", "mcp:write"]
|
||||
)
|
||||
)
|
||||
```
|
||||
|
||||
**Internal Service Auth (Recommended for v1):**
|
||||
```python
|
||||
# For internal deployment, use service tokens
|
||||
@mcp.tool()
|
||||
def create_issue(
|
||||
service_token: str, # Validated internally
|
||||
project_id: str,
|
||||
title: str,
|
||||
body: str
|
||||
) -> Issue:
|
||||
validate_service_token(service_token)
|
||||
# ... implementation
|
||||
```
|
||||
|
||||
### 5. FastAPI Integration Pattern
|
||||
|
||||
```python
|
||||
# app/mcp/client.py
|
||||
from mcp import ClientSession
|
||||
from mcp.client.sse import sse_client
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
class MCPClientManager:
|
||||
def __init__(self):
|
||||
self._sessions: dict[str, ClientSession] = {}
|
||||
|
||||
async def connect_all(self):
|
||||
"""Connect to all configured MCP servers."""
|
||||
for name, config in MCP_SERVERS.items():
|
||||
if config.enabled:
|
||||
session = await self._connect_server(config)
|
||||
self._sessions[name] = session
|
||||
|
||||
async def call_tool(
|
||||
self,
|
||||
server: str,
|
||||
tool_name: str,
|
||||
arguments: dict
|
||||
) -> Any:
|
||||
"""Call a tool on a specific MCP server."""
|
||||
session = self._sessions[server]
|
||||
result = await session.call_tool(tool_name, arguments)
|
||||
return result.content
|
||||
|
||||
# Usage in FastAPI
|
||||
mcp_client = MCPClientManager()
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
await mcp_client.connect_all()
|
||||
|
||||
@app.post("/api/v1/knowledge/search")
|
||||
async def search_knowledge(request: SearchRequest):
|
||||
result = await mcp_client.call_tool(
|
||||
"knowledge_base",
|
||||
"search_knowledge",
|
||||
{
|
||||
"project_id": request.project_id,
|
||||
"agent_id": request.agent_id,
|
||||
"query": request.query
|
||||
}
|
||||
)
|
||||
return result
|
||||
```
|
||||
|
||||
## Recommendations
|
||||
|
||||
### Immediate Actions
|
||||
|
||||
1. **Use FastMCP 2.0** for all MCP server implementations
|
||||
2. **Implement unified singleton pattern** with explicit scoping
|
||||
3. **Use SSE transport** for MCP server connections
|
||||
4. **Service tokens** for internal auth (v1), OAuth 2.0 for future
|
||||
|
||||
### MCP Server Priority
|
||||
|
||||
1. **LLM Gateway** - Critical for agent operation
|
||||
2. **Knowledge Base** - Required for RAG functionality
|
||||
3. **Git MCP** - Required for code delivery
|
||||
4. **Issues MCP** - Required for project management
|
||||
5. **File System** - Required for workspace operations
|
||||
6. **Code Analysis** - Enhance code quality
|
||||
7. **CI/CD** - Automate deployments
|
||||
|
||||
### Code Organization
|
||||
|
||||
```
|
||||
syndarix/
|
||||
├── backend/
|
||||
│ └── app/
|
||||
│ └── mcp/
|
||||
│ ├── __init__.py
|
||||
│ ├── client.py # MCP client manager
|
||||
│ ├── registry.py # Server configurations
|
||||
│ └── schemas.py # Tool argument schemas
|
||||
└── mcp_servers/
|
||||
├── llm_gateway/
|
||||
│ ├── __init__.py
|
||||
│ ├── server.py
|
||||
│ └── tools.py
|
||||
├── knowledge_base/
|
||||
├── git/
|
||||
├── issues/
|
||||
├── file_system/
|
||||
├── code_analysis/
|
||||
└── cicd/
|
||||
```
|
||||
|
||||
## References
|
||||
|
||||
- [FastMCP Documentation](https://gofastmcp.com)
|
||||
- [MCP Protocol Specification](https://spec.modelcontextprotocol.io)
|
||||
- [Anthropic MCP SDK](https://github.com/anthropics/anthropic-sdk-mcp)
|
||||
|
||||
## Decision
|
||||
|
||||
**Adopt FastMCP 2.0** with unified singleton servers and explicit project/agent scoping for all MCP integrations.
|
||||
|
||||
---
|
||||
|
||||
*Spike completed. Findings will inform ADR-001: MCP Integration Architecture.*
|
||||
338
docs/spikes/SPIKE-003-realtime-updates.md
Normal file
338
docs/spikes/SPIKE-003-realtime-updates.md
Normal file
@@ -0,0 +1,338 @@
|
||||
# SPIKE-003: Real-time Updates Architecture
|
||||
|
||||
**Status:** Completed
|
||||
**Date:** 2025-12-29
|
||||
**Author:** Architecture Team
|
||||
**Related Issue:** #3
|
||||
|
||||
---
|
||||
|
||||
## Objective
|
||||
|
||||
Evaluate WebSocket vs Server-Sent Events (SSE) for real-time updates in Syndarix, focusing on agent activity streams, progress updates, and client notifications.
|
||||
|
||||
## Research Questions
|
||||
|
||||
1. What are the trade-offs between WebSocket and SSE?
|
||||
2. Which pattern best fits Syndarix's use cases?
|
||||
3. How do we handle reconnection and reliability?
|
||||
4. What is the FastAPI implementation approach?
|
||||
|
||||
## Findings
|
||||
|
||||
### 1. Use Case Analysis
|
||||
|
||||
| Use Case | Direction | Frequency | Latency Req |
|
||||
|----------|-----------|-----------|-------------|
|
||||
| Agent activity feed | Server → Client | High | Low |
|
||||
| Sprint progress | Server → Client | Medium | Low |
|
||||
| Build status | Server → Client | Low | Medium |
|
||||
| Client approval requests | Server → Client | Low | High |
|
||||
| Client messages | Client → Server | Low | Medium |
|
||||
| Issue updates | Server → Client | Medium | Low |
|
||||
|
||||
**Key Insight:** 90%+ of real-time communication is **server-to-client** (unidirectional).
|
||||
|
||||
### 2. Technology Comparison
|
||||
|
||||
| Feature | Server-Sent Events (SSE) | WebSocket |
|
||||
|---------|-------------------------|-----------|
|
||||
| Direction | Unidirectional (server → client) | Bidirectional |
|
||||
| Protocol | HTTP/1.1 or HTTP/2 | Custom (ws://) |
|
||||
| Reconnection | Built-in automatic | Manual implementation |
|
||||
| Connection limits | Limited per domain | Similar limits |
|
||||
| Browser support | Excellent | Excellent |
|
||||
| Through proxies | Native HTTP | May require config |
|
||||
| Complexity | Simple | More complex |
|
||||
| FastAPI support | Native | Native |
|
||||
|
||||
### 3. Recommendation: SSE for Primary, WebSocket for Chat
|
||||
|
||||
**SSE (Recommended for 90% of use cases):**
|
||||
- Agent activity streams
|
||||
- Progress updates
|
||||
- Build/pipeline status
|
||||
- Issue change notifications
|
||||
- Approval request alerts
|
||||
|
||||
**WebSocket (For bidirectional needs):**
|
||||
- Live chat with agents
|
||||
- Interactive debugging sessions
|
||||
- Real-time collaboration (future)
|
||||
|
||||
### 4. FastAPI SSE Implementation
|
||||
|
||||
```python
|
||||
# app/api/v1/events.py
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
from app.services.events import EventBus
|
||||
import asyncio
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/projects/{project_id}/events")
|
||||
async def project_events(
|
||||
project_id: str,
|
||||
request: Request,
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
"""Stream real-time events for a project."""
|
||||
|
||||
async def event_generator():
|
||||
event_bus = EventBus()
|
||||
subscriber = await event_bus.subscribe(
|
||||
channel=f"project:{project_id}",
|
||||
user_id=current_user.id
|
||||
)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Check if client disconnected
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
|
||||
# Wait for next event (with timeout for keepalive)
|
||||
try:
|
||||
event = await asyncio.wait_for(
|
||||
subscriber.get_event(),
|
||||
timeout=30.0
|
||||
)
|
||||
yield f"event: {event.type}\ndata: {event.json()}\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
# Send keepalive comment
|
||||
yield ": keepalive\n\n"
|
||||
finally:
|
||||
await event_bus.unsubscribe(subscriber)
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no", # Disable nginx buffering
|
||||
}
|
||||
)
|
||||
```
|
||||
|
||||
### 5. Event Bus Architecture with Redis
|
||||
|
||||
```python
|
||||
# app/services/events.py
|
||||
from dataclasses import dataclass
|
||||
from typing import AsyncIterator
|
||||
import redis.asyncio as redis
|
||||
import json
|
||||
|
||||
@dataclass
|
||||
class Event:
|
||||
type: str
|
||||
data: dict
|
||||
project_id: str
|
||||
agent_id: str | None = None
|
||||
timestamp: float = None
|
||||
|
||||
class EventBus:
|
||||
def __init__(self, redis_url: str):
|
||||
self.redis = redis.from_url(redis_url)
|
||||
self.pubsub = self.redis.pubsub()
|
||||
|
||||
async def publish(self, channel: str, event: Event):
|
||||
"""Publish an event to a channel."""
|
||||
await self.redis.publish(
|
||||
channel,
|
||||
json.dumps(event.__dict__)
|
||||
)
|
||||
|
||||
async def subscribe(self, channel: str) -> "Subscriber":
|
||||
"""Subscribe to a channel."""
|
||||
await self.pubsub.subscribe(channel)
|
||||
return Subscriber(self.pubsub, channel)
|
||||
|
||||
class Subscriber:
|
||||
def __init__(self, pubsub, channel: str):
|
||||
self.pubsub = pubsub
|
||||
self.channel = channel
|
||||
|
||||
async def get_event(self) -> Event:
|
||||
"""Get the next event (blocking)."""
|
||||
while True:
|
||||
message = await self.pubsub.get_message(
|
||||
ignore_subscribe_messages=True,
|
||||
timeout=1.0
|
||||
)
|
||||
if message and message["type"] == "message":
|
||||
data = json.loads(message["data"])
|
||||
return Event(**data)
|
||||
|
||||
async def unsubscribe(self):
|
||||
await self.pubsub.unsubscribe(self.channel)
|
||||
```
|
||||
|
||||
### 6. Client-Side Implementation
|
||||
|
||||
```typescript
|
||||
// frontend/lib/events.ts
|
||||
class EventSource {
|
||||
private eventSource: EventSource | null = null;
|
||||
private reconnectDelay = 1000;
|
||||
private maxReconnectDelay = 30000;
|
||||
|
||||
connect(projectId: string, onEvent: (event: ProjectEvent) => void) {
|
||||
const url = `/api/v1/projects/${projectId}/events`;
|
||||
|
||||
this.eventSource = new EventSource(url, {
|
||||
withCredentials: true
|
||||
});
|
||||
|
||||
this.eventSource.onopen = () => {
|
||||
console.log('SSE connected');
|
||||
this.reconnectDelay = 1000; // Reset on success
|
||||
};
|
||||
|
||||
this.eventSource.addEventListener('agent_activity', (e) => {
|
||||
onEvent({ type: 'agent_activity', data: JSON.parse(e.data) });
|
||||
});
|
||||
|
||||
this.eventSource.addEventListener('issue_update', (e) => {
|
||||
onEvent({ type: 'issue_update', data: JSON.parse(e.data) });
|
||||
});
|
||||
|
||||
this.eventSource.addEventListener('approval_required', (e) => {
|
||||
onEvent({ type: 'approval_required', data: JSON.parse(e.data) });
|
||||
});
|
||||
|
||||
this.eventSource.onerror = () => {
|
||||
this.eventSource?.close();
|
||||
// Exponential backoff reconnect
|
||||
setTimeout(() => this.connect(projectId, onEvent), this.reconnectDelay);
|
||||
this.reconnectDelay = Math.min(
|
||||
this.reconnectDelay * 2,
|
||||
this.maxReconnectDelay
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
this.eventSource?.close();
|
||||
this.eventSource = null;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 7. Event Types
|
||||
|
||||
```python
|
||||
# app/schemas/events.py
|
||||
from enum import Enum
|
||||
from pydantic import BaseModel
|
||||
from datetime import datetime
|
||||
|
||||
class EventType(str, Enum):
|
||||
# Agent Events
|
||||
AGENT_STARTED = "agent_started"
|
||||
AGENT_ACTIVITY = "agent_activity"
|
||||
AGENT_COMPLETED = "agent_completed"
|
||||
AGENT_ERROR = "agent_error"
|
||||
|
||||
# Project Events
|
||||
ISSUE_CREATED = "issue_created"
|
||||
ISSUE_UPDATED = "issue_updated"
|
||||
ISSUE_CLOSED = "issue_closed"
|
||||
|
||||
# Git Events
|
||||
BRANCH_CREATED = "branch_created"
|
||||
COMMIT_PUSHED = "commit_pushed"
|
||||
PR_CREATED = "pr_created"
|
||||
PR_MERGED = "pr_merged"
|
||||
|
||||
# Workflow Events
|
||||
APPROVAL_REQUIRED = "approval_required"
|
||||
SPRINT_STARTED = "sprint_started"
|
||||
SPRINT_COMPLETED = "sprint_completed"
|
||||
|
||||
# Pipeline Events
|
||||
PIPELINE_STARTED = "pipeline_started"
|
||||
PIPELINE_COMPLETED = "pipeline_completed"
|
||||
PIPELINE_FAILED = "pipeline_failed"
|
||||
|
||||
class ProjectEvent(BaseModel):
|
||||
id: str
|
||||
type: EventType
|
||||
project_id: str
|
||||
agent_id: str | None
|
||||
data: dict
|
||||
timestamp: datetime
|
||||
```
|
||||
|
||||
### 8. WebSocket for Chat (Secondary)
|
||||
|
||||
```python
|
||||
# app/api/v1/chat.py
|
||||
from fastapi import WebSocket, WebSocketDisconnect
|
||||
from app.services.agent_chat import AgentChatService
|
||||
|
||||
@router.websocket("/projects/{project_id}/agents/{agent_id}/chat")
|
||||
async def agent_chat(
|
||||
websocket: WebSocket,
|
||||
project_id: str,
|
||||
agent_id: str
|
||||
):
|
||||
"""Bidirectional chat with an agent."""
|
||||
await websocket.accept()
|
||||
|
||||
chat_service = AgentChatService(project_id, agent_id)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Receive message from client
|
||||
message = await websocket.receive_json()
|
||||
|
||||
# Stream response from agent
|
||||
async for chunk in chat_service.get_response(message):
|
||||
await websocket.send_json({
|
||||
"type": "chunk",
|
||||
"content": chunk
|
||||
})
|
||||
|
||||
await websocket.send_json({"type": "done"})
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
```
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
### Connection Limits
|
||||
- Browser limit: ~6 connections per domain (HTTP/1.1)
|
||||
- Recommendation: Use single SSE connection per project, multiplex events
|
||||
|
||||
### Scalability
|
||||
- Redis Pub/Sub handles cross-instance event distribution
|
||||
- Consider Redis Streams for message persistence (audit/replay)
|
||||
|
||||
### Keepalive
|
||||
- Send comment every 30 seconds to prevent timeout
|
||||
- Client reconnects automatically on disconnect
|
||||
|
||||
## Recommendations
|
||||
|
||||
1. **Use SSE for all server-to-client events** (simpler, auto-reconnect)
|
||||
2. **Use WebSocket only for interactive chat** with agents
|
||||
3. **Redis Pub/Sub for event distribution** across instances
|
||||
4. **Single SSE connection per project** with event multiplexing
|
||||
5. **Exponential backoff** for client reconnection
|
||||
|
||||
## References
|
||||
|
||||
- [FastAPI SSE](https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse)
|
||||
- [MDN EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource)
|
||||
- [Redis Pub/Sub](https://redis.io/topics/pubsub)
|
||||
|
||||
## Decision
|
||||
|
||||
**Adopt SSE as the primary real-time transport** with WebSocket reserved for bidirectional chat. Use Redis Pub/Sub for event distribution.
|
||||
|
||||
---
|
||||
|
||||
*Spike completed. Findings will inform ADR-002: Real-time Communication Architecture.*
|
||||
420
docs/spikes/SPIKE-004-celery-redis-integration.md
Normal file
420
docs/spikes/SPIKE-004-celery-redis-integration.md
Normal file
@@ -0,0 +1,420 @@
|
||||
# SPIKE-004: Celery + Redis Integration
|
||||
|
||||
**Status:** Completed
|
||||
**Date:** 2025-12-29
|
||||
**Author:** Architecture Team
|
||||
**Related Issue:** #4
|
||||
|
||||
---
|
||||
|
||||
## Objective
|
||||
|
||||
Research best practices for integrating Celery with FastAPI for background task processing, focusing on agent orchestration, long-running workflows, and task monitoring.
|
||||
|
||||
## Research Questions
|
||||
|
||||
1. How to properly integrate Celery with async FastAPI?
|
||||
2. What is the optimal task queue architecture for Syndarix?
|
||||
3. How to handle long-running agent tasks?
|
||||
4. What monitoring and visibility patterns should we use?
|
||||
|
||||
## Findings
|
||||
|
||||
### 1. Celery + FastAPI Integration Pattern
|
||||
|
||||
**Challenge:** Celery is synchronous, FastAPI is async.
|
||||
|
||||
**Solution:** Use `celery.result.AsyncResult` with async polling or callbacks.
|
||||
|
||||
```python
|
||||
# app/core/celery.py
|
||||
from celery import Celery
|
||||
from app.core.config import settings
|
||||
|
||||
celery_app = Celery(
|
||||
"syndarix",
|
||||
broker=settings.REDIS_URL,
|
||||
backend=settings.REDIS_URL,
|
||||
include=[
|
||||
"app.tasks.agent_tasks",
|
||||
"app.tasks.git_tasks",
|
||||
"app.tasks.sync_tasks",
|
||||
]
|
||||
)
|
||||
|
||||
celery_app.conf.update(
|
||||
task_serializer="json",
|
||||
accept_content=["json"],
|
||||
result_serializer="json",
|
||||
timezone="UTC",
|
||||
enable_utc=True,
|
||||
task_track_started=True,
|
||||
task_time_limit=3600, # 1 hour max
|
||||
task_soft_time_limit=3300, # 55 min soft limit
|
||||
worker_prefetch_multiplier=1, # One task at a time for LLM tasks
|
||||
task_acks_late=True, # Acknowledge after completion
|
||||
task_reject_on_worker_lost=True, # Retry if worker dies
|
||||
)
|
||||
```
|
||||
|
||||
### 2. Task Queue Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ FastAPI Backend │
|
||||
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
||||
│ │ API Layer │ │ Services │ │ Events │ │
|
||||
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
|
||||
│ │ │ │ │
|
||||
│ └────────────────┼────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌────────────────────────────────┐ │
|
||||
│ │ Task Dispatcher │ │
|
||||
│ │ (Celery send_task) │ │
|
||||
│ └────────────────┬───────────────┘ │
|
||||
└──────────────────────────┼──────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌──────────────────────────────────────────────────────────────────┐
|
||||
│ Redis (Broker + Backend) │
|
||||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
||||
│ │ agent_queue │ │ git_queue │ │ sync_queue │ │
|
||||
│ │ (priority) │ │ │ │ │ │
|
||||
│ └──────────────┘ └──────────────┘ └──────────────┘ │
|
||||
└──────────────────────────────────────────────────────────────────┘
|
||||
│
|
||||
┌───────────────┼───────────────┐
|
||||
│ │ │
|
||||
▼ ▼ ▼
|
||||
┌────────────┐ ┌────────────┐ ┌────────────┐
|
||||
│ Worker │ │ Worker │ │ Worker │
|
||||
│ (agents) │ │ (git) │ │ (sync) │
|
||||
│ prefetch=1 │ │ prefetch=4 │ │ prefetch=4 │
|
||||
└────────────┘ └────────────┘ └────────────┘
|
||||
```
|
||||
|
||||
### 3. Queue Configuration
|
||||
|
||||
```python
|
||||
# app/core/celery.py
|
||||
celery_app.conf.task_queues = [
|
||||
Queue("agent_queue", routing_key="agent.#"),
|
||||
Queue("git_queue", routing_key="git.#"),
|
||||
Queue("sync_queue", routing_key="sync.#"),
|
||||
Queue("cicd_queue", routing_key="cicd.#"),
|
||||
]
|
||||
|
||||
celery_app.conf.task_routes = {
|
||||
"app.tasks.agent_tasks.*": {"queue": "agent_queue"},
|
||||
"app.tasks.git_tasks.*": {"queue": "git_queue"},
|
||||
"app.tasks.sync_tasks.*": {"queue": "sync_queue"},
|
||||
"app.tasks.cicd_tasks.*": {"queue": "cicd_queue"},
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Agent Task Implementation
|
||||
|
||||
```python
|
||||
# app/tasks/agent_tasks.py
|
||||
from celery import Task
|
||||
from app.core.celery import celery_app
|
||||
from app.services.agent_runner import AgentRunner
|
||||
from app.services.events import EventBus
|
||||
|
||||
class AgentTask(Task):
|
||||
"""Base class for agent tasks with retry and monitoring."""
|
||||
|
||||
autoretry_for = (ConnectionError, TimeoutError)
|
||||
retry_backoff = True
|
||||
retry_backoff_max = 600
|
||||
retry_jitter = True
|
||||
max_retries = 3
|
||||
|
||||
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
||||
"""Handle task failure."""
|
||||
project_id = kwargs.get("project_id")
|
||||
agent_id = kwargs.get("agent_id")
|
||||
EventBus().publish(f"project:{project_id}", {
|
||||
"type": "agent_error",
|
||||
"agent_id": agent_id,
|
||||
"error": str(exc)
|
||||
})
|
||||
|
||||
@celery_app.task(bind=True, base=AgentTask)
|
||||
def run_agent_action(
|
||||
self,
|
||||
agent_id: str,
|
||||
project_id: str,
|
||||
action: str,
|
||||
context: dict
|
||||
) -> dict:
|
||||
"""
|
||||
Execute an agent action as a background task.
|
||||
|
||||
Args:
|
||||
agent_id: The agent instance ID
|
||||
project_id: The project context
|
||||
action: The action to perform
|
||||
context: Action-specific context
|
||||
|
||||
Returns:
|
||||
Action result dictionary
|
||||
"""
|
||||
runner = AgentRunner(agent_id, project_id)
|
||||
|
||||
# Update task state for monitoring
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={"agent_id": agent_id, "action": action}
|
||||
)
|
||||
|
||||
# Publish start event
|
||||
EventBus().publish(f"project:{project_id}", {
|
||||
"type": "agent_started",
|
||||
"agent_id": agent_id,
|
||||
"action": action,
|
||||
"task_id": self.request.id
|
||||
})
|
||||
|
||||
try:
|
||||
result = runner.execute(action, context)
|
||||
|
||||
# Publish completion event
|
||||
EventBus().publish(f"project:{project_id}", {
|
||||
"type": "agent_completed",
|
||||
"agent_id": agent_id,
|
||||
"action": action,
|
||||
"result_summary": result.get("summary")
|
||||
})
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
# Will trigger on_failure
|
||||
raise
|
||||
```
|
||||
|
||||
### 5. Long-Running Task Patterns
|
||||
|
||||
**Progress Reporting:**
|
||||
```python
|
||||
@celery_app.task(bind=True)
|
||||
def implement_story(self, story_id: str, agent_id: str, project_id: str):
|
||||
"""Implement a user story with progress reporting."""
|
||||
|
||||
steps = [
|
||||
("analyzing", "Analyzing requirements"),
|
||||
("designing", "Designing solution"),
|
||||
("implementing", "Writing code"),
|
||||
("testing", "Running tests"),
|
||||
("documenting", "Updating documentation"),
|
||||
]
|
||||
|
||||
for i, (state, description) in enumerate(steps):
|
||||
self.update_state(
|
||||
state="PROGRESS",
|
||||
meta={
|
||||
"current": i + 1,
|
||||
"total": len(steps),
|
||||
"status": description
|
||||
}
|
||||
)
|
||||
|
||||
# Do the actual work
|
||||
execute_step(state, story_id, agent_id)
|
||||
|
||||
# Publish progress event
|
||||
EventBus().publish(f"project:{project_id}", {
|
||||
"type": "agent_progress",
|
||||
"agent_id": agent_id,
|
||||
"step": i + 1,
|
||||
"total": len(steps),
|
||||
"description": description
|
||||
})
|
||||
|
||||
return {"status": "completed", "story_id": story_id}
|
||||
```
|
||||
|
||||
**Task Chaining:**
|
||||
```python
|
||||
from celery import chain, group
|
||||
|
||||
# Sequential workflow
|
||||
workflow = chain(
|
||||
analyze_requirements.s(story_id),
|
||||
design_solution.s(),
|
||||
implement_code.s(),
|
||||
run_tests.s(),
|
||||
create_pr.s()
|
||||
)
|
||||
|
||||
# Parallel execution
|
||||
parallel_tests = group(
|
||||
run_unit_tests.s(project_id),
|
||||
run_integration_tests.s(project_id),
|
||||
run_linting.s(project_id)
|
||||
)
|
||||
```
|
||||
|
||||
### 6. FastAPI Integration
|
||||
|
||||
```python
|
||||
# app/api/v1/agents.py
|
||||
from fastapi import APIRouter, BackgroundTasks
|
||||
from app.tasks.agent_tasks import run_agent_action
|
||||
from celery.result import AsyncResult
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.post("/agents/{agent_id}/actions")
|
||||
async def trigger_agent_action(
|
||||
agent_id: str,
|
||||
action: AgentActionRequest,
|
||||
background_tasks: BackgroundTasks
|
||||
):
|
||||
"""Trigger an agent action as a background task."""
|
||||
|
||||
# Dispatch to Celery
|
||||
task = run_agent_action.delay(
|
||||
agent_id=agent_id,
|
||||
project_id=action.project_id,
|
||||
action=action.action,
|
||||
context=action.context
|
||||
)
|
||||
|
||||
return {
|
||||
"task_id": task.id,
|
||||
"status": "queued"
|
||||
}
|
||||
|
||||
@router.get("/tasks/{task_id}")
|
||||
async def get_task_status(task_id: str):
|
||||
"""Get the status of a background task."""
|
||||
|
||||
result = AsyncResult(task_id)
|
||||
|
||||
if result.state == "PENDING":
|
||||
return {"status": "pending"}
|
||||
elif result.state == "RUNNING":
|
||||
return {"status": "running", **result.info}
|
||||
elif result.state == "PROGRESS":
|
||||
return {"status": "progress", **result.info}
|
||||
elif result.state == "SUCCESS":
|
||||
return {"status": "completed", "result": result.result}
|
||||
elif result.state == "FAILURE":
|
||||
return {"status": "failed", "error": str(result.result)}
|
||||
|
||||
return {"status": result.state}
|
||||
```
|
||||
|
||||
### 7. Worker Configuration
|
||||
|
||||
```bash
|
||||
# Run different workers for different queues
|
||||
|
||||
# Agent worker (single task at a time for LLM rate limiting)
|
||||
celery -A app.core.celery worker \
|
||||
-Q agent_queue \
|
||||
-c 4 \
|
||||
--prefetch-multiplier=1 \
|
||||
-n agent_worker@%h
|
||||
|
||||
# Git worker (can handle multiple concurrent tasks)
|
||||
celery -A app.core.celery worker \
|
||||
-Q git_queue \
|
||||
-c 8 \
|
||||
--prefetch-multiplier=4 \
|
||||
-n git_worker@%h
|
||||
|
||||
# Sync worker
|
||||
celery -A app.core.celery worker \
|
||||
-Q sync_queue \
|
||||
-c 4 \
|
||||
--prefetch-multiplier=4 \
|
||||
-n sync_worker@%h
|
||||
```
|
||||
|
||||
### 8. Monitoring with Flower
|
||||
|
||||
```python
|
||||
# docker-compose.yml
|
||||
services:
|
||||
flower:
|
||||
image: mher/flower:latest
|
||||
command: celery flower --broker=redis://redis:6379/0
|
||||
ports:
|
||||
- "5555:5555"
|
||||
environment:
|
||||
- CELERY_BROKER_URL=redis://redis:6379/0
|
||||
- FLOWER_BASIC_AUTH=admin:password
|
||||
```
|
||||
|
||||
### 9. Task Scheduling (Celery Beat)
|
||||
|
||||
```python
|
||||
# app/core/celery.py
|
||||
from celery.schedules import crontab
|
||||
|
||||
celery_app.conf.beat_schedule = {
|
||||
# Sync issues every minute
|
||||
"sync-external-issues": {
|
||||
"task": "app.tasks.sync_tasks.sync_all_issues",
|
||||
"schedule": 60.0,
|
||||
},
|
||||
# Health check every 5 minutes
|
||||
"agent-health-check": {
|
||||
"task": "app.tasks.agent_tasks.health_check_all_agents",
|
||||
"schedule": 300.0,
|
||||
},
|
||||
# Daily cleanup at midnight
|
||||
"cleanup-old-tasks": {
|
||||
"task": "app.tasks.maintenance.cleanup_old_tasks",
|
||||
"schedule": crontab(hour=0, minute=0),
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **One task per LLM call** - Avoid rate limiting issues
|
||||
2. **Progress reporting** - Update state for long-running tasks
|
||||
3. **Idempotent tasks** - Handle retries gracefully
|
||||
4. **Separate queues** - Isolate slow tasks from fast ones
|
||||
5. **Task result expiry** - Set `result_expires` to avoid Redis bloat
|
||||
6. **Soft time limits** - Allow graceful shutdown before hard kill
|
||||
|
||||
## Recommendations
|
||||
|
||||
1. **Use Celery for all long-running operations**
|
||||
- Agent actions
|
||||
- Git operations
|
||||
- External sync
|
||||
- CI/CD triggers
|
||||
|
||||
2. **Use Redis as both broker and backend**
|
||||
- Simplifies infrastructure
|
||||
- Fast enough for our scale
|
||||
|
||||
3. **Configure separate queues**
|
||||
- `agent_queue` with prefetch=1
|
||||
- `git_queue` with prefetch=4
|
||||
- `sync_queue` with prefetch=4
|
||||
|
||||
4. **Implement proper monitoring**
|
||||
- Flower for web UI
|
||||
- Prometheus metrics export
|
||||
- Dead letter queue for failed tasks
|
||||
|
||||
## References
|
||||
|
||||
- [Celery Documentation](https://docs.celeryq.dev/)
|
||||
- [FastAPI Background Tasks](https://fastapi.tiangolo.com/tutorial/background-tasks/)
|
||||
- [Celery Best Practices](https://docs.celeryq.dev/en/stable/userguide/tasks.html#tips-and-best-practices)
|
||||
|
||||
## Decision
|
||||
|
||||
**Adopt Celery + Redis** for all background task processing with queue-based routing and progress reporting via Redis Pub/Sub events.
|
||||
|
||||
---
|
||||
|
||||
*Spike completed. Findings will inform ADR-003: Background Task Architecture.*
|
||||
516
docs/spikes/SPIKE-005-llm-provider-abstraction.md
Normal file
516
docs/spikes/SPIKE-005-llm-provider-abstraction.md
Normal file
@@ -0,0 +1,516 @@
|
||||
# SPIKE-005: LLM Provider Abstraction
|
||||
|
||||
**Status:** Completed
|
||||
**Date:** 2025-12-29
|
||||
**Author:** Architecture Team
|
||||
**Related Issue:** #5
|
||||
|
||||
---
|
||||
|
||||
## Objective
|
||||
|
||||
Research the best approach for unified LLM provider abstraction with support for multiple providers, automatic failover, and cost tracking.
|
||||
|
||||
## Research Questions
|
||||
|
||||
1. What libraries exist for unified LLM access?
|
||||
2. How to implement automatic failover between providers?
|
||||
3. How to track token usage and costs per agent/project?
|
||||
4. What caching strategies can reduce API costs?
|
||||
|
||||
## Findings
|
||||
|
||||
### 1. LiteLLM - Recommended Solution
|
||||
|
||||
**LiteLLM** provides a unified interface to 100+ LLM providers using the OpenAI SDK format.
|
||||
|
||||
**Key Features:**
|
||||
- Unified API across providers (Anthropic, OpenAI, local, etc.)
|
||||
- Built-in failover and load balancing
|
||||
- Token counting and cost tracking
|
||||
- Streaming support
|
||||
- Async support
|
||||
- Caching with Redis
|
||||
|
||||
**Installation:**
|
||||
```bash
|
||||
pip install litellm
|
||||
```
|
||||
|
||||
### 2. Basic Usage
|
||||
|
||||
```python
|
||||
from litellm import completion, acompletion
|
||||
import litellm
|
||||
|
||||
# Configure providers
|
||||
litellm.api_key = os.getenv("ANTHROPIC_API_KEY")
|
||||
litellm.set_verbose = True # For debugging
|
||||
|
||||
# Synchronous call
|
||||
response = completion(
|
||||
model="claude-3-5-sonnet-20241022",
|
||||
messages=[{"role": "user", "content": "Hello!"}]
|
||||
)
|
||||
|
||||
# Async call (for FastAPI)
|
||||
response = await acompletion(
|
||||
model="claude-3-5-sonnet-20241022",
|
||||
messages=[{"role": "user", "content": "Hello!"}]
|
||||
)
|
||||
```
|
||||
|
||||
### 3. Model Naming Convention
|
||||
|
||||
LiteLLM uses prefixed model names:
|
||||
|
||||
| Provider | Model Format |
|
||||
|----------|--------------|
|
||||
| Anthropic | `claude-3-5-sonnet-20241022` |
|
||||
| OpenAI | `gpt-4-turbo` |
|
||||
| Azure OpenAI | `azure/deployment-name` |
|
||||
| Ollama | `ollama/llama3` |
|
||||
| Together AI | `together_ai/togethercomputer/llama-2-70b` |
|
||||
|
||||
### 4. Failover Configuration
|
||||
|
||||
```python
|
||||
from litellm import Router
|
||||
|
||||
# Define model list with fallbacks
|
||||
model_list = [
|
||||
{
|
||||
"model_name": "primary-agent",
|
||||
"litellm_params": {
|
||||
"model": "claude-3-5-sonnet-20241022",
|
||||
"api_key": os.getenv("ANTHROPIC_API_KEY"),
|
||||
},
|
||||
"model_info": {"id": 1}
|
||||
},
|
||||
{
|
||||
"model_name": "primary-agent", # Same name = fallback
|
||||
"litellm_params": {
|
||||
"model": "gpt-4-turbo",
|
||||
"api_key": os.getenv("OPENAI_API_KEY"),
|
||||
},
|
||||
"model_info": {"id": 2}
|
||||
},
|
||||
{
|
||||
"model_name": "primary-agent",
|
||||
"litellm_params": {
|
||||
"model": "ollama/llama3",
|
||||
"api_base": "http://localhost:11434",
|
||||
},
|
||||
"model_info": {"id": 3}
|
||||
}
|
||||
]
|
||||
|
||||
# Initialize router with failover
|
||||
router = Router(
|
||||
model_list=model_list,
|
||||
fallbacks=[
|
||||
{"primary-agent": ["primary-agent"]} # Try all models with same name
|
||||
],
|
||||
routing_strategy="simple-shuffle", # or "latency-based-routing"
|
||||
num_retries=3,
|
||||
retry_after=5, # seconds
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
# Use router
|
||||
response = await router.acompletion(
|
||||
model="primary-agent",
|
||||
messages=[{"role": "user", "content": "Hello!"}]
|
||||
)
|
||||
```
|
||||
|
||||
### 5. Syndarix LLM Gateway Architecture
|
||||
|
||||
```python
|
||||
# app/services/llm_gateway.py
|
||||
from litellm import Router, acompletion
|
||||
from app.core.config import settings
|
||||
from app.models.agent import AgentType
|
||||
from app.services.cost_tracker import CostTracker
|
||||
from app.services.events import EventBus
|
||||
|
||||
class LLMGateway:
|
||||
"""Unified LLM gateway with failover and cost tracking."""
|
||||
|
||||
def __init__(self):
|
||||
self.router = self._build_router()
|
||||
self.cost_tracker = CostTracker()
|
||||
self.event_bus = EventBus()
|
||||
|
||||
def _build_router(self) -> Router:
|
||||
"""Build LiteLLM router from configuration."""
|
||||
model_list = []
|
||||
|
||||
# Add Anthropic models
|
||||
if settings.ANTHROPIC_API_KEY:
|
||||
model_list.extend([
|
||||
{
|
||||
"model_name": "high-reasoning",
|
||||
"litellm_params": {
|
||||
"model": "claude-3-5-sonnet-20241022",
|
||||
"api_key": settings.ANTHROPIC_API_KEY,
|
||||
}
|
||||
},
|
||||
{
|
||||
"model_name": "fast-response",
|
||||
"litellm_params": {
|
||||
"model": "claude-3-haiku-20240307",
|
||||
"api_key": settings.ANTHROPIC_API_KEY,
|
||||
}
|
||||
}
|
||||
])
|
||||
|
||||
# Add OpenAI fallbacks
|
||||
if settings.OPENAI_API_KEY:
|
||||
model_list.extend([
|
||||
{
|
||||
"model_name": "high-reasoning",
|
||||
"litellm_params": {
|
||||
"model": "gpt-4-turbo",
|
||||
"api_key": settings.OPENAI_API_KEY,
|
||||
}
|
||||
},
|
||||
{
|
||||
"model_name": "fast-response",
|
||||
"litellm_params": {
|
||||
"model": "gpt-4o-mini",
|
||||
"api_key": settings.OPENAI_API_KEY,
|
||||
}
|
||||
}
|
||||
])
|
||||
|
||||
# Add local models (Ollama)
|
||||
if settings.OLLAMA_URL:
|
||||
model_list.append({
|
||||
"model_name": "local-fallback",
|
||||
"litellm_params": {
|
||||
"model": "ollama/llama3",
|
||||
"api_base": settings.OLLAMA_URL,
|
||||
}
|
||||
})
|
||||
|
||||
return Router(
|
||||
model_list=model_list,
|
||||
fallbacks=[
|
||||
{"high-reasoning": ["high-reasoning", "local-fallback"]},
|
||||
{"fast-response": ["fast-response", "local-fallback"]},
|
||||
],
|
||||
routing_strategy="latency-based-routing",
|
||||
num_retries=3,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
async def complete(
|
||||
self,
|
||||
agent_id: str,
|
||||
project_id: str,
|
||||
messages: list[dict],
|
||||
model_preference: str = "high-reasoning",
|
||||
stream: bool = False,
|
||||
**kwargs
|
||||
) -> dict:
|
||||
"""
|
||||
Generate a completion with automatic failover and cost tracking.
|
||||
|
||||
Args:
|
||||
agent_id: The calling agent's ID
|
||||
project_id: The project context
|
||||
messages: Chat messages
|
||||
model_preference: "high-reasoning" or "fast-response"
|
||||
stream: Whether to stream the response
|
||||
**kwargs: Additional LiteLLM parameters
|
||||
|
||||
Returns:
|
||||
Completion response dictionary
|
||||
"""
|
||||
try:
|
||||
if stream:
|
||||
return self._stream_completion(
|
||||
agent_id, project_id, messages, model_preference, **kwargs
|
||||
)
|
||||
|
||||
response = await self.router.acompletion(
|
||||
model=model_preference,
|
||||
messages=messages,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
# Track usage
|
||||
await self._track_usage(
|
||||
agent_id=agent_id,
|
||||
project_id=project_id,
|
||||
model=response.model,
|
||||
usage=response.usage,
|
||||
)
|
||||
|
||||
return {
|
||||
"content": response.choices[0].message.content,
|
||||
"model": response.model,
|
||||
"usage": {
|
||||
"prompt_tokens": response.usage.prompt_tokens,
|
||||
"completion_tokens": response.usage.completion_tokens,
|
||||
"total_tokens": response.usage.total_tokens,
|
||||
}
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
# Publish error event
|
||||
await self.event_bus.publish(f"project:{project_id}", {
|
||||
"type": "llm_error",
|
||||
"agent_id": agent_id,
|
||||
"error": str(e)
|
||||
})
|
||||
raise
|
||||
|
||||
async def _stream_completion(
|
||||
self,
|
||||
agent_id: str,
|
||||
project_id: str,
|
||||
messages: list[dict],
|
||||
model_preference: str,
|
||||
**kwargs
|
||||
):
|
||||
"""Stream a completion response."""
|
||||
response = await self.router.acompletion(
|
||||
model=model_preference,
|
||||
messages=messages,
|
||||
stream=True,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
async for chunk in response:
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
async def _track_usage(
|
||||
self,
|
||||
agent_id: str,
|
||||
project_id: str,
|
||||
model: str,
|
||||
usage: dict
|
||||
):
|
||||
"""Track token usage and costs."""
|
||||
await self.cost_tracker.record_usage(
|
||||
agent_id=agent_id,
|
||||
project_id=project_id,
|
||||
model=model,
|
||||
prompt_tokens=usage.prompt_tokens,
|
||||
completion_tokens=usage.completion_tokens,
|
||||
)
|
||||
```
|
||||
|
||||
### 6. Cost Tracking
|
||||
|
||||
```python
|
||||
# app/services/cost_tracker.py
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from app.models.usage import TokenUsage
|
||||
from datetime import datetime
|
||||
|
||||
# Cost per 1M tokens (approximate)
|
||||
MODEL_COSTS = {
|
||||
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
|
||||
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
|
||||
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
|
||||
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
|
||||
"ollama/llama3": {"input": 0.00, "output": 0.00}, # Local
|
||||
}
|
||||
|
||||
class CostTracker:
|
||||
def __init__(self, db: AsyncSession):
|
||||
self.db = db
|
||||
|
||||
async def record_usage(
|
||||
self,
|
||||
agent_id: str,
|
||||
project_id: str,
|
||||
model: str,
|
||||
prompt_tokens: int,
|
||||
completion_tokens: int,
|
||||
):
|
||||
"""Record token usage and calculate cost."""
|
||||
costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
|
||||
|
||||
input_cost = (prompt_tokens / 1_000_000) * costs["input"]
|
||||
output_cost = (completion_tokens / 1_000_000) * costs["output"]
|
||||
total_cost = input_cost + output_cost
|
||||
|
||||
usage = TokenUsage(
|
||||
agent_id=agent_id,
|
||||
project_id=project_id,
|
||||
model=model,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
total_tokens=prompt_tokens + completion_tokens,
|
||||
cost_usd=total_cost,
|
||||
timestamp=datetime.utcnow(),
|
||||
)
|
||||
|
||||
self.db.add(usage)
|
||||
await self.db.commit()
|
||||
|
||||
async def get_project_usage(
|
||||
self,
|
||||
project_id: str,
|
||||
start_date: datetime = None,
|
||||
end_date: datetime = None,
|
||||
) -> dict:
|
||||
"""Get usage summary for a project."""
|
||||
# Query aggregated usage
|
||||
...
|
||||
|
||||
async def check_budget(
|
||||
self,
|
||||
project_id: str,
|
||||
budget_limit: float,
|
||||
) -> bool:
|
||||
"""Check if project is within budget."""
|
||||
usage = await self.get_project_usage(project_id)
|
||||
return usage["total_cost_usd"] < budget_limit
|
||||
```
|
||||
|
||||
### 7. Caching with Redis
|
||||
|
||||
```python
|
||||
import litellm
|
||||
from litellm import Cache
|
||||
|
||||
# Configure Redis cache
|
||||
litellm.cache = Cache(
|
||||
type="redis",
|
||||
host=settings.REDIS_HOST,
|
||||
port=settings.REDIS_PORT,
|
||||
password=settings.REDIS_PASSWORD,
|
||||
)
|
||||
|
||||
# Enable caching
|
||||
litellm.enable_cache()
|
||||
|
||||
# Cached completions (same input = cached response)
|
||||
response = await litellm.acompletion(
|
||||
model="claude-3-5-sonnet-20241022",
|
||||
messages=[{"role": "user", "content": "What is 2+2?"}],
|
||||
cache={"ttl": 3600} # Cache for 1 hour
|
||||
)
|
||||
```
|
||||
|
||||
### 8. Agent Type Model Mapping
|
||||
|
||||
```python
|
||||
# app/models/agent_type.py
|
||||
from sqlalchemy import Column, String, Enum as SQLEnum
|
||||
from app.db.base import Base
|
||||
|
||||
class ModelPreference(str, Enum):
|
||||
HIGH_REASONING = "high-reasoning"
|
||||
FAST_RESPONSE = "fast-response"
|
||||
COST_OPTIMIZED = "cost-optimized"
|
||||
|
||||
class AgentType(Base):
|
||||
__tablename__ = "agent_types"
|
||||
|
||||
id = Column(UUID, primary_key=True)
|
||||
name = Column(String(50), unique=True)
|
||||
role = Column(String(50))
|
||||
|
||||
# LLM configuration
|
||||
model_preference = Column(
|
||||
SQLEnum(ModelPreference),
|
||||
default=ModelPreference.HIGH_REASONING
|
||||
)
|
||||
max_tokens = Column(Integer, default=4096)
|
||||
temperature = Column(Float, default=0.7)
|
||||
|
||||
# System prompt
|
||||
system_prompt = Column(Text)
|
||||
|
||||
# Mapping agent types to models
|
||||
AGENT_MODEL_MAPPING = {
|
||||
"Product Owner": ModelPreference.HIGH_REASONING,
|
||||
"Project Manager": ModelPreference.FAST_RESPONSE,
|
||||
"Business Analyst": ModelPreference.HIGH_REASONING,
|
||||
"Software Architect": ModelPreference.HIGH_REASONING,
|
||||
"Software Engineer": ModelPreference.HIGH_REASONING,
|
||||
"UI/UX Designer": ModelPreference.HIGH_REASONING,
|
||||
"QA Engineer": ModelPreference.FAST_RESPONSE,
|
||||
"DevOps Engineer": ModelPreference.FAST_RESPONSE,
|
||||
"AI/ML Engineer": ModelPreference.HIGH_REASONING,
|
||||
"Security Expert": ModelPreference.HIGH_REASONING,
|
||||
}
|
||||
```
|
||||
|
||||
## Rate Limiting Strategy
|
||||
|
||||
```python
|
||||
from litellm import Router
|
||||
import asyncio
|
||||
|
||||
# Configure rate limits per model
|
||||
router = Router(
|
||||
model_list=model_list,
|
||||
redis_host=settings.REDIS_HOST,
|
||||
redis_port=settings.REDIS_PORT,
|
||||
routing_strategy="usage-based-routing", # Route based on rate limits
|
||||
)
|
||||
|
||||
# Custom rate limiter
|
||||
class RateLimiter:
|
||||
def __init__(self, requests_per_minute: int = 60):
|
||||
self.rpm = requests_per_minute
|
||||
self.semaphore = asyncio.Semaphore(requests_per_minute)
|
||||
|
||||
async def acquire(self):
|
||||
await self.semaphore.acquire()
|
||||
# Release after 60 seconds
|
||||
asyncio.create_task(self._release_after(60))
|
||||
|
||||
async def _release_after(self, seconds: int):
|
||||
await asyncio.sleep(seconds)
|
||||
self.semaphore.release()
|
||||
```
|
||||
|
||||
## Recommendations
|
||||
|
||||
1. **Use LiteLLM as the unified abstraction layer**
|
||||
- Simplifies multi-provider support
|
||||
- Built-in failover and retry
|
||||
- Consistent API across providers
|
||||
|
||||
2. **Configure model groups by use case**
|
||||
- `high-reasoning`: Complex analysis, architecture decisions
|
||||
- `fast-response`: Quick tasks, simple queries
|
||||
- `cost-optimized`: Non-critical, high-volume tasks
|
||||
|
||||
3. **Implement automatic failover chain**
|
||||
- Primary: Claude 3.5 Sonnet
|
||||
- Fallback 1: GPT-4 Turbo
|
||||
- Fallback 2: Local Llama 3 (if available)
|
||||
|
||||
4. **Track all usage and costs**
|
||||
- Per agent, per project
|
||||
- Set budget alerts
|
||||
- Generate usage reports
|
||||
|
||||
5. **Cache frequently repeated queries**
|
||||
- Use Redis-backed cache
|
||||
- Cache embeddings for RAG
|
||||
- Cache deterministic transformations
|
||||
|
||||
## References
|
||||
|
||||
- [LiteLLM Documentation](https://docs.litellm.ai/)
|
||||
- [LiteLLM Router](https://docs.litellm.ai/docs/routing)
|
||||
- [Anthropic Rate Limits](https://docs.anthropic.com/en/api/rate-limits)
|
||||
|
||||
## Decision
|
||||
|
||||
**Adopt LiteLLM** as the unified LLM abstraction layer with automatic failover, usage-based routing, and Redis-backed caching.
|
||||
|
||||
---
|
||||
|
||||
*Spike completed. Findings will inform ADR-004: LLM Provider Integration Architecture.*
|
||||
Reference in New Issue
Block a user