forked from cardosofelipe/fast-next-template
- Added research findings and recommendations as separate SPIKE documents in `docs/spikes/`: - `SPIKE-005-llm-provider-abstraction.md`: Research on unified abstraction for LLM providers with failover, cost tracking, and caching strategies. - `SPIKE-001-mcp-integration-pattern.md`: Optimal pattern for integrating MCP with project/agent scoping and authentication strategies. - `SPIKE-003-realtime-updates.md`: Evaluation of SSE vs WebSocket for real-time updates, aligned with use-case needs. - Focused on aligning implementation architectures with scalability, efficiency, and user needs. - Documentation intended to inform upcoming ADRs.
517 lines
14 KiB
Markdown
517 lines
14 KiB
Markdown
# SPIKE-005: LLM Provider Abstraction
|
|
|
|
**Status:** Completed
|
|
**Date:** 2025-12-29
|
|
**Author:** Architecture Team
|
|
**Related Issue:** #5
|
|
|
|
---
|
|
|
|
## Objective
|
|
|
|
Research the best approach for unified LLM provider abstraction with support for multiple providers, automatic failover, and cost tracking.
|
|
|
|
## Research Questions
|
|
|
|
1. What libraries exist for unified LLM access?
|
|
2. How to implement automatic failover between providers?
|
|
3. How to track token usage and costs per agent/project?
|
|
4. What caching strategies can reduce API costs?
|
|
|
|
## Findings
|
|
|
|
### 1. LiteLLM - Recommended Solution
|
|
|
|
**LiteLLM** provides a unified interface to 100+ LLM providers using the OpenAI SDK format.
|
|
|
|
**Key Features:**
|
|
- Unified API across providers (Anthropic, OpenAI, local, etc.)
|
|
- Built-in failover and load balancing
|
|
- Token counting and cost tracking
|
|
- Streaming support
|
|
- Async support
|
|
- Caching with Redis
|
|
|
|
**Installation:**
|
|
```bash
|
|
pip install litellm
|
|
```
|
|
|
|
### 2. Basic Usage
|
|
|
|
```python
|
|
from litellm import completion, acompletion
|
|
import litellm
|
|
|
|
# Configure providers
|
|
litellm.api_key = os.getenv("ANTHROPIC_API_KEY")
|
|
litellm.set_verbose = True # For debugging
|
|
|
|
# Synchronous call
|
|
response = completion(
|
|
model="claude-3-5-sonnet-20241022",
|
|
messages=[{"role": "user", "content": "Hello!"}]
|
|
)
|
|
|
|
# Async call (for FastAPI)
|
|
response = await acompletion(
|
|
model="claude-3-5-sonnet-20241022",
|
|
messages=[{"role": "user", "content": "Hello!"}]
|
|
)
|
|
```
|
|
|
|
### 3. Model Naming Convention
|
|
|
|
LiteLLM uses prefixed model names:
|
|
|
|
| Provider | Model Format |
|
|
|----------|--------------|
|
|
| Anthropic | `claude-3-5-sonnet-20241022` |
|
|
| OpenAI | `gpt-4-turbo` |
|
|
| Azure OpenAI | `azure/deployment-name` |
|
|
| Ollama | `ollama/llama3` |
|
|
| Together AI | `together_ai/togethercomputer/llama-2-70b` |
|
|
|
|
### 4. Failover Configuration
|
|
|
|
```python
|
|
from litellm import Router
|
|
|
|
# Define model list with fallbacks
|
|
model_list = [
|
|
{
|
|
"model_name": "primary-agent",
|
|
"litellm_params": {
|
|
"model": "claude-3-5-sonnet-20241022",
|
|
"api_key": os.getenv("ANTHROPIC_API_KEY"),
|
|
},
|
|
"model_info": {"id": 1}
|
|
},
|
|
{
|
|
"model_name": "primary-agent", # Same name = fallback
|
|
"litellm_params": {
|
|
"model": "gpt-4-turbo",
|
|
"api_key": os.getenv("OPENAI_API_KEY"),
|
|
},
|
|
"model_info": {"id": 2}
|
|
},
|
|
{
|
|
"model_name": "primary-agent",
|
|
"litellm_params": {
|
|
"model": "ollama/llama3",
|
|
"api_base": "http://localhost:11434",
|
|
},
|
|
"model_info": {"id": 3}
|
|
}
|
|
]
|
|
|
|
# Initialize router with failover
|
|
router = Router(
|
|
model_list=model_list,
|
|
fallbacks=[
|
|
{"primary-agent": ["primary-agent"]} # Try all models with same name
|
|
],
|
|
routing_strategy="simple-shuffle", # or "latency-based-routing"
|
|
num_retries=3,
|
|
retry_after=5, # seconds
|
|
timeout=60,
|
|
)
|
|
|
|
# Use router
|
|
response = await router.acompletion(
|
|
model="primary-agent",
|
|
messages=[{"role": "user", "content": "Hello!"}]
|
|
)
|
|
```
|
|
|
|
### 5. Syndarix LLM Gateway Architecture
|
|
|
|
```python
|
|
# app/services/llm_gateway.py
|
|
from litellm import Router, acompletion
|
|
from app.core.config import settings
|
|
from app.models.agent import AgentType
|
|
from app.services.cost_tracker import CostTracker
|
|
from app.services.events import EventBus
|
|
|
|
class LLMGateway:
|
|
"""Unified LLM gateway with failover and cost tracking."""
|
|
|
|
def __init__(self):
|
|
self.router = self._build_router()
|
|
self.cost_tracker = CostTracker()
|
|
self.event_bus = EventBus()
|
|
|
|
def _build_router(self) -> Router:
|
|
"""Build LiteLLM router from configuration."""
|
|
model_list = []
|
|
|
|
# Add Anthropic models
|
|
if settings.ANTHROPIC_API_KEY:
|
|
model_list.extend([
|
|
{
|
|
"model_name": "high-reasoning",
|
|
"litellm_params": {
|
|
"model": "claude-3-5-sonnet-20241022",
|
|
"api_key": settings.ANTHROPIC_API_KEY,
|
|
}
|
|
},
|
|
{
|
|
"model_name": "fast-response",
|
|
"litellm_params": {
|
|
"model": "claude-3-haiku-20240307",
|
|
"api_key": settings.ANTHROPIC_API_KEY,
|
|
}
|
|
}
|
|
])
|
|
|
|
# Add OpenAI fallbacks
|
|
if settings.OPENAI_API_KEY:
|
|
model_list.extend([
|
|
{
|
|
"model_name": "high-reasoning",
|
|
"litellm_params": {
|
|
"model": "gpt-4-turbo",
|
|
"api_key": settings.OPENAI_API_KEY,
|
|
}
|
|
},
|
|
{
|
|
"model_name": "fast-response",
|
|
"litellm_params": {
|
|
"model": "gpt-4o-mini",
|
|
"api_key": settings.OPENAI_API_KEY,
|
|
}
|
|
}
|
|
])
|
|
|
|
# Add local models (Ollama)
|
|
if settings.OLLAMA_URL:
|
|
model_list.append({
|
|
"model_name": "local-fallback",
|
|
"litellm_params": {
|
|
"model": "ollama/llama3",
|
|
"api_base": settings.OLLAMA_URL,
|
|
}
|
|
})
|
|
|
|
return Router(
|
|
model_list=model_list,
|
|
fallbacks=[
|
|
{"high-reasoning": ["high-reasoning", "local-fallback"]},
|
|
{"fast-response": ["fast-response", "local-fallback"]},
|
|
],
|
|
routing_strategy="latency-based-routing",
|
|
num_retries=3,
|
|
timeout=120,
|
|
)
|
|
|
|
async def complete(
|
|
self,
|
|
agent_id: str,
|
|
project_id: str,
|
|
messages: list[dict],
|
|
model_preference: str = "high-reasoning",
|
|
stream: bool = False,
|
|
**kwargs
|
|
) -> dict:
|
|
"""
|
|
Generate a completion with automatic failover and cost tracking.
|
|
|
|
Args:
|
|
agent_id: The calling agent's ID
|
|
project_id: The project context
|
|
messages: Chat messages
|
|
model_preference: "high-reasoning" or "fast-response"
|
|
stream: Whether to stream the response
|
|
**kwargs: Additional LiteLLM parameters
|
|
|
|
Returns:
|
|
Completion response dictionary
|
|
"""
|
|
try:
|
|
if stream:
|
|
return self._stream_completion(
|
|
agent_id, project_id, messages, model_preference, **kwargs
|
|
)
|
|
|
|
response = await self.router.acompletion(
|
|
model=model_preference,
|
|
messages=messages,
|
|
**kwargs
|
|
)
|
|
|
|
# Track usage
|
|
await self._track_usage(
|
|
agent_id=agent_id,
|
|
project_id=project_id,
|
|
model=response.model,
|
|
usage=response.usage,
|
|
)
|
|
|
|
return {
|
|
"content": response.choices[0].message.content,
|
|
"model": response.model,
|
|
"usage": {
|
|
"prompt_tokens": response.usage.prompt_tokens,
|
|
"completion_tokens": response.usage.completion_tokens,
|
|
"total_tokens": response.usage.total_tokens,
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
# Publish error event
|
|
await self.event_bus.publish(f"project:{project_id}", {
|
|
"type": "llm_error",
|
|
"agent_id": agent_id,
|
|
"error": str(e)
|
|
})
|
|
raise
|
|
|
|
async def _stream_completion(
|
|
self,
|
|
agent_id: str,
|
|
project_id: str,
|
|
messages: list[dict],
|
|
model_preference: str,
|
|
**kwargs
|
|
):
|
|
"""Stream a completion response."""
|
|
response = await self.router.acompletion(
|
|
model=model_preference,
|
|
messages=messages,
|
|
stream=True,
|
|
**kwargs
|
|
)
|
|
|
|
async for chunk in response:
|
|
if chunk.choices[0].delta.content:
|
|
yield chunk.choices[0].delta.content
|
|
|
|
async def _track_usage(
|
|
self,
|
|
agent_id: str,
|
|
project_id: str,
|
|
model: str,
|
|
usage: dict
|
|
):
|
|
"""Track token usage and costs."""
|
|
await self.cost_tracker.record_usage(
|
|
agent_id=agent_id,
|
|
project_id=project_id,
|
|
model=model,
|
|
prompt_tokens=usage.prompt_tokens,
|
|
completion_tokens=usage.completion_tokens,
|
|
)
|
|
```
|
|
|
|
### 6. Cost Tracking
|
|
|
|
```python
|
|
# app/services/cost_tracker.py
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.models.usage import TokenUsage
|
|
from datetime import datetime
|
|
|
|
# Cost per 1M tokens (approximate)
|
|
MODEL_COSTS = {
|
|
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
|
|
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
|
|
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
|
|
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
|
|
"ollama/llama3": {"input": 0.00, "output": 0.00}, # Local
|
|
}
|
|
|
|
class CostTracker:
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def record_usage(
|
|
self,
|
|
agent_id: str,
|
|
project_id: str,
|
|
model: str,
|
|
prompt_tokens: int,
|
|
completion_tokens: int,
|
|
):
|
|
"""Record token usage and calculate cost."""
|
|
costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
|
|
|
|
input_cost = (prompt_tokens / 1_000_000) * costs["input"]
|
|
output_cost = (completion_tokens / 1_000_000) * costs["output"]
|
|
total_cost = input_cost + output_cost
|
|
|
|
usage = TokenUsage(
|
|
agent_id=agent_id,
|
|
project_id=project_id,
|
|
model=model,
|
|
prompt_tokens=prompt_tokens,
|
|
completion_tokens=completion_tokens,
|
|
total_tokens=prompt_tokens + completion_tokens,
|
|
cost_usd=total_cost,
|
|
timestamp=datetime.utcnow(),
|
|
)
|
|
|
|
self.db.add(usage)
|
|
await self.db.commit()
|
|
|
|
async def get_project_usage(
|
|
self,
|
|
project_id: str,
|
|
start_date: datetime = None,
|
|
end_date: datetime = None,
|
|
) -> dict:
|
|
"""Get usage summary for a project."""
|
|
# Query aggregated usage
|
|
...
|
|
|
|
async def check_budget(
|
|
self,
|
|
project_id: str,
|
|
budget_limit: float,
|
|
) -> bool:
|
|
"""Check if project is within budget."""
|
|
usage = await self.get_project_usage(project_id)
|
|
return usage["total_cost_usd"] < budget_limit
|
|
```
|
|
|
|
### 7. Caching with Redis
|
|
|
|
```python
|
|
import litellm
|
|
from litellm import Cache
|
|
|
|
# Configure Redis cache
|
|
litellm.cache = Cache(
|
|
type="redis",
|
|
host=settings.REDIS_HOST,
|
|
port=settings.REDIS_PORT,
|
|
password=settings.REDIS_PASSWORD,
|
|
)
|
|
|
|
# Enable caching
|
|
litellm.enable_cache()
|
|
|
|
# Cached completions (same input = cached response)
|
|
response = await litellm.acompletion(
|
|
model="claude-3-5-sonnet-20241022",
|
|
messages=[{"role": "user", "content": "What is 2+2?"}],
|
|
cache={"ttl": 3600} # Cache for 1 hour
|
|
)
|
|
```
|
|
|
|
### 8. Agent Type Model Mapping
|
|
|
|
```python
|
|
# app/models/agent_type.py
|
|
from sqlalchemy import Column, String, Enum as SQLEnum
|
|
from app.db.base import Base
|
|
|
|
class ModelPreference(str, Enum):
|
|
HIGH_REASONING = "high-reasoning"
|
|
FAST_RESPONSE = "fast-response"
|
|
COST_OPTIMIZED = "cost-optimized"
|
|
|
|
class AgentType(Base):
|
|
__tablename__ = "agent_types"
|
|
|
|
id = Column(UUID, primary_key=True)
|
|
name = Column(String(50), unique=True)
|
|
role = Column(String(50))
|
|
|
|
# LLM configuration
|
|
model_preference = Column(
|
|
SQLEnum(ModelPreference),
|
|
default=ModelPreference.HIGH_REASONING
|
|
)
|
|
max_tokens = Column(Integer, default=4096)
|
|
temperature = Column(Float, default=0.7)
|
|
|
|
# System prompt
|
|
system_prompt = Column(Text)
|
|
|
|
# Mapping agent types to models
|
|
AGENT_MODEL_MAPPING = {
|
|
"Product Owner": ModelPreference.HIGH_REASONING,
|
|
"Project Manager": ModelPreference.FAST_RESPONSE,
|
|
"Business Analyst": ModelPreference.HIGH_REASONING,
|
|
"Software Architect": ModelPreference.HIGH_REASONING,
|
|
"Software Engineer": ModelPreference.HIGH_REASONING,
|
|
"UI/UX Designer": ModelPreference.HIGH_REASONING,
|
|
"QA Engineer": ModelPreference.FAST_RESPONSE,
|
|
"DevOps Engineer": ModelPreference.FAST_RESPONSE,
|
|
"AI/ML Engineer": ModelPreference.HIGH_REASONING,
|
|
"Security Expert": ModelPreference.HIGH_REASONING,
|
|
}
|
|
```
|
|
|
|
## Rate Limiting Strategy
|
|
|
|
```python
|
|
from litellm import Router
|
|
import asyncio
|
|
|
|
# Configure rate limits per model
|
|
router = Router(
|
|
model_list=model_list,
|
|
redis_host=settings.REDIS_HOST,
|
|
redis_port=settings.REDIS_PORT,
|
|
routing_strategy="usage-based-routing", # Route based on rate limits
|
|
)
|
|
|
|
# Custom rate limiter
|
|
class RateLimiter:
|
|
def __init__(self, requests_per_minute: int = 60):
|
|
self.rpm = requests_per_minute
|
|
self.semaphore = asyncio.Semaphore(requests_per_minute)
|
|
|
|
async def acquire(self):
|
|
await self.semaphore.acquire()
|
|
# Release after 60 seconds
|
|
asyncio.create_task(self._release_after(60))
|
|
|
|
async def _release_after(self, seconds: int):
|
|
await asyncio.sleep(seconds)
|
|
self.semaphore.release()
|
|
```
|
|
|
|
## Recommendations
|
|
|
|
1. **Use LiteLLM as the unified abstraction layer**
|
|
- Simplifies multi-provider support
|
|
- Built-in failover and retry
|
|
- Consistent API across providers
|
|
|
|
2. **Configure model groups by use case**
|
|
- `high-reasoning`: Complex analysis, architecture decisions
|
|
- `fast-response`: Quick tasks, simple queries
|
|
- `cost-optimized`: Non-critical, high-volume tasks
|
|
|
|
3. **Implement automatic failover chain**
|
|
- Primary: Claude 3.5 Sonnet
|
|
- Fallback 1: GPT-4 Turbo
|
|
- Fallback 2: Local Llama 3 (if available)
|
|
|
|
4. **Track all usage and costs**
|
|
- Per agent, per project
|
|
- Set budget alerts
|
|
- Generate usage reports
|
|
|
|
5. **Cache frequently repeated queries**
|
|
- Use Redis-backed cache
|
|
- Cache embeddings for RAG
|
|
- Cache deterministic transformations
|
|
|
|
## References
|
|
|
|
- [LiteLLM Documentation](https://docs.litellm.ai/)
|
|
- [LiteLLM Router](https://docs.litellm.ai/docs/routing)
|
|
- [Anthropic Rate Limits](https://docs.anthropic.com/en/api/rate-limits)
|
|
|
|
## Decision
|
|
|
|
**Adopt LiteLLM** as the unified LLM abstraction layer with automatic failover, usage-based routing, and Redis-backed caching.
|
|
|
|
---
|
|
|
|
*Spike completed. Findings will inform ADR-004: LLM Provider Integration Architecture.*
|