From a6a336b66e407d71cd1202e3547ecafe85fdc12e Mon Sep 17 00:00:00 2001 From: Felipe Cardoso Date: Mon, 29 Dec 2025 13:15:50 +0100 Subject: [PATCH] docs: add spike findings for LLM abstraction, MCP integration, and real-time updates - Added research findings and recommendations as separate SPIKE documents in `docs/spikes/`: - `SPIKE-005-llm-provider-abstraction.md`: Research on unified abstraction for LLM providers with failover, cost tracking, and caching strategies. - `SPIKE-001-mcp-integration-pattern.md`: Optimal pattern for integrating MCP with project/agent scoping and authentication strategies. - `SPIKE-003-realtime-updates.md`: Evaluation of SSE vs WebSocket for real-time updates, aligned with use-case needs. - Focused on aligning implementation architectures with scalability, efficiency, and user needs. - Documentation intended to inform upcoming ADRs. --- .../SPIKE-001-mcp-integration-pattern.md | 288 ++++++++++ docs/spikes/SPIKE-003-realtime-updates.md | 338 ++++++++++++ .../SPIKE-004-celery-redis-integration.md | 420 ++++++++++++++ .../SPIKE-005-llm-provider-abstraction.md | 516 ++++++++++++++++++ 4 files changed, 1562 insertions(+) create mode 100644 docs/spikes/SPIKE-001-mcp-integration-pattern.md create mode 100644 docs/spikes/SPIKE-003-realtime-updates.md create mode 100644 docs/spikes/SPIKE-004-celery-redis-integration.md create mode 100644 docs/spikes/SPIKE-005-llm-provider-abstraction.md diff --git a/docs/spikes/SPIKE-001-mcp-integration-pattern.md b/docs/spikes/SPIKE-001-mcp-integration-pattern.md new file mode 100644 index 0000000..37b417e --- /dev/null +++ b/docs/spikes/SPIKE-001-mcp-integration-pattern.md @@ -0,0 +1,288 @@ +# SPIKE-001: MCP Integration Pattern + +**Status:** Completed +**Date:** 2025-12-29 +**Author:** Architecture Team +**Related Issue:** #1 + +--- + +## Objective + +Research the optimal pattern for integrating Model Context Protocol (MCP) servers with FastAPI backend, focusing on unified singleton servers with project/agent scoping. + +## Research Questions + +1. What is the recommended MCP SDK for Python/FastAPI? +2. How should we structure unified MCP servers vs per-project servers? +3. What is the best pattern for project/agent scoping in MCP tools? +4. How do we handle authentication between Syndarix and MCP servers? + +## Findings + +### 1. FastMCP 2.0 - Recommended Framework + +**FastMCP** is a high-level, Pythonic framework for building MCP servers that significantly reduces boilerplate compared to the low-level MCP SDK. + +**Key Features:** +- Decorator-based tool registration (`@mcp.tool()`) +- Built-in context management for resources and prompts +- Support for server-sent events (SSE) and stdio transports +- Type-safe with Pydantic model support +- Async-first design compatible with FastAPI + +**Installation:** +```bash +pip install fastmcp +``` + +**Basic Example:** +```python +from fastmcp import FastMCP + +mcp = FastMCP("syndarix-knowledge-base") + +@mcp.tool() +def search_knowledge( + project_id: str, + query: str, + scope: str = "project" +) -> list[dict]: + """Search the knowledge base with project scoping.""" + # Implementation here + return results + +@mcp.resource("project://{project_id}/config") +def get_project_config(project_id: str) -> dict: + """Get project configuration.""" + return config +``` + +### 2. Unified Singleton Pattern (Recommended) + +**Decision:** Use unified singleton MCP servers instead of per-project servers. + +**Architecture:** +``` +┌─────────────────────────────────────────────────────────┐ +│ Syndarix Backend │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Agent 1 │ │ Agent 2 │ │ Agent 3 │ │ +│ │ (project A) │ │ (project A) │ │ (project B) │ │ +│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ +│ │ │ │ │ +│ └────────────────┼────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────┐ │ +│ │ MCP Client (Singleton) │ │ +│ │ Maintains connections to all MCP servers │ │ +│ └─────────────────────────────────────────────────┘ │ +└──────────────────────────┬──────────────────────────────┘ + │ + ┌───────────────┼───────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌────────────┐ ┌────────────┐ ┌────────────┐ + │ Git MCP │ │ KB MCP │ │ LLM MCP │ + │ (Singleton)│ │ (Singleton)│ │ (Singleton)│ + └────────────┘ └────────────┘ └────────────┘ +``` + +**Why Singleton:** +- Resource efficiency (one process per MCP type) +- Shared connection pools +- Centralized logging and monitoring +- Simpler deployment (7 services vs N×7) +- Cross-project learning possible (if needed) + +**Scoping Pattern:** +```python +@mcp.tool() +def search_knowledge( + project_id: str, # Required - scopes to project + agent_id: str, # Required - identifies calling agent + query: str, + scope: Literal["project", "global"] = "project" +) -> SearchResults: + """ + All tools accept project_id and agent_id for: + - Access control validation + - Audit logging + - Context filtering + """ + # Validate agent has access to project + validate_access(agent_id, project_id) + + # Log the access + log_tool_usage(agent_id, project_id, "search_knowledge") + + # Perform scoped search + if scope == "project": + return search_project_kb(project_id, query) + else: + return search_global_kb(query) +``` + +### 3. MCP Server Registry Architecture + +```python +# mcp/registry.py +from dataclasses import dataclass +from typing import Dict + +@dataclass +class MCPServerConfig: + name: str + port: int + transport: str # "sse" or "stdio" + enabled: bool = True + +MCP_SERVERS: Dict[str, MCPServerConfig] = { + "llm_gateway": MCPServerConfig("llm-gateway", 9001, "sse"), + "git": MCPServerConfig("git-mcp", 9002, "sse"), + "knowledge_base": MCPServerConfig("kb-mcp", 9003, "sse"), + "issues": MCPServerConfig("issues-mcp", 9004, "sse"), + "file_system": MCPServerConfig("fs-mcp", 9005, "sse"), + "code_analysis": MCPServerConfig("code-mcp", 9006, "sse"), + "cicd": MCPServerConfig("cicd-mcp", 9007, "sse"), +} +``` + +### 4. Authentication Pattern + +**MCP OAuth 2.0 Integration:** +```python +from fastmcp import FastMCP +from fastmcp.auth import OAuth2Bearer + +mcp = FastMCP( + "syndarix-mcp", + auth=OAuth2Bearer( + token_url="https://syndarix.local/oauth/token", + scopes=["mcp:read", "mcp:write"] + ) +) +``` + +**Internal Service Auth (Recommended for v1):** +```python +# For internal deployment, use service tokens +@mcp.tool() +def create_issue( + service_token: str, # Validated internally + project_id: str, + title: str, + body: str +) -> Issue: + validate_service_token(service_token) + # ... implementation +``` + +### 5. FastAPI Integration Pattern + +```python +# app/mcp/client.py +from mcp import ClientSession +from mcp.client.sse import sse_client +from contextlib import asynccontextmanager + +class MCPClientManager: + def __init__(self): + self._sessions: dict[str, ClientSession] = {} + + async def connect_all(self): + """Connect to all configured MCP servers.""" + for name, config in MCP_SERVERS.items(): + if config.enabled: + session = await self._connect_server(config) + self._sessions[name] = session + + async def call_tool( + self, + server: str, + tool_name: str, + arguments: dict + ) -> Any: + """Call a tool on a specific MCP server.""" + session = self._sessions[server] + result = await session.call_tool(tool_name, arguments) + return result.content + +# Usage in FastAPI +mcp_client = MCPClientManager() + +@app.on_event("startup") +async def startup(): + await mcp_client.connect_all() + +@app.post("/api/v1/knowledge/search") +async def search_knowledge(request: SearchRequest): + result = await mcp_client.call_tool( + "knowledge_base", + "search_knowledge", + { + "project_id": request.project_id, + "agent_id": request.agent_id, + "query": request.query + } + ) + return result +``` + +## Recommendations + +### Immediate Actions + +1. **Use FastMCP 2.0** for all MCP server implementations +2. **Implement unified singleton pattern** with explicit scoping +3. **Use SSE transport** for MCP server connections +4. **Service tokens** for internal auth (v1), OAuth 2.0 for future + +### MCP Server Priority + +1. **LLM Gateway** - Critical for agent operation +2. **Knowledge Base** - Required for RAG functionality +3. **Git MCP** - Required for code delivery +4. **Issues MCP** - Required for project management +5. **File System** - Required for workspace operations +6. **Code Analysis** - Enhance code quality +7. **CI/CD** - Automate deployments + +### Code Organization + +``` +syndarix/ +├── backend/ +│ └── app/ +│ └── mcp/ +│ ├── __init__.py +│ ├── client.py # MCP client manager +│ ├── registry.py # Server configurations +│ └── schemas.py # Tool argument schemas +└── mcp_servers/ + ├── llm_gateway/ + │ ├── __init__.py + │ ├── server.py + │ └── tools.py + ├── knowledge_base/ + ├── git/ + ├── issues/ + ├── file_system/ + ├── code_analysis/ + └── cicd/ +``` + +## References + +- [FastMCP Documentation](https://gofastmcp.com) +- [MCP Protocol Specification](https://spec.modelcontextprotocol.io) +- [Anthropic MCP SDK](https://github.com/anthropics/anthropic-sdk-mcp) + +## Decision + +**Adopt FastMCP 2.0** with unified singleton servers and explicit project/agent scoping for all MCP integrations. + +--- + +*Spike completed. Findings will inform ADR-001: MCP Integration Architecture.* diff --git a/docs/spikes/SPIKE-003-realtime-updates.md b/docs/spikes/SPIKE-003-realtime-updates.md new file mode 100644 index 0000000..36f14f9 --- /dev/null +++ b/docs/spikes/SPIKE-003-realtime-updates.md @@ -0,0 +1,338 @@ +# SPIKE-003: Real-time Updates Architecture + +**Status:** Completed +**Date:** 2025-12-29 +**Author:** Architecture Team +**Related Issue:** #3 + +--- + +## Objective + +Evaluate WebSocket vs Server-Sent Events (SSE) for real-time updates in Syndarix, focusing on agent activity streams, progress updates, and client notifications. + +## Research Questions + +1. What are the trade-offs between WebSocket and SSE? +2. Which pattern best fits Syndarix's use cases? +3. How do we handle reconnection and reliability? +4. What is the FastAPI implementation approach? + +## Findings + +### 1. Use Case Analysis + +| Use Case | Direction | Frequency | Latency Req | +|----------|-----------|-----------|-------------| +| Agent activity feed | Server → Client | High | Low | +| Sprint progress | Server → Client | Medium | Low | +| Build status | Server → Client | Low | Medium | +| Client approval requests | Server → Client | Low | High | +| Client messages | Client → Server | Low | Medium | +| Issue updates | Server → Client | Medium | Low | + +**Key Insight:** 90%+ of real-time communication is **server-to-client** (unidirectional). + +### 2. Technology Comparison + +| Feature | Server-Sent Events (SSE) | WebSocket | +|---------|-------------------------|-----------| +| Direction | Unidirectional (server → client) | Bidirectional | +| Protocol | HTTP/1.1 or HTTP/2 | Custom (ws://) | +| Reconnection | Built-in automatic | Manual implementation | +| Connection limits | Limited per domain | Similar limits | +| Browser support | Excellent | Excellent | +| Through proxies | Native HTTP | May require config | +| Complexity | Simple | More complex | +| FastAPI support | Native | Native | + +### 3. Recommendation: SSE for Primary, WebSocket for Chat + +**SSE (Recommended for 90% of use cases):** +- Agent activity streams +- Progress updates +- Build/pipeline status +- Issue change notifications +- Approval request alerts + +**WebSocket (For bidirectional needs):** +- Live chat with agents +- Interactive debugging sessions +- Real-time collaboration (future) + +### 4. FastAPI SSE Implementation + +```python +# app/api/v1/events.py +from fastapi import APIRouter, Request +from fastapi.responses import StreamingResponse +from app.services.events import EventBus +import asyncio + +router = APIRouter() + +@router.get("/projects/{project_id}/events") +async def project_events( + project_id: str, + request: Request, + current_user: User = Depends(get_current_user) +): + """Stream real-time events for a project.""" + + async def event_generator(): + event_bus = EventBus() + subscriber = await event_bus.subscribe( + channel=f"project:{project_id}", + user_id=current_user.id + ) + + try: + while True: + # Check if client disconnected + if await request.is_disconnected(): + break + + # Wait for next event (with timeout for keepalive) + try: + event = await asyncio.wait_for( + subscriber.get_event(), + timeout=30.0 + ) + yield f"event: {event.type}\ndata: {event.json()}\n\n" + except asyncio.TimeoutError: + # Send keepalive comment + yield ": keepalive\n\n" + finally: + await event_bus.unsubscribe(subscriber) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering + } + ) +``` + +### 5. Event Bus Architecture with Redis + +```python +# app/services/events.py +from dataclasses import dataclass +from typing import AsyncIterator +import redis.asyncio as redis +import json + +@dataclass +class Event: + type: str + data: dict + project_id: str + agent_id: str | None = None + timestamp: float = None + +class EventBus: + def __init__(self, redis_url: str): + self.redis = redis.from_url(redis_url) + self.pubsub = self.redis.pubsub() + + async def publish(self, channel: str, event: Event): + """Publish an event to a channel.""" + await self.redis.publish( + channel, + json.dumps(event.__dict__) + ) + + async def subscribe(self, channel: str) -> "Subscriber": + """Subscribe to a channel.""" + await self.pubsub.subscribe(channel) + return Subscriber(self.pubsub, channel) + +class Subscriber: + def __init__(self, pubsub, channel: str): + self.pubsub = pubsub + self.channel = channel + + async def get_event(self) -> Event: + """Get the next event (blocking).""" + while True: + message = await self.pubsub.get_message( + ignore_subscribe_messages=True, + timeout=1.0 + ) + if message and message["type"] == "message": + data = json.loads(message["data"]) + return Event(**data) + + async def unsubscribe(self): + await self.pubsub.unsubscribe(self.channel) +``` + +### 6. Client-Side Implementation + +```typescript +// frontend/lib/events.ts +class EventSource { + private eventSource: EventSource | null = null; + private reconnectDelay = 1000; + private maxReconnectDelay = 30000; + + connect(projectId: string, onEvent: (event: ProjectEvent) => void) { + const url = `/api/v1/projects/${projectId}/events`; + + this.eventSource = new EventSource(url, { + withCredentials: true + }); + + this.eventSource.onopen = () => { + console.log('SSE connected'); + this.reconnectDelay = 1000; // Reset on success + }; + + this.eventSource.addEventListener('agent_activity', (e) => { + onEvent({ type: 'agent_activity', data: JSON.parse(e.data) }); + }); + + this.eventSource.addEventListener('issue_update', (e) => { + onEvent({ type: 'issue_update', data: JSON.parse(e.data) }); + }); + + this.eventSource.addEventListener('approval_required', (e) => { + onEvent({ type: 'approval_required', data: JSON.parse(e.data) }); + }); + + this.eventSource.onerror = () => { + this.eventSource?.close(); + // Exponential backoff reconnect + setTimeout(() => this.connect(projectId, onEvent), this.reconnectDelay); + this.reconnectDelay = Math.min( + this.reconnectDelay * 2, + this.maxReconnectDelay + ); + }; + } + + disconnect() { + this.eventSource?.close(); + this.eventSource = null; + } +} +``` + +### 7. Event Types + +```python +# app/schemas/events.py +from enum import Enum +from pydantic import BaseModel +from datetime import datetime + +class EventType(str, Enum): + # Agent Events + AGENT_STARTED = "agent_started" + AGENT_ACTIVITY = "agent_activity" + AGENT_COMPLETED = "agent_completed" + AGENT_ERROR = "agent_error" + + # Project Events + ISSUE_CREATED = "issue_created" + ISSUE_UPDATED = "issue_updated" + ISSUE_CLOSED = "issue_closed" + + # Git Events + BRANCH_CREATED = "branch_created" + COMMIT_PUSHED = "commit_pushed" + PR_CREATED = "pr_created" + PR_MERGED = "pr_merged" + + # Workflow Events + APPROVAL_REQUIRED = "approval_required" + SPRINT_STARTED = "sprint_started" + SPRINT_COMPLETED = "sprint_completed" + + # Pipeline Events + PIPELINE_STARTED = "pipeline_started" + PIPELINE_COMPLETED = "pipeline_completed" + PIPELINE_FAILED = "pipeline_failed" + +class ProjectEvent(BaseModel): + id: str + type: EventType + project_id: str + agent_id: str | None + data: dict + timestamp: datetime +``` + +### 8. WebSocket for Chat (Secondary) + +```python +# app/api/v1/chat.py +from fastapi import WebSocket, WebSocketDisconnect +from app.services.agent_chat import AgentChatService + +@router.websocket("/projects/{project_id}/agents/{agent_id}/chat") +async def agent_chat( + websocket: WebSocket, + project_id: str, + agent_id: str +): + """Bidirectional chat with an agent.""" + await websocket.accept() + + chat_service = AgentChatService(project_id, agent_id) + + try: + while True: + # Receive message from client + message = await websocket.receive_json() + + # Stream response from agent + async for chunk in chat_service.get_response(message): + await websocket.send_json({ + "type": "chunk", + "content": chunk + }) + + await websocket.send_json({"type": "done"}) + except WebSocketDisconnect: + pass +``` + +## Performance Considerations + +### Connection Limits +- Browser limit: ~6 connections per domain (HTTP/1.1) +- Recommendation: Use single SSE connection per project, multiplex events + +### Scalability +- Redis Pub/Sub handles cross-instance event distribution +- Consider Redis Streams for message persistence (audit/replay) + +### Keepalive +- Send comment every 30 seconds to prevent timeout +- Client reconnects automatically on disconnect + +## Recommendations + +1. **Use SSE for all server-to-client events** (simpler, auto-reconnect) +2. **Use WebSocket only for interactive chat** with agents +3. **Redis Pub/Sub for event distribution** across instances +4. **Single SSE connection per project** with event multiplexing +5. **Exponential backoff** for client reconnection + +## References + +- [FastAPI SSE](https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse) +- [MDN EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) +- [Redis Pub/Sub](https://redis.io/topics/pubsub) + +## Decision + +**Adopt SSE as the primary real-time transport** with WebSocket reserved for bidirectional chat. Use Redis Pub/Sub for event distribution. + +--- + +*Spike completed. Findings will inform ADR-002: Real-time Communication Architecture.* diff --git a/docs/spikes/SPIKE-004-celery-redis-integration.md b/docs/spikes/SPIKE-004-celery-redis-integration.md new file mode 100644 index 0000000..3cae362 --- /dev/null +++ b/docs/spikes/SPIKE-004-celery-redis-integration.md @@ -0,0 +1,420 @@ +# SPIKE-004: Celery + Redis Integration + +**Status:** Completed +**Date:** 2025-12-29 +**Author:** Architecture Team +**Related Issue:** #4 + +--- + +## Objective + +Research best practices for integrating Celery with FastAPI for background task processing, focusing on agent orchestration, long-running workflows, and task monitoring. + +## Research Questions + +1. How to properly integrate Celery with async FastAPI? +2. What is the optimal task queue architecture for Syndarix? +3. How to handle long-running agent tasks? +4. What monitoring and visibility patterns should we use? + +## Findings + +### 1. Celery + FastAPI Integration Pattern + +**Challenge:** Celery is synchronous, FastAPI is async. + +**Solution:** Use `celery.result.AsyncResult` with async polling or callbacks. + +```python +# app/core/celery.py +from celery import Celery +from app.core.config import settings + +celery_app = Celery( + "syndarix", + broker=settings.REDIS_URL, + backend=settings.REDIS_URL, + include=[ + "app.tasks.agent_tasks", + "app.tasks.git_tasks", + "app.tasks.sync_tasks", + ] +) + +celery_app.conf.update( + task_serializer="json", + accept_content=["json"], + result_serializer="json", + timezone="UTC", + enable_utc=True, + task_track_started=True, + task_time_limit=3600, # 1 hour max + task_soft_time_limit=3300, # 55 min soft limit + worker_prefetch_multiplier=1, # One task at a time for LLM tasks + task_acks_late=True, # Acknowledge after completion + task_reject_on_worker_lost=True, # Retry if worker dies +) +``` + +### 2. Task Queue Architecture + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ FastAPI Backend │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ API Layer │ │ Services │ │ Events │ │ +│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ +│ │ │ │ │ +│ └────────────────┼────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌────────────────────────────────┐ │ +│ │ Task Dispatcher │ │ +│ │ (Celery send_task) │ │ +│ └────────────────┬───────────────┘ │ +└──────────────────────────┼──────────────────────────────────────┘ + │ + ▼ +┌──────────────────────────────────────────────────────────────────┐ +│ Redis (Broker + Backend) │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +│ │ agent_queue │ │ git_queue │ │ sync_queue │ │ +│ │ (priority) │ │ │ │ │ │ +│ └──────────────┘ └──────────────┘ └──────────────┘ │ +└──────────────────────────────────────────────────────────────────┘ + │ + ┌───────────────┼───────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌────────────┐ ┌────────────┐ ┌────────────┐ + │ Worker │ │ Worker │ │ Worker │ + │ (agents) │ │ (git) │ │ (sync) │ + │ prefetch=1 │ │ prefetch=4 │ │ prefetch=4 │ + └────────────┘ └────────────┘ └────────────┘ +``` + +### 3. Queue Configuration + +```python +# app/core/celery.py +celery_app.conf.task_queues = [ + Queue("agent_queue", routing_key="agent.#"), + Queue("git_queue", routing_key="git.#"), + Queue("sync_queue", routing_key="sync.#"), + Queue("cicd_queue", routing_key="cicd.#"), +] + +celery_app.conf.task_routes = { + "app.tasks.agent_tasks.*": {"queue": "agent_queue"}, + "app.tasks.git_tasks.*": {"queue": "git_queue"}, + "app.tasks.sync_tasks.*": {"queue": "sync_queue"}, + "app.tasks.cicd_tasks.*": {"queue": "cicd_queue"}, +} +``` + +### 4. Agent Task Implementation + +```python +# app/tasks/agent_tasks.py +from celery import Task +from app.core.celery import celery_app +from app.services.agent_runner import AgentRunner +from app.services.events import EventBus + +class AgentTask(Task): + """Base class for agent tasks with retry and monitoring.""" + + autoretry_for = (ConnectionError, TimeoutError) + retry_backoff = True + retry_backoff_max = 600 + retry_jitter = True + max_retries = 3 + + def on_failure(self, exc, task_id, args, kwargs, einfo): + """Handle task failure.""" + project_id = kwargs.get("project_id") + agent_id = kwargs.get("agent_id") + EventBus().publish(f"project:{project_id}", { + "type": "agent_error", + "agent_id": agent_id, + "error": str(exc) + }) + +@celery_app.task(bind=True, base=AgentTask) +def run_agent_action( + self, + agent_id: str, + project_id: str, + action: str, + context: dict +) -> dict: + """ + Execute an agent action as a background task. + + Args: + agent_id: The agent instance ID + project_id: The project context + action: The action to perform + context: Action-specific context + + Returns: + Action result dictionary + """ + runner = AgentRunner(agent_id, project_id) + + # Update task state for monitoring + self.update_state( + state="RUNNING", + meta={"agent_id": agent_id, "action": action} + ) + + # Publish start event + EventBus().publish(f"project:{project_id}", { + "type": "agent_started", + "agent_id": agent_id, + "action": action, + "task_id": self.request.id + }) + + try: + result = runner.execute(action, context) + + # Publish completion event + EventBus().publish(f"project:{project_id}", { + "type": "agent_completed", + "agent_id": agent_id, + "action": action, + "result_summary": result.get("summary") + }) + + return result + except Exception as e: + # Will trigger on_failure + raise +``` + +### 5. Long-Running Task Patterns + +**Progress Reporting:** +```python +@celery_app.task(bind=True) +def implement_story(self, story_id: str, agent_id: str, project_id: str): + """Implement a user story with progress reporting.""" + + steps = [ + ("analyzing", "Analyzing requirements"), + ("designing", "Designing solution"), + ("implementing", "Writing code"), + ("testing", "Running tests"), + ("documenting", "Updating documentation"), + ] + + for i, (state, description) in enumerate(steps): + self.update_state( + state="PROGRESS", + meta={ + "current": i + 1, + "total": len(steps), + "status": description + } + ) + + # Do the actual work + execute_step(state, story_id, agent_id) + + # Publish progress event + EventBus().publish(f"project:{project_id}", { + "type": "agent_progress", + "agent_id": agent_id, + "step": i + 1, + "total": len(steps), + "description": description + }) + + return {"status": "completed", "story_id": story_id} +``` + +**Task Chaining:** +```python +from celery import chain, group + +# Sequential workflow +workflow = chain( + analyze_requirements.s(story_id), + design_solution.s(), + implement_code.s(), + run_tests.s(), + create_pr.s() +) + +# Parallel execution +parallel_tests = group( + run_unit_tests.s(project_id), + run_integration_tests.s(project_id), + run_linting.s(project_id) +) +``` + +### 6. FastAPI Integration + +```python +# app/api/v1/agents.py +from fastapi import APIRouter, BackgroundTasks +from app.tasks.agent_tasks import run_agent_action +from celery.result import AsyncResult + +router = APIRouter() + +@router.post("/agents/{agent_id}/actions") +async def trigger_agent_action( + agent_id: str, + action: AgentActionRequest, + background_tasks: BackgroundTasks +): + """Trigger an agent action as a background task.""" + + # Dispatch to Celery + task = run_agent_action.delay( + agent_id=agent_id, + project_id=action.project_id, + action=action.action, + context=action.context + ) + + return { + "task_id": task.id, + "status": "queued" + } + +@router.get("/tasks/{task_id}") +async def get_task_status(task_id: str): + """Get the status of a background task.""" + + result = AsyncResult(task_id) + + if result.state == "PENDING": + return {"status": "pending"} + elif result.state == "RUNNING": + return {"status": "running", **result.info} + elif result.state == "PROGRESS": + return {"status": "progress", **result.info} + elif result.state == "SUCCESS": + return {"status": "completed", "result": result.result} + elif result.state == "FAILURE": + return {"status": "failed", "error": str(result.result)} + + return {"status": result.state} +``` + +### 7. Worker Configuration + +```bash +# Run different workers for different queues + +# Agent worker (single task at a time for LLM rate limiting) +celery -A app.core.celery worker \ + -Q agent_queue \ + -c 4 \ + --prefetch-multiplier=1 \ + -n agent_worker@%h + +# Git worker (can handle multiple concurrent tasks) +celery -A app.core.celery worker \ + -Q git_queue \ + -c 8 \ + --prefetch-multiplier=4 \ + -n git_worker@%h + +# Sync worker +celery -A app.core.celery worker \ + -Q sync_queue \ + -c 4 \ + --prefetch-multiplier=4 \ + -n sync_worker@%h +``` + +### 8. Monitoring with Flower + +```python +# docker-compose.yml +services: + flower: + image: mher/flower:latest + command: celery flower --broker=redis://redis:6379/0 + ports: + - "5555:5555" + environment: + - CELERY_BROKER_URL=redis://redis:6379/0 + - FLOWER_BASIC_AUTH=admin:password +``` + +### 9. Task Scheduling (Celery Beat) + +```python +# app/core/celery.py +from celery.schedules import crontab + +celery_app.conf.beat_schedule = { + # Sync issues every minute + "sync-external-issues": { + "task": "app.tasks.sync_tasks.sync_all_issues", + "schedule": 60.0, + }, + # Health check every 5 minutes + "agent-health-check": { + "task": "app.tasks.agent_tasks.health_check_all_agents", + "schedule": 300.0, + }, + # Daily cleanup at midnight + "cleanup-old-tasks": { + "task": "app.tasks.maintenance.cleanup_old_tasks", + "schedule": crontab(hour=0, minute=0), + }, +} +``` + +## Best Practices + +1. **One task per LLM call** - Avoid rate limiting issues +2. **Progress reporting** - Update state for long-running tasks +3. **Idempotent tasks** - Handle retries gracefully +4. **Separate queues** - Isolate slow tasks from fast ones +5. **Task result expiry** - Set `result_expires` to avoid Redis bloat +6. **Soft time limits** - Allow graceful shutdown before hard kill + +## Recommendations + +1. **Use Celery for all long-running operations** + - Agent actions + - Git operations + - External sync + - CI/CD triggers + +2. **Use Redis as both broker and backend** + - Simplifies infrastructure + - Fast enough for our scale + +3. **Configure separate queues** + - `agent_queue` with prefetch=1 + - `git_queue` with prefetch=4 + - `sync_queue` with prefetch=4 + +4. **Implement proper monitoring** + - Flower for web UI + - Prometheus metrics export + - Dead letter queue for failed tasks + +## References + +- [Celery Documentation](https://docs.celeryq.dev/) +- [FastAPI Background Tasks](https://fastapi.tiangolo.com/tutorial/background-tasks/) +- [Celery Best Practices](https://docs.celeryq.dev/en/stable/userguide/tasks.html#tips-and-best-practices) + +## Decision + +**Adopt Celery + Redis** for all background task processing with queue-based routing and progress reporting via Redis Pub/Sub events. + +--- + +*Spike completed. Findings will inform ADR-003: Background Task Architecture.* diff --git a/docs/spikes/SPIKE-005-llm-provider-abstraction.md b/docs/spikes/SPIKE-005-llm-provider-abstraction.md new file mode 100644 index 0000000..dbe3404 --- /dev/null +++ b/docs/spikes/SPIKE-005-llm-provider-abstraction.md @@ -0,0 +1,516 @@ +# SPIKE-005: LLM Provider Abstraction + +**Status:** Completed +**Date:** 2025-12-29 +**Author:** Architecture Team +**Related Issue:** #5 + +--- + +## Objective + +Research the best approach for unified LLM provider abstraction with support for multiple providers, automatic failover, and cost tracking. + +## Research Questions + +1. What libraries exist for unified LLM access? +2. How to implement automatic failover between providers? +3. How to track token usage and costs per agent/project? +4. What caching strategies can reduce API costs? + +## Findings + +### 1. LiteLLM - Recommended Solution + +**LiteLLM** provides a unified interface to 100+ LLM providers using the OpenAI SDK format. + +**Key Features:** +- Unified API across providers (Anthropic, OpenAI, local, etc.) +- Built-in failover and load balancing +- Token counting and cost tracking +- Streaming support +- Async support +- Caching with Redis + +**Installation:** +```bash +pip install litellm +``` + +### 2. Basic Usage + +```python +from litellm import completion, acompletion +import litellm + +# Configure providers +litellm.api_key = os.getenv("ANTHROPIC_API_KEY") +litellm.set_verbose = True # For debugging + +# Synchronous call +response = completion( + model="claude-3-5-sonnet-20241022", + messages=[{"role": "user", "content": "Hello!"}] +) + +# Async call (for FastAPI) +response = await acompletion( + model="claude-3-5-sonnet-20241022", + messages=[{"role": "user", "content": "Hello!"}] +) +``` + +### 3. Model Naming Convention + +LiteLLM uses prefixed model names: + +| Provider | Model Format | +|----------|--------------| +| Anthropic | `claude-3-5-sonnet-20241022` | +| OpenAI | `gpt-4-turbo` | +| Azure OpenAI | `azure/deployment-name` | +| Ollama | `ollama/llama3` | +| Together AI | `together_ai/togethercomputer/llama-2-70b` | + +### 4. Failover Configuration + +```python +from litellm import Router + +# Define model list with fallbacks +model_list = [ + { + "model_name": "primary-agent", + "litellm_params": { + "model": "claude-3-5-sonnet-20241022", + "api_key": os.getenv("ANTHROPIC_API_KEY"), + }, + "model_info": {"id": 1} + }, + { + "model_name": "primary-agent", # Same name = fallback + "litellm_params": { + "model": "gpt-4-turbo", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "model_info": {"id": 2} + }, + { + "model_name": "primary-agent", + "litellm_params": { + "model": "ollama/llama3", + "api_base": "http://localhost:11434", + }, + "model_info": {"id": 3} + } +] + +# Initialize router with failover +router = Router( + model_list=model_list, + fallbacks=[ + {"primary-agent": ["primary-agent"]} # Try all models with same name + ], + routing_strategy="simple-shuffle", # or "latency-based-routing" + num_retries=3, + retry_after=5, # seconds + timeout=60, +) + +# Use router +response = await router.acompletion( + model="primary-agent", + messages=[{"role": "user", "content": "Hello!"}] +) +``` + +### 5. Syndarix LLM Gateway Architecture + +```python +# app/services/llm_gateway.py +from litellm import Router, acompletion +from app.core.config import settings +from app.models.agent import AgentType +from app.services.cost_tracker import CostTracker +from app.services.events import EventBus + +class LLMGateway: + """Unified LLM gateway with failover and cost tracking.""" + + def __init__(self): + self.router = self._build_router() + self.cost_tracker = CostTracker() + self.event_bus = EventBus() + + def _build_router(self) -> Router: + """Build LiteLLM router from configuration.""" + model_list = [] + + # Add Anthropic models + if settings.ANTHROPIC_API_KEY: + model_list.extend([ + { + "model_name": "high-reasoning", + "litellm_params": { + "model": "claude-3-5-sonnet-20241022", + "api_key": settings.ANTHROPIC_API_KEY, + } + }, + { + "model_name": "fast-response", + "litellm_params": { + "model": "claude-3-haiku-20240307", + "api_key": settings.ANTHROPIC_API_KEY, + } + } + ]) + + # Add OpenAI fallbacks + if settings.OPENAI_API_KEY: + model_list.extend([ + { + "model_name": "high-reasoning", + "litellm_params": { + "model": "gpt-4-turbo", + "api_key": settings.OPENAI_API_KEY, + } + }, + { + "model_name": "fast-response", + "litellm_params": { + "model": "gpt-4o-mini", + "api_key": settings.OPENAI_API_KEY, + } + } + ]) + + # Add local models (Ollama) + if settings.OLLAMA_URL: + model_list.append({ + "model_name": "local-fallback", + "litellm_params": { + "model": "ollama/llama3", + "api_base": settings.OLLAMA_URL, + } + }) + + return Router( + model_list=model_list, + fallbacks=[ + {"high-reasoning": ["high-reasoning", "local-fallback"]}, + {"fast-response": ["fast-response", "local-fallback"]}, + ], + routing_strategy="latency-based-routing", + num_retries=3, + timeout=120, + ) + + async def complete( + self, + agent_id: str, + project_id: str, + messages: list[dict], + model_preference: str = "high-reasoning", + stream: bool = False, + **kwargs + ) -> dict: + """ + Generate a completion with automatic failover and cost tracking. + + Args: + agent_id: The calling agent's ID + project_id: The project context + messages: Chat messages + model_preference: "high-reasoning" or "fast-response" + stream: Whether to stream the response + **kwargs: Additional LiteLLM parameters + + Returns: + Completion response dictionary + """ + try: + if stream: + return self._stream_completion( + agent_id, project_id, messages, model_preference, **kwargs + ) + + response = await self.router.acompletion( + model=model_preference, + messages=messages, + **kwargs + ) + + # Track usage + await self._track_usage( + agent_id=agent_id, + project_id=project_id, + model=response.model, + usage=response.usage, + ) + + return { + "content": response.choices[0].message.content, + "model": response.model, + "usage": { + "prompt_tokens": response.usage.prompt_tokens, + "completion_tokens": response.usage.completion_tokens, + "total_tokens": response.usage.total_tokens, + } + } + + except Exception as e: + # Publish error event + await self.event_bus.publish(f"project:{project_id}", { + "type": "llm_error", + "agent_id": agent_id, + "error": str(e) + }) + raise + + async def _stream_completion( + self, + agent_id: str, + project_id: str, + messages: list[dict], + model_preference: str, + **kwargs + ): + """Stream a completion response.""" + response = await self.router.acompletion( + model=model_preference, + messages=messages, + stream=True, + **kwargs + ) + + async for chunk in response: + if chunk.choices[0].delta.content: + yield chunk.choices[0].delta.content + + async def _track_usage( + self, + agent_id: str, + project_id: str, + model: str, + usage: dict + ): + """Track token usage and costs.""" + await self.cost_tracker.record_usage( + agent_id=agent_id, + project_id=project_id, + model=model, + prompt_tokens=usage.prompt_tokens, + completion_tokens=usage.completion_tokens, + ) +``` + +### 6. Cost Tracking + +```python +# app/services/cost_tracker.py +from sqlalchemy.ext.asyncio import AsyncSession +from app.models.usage import TokenUsage +from datetime import datetime + +# Cost per 1M tokens (approximate) +MODEL_COSTS = { + "claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00}, + "claude-3-haiku-20240307": {"input": 0.25, "output": 1.25}, + "gpt-4-turbo": {"input": 10.00, "output": 30.00}, + "gpt-4o-mini": {"input": 0.15, "output": 0.60}, + "ollama/llama3": {"input": 0.00, "output": 0.00}, # Local +} + +class CostTracker: + def __init__(self, db: AsyncSession): + self.db = db + + async def record_usage( + self, + agent_id: str, + project_id: str, + model: str, + prompt_tokens: int, + completion_tokens: int, + ): + """Record token usage and calculate cost.""" + costs = MODEL_COSTS.get(model, {"input": 0, "output": 0}) + + input_cost = (prompt_tokens / 1_000_000) * costs["input"] + output_cost = (completion_tokens / 1_000_000) * costs["output"] + total_cost = input_cost + output_cost + + usage = TokenUsage( + agent_id=agent_id, + project_id=project_id, + model=model, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=prompt_tokens + completion_tokens, + cost_usd=total_cost, + timestamp=datetime.utcnow(), + ) + + self.db.add(usage) + await self.db.commit() + + async def get_project_usage( + self, + project_id: str, + start_date: datetime = None, + end_date: datetime = None, + ) -> dict: + """Get usage summary for a project.""" + # Query aggregated usage + ... + + async def check_budget( + self, + project_id: str, + budget_limit: float, + ) -> bool: + """Check if project is within budget.""" + usage = await self.get_project_usage(project_id) + return usage["total_cost_usd"] < budget_limit +``` + +### 7. Caching with Redis + +```python +import litellm +from litellm import Cache + +# Configure Redis cache +litellm.cache = Cache( + type="redis", + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + password=settings.REDIS_PASSWORD, +) + +# Enable caching +litellm.enable_cache() + +# Cached completions (same input = cached response) +response = await litellm.acompletion( + model="claude-3-5-sonnet-20241022", + messages=[{"role": "user", "content": "What is 2+2?"}], + cache={"ttl": 3600} # Cache for 1 hour +) +``` + +### 8. Agent Type Model Mapping + +```python +# app/models/agent_type.py +from sqlalchemy import Column, String, Enum as SQLEnum +from app.db.base import Base + +class ModelPreference(str, Enum): + HIGH_REASONING = "high-reasoning" + FAST_RESPONSE = "fast-response" + COST_OPTIMIZED = "cost-optimized" + +class AgentType(Base): + __tablename__ = "agent_types" + + id = Column(UUID, primary_key=True) + name = Column(String(50), unique=True) + role = Column(String(50)) + + # LLM configuration + model_preference = Column( + SQLEnum(ModelPreference), + default=ModelPreference.HIGH_REASONING + ) + max_tokens = Column(Integer, default=4096) + temperature = Column(Float, default=0.7) + + # System prompt + system_prompt = Column(Text) + +# Mapping agent types to models +AGENT_MODEL_MAPPING = { + "Product Owner": ModelPreference.HIGH_REASONING, + "Project Manager": ModelPreference.FAST_RESPONSE, + "Business Analyst": ModelPreference.HIGH_REASONING, + "Software Architect": ModelPreference.HIGH_REASONING, + "Software Engineer": ModelPreference.HIGH_REASONING, + "UI/UX Designer": ModelPreference.HIGH_REASONING, + "QA Engineer": ModelPreference.FAST_RESPONSE, + "DevOps Engineer": ModelPreference.FAST_RESPONSE, + "AI/ML Engineer": ModelPreference.HIGH_REASONING, + "Security Expert": ModelPreference.HIGH_REASONING, +} +``` + +## Rate Limiting Strategy + +```python +from litellm import Router +import asyncio + +# Configure rate limits per model +router = Router( + model_list=model_list, + redis_host=settings.REDIS_HOST, + redis_port=settings.REDIS_PORT, + routing_strategy="usage-based-routing", # Route based on rate limits +) + +# Custom rate limiter +class RateLimiter: + def __init__(self, requests_per_minute: int = 60): + self.rpm = requests_per_minute + self.semaphore = asyncio.Semaphore(requests_per_minute) + + async def acquire(self): + await self.semaphore.acquire() + # Release after 60 seconds + asyncio.create_task(self._release_after(60)) + + async def _release_after(self, seconds: int): + await asyncio.sleep(seconds) + self.semaphore.release() +``` + +## Recommendations + +1. **Use LiteLLM as the unified abstraction layer** + - Simplifies multi-provider support + - Built-in failover and retry + - Consistent API across providers + +2. **Configure model groups by use case** + - `high-reasoning`: Complex analysis, architecture decisions + - `fast-response`: Quick tasks, simple queries + - `cost-optimized`: Non-critical, high-volume tasks + +3. **Implement automatic failover chain** + - Primary: Claude 3.5 Sonnet + - Fallback 1: GPT-4 Turbo + - Fallback 2: Local Llama 3 (if available) + +4. **Track all usage and costs** + - Per agent, per project + - Set budget alerts + - Generate usage reports + +5. **Cache frequently repeated queries** + - Use Redis-backed cache + - Cache embeddings for RAG + - Cache deterministic transformations + +## References + +- [LiteLLM Documentation](https://docs.litellm.ai/) +- [LiteLLM Router](https://docs.litellm.ai/docs/routing) +- [Anthropic Rate Limits](https://docs.anthropic.com/en/api/rate-limits) + +## Decision + +**Adopt LiteLLM** as the unified LLM abstraction layer with automatic failover, usage-based routing, and Redis-backed caching. + +--- + +*Spike completed. Findings will inform ADR-004: LLM Provider Integration Architecture.*