Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
65 KiB
SPIKE-010: Cost Tracking for Syndarix
Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #10
Executive Summary
Syndarix requires comprehensive LLM cost tracking to manage expenses across multiple providers (Anthropic, OpenAI, local Ollama). This spike researches token usage monitoring, budget enforcement, cost optimization strategies, and real-time alerting.
Recommendation
Adopt a multi-layered cost tracking architecture:
- LiteLLM Callbacks for real-time usage capture at the gateway level
- PostgreSQL for persistent usage records with time-series aggregation
- Redis for real-time budget enforcement and rate limiting
- Celery Beat for scheduled budget checks and alert processing
- SSE Events for real-time dashboard updates
Expected Cost Savings: 60-80% reduction through combined optimization strategies (semantic caching, model cascading, prompt compression).
Table of Contents
- Research Questions & Findings
- Cost Tracking Architecture
- Database Schema Design
- LiteLLM Callback Implementation
- Budget Management System
- Alert System Integration
- Cost Optimization Strategies
- Cost Estimation Before Execution
- Reporting Dashboard Requirements
- Implementation Roadmap
Research Questions & Findings
1. How does LiteLLM track token usage and costs?
LiteLLM provides built-in cost tracking through multiple mechanisms:
Response Usage Object:
response = await litellm.acompletion(
model="claude-3-5-sonnet-20241022",
messages=[...]
)
# Access usage data
print(response.usage.prompt_tokens) # Input tokens
print(response.usage.completion_tokens) # Output tokens
print(response.usage.total_tokens) # Total tokens
Automatic Cost Calculation:
LiteLLM maintains a centralized pricing database (model_prices_and_context_window.json) with costs for 100+ models. Cost is accessible via kwargs["response_cost"] in callbacks.
Custom Callbacks:
from litellm.integrations.custom_logger import CustomLogger
class SyndarixCostLogger(CustomLogger):
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
cost = kwargs.get("response_cost", 0)
model = kwargs.get("model")
usage = response_obj.usage
# Store in database
Custom Pricing Override:
response = await litellm.acompletion(
model="custom-model",
messages=[...],
input_cost_per_token=0.000003, # $3 per 1M tokens
output_cost_per_token=0.000015 # $15 per 1M tokens
)
2. Best practices for LLM cost attribution in multi-tenant systems?
Key Patterns Identified:
- Hierarchical Attribution: Track costs at multiple levels (organization > project > agent type > agent instance > request)
- Metadata Tagging: Include cost center, budget ID, and user context in every request
- Tenant-Aware Budgets: Implement per-customer tier budgets with different limits
- Chargeback/Showback: Enable detailed cost allocation for billing integration
Multi-Tenant Architecture for Syndarix:
Organization (Billing Entity)
└── Project (Cost Center)
└── Sprint (Time-bounded Budget)
└── Agent Instance (Worker)
└── LLM Request (Atomic Cost Unit)
3. Real-time cost monitoring approaches?
Recommended Approach: Hybrid Redis + PostgreSQL
| Layer | Technology | Purpose |
|---|---|---|
| Hot data | Redis | Real-time budget counters, rate limiting |
| Warm data | PostgreSQL | Hourly/daily aggregates, analytics |
| Cold data | S3/Archive | Monthly exports, long-term retention |
Real-time Pipeline:
LLM Request → LiteLLM Callback → Redis INCR → Budget Check
↓
Async Queue → PostgreSQL Insert → SSE Event
4. Budget enforcement strategies (soft vs hard limits)?
Soft Limits (Recommended for most cases):
- Send alerts at threshold percentages (50%, 80%, 100%)
- Allow continued usage with warnings
- Automatic model downgrade (e.g., Sonnet → Haiku)
- Require approval for continued expensive operations
Hard Limits (For critical cost control):
- Block requests once budget exhausted
- Reject LLM calls at gateway level
- Require manual budget increase
Syndarix Recommendation:
- Daily budgets: Hard limits (prevent runaway costs)
- Weekly/Monthly budgets: Soft limits with alerts and escalation
- Per-request limits: Enforce max tokens per call
5. Cost optimization techniques?
Tier 1: Immediate Impact (0 code changes)
- Semantic caching: 15-30% cost reduction
- Response caching for deterministic queries
Tier 2: Model Selection (Architecture changes)
- Model cascading: Start with cheaper models, escalate as needed
- 87% cost reduction possible (90% of queries handled by smaller models)
- Dynamic routing based on query complexity
Tier 3: Prompt Engineering (Ongoing optimization)
- Prompt compression with LLMLingua: Up to 20x compression with <5% quality loss
- Extractive compression for RAG: 2-10x compression, often improves accuracy
Tier 4: Infrastructure (Long-term)
- Self-hosted models for high-volume, latency-tolerant tasks
- Fine-tuned smaller models for domain-specific tasks
6. Reporting and visualization requirements?
Dashboard Requirements:
- Real-time spend ticker (updated via SSE)
- Cost breakdown by: project, agent type, model, time period
- Budget utilization gauges with threshold indicators
- Historical trend charts (daily/weekly/monthly)
- Anomaly detection alerts (spending spikes)
- Forecast projections based on current burn rate
- Cost per task/feature analysis
- Comparative efficiency metrics (cost per successful completion)
7. Cost estimation before execution?
Pre-execution Estimation Pattern:
def estimate_cost(messages: list, model: str) -> CostEstimate:
"""Estimate cost before making LLM call."""
input_tokens = count_tokens(messages, model)
estimated_output = estimate_output_tokens(messages, model)
costs = MODEL_COSTS[model]
input_cost = (input_tokens / 1_000_000) * costs["input"]
output_cost = (estimated_output / 1_000_000) * costs["output"]
return CostEstimate(
input_tokens=input_tokens,
estimated_output_tokens=estimated_output,
estimated_cost_usd=input_cost + output_cost,
confidence="medium" # Based on output estimation accuracy
)
Token Counting:
- Use
tiktokenfor OpenAI models - Use
anthropic.count_tokens()for Claude models - Use LiteLLM's
token_counter()for unified counting
Cost Tracking Architecture
System Overview
┌─────────────────────────────────────────────────────────────────────────┐
│ Syndarix Backend │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ LLM Gateway (LiteLLM) │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Pre-Request │ │ Router │ │ Post-Request│ │ │
│ │ │ Callback │──▶│ (Failover) │──▶│ Callback │ │ │
│ │ └──────┬──────┘ └─────────────┘ └──────┬──────┘ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌─────────────┐ ┌──────────────────────┐ │ │
│ │ │ Budget │ │ Cost Tracker │ │ │
│ │ │ Check │ │ Service │ │ │
│ │ └──────┬──────┘ └──────────┬───────────┘ │ │
│ └─────────┼────────────────────────────────┼─────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────┐ ┌──────────────────────┐ │
│ │ Redis │ │ PostgreSQL │ │
│ │ ┌───────────────┐ │ │ ┌────────────────┐ │ │
│ │ │ Budget │ │ │ │ token_usage │ │ │
│ │ │ Counters │ │ │ │ (time-series) │ │ │
│ │ ├───────────────┤ │ │ ├────────────────┤ │ │
│ │ │ Rate Limits │ │ │ │ budgets │ │ │
│ │ ├───────────────┤ │ │ ├────────────────┤ │ │
│ │ │ Cache Keys │ │ │ │ alerts_log │ │ │
│ │ └───────────────┘ │ │ ├────────────────┤ │ │
│ └─────────────────────┘ │ │ daily_summary │ │ │
│ │ └────────────────┘ │ │
│ └──────────────────────┘ │
│ │ │
│ ┌──────────────────────────────┴─────────────┐ │
│ ▼ ▼ │
│ ┌─────────────────────┐ ┌────────────────────────┐ │
│ │ Event Bus (SSE) │ │ Celery Beat │ │
│ │ - cost_update │ │ - budget_check │ │
│ │ - budget_alert │ │ - daily_rollup │ │
│ │ - threshold_warn │ │ - monthly_report │ │
│ └─────────────────────┘ └────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Component Responsibilities
| Component | Responsibility |
|---|---|
| LLM Gateway | Route requests, capture usage, enforce limits |
| Cost Tracker | Calculate costs, persist records, update counters |
| Redis | Real-time budget counters, rate limiting, caching |
| PostgreSQL | Persistent usage records, aggregations, analytics |
| Event Bus | Real-time notifications to dashboards |
| Celery Beat | Scheduled tasks (rollups, alerts, reports) |
Database Schema Design
Core Tables
# app/models/cost_tracking.py
from sqlalchemy import Column, String, Integer, Float, DateTime, ForeignKey, Enum, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.models.base import Base, TimestampMixin, UUIDMixin
import enum
class BudgetPeriod(str, enum.Enum):
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
class AlertSeverity(str, enum.Enum):
INFO = "info" # 50% threshold
WARNING = "warning" # 80% threshold
CRITICAL = "critical" # 100% threshold
class AlertStatus(str, enum.Enum):
PENDING = "pending"
ACKNOWLEDGED = "acknowledged"
RESOLVED = "resolved"
class TokenUsage(Base, UUIDMixin, TimestampMixin):
"""Individual LLM request usage record."""
__tablename__ = "token_usage"
# Attribution
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
agent_instance_id = Column(UUID(as_uuid=True), ForeignKey("agent_instances.id"), nullable=True)
agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
# Request details
request_id = Column(String(64), unique=True, nullable=False) # Correlation ID
model = Column(String(100), nullable=False)
provider = Column(String(50), nullable=False) # anthropic, openai, ollama
# Token counts
prompt_tokens = Column(Integer, nullable=False)
completion_tokens = Column(Integer, nullable=False)
total_tokens = Column(Integer, nullable=False)
cached_tokens = Column(Integer, default=0) # Tokens served from cache
# Cost calculation
input_cost_usd = Column(Float, nullable=False)
output_cost_usd = Column(Float, nullable=False)
total_cost_usd = Column(Float, nullable=False)
# Timing
latency_ms = Column(Integer, nullable=True)
request_timestamp = Column(DateTime(timezone=True), nullable=False)
# Metadata
task_type = Column(String(50), nullable=True) # reasoning, coding, chat, etc.
success = Column(Boolean, default=True)
error_type = Column(String(100), nullable=True)
metadata = Column(JSONB, default={}) # Additional context
# Indexes for common queries
__table_args__ = (
Index("ix_token_usage_project_timestamp", "project_id", "request_timestamp"),
Index("ix_token_usage_agent_instance_timestamp", "agent_instance_id", "request_timestamp"),
Index("ix_token_usage_model_timestamp", "model", "request_timestamp"),
Index("ix_token_usage_request_timestamp", "request_timestamp"),
)
class Budget(Base, UUIDMixin, TimestampMixin):
"""Budget definition for cost control."""
__tablename__ = "budgets"
# Scope (one of these will be set)
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True)
agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True)
organization_id = Column(UUID(as_uuid=True), ForeignKey("organizations.id"), nullable=True)
# Budget configuration
name = Column(String(100), nullable=False)
period = Column(Enum(BudgetPeriod), nullable=False)
limit_usd = Column(Float, nullable=False)
# Thresholds (percentages)
warn_threshold = Column(Float, default=0.5) # 50%
alert_threshold = Column(Float, default=0.8) # 80%
critical_threshold = Column(Float, default=1.0) # 100%
# Enforcement
hard_limit = Column(Boolean, default=False) # Block requests at limit
auto_downgrade = Column(Boolean, default=True) # Downgrade to cheaper model
# Status
is_active = Column(Boolean, default=True)
current_spend = Column(Float, default=0.0) # Cached from Redis
period_start = Column(DateTime(timezone=True), nullable=False)
period_end = Column(DateTime(timezone=True), nullable=False)
__table_args__ = (
Index("ix_budgets_project_active", "project_id", "is_active"),
Index("ix_budgets_period_end", "period_end"),
)
class BudgetAlert(Base, UUIDMixin, TimestampMixin):
"""Alert record for budget threshold breaches."""
__tablename__ = "budget_alerts"
budget_id = Column(UUID(as_uuid=True), ForeignKey("budgets.id"), nullable=False)
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True)
severity = Column(Enum(AlertSeverity), nullable=False)
status = Column(Enum(AlertStatus), default=AlertStatus.PENDING)
threshold_percent = Column(Float, nullable=False)
current_spend = Column(Float, nullable=False)
budget_limit = Column(Float, nullable=False)
message = Column(String(500), nullable=False)
acknowledged_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
acknowledged_at = Column(DateTime(timezone=True), nullable=True)
# Notification tracking
notifications_sent = Column(JSONB, default=[]) # [{"channel": "email", "sent_at": "..."}]
budget = relationship("Budget", backref="alerts")
class DailyCostSummary(Base, UUIDMixin):
"""Materialized daily cost aggregations for fast reporting."""
__tablename__ = "daily_cost_summaries"
date = Column(DateTime(timezone=True), nullable=False)
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True)
agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True)
model = Column(String(100), nullable=True)
# Aggregated metrics
request_count = Column(Integer, default=0)
total_prompt_tokens = Column(Integer, default=0)
total_completion_tokens = Column(Integer, default=0)
total_tokens = Column(Integer, default=0)
total_cost_usd = Column(Float, default=0.0)
# Performance metrics
avg_latency_ms = Column(Float, nullable=True)
cache_hit_rate = Column(Float, default=0.0)
success_rate = Column(Float, default=1.0)
__table_args__ = (
Index("ix_daily_summary_date_project", "date", "project_id"),
Index("ix_daily_summary_date_model", "date", "model"),
)
class ModelPricing(Base, UUIDMixin, TimestampMixin):
"""Custom model pricing overrides."""
__tablename__ = "model_pricing"
model = Column(String(100), unique=True, nullable=False)
provider = Column(String(50), nullable=False)
input_cost_per_million = Column(Float, nullable=False)
output_cost_per_million = Column(Float, nullable=False)
# Context limits
max_input_tokens = Column(Integer, nullable=True)
max_output_tokens = Column(Integer, nullable=True)
# Validity period
effective_from = Column(DateTime(timezone=True), nullable=False)
effective_until = Column(DateTime(timezone=True), nullable=True)
is_active = Column(Boolean, default=True)
Redis Key Structure
# Redis key patterns for real-time tracking
REDIS_KEYS = {
# Budget counters (reset on period start)
"budget_spend": "budget:{budget_id}:spend", # INCRBYFLOAT
"budget_requests": "budget:{budget_id}:requests", # INCR
# Project-level aggregations
"project_daily_spend": "project:{project_id}:daily:{date}:spend",
"project_monthly_spend": "project:{project_id}:monthly:{year_month}:spend",
# Rate limiting
"rate_limit": "ratelimit:{project_id}:{model}:{window}",
# Semantic cache
"semantic_cache": "cache:semantic:{hash}",
# Real-time metrics (for dashboard)
"realtime_spend": "metrics:realtime:spend:{project_id}",
}
LiteLLM Callback Implementation
Custom Callback Handler
# app/services/cost_tracking/callbacks.py
from typing import Any
from datetime import datetime, UTC
import asyncio
from litellm.integrations.custom_logger import CustomLogger
from app.core.config import settings
from app.services.cost_tracking.tracker import CostTracker
from app.services.cost_tracking.budget import BudgetEnforcer
from app.services.events import EventBus
class SyndarixCostCallback(CustomLogger):
"""
LiteLLM callback handler for cost tracking and budget enforcement.
Integrates with:
- Redis for real-time budget counters
- PostgreSQL for persistent usage records
- Event bus for real-time dashboard updates
"""
def __init__(self):
self.tracker = CostTracker()
self.budget_enforcer = BudgetEnforcer()
self.event_bus = EventBus()
def log_pre_api_call(self, model: str, messages: list, kwargs: dict):
"""Pre-request validation and cost estimation."""
project_id = kwargs.get("metadata", {}).get("project_id")
agent_id = kwargs.get("metadata", {}).get("agent_id")
if not project_id:
return # Skip tracking for internal calls
# Check budget before making request
budget_status = self.budget_enforcer.check_budget_sync(project_id)
if budget_status.exceeded and budget_status.hard_limit:
raise BudgetExceededError(
f"Budget exceeded for project {project_id}. "
f"Limit: ${budget_status.limit:.2f}, "
f"Spent: ${budget_status.spent:.2f}"
)
if budget_status.exceeded and budget_status.auto_downgrade:
# Suggest model downgrade
kwargs["model"] = self._get_fallback_model(model)
async def async_log_success_event(
self,
kwargs: dict,
response_obj: Any,
start_time: float,
end_time: float
):
"""Record successful request usage and costs."""
metadata = kwargs.get("metadata", {})
project_id = metadata.get("project_id")
agent_id = metadata.get("agent_id")
agent_type_id = metadata.get("agent_type_id")
if not project_id:
return
# Extract usage data
usage = response_obj.usage
model = kwargs.get("model")
cost = kwargs.get("response_cost", 0)
# Calculate costs if not provided
if cost == 0:
cost = self._calculate_cost(model, usage)
# Create usage record
usage_record = {
"request_id": kwargs.get("litellm_call_id", str(uuid.uuid4())),
"project_id": project_id,
"agent_instance_id": agent_id,
"agent_type_id": agent_type_id,
"model": model,
"provider": self._get_provider(model),
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens,
"input_cost_usd": self._calculate_input_cost(model, usage.prompt_tokens),
"output_cost_usd": self._calculate_output_cost(model, usage.completion_tokens),
"total_cost_usd": cost,
"latency_ms": int((end_time - start_time) * 1000),
"request_timestamp": datetime.now(UTC),
"task_type": metadata.get("task_type"),
"success": True,
}
# Async operations in parallel
await asyncio.gather(
self.tracker.record_usage(usage_record),
self.budget_enforcer.increment_spend(project_id, cost),
self._publish_cost_event(project_id, usage_record),
)
async def async_log_failure_event(
self,
kwargs: dict,
response_obj: Any,
start_time: float,
end_time: float
):
"""Record failed request (still costs input tokens)."""
metadata = kwargs.get("metadata", {})
project_id = metadata.get("project_id")
if not project_id:
return
# Failed requests still consume input tokens
input_tokens = kwargs.get("input_tokens", 0)
model = kwargs.get("model")
if input_tokens > 0:
input_cost = self._calculate_input_cost(model, input_tokens)
usage_record = {
"request_id": kwargs.get("litellm_call_id", str(uuid.uuid4())),
"project_id": project_id,
"model": model,
"prompt_tokens": input_tokens,
"completion_tokens": 0,
"total_tokens": input_tokens,
"total_cost_usd": input_cost,
"success": False,
"error_type": type(response_obj).__name__,
"request_timestamp": datetime.now(UTC),
}
await self.tracker.record_usage(usage_record)
async def _publish_cost_event(self, project_id: str, usage: dict):
"""Publish real-time cost event to dashboard."""
await self.event_bus.publish(f"project:{project_id}", {
"type": "cost_update",
"data": {
"model": usage["model"],
"tokens": usage["total_tokens"],
"cost_usd": usage["total_cost_usd"],
"timestamp": usage["request_timestamp"].isoformat(),
}
})
def _calculate_cost(self, model: str, usage) -> float:
"""Calculate total cost from usage."""
input_cost = self._calculate_input_cost(model, usage.prompt_tokens)
output_cost = self._calculate_output_cost(model, usage.completion_tokens)
return input_cost + output_cost
def _calculate_input_cost(self, model: str, tokens: int) -> float:
"""Calculate input token cost."""
costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
return (tokens / 1_000_000) * costs["input"]
def _calculate_output_cost(self, model: str, tokens: int) -> float:
"""Calculate output token cost."""
costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
return (tokens / 1_000_000) * costs["output"]
def _get_provider(self, model: str) -> str:
"""Extract provider from model name."""
if model.startswith("claude"):
return "anthropic"
elif model.startswith("gpt") or model.startswith("o1"):
return "openai"
elif model.startswith("ollama/"):
return "ollama"
return "unknown"
def _get_fallback_model(self, model: str) -> str:
"""Get cheaper fallback model."""
fallbacks = {
"claude-3-5-sonnet-20241022": "claude-3-haiku-20240307",
"gpt-4-turbo": "gpt-4o-mini",
"gpt-4o": "gpt-4o-mini",
}
return fallbacks.get(model, model)
# Model cost database (per 1M tokens)
MODEL_COSTS = {
# Anthropic
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
"claude-3-opus-20240229": {"input": 15.00, "output": 75.00},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
# OpenAI
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"o1-preview": {"input": 15.00, "output": 60.00},
"o1-mini": {"input": 3.00, "output": 12.00},
# Local (compute-only, no API cost)
"ollama/llama3": {"input": 0.00, "output": 0.00},
"ollama/mixtral": {"input": 0.00, "output": 0.00},
}
Registering the Callback
# app/services/llm_gateway.py
import litellm
from app.services.cost_tracking.callbacks import SyndarixCostCallback
# Register the callback globally
syndarix_callback = SyndarixCostCallback()
litellm.callbacks = [syndarix_callback]
# Or register specific callbacks
litellm.success_callback = [syndarix_callback.async_log_success_event]
litellm.failure_callback = [syndarix_callback.async_log_failure_event]
Budget Management System
Budget Enforcer
# app/services/cost_tracking/budget.py
from dataclasses import dataclass
from datetime import datetime, UTC, timedelta
from typing import Optional
import redis.asyncio as redis
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.cost_tracking import Budget, BudgetPeriod, BudgetAlert, AlertSeverity
@dataclass
class BudgetStatus:
budget_id: str
limit: float
spent: float
remaining: float
percent_used: float
exceeded: bool
hard_limit: bool
auto_downgrade: bool
threshold_breached: Optional[str] = None # warn, alert, critical
class BudgetEnforcer:
"""Real-time budget enforcement using Redis."""
def __init__(self, redis_url: str = None):
self.redis = redis.from_url(redis_url or settings.REDIS_URL)
async def check_budget(self, project_id: str) -> BudgetStatus:
"""Check current budget status for a project."""
# Get active budgets for project
budgets = await self._get_active_budgets(project_id)
if not budgets:
return BudgetStatus(
budget_id=None,
limit=float("inf"),
spent=0,
remaining=float("inf"),
percent_used=0,
exceeded=False,
hard_limit=False,
auto_downgrade=False,
)
# Check most restrictive budget
for budget in budgets:
spend_key = f"budget:{budget.id}:spend"
spent = float(await self.redis.get(spend_key) or 0)
percent_used = (spent / budget.limit_usd) * 100
exceeded = spent >= budget.limit_usd
# Determine threshold breach
threshold_breached = None
if percent_used >= budget.critical_threshold * 100:
threshold_breached = "critical"
elif percent_used >= budget.alert_threshold * 100:
threshold_breached = "alert"
elif percent_used >= budget.warn_threshold * 100:
threshold_breached = "warn"
return BudgetStatus(
budget_id=str(budget.id),
limit=budget.limit_usd,
spent=spent,
remaining=max(0, budget.limit_usd - spent),
percent_used=percent_used,
exceeded=exceeded,
hard_limit=budget.hard_limit,
auto_downgrade=budget.auto_downgrade,
threshold_breached=threshold_breached,
)
async def increment_spend(self, project_id: str, amount: float) -> BudgetStatus:
"""Increment budget spend and check thresholds."""
budgets = await self._get_active_budgets(project_id)
for budget in budgets:
spend_key = f"budget:{budget.id}:spend"
# Atomic increment
new_spend = await self.redis.incrbyfloat(spend_key, amount)
# Check and trigger alerts
await self._check_thresholds(budget, new_spend)
return await self.check_budget(project_id)
async def _check_thresholds(self, budget: Budget, current_spend: float):
"""Check thresholds and create alerts if needed."""
percent = (current_spend / budget.limit_usd) * 100
thresholds = [
(budget.warn_threshold * 100, AlertSeverity.INFO),
(budget.alert_threshold * 100, AlertSeverity.WARNING),
(budget.critical_threshold * 100, AlertSeverity.CRITICAL),
]
for threshold, severity in thresholds:
if percent >= threshold:
# Check if alert already exists for this threshold
alert_key = f"budget:{budget.id}:alert:{severity.value}"
if not await self.redis.exists(alert_key):
await self._create_alert(budget, severity, percent, current_spend)
# Mark alert as sent
await self.redis.setex(alert_key, 86400, "1") # 24h TTL
async def _create_alert(
self,
budget: Budget,
severity: AlertSeverity,
percent: float,
current_spend: float
):
"""Create budget alert and trigger notifications."""
alert = BudgetAlert(
budget_id=budget.id,
project_id=budget.project_id,
severity=severity,
threshold_percent=percent,
current_spend=current_spend,
budget_limit=budget.limit_usd,
message=f"Budget '{budget.name}' at {percent:.1f}% (${current_spend:.2f}/${budget.limit_usd:.2f})"
)
# Persist alert
async with get_async_session() as session:
session.add(alert)
await session.commit()
# Publish alert event
await EventBus().publish(f"project:{budget.project_id}", {
"type": "budget_alert",
"data": {
"severity": severity.value,
"message": alert.message,
"percent_used": percent,
"budget_name": budget.name,
}
})
async def reset_budget_period(self, budget_id: str):
"""Reset budget counter for new period."""
spend_key = f"budget:{budget_id}:spend"
await self.redis.delete(spend_key)
# Clear alert markers
for severity in AlertSeverity:
alert_key = f"budget:{budget_id}:alert:{severity.value}"
await self.redis.delete(alert_key)
def check_budget_sync(self, project_id: str) -> BudgetStatus:
"""Synchronous version for pre-request checks."""
import asyncio
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.check_budget(project_id))
Budget Period Management (Celery Task)
# app/tasks/budget_tasks.py
from celery import shared_task
from datetime import datetime, UTC, timedelta
from app.services.cost_tracking.budget import BudgetEnforcer
@shared_task
def check_budget_periods():
"""
Scheduled task to manage budget periods.
Run every hour via Celery Beat.
"""
now = datetime.now(UTC)
enforcer = BudgetEnforcer()
# Find budgets that need period reset
with get_session() as session:
expired_budgets = session.query(Budget).filter(
Budget.is_active == True,
Budget.period_end <= now
).all()
for budget in expired_budgets:
# Archive current period spend
archive_budget_period(budget)
# Calculate new period
if budget.period == BudgetPeriod.DAILY:
period_start = now.replace(hour=0, minute=0, second=0)
period_end = period_start + timedelta(days=1)
elif budget.period == BudgetPeriod.WEEKLY:
period_start = now - timedelta(days=now.weekday())
period_end = period_start + timedelta(weeks=1)
elif budget.period == BudgetPeriod.MONTHLY:
period_start = now.replace(day=1)
next_month = period_start + timedelta(days=32)
period_end = next_month.replace(day=1)
# Update budget
budget.period_start = period_start
budget.period_end = period_end
budget.current_spend = 0
session.commit()
# Reset Redis counter
asyncio.run(enforcer.reset_budget_period(str(budget.id)))
@shared_task
def daily_cost_rollup():
"""
Aggregate daily costs into summary table.
Run at 1 AM daily via Celery Beat.
"""
yesterday = datetime.now(UTC).date() - timedelta(days=1)
with get_session() as session:
# Aggregate by project, agent_type, model
results = session.execute("""
INSERT INTO daily_cost_summaries
(date, project_id, agent_type_id, model,
request_count, total_prompt_tokens, total_completion_tokens,
total_tokens, total_cost_usd, avg_latency_ms,
cache_hit_rate, success_rate)
SELECT
DATE(request_timestamp) as date,
project_id,
agent_type_id,
model,
COUNT(*) as request_count,
SUM(prompt_tokens) as total_prompt_tokens,
SUM(completion_tokens) as total_completion_tokens,
SUM(total_tokens) as total_tokens,
SUM(total_cost_usd) as total_cost_usd,
AVG(latency_ms) as avg_latency_ms,
SUM(cached_tokens)::float / NULLIF(SUM(total_tokens), 0) as cache_hit_rate,
AVG(CASE WHEN success THEN 1 ELSE 0 END) as success_rate
FROM token_usage
WHERE DATE(request_timestamp) = :yesterday
GROUP BY DATE(request_timestamp), project_id, agent_type_id, model
ON CONFLICT DO NOTHING
""", {"yesterday": yesterday})
session.commit()
Alert System Integration
Alert Configuration
# app/services/cost_tracking/alerts.py
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
class AlertChannel(str, Enum):
EMAIL = "email"
SLACK = "slack"
WEBHOOK = "webhook"
SSE = "sse" # Real-time dashboard
@dataclass
class AlertRule:
budget_id: str
channels: List[AlertChannel]
recipients: List[str] # Email addresses, Slack channels, etc.
escalation_delay: int = 0 # Minutes before escalating
class AlertDispatcher:
"""Dispatch budget alerts to configured channels."""
async def dispatch(self, alert: BudgetAlert, rules: List[AlertRule]):
"""Send alert through all configured channels."""
for rule in rules:
if rule.budget_id == str(alert.budget_id):
for channel in rule.channels:
await self._send_alert(channel, alert, rule.recipients)
async def _send_alert(
self,
channel: AlertChannel,
alert: BudgetAlert,
recipients: List[str]
):
"""Send alert to specific channel."""
notification = {
"channel": channel.value,
"sent_at": datetime.now(UTC).isoformat(),
"recipients": recipients,
}
if channel == AlertChannel.EMAIL:
await self._send_email_alert(alert, recipients)
elif channel == AlertChannel.SLACK:
await self._send_slack_alert(alert, recipients)
elif channel == AlertChannel.WEBHOOK:
await self._send_webhook_alert(alert, recipients)
elif channel == AlertChannel.SSE:
# Already handled via EventBus
pass
# Track notification
alert.notifications_sent.append(notification)
async def _send_slack_alert(self, alert: BudgetAlert, channels: List[str]):
"""Send alert to Slack."""
color = {
AlertSeverity.INFO: "#36a64f", # Green
AlertSeverity.WARNING: "#ff9800", # Orange
AlertSeverity.CRITICAL: "#f44336", # Red
}[alert.severity]
payload = {
"attachments": [{
"color": color,
"title": f"Budget Alert: {alert.message}",
"fields": [
{"title": "Budget", "value": f"${alert.budget_limit:.2f}", "short": True},
{"title": "Current Spend", "value": f"${alert.current_spend:.2f}", "short": True},
{"title": "Usage", "value": f"{alert.threshold_percent:.1f}%", "short": True},
{"title": "Severity", "value": alert.severity.value.upper(), "short": True},
],
"ts": datetime.now(UTC).timestamp(),
}]
}
for channel in channels:
await send_slack_message(channel, payload)
Cost Optimization Strategies
1. Semantic Caching
# app/services/cost_tracking/cache.py
import hashlib
import numpy as np
from typing import Optional, Tuple
import redis.asyncio as redis
class SemanticCache:
"""
Cache LLM responses based on semantic similarity.
Uses embedding vectors to find similar queries.
"""
def __init__(self, redis_url: str, similarity_threshold: float = 0.95):
self.redis = redis.from_url(redis_url)
self.threshold = similarity_threshold
self.embedding_model = "text-embedding-3-small" # OpenAI
async def get(
self,
messages: list,
model: str,
project_id: str
) -> Optional[Tuple[str, dict]]:
"""
Check cache for semantically similar query.
Returns:
Tuple of (cached_response, usage_info) or None
"""
query_embedding = await self._get_embedding(messages)
cache_key_pattern = f"cache:semantic:{project_id}:{model}:*"
# Search for similar embeddings
async for key in self.redis.scan_iter(match=cache_key_pattern):
cached = await self.redis.hgetall(key)
if cached:
cached_embedding = np.frombuffer(cached[b"embedding"], dtype=np.float32)
similarity = self._cosine_similarity(query_embedding, cached_embedding)
if similarity >= self.threshold:
return (
cached[b"response"].decode(),
{
"cache_hit": True,
"similarity": similarity,
"tokens_saved": int(cached[b"tokens"]),
"cost_saved": float(cached[b"cost"]),
}
)
return None
async def set(
self,
messages: list,
model: str,
project_id: str,
response: str,
usage: dict,
ttl: int = 3600
):
"""Cache response with embedding for future similarity matching."""
query_embedding = await self._get_embedding(messages)
# Create unique key
content_hash = hashlib.sha256(str(messages).encode()).hexdigest()[:16]
cache_key = f"cache:semantic:{project_id}:{model}:{content_hash}"
await self.redis.hset(cache_key, mapping={
"embedding": query_embedding.tobytes(),
"response": response,
"tokens": usage["total_tokens"],
"cost": usage["total_cost_usd"],
"model": model,
"created_at": datetime.now(UTC).isoformat(),
})
await self.redis.expire(cache_key, ttl)
async def _get_embedding(self, messages: list) -> np.ndarray:
"""Generate embedding for messages."""
# Extract text content
text = " ".join(m.get("content", "") for m in messages)
# Call embedding API
response = await litellm.aembedding(
model=self.embedding_model,
input=text
)
return np.array(response.data[0].embedding, dtype=np.float32)
@staticmethod
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Calculate cosine similarity between vectors."""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
2. Model Cascading
# app/services/cost_tracking/cascade.py
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class CascadeModel:
model: str
cost_per_1k_tokens: float
max_complexity: float # 0-1 score
timeout: int = 30
class ModelCascade:
"""
Route queries to cheapest capable model.
Escalate to more expensive models only when needed.
"""
def __init__(self):
self.models = [
CascadeModel("claude-3-haiku-20240307", 0.00075, 0.3, timeout=10),
CascadeModel("gpt-4o-mini", 0.000375, 0.4, timeout=15),
CascadeModel("claude-3-5-sonnet-20241022", 0.009, 0.8, timeout=30),
CascadeModel("gpt-4-turbo", 0.02, 1.0, timeout=60),
]
self.complexity_classifier = None # TODO: Train classifier
async def route(
self,
messages: list,
required_quality: float = 0.7
) -> str:
"""
Determine the best model for the query.
Args:
messages: The query messages
required_quality: Minimum quality threshold (0-1)
Returns:
Model identifier to use
"""
complexity = await self._estimate_complexity(messages)
# Find cheapest model that meets requirements
for model in self.models:
if model.max_complexity >= complexity and model.max_complexity >= required_quality:
return model.model
# Fallback to most capable
return self.models[-1].model
async def _estimate_complexity(self, messages: list) -> float:
"""
Estimate query complexity (0-1).
Factors:
- Message length
- Technical terms
- Multi-step reasoning required
- Code generation requested
"""
text = " ".join(m.get("content", "") for m in messages)
# Simple heuristics (replace with ML model)
complexity = 0.0
# Length factor
word_count = len(text.split())
if word_count > 500:
complexity += 0.2
elif word_count > 200:
complexity += 0.1
# Technical indicators
technical_terms = ["implement", "architect", "optimize", "debug", "refactor"]
if any(term in text.lower() for term in technical_terms):
complexity += 0.3
# Code indicators
if "```" in text or "code" in text.lower():
complexity += 0.2
# Multi-step indicators
if "step" in text.lower() or "first" in text.lower() or "then" in text.lower():
complexity += 0.2
return min(1.0, complexity)
async def execute_with_fallback(
self,
messages: list,
required_quality: float = 0.7,
confidence_threshold: float = 0.8
) -> dict:
"""
Execute query with automatic fallback to stronger models.
"""
for model in self.models:
if model.max_complexity < required_quality:
continue
try:
response = await litellm.acompletion(
model=model.model,
messages=messages,
timeout=model.timeout,
)
# Check response quality (simplified)
confidence = await self._evaluate_response(response, messages)
if confidence >= confidence_threshold:
return {
"response": response,
"model_used": model.model,
"fallback_count": self.models.index(model),
}
except Exception as e:
# Log and try next model
continue
# Final fallback
return await self._execute_premium(messages)
3. Prompt Compression (LLMLingua Integration)
# app/services/cost_tracking/compression.py
from llmlingua import PromptCompressor
class PromptOptimizer:
"""
Compress prompts to reduce token count while preserving meaning.
Uses LLMLingua for intelligent compression.
"""
def __init__(self):
self.compressor = PromptCompressor(
model_name="microsoft/llmlingua-2-bert-base-multilingual-cased-meetingbank",
use_llmlingua2=True
)
def compress(
self,
messages: list,
target_ratio: float = 0.5, # Compress to 50% of original
preserve_instructions: bool = True
) -> list:
"""
Compress messages while preserving critical information.
Args:
messages: Original messages
target_ratio: Target compression ratio
preserve_instructions: Whether to preserve system instructions
Returns:
Compressed messages
"""
compressed_messages = []
for msg in messages:
role = msg["role"]
content = msg["content"]
# Don't compress system instructions if requested
if role == "system" and preserve_instructions:
compressed_messages.append(msg)
continue
# Apply compression
compressed = self.compressor.compress_prompt(
content,
rate=target_ratio,
force_tokens=self._extract_critical_tokens(content),
)
compressed_messages.append({
"role": role,
"content": compressed["compressed_prompt"]
})
return compressed_messages
def _extract_critical_tokens(self, text: str) -> list:
"""Extract tokens that should never be removed."""
# Preserve code blocks, variable names, etc.
import re
critical = []
# Preserve code blocks
code_blocks = re.findall(r'```[\s\S]*?```', text)
for block in code_blocks:
critical.extend(block.split())
# Preserve quoted strings
quotes = re.findall(r'"[^"]*"', text)
critical.extend(quotes)
return critical
def estimate_savings(self, original: list, compressed: list) -> dict:
"""Calculate token savings from compression."""
original_tokens = sum(
len(m["content"].split()) * 1.3 # Rough token estimate
for m in original
)
compressed_tokens = sum(
len(m["content"].split()) * 1.3
for m in compressed
)
return {
"original_tokens": int(original_tokens),
"compressed_tokens": int(compressed_tokens),
"tokens_saved": int(original_tokens - compressed_tokens),
"compression_ratio": compressed_tokens / original_tokens,
}
Cost Estimation Before Execution
# app/services/cost_tracking/estimator.py
from dataclasses import dataclass
from typing import Optional
import tiktoken
@dataclass
class CostEstimate:
input_tokens: int
estimated_output_tokens: int
min_cost_usd: float
max_cost_usd: float
expected_cost_usd: float
model: str
confidence: str # low, medium, high
class CostEstimator:
"""
Estimate LLM request cost before execution.
"""
def __init__(self):
self.encoders = {}
def estimate(
self,
messages: list,
model: str,
max_tokens: Optional[int] = None
) -> CostEstimate:
"""
Estimate cost for a completion request.
Args:
messages: Input messages
model: Target model
max_tokens: Maximum output tokens (if known)
"""
# Count input tokens
input_tokens = self._count_tokens(messages, model)
# Estimate output tokens
if max_tokens:
estimated_output = max_tokens
confidence = "high"
else:
estimated_output = self._estimate_output_tokens(messages, model)
confidence = "medium"
# Get model costs
costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
# Calculate costs
input_cost = (input_tokens / 1_000_000) * costs["input"]
min_output = estimated_output * 0.3 # Conservative
max_output = estimated_output * 1.5 # Liberal
expected_output = estimated_output
min_output_cost = (min_output / 1_000_000) * costs["output"]
max_output_cost = (max_output / 1_000_000) * costs["output"]
expected_output_cost = (expected_output / 1_000_000) * costs["output"]
return CostEstimate(
input_tokens=input_tokens,
estimated_output_tokens=estimated_output,
min_cost_usd=input_cost + min_output_cost,
max_cost_usd=input_cost + max_output_cost,
expected_cost_usd=input_cost + expected_output_cost,
model=model,
confidence=confidence,
)
def _count_tokens(self, messages: list, model: str) -> int:
"""Count tokens for input messages."""
# Get or create encoder
if model not in self.encoders:
try:
if "gpt" in model or "o1" in model:
self.encoders[model] = tiktoken.encoding_for_model(model)
else:
# Fallback to cl100k_base for non-OpenAI
self.encoders[model] = tiktoken.get_encoding("cl100k_base")
except Exception:
self.encoders[model] = tiktoken.get_encoding("cl100k_base")
encoder = self.encoders[model]
total = 0
for msg in messages:
total += len(encoder.encode(msg.get("content", "")))
total += 4 # Message overhead tokens
return total + 2 # Priming tokens
def _estimate_output_tokens(self, messages: list, model: str) -> int:
"""
Estimate expected output tokens based on input.
Uses heuristics and historical data.
"""
input_tokens = self._count_tokens(messages, model)
last_message = messages[-1].get("content", "") if messages else ""
# Heuristics based on request type
if "explain" in last_message.lower() or "describe" in last_message.lower():
# Explanatory responses tend to be longer
return min(2000, input_tokens * 1.5)
if "code" in last_message.lower() or "implement" in last_message.lower():
# Code generation can be substantial
return min(4000, input_tokens * 2)
if "yes or no" in last_message.lower() or "brief" in last_message.lower():
# Short responses
return 100
if "list" in last_message.lower():
# Lists are medium length
return min(1000, input_tokens * 0.8)
# Default: output roughly equal to input
return min(2000, input_tokens)
def should_warn(
self,
estimate: CostEstimate,
budget_remaining: float,
threshold: float = 0.1 # Warn if request is >10% of remaining budget
) -> bool:
"""Check if user should be warned about expensive request."""
if budget_remaining <= 0:
return True
return estimate.max_cost_usd > (budget_remaining * threshold)
Pre-Execution Check Integration
# app/services/llm_gateway.py
class LLMGateway:
async def complete(
self,
agent_id: str,
project_id: str,
messages: list,
model_preference: str = "high-reasoning",
require_estimate: bool = False,
**kwargs
) -> dict:
"""Generate completion with optional cost estimation."""
model = self._resolve_model(model_preference)
if require_estimate:
estimator = CostEstimator()
estimate = estimator.estimate(messages, model, kwargs.get("max_tokens"))
# Check against budget
budget_status = await self.budget_enforcer.check_budget(project_id)
if estimator.should_warn(estimate, budget_status.remaining):
return {
"type": "cost_warning",
"estimate": estimate,
"budget_remaining": budget_status.remaining,
"message": f"This request may cost ${estimate.expected_cost_usd:.4f}. "
f"Remaining budget: ${budget_status.remaining:.2f}. "
f"Proceed?",
"alternatives": self._suggest_alternatives(estimate, model),
}
# Proceed with request
return await self._execute_completion(...)
def _suggest_alternatives(self, estimate: CostEstimate, model: str) -> list:
"""Suggest cost-saving alternatives."""
alternatives = []
# Cheaper model
if model == "claude-3-5-sonnet-20241022":
alternatives.append({
"action": "use_haiku",
"model": "claude-3-haiku-20240307",
"estimated_savings": estimate.expected_cost_usd * 0.9,
"quality_impact": "May reduce quality for complex tasks",
})
# Compression
alternatives.append({
"action": "compress_prompt",
"estimated_savings": estimate.expected_cost_usd * 0.3,
"quality_impact": "Minimal for most use cases",
})
return alternatives
Reporting Dashboard Requirements
Dashboard Components
| Component | Data Source | Update Frequency |
|---|---|---|
| Real-time Spend Ticker | Redis + SSE | Real-time |
| Budget Utilization Gauges | Redis | 1 second |
| Daily Cost Chart | PostgreSQL (daily_cost_summaries) | 1 minute |
| Model Breakdown Pie Chart | PostgreSQL | 5 minutes |
| Agent Efficiency Table | PostgreSQL | 5 minutes |
| Cost Forecast | PostgreSQL + ML | 1 hour |
| Alert Feed | SSE + PostgreSQL | Real-time |
API Endpoints
# app/api/v1/costs.py
from fastapi import APIRouter, Query
from datetime import datetime, timedelta
router = APIRouter(prefix="/costs", tags=["costs"])
@router.get("/summary/{project_id}")
async def get_cost_summary(
project_id: str,
period: str = Query("day", regex="^(day|week|month)$")
) -> CostSummaryResponse:
"""Get cost summary for a project."""
pass
@router.get("/breakdown/{project_id}")
async def get_cost_breakdown(
project_id: str,
group_by: str = Query("model", regex="^(model|agent_type|day)$"),
start_date: datetime = None,
end_date: datetime = None
) -> CostBreakdownResponse:
"""Get detailed cost breakdown."""
pass
@router.get("/trends/{project_id}")
async def get_cost_trends(
project_id: str,
days: int = Query(30, ge=1, le=365)
) -> CostTrendsResponse:
"""Get historical cost trends."""
pass
@router.get("/forecast/{project_id}")
async def get_cost_forecast(
project_id: str,
days_ahead: int = Query(7, ge=1, le=30)
) -> CostForecastResponse:
"""Get projected costs based on current trends."""
pass
@router.get("/efficiency/{project_id}")
async def get_efficiency_metrics(
project_id: str
) -> EfficiencyMetricsResponse:
"""Get cost efficiency metrics (cost per task, cache hit rate, etc.)."""
pass
@router.get("/budgets/{project_id}")
async def get_budgets(project_id: str) -> List[BudgetResponse]:
"""Get all budgets for a project."""
pass
@router.post("/budgets")
async def create_budget(budget: BudgetCreate) -> BudgetResponse:
"""Create a new budget."""
pass
@router.put("/budgets/{budget_id}")
async def update_budget(budget_id: str, budget: BudgetUpdate) -> BudgetResponse:
"""Update budget configuration."""
pass
@router.get("/alerts/{project_id}")
async def get_alerts(
project_id: str,
status: str = Query(None, regex="^(pending|acknowledged|resolved)$")
) -> List[AlertResponse]:
"""Get budget alerts for a project."""
pass
@router.post("/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: str) -> AlertResponse:
"""Acknowledge a budget alert."""
pass
Dashboard Wireframe
+------------------------------------------------------------------+
| Syndarix Cost Dashboard |
+------------------------------------------------------------------+
| |
| +------------------+ +------------------+ +-------------------+ |
| | DAILY SPEND | | WEEKLY BUDGET | | MONTHLY BUDGET | |
| | $12.47 | | ████████░░ 76% | | ████░░░░░░ 42% | |
| | ▲ 15% vs avg | | $152/$200 | | $421/$1000 | |
| +------------------+ +------------------+ +-------------------+ |
| |
| +-------------------------------+ +----------------------------+ |
| | COST BY MODEL (7 days) | | DAILY COST TREND | |
| | ┌───────────────────┐ | | $ | |
| | │ Claude Sonnet 64% │ | | 20├────────────────────── | |
| | │ GPT-4o-mini 22% │ | | 15├─────────────█──────────| |
| | │ Claude Haiku 9% │ | | 10├────────█───█───────────| |
| | │ Other 5% │ | | 5├──█───█───█───█─────────| |
| | └───────────────────┘ | | 0└───────────────────────>| |
| +-------------------------------+ | Mon Tue Wed Thu Fri | |
| +----------------------------+ |
| |
| +---------------------------------------------------------------+ |
| | AGENT EFFICIENCY | |
| +----------+----------+----------+-----------+------------------+ |
| | Agent | Requests | Tokens | Cost | Cost/Task | |
| +----------+----------+----------+-----------+------------------+ |
| | Arch | 145 | 2.3M | $18.50 | $0.13 | |
| | Engineer | 892 | 12.1M | $96.80 | $0.11 | |
| | QA | 234 | 1.2M | $4.20 | $0.02 | |
| +----------+----------+----------+-----------+------------------+ |
| |
| +---------------------------------------------------------------+ |
| | RECENT ALERTS [Clear] | |
| +---------------------------------------------------------------+ |
| | ! WARNING Daily budget at 80% ($16/$20) 2 mins ago | |
| | i INFO Weekly budget at 50% 1 hour ago | |
| +---------------------------------------------------------------+ |
+------------------------------------------------------------------+
Implementation Roadmap
Phase 1: Core Infrastructure (Week 1-2)
-
Database Schema
- Create migration for
token_usage,budgets,budget_alerts - Create migration for
daily_cost_summaries,model_pricing - Add indexes for performance
- Create migration for
-
Redis Setup
- Configure Redis keys for budget counters
- Implement budget increment/decrement operations
- Set up TTL for temporary data
-
LiteLLM Callback
- Implement
SyndarixCostCallbackclass - Integrate with existing LLM Gateway
- Add metadata propagation for attribution
- Implement
Phase 2: Budget Management (Week 2-3)
-
Budget Enforcer
- Implement
BudgetEnforcerclass - Add pre-request budget checks
- Implement soft/hard limit logic
- Implement
-
Alert System
- Implement threshold detection
- Create alert dispatcher
- Integrate with SSE event bus
-
Celery Tasks
- Budget period management task
- Daily cost rollup task
- Alert digest task
Phase 3: Cost Optimization (Week 3-4)
-
Semantic Cache
- Implement
SemanticCachewith Redis - Integrate embedding generation
- Add cache hit/miss tracking
- Implement
-
Model Cascading
- Implement
ModelCascaderouter - Add complexity estimation
- Integrate with LLM Gateway
- Implement
-
Cost Estimation
- Implement
CostEstimator - Add pre-execution warnings
- Create alternative suggestions
- Implement
Phase 4: Reporting (Week 4-5)
-
API Endpoints
- Implement cost summary endpoints
- Add breakdown and trends endpoints
- Create forecast endpoint
-
Dashboard
- Design and implement cost dashboard UI
- Add real-time updates via SSE
- Create alert management interface
Phase 5: Testing & Documentation (Week 5-6)
-
Testing
- Unit tests for cost tracking
- Integration tests for budget enforcement
- Load tests for high-volume scenarios
-
Documentation
- API documentation
- User guide for budget management
- Operations runbook
References
Research Sources
- LiteLLM Custom Callbacks Documentation
- LiteLLM Cost Calculation
- LiteLLM Spend Tracking
- LLM Cost Optimization Guide 2025
- Dynamic Load Balancing for Multi-Tenant LLMs
- Multi-Tenant AI Architecture - Azure
- LLMLingua Prompt Compression
- Model Cascading for Cost Reduction
- BlockLLM Multi-tenant LLM Serving
Related Spikes
- SPIKE-005: LLM Provider Abstraction - LiteLLM integration baseline
- SPIKE-003: Real-time Updates - SSE event architecture
- SPIKE-004: Celery + Redis Integration - Background task infrastructure
Decision
Implement a comprehensive cost tracking system with:
- LiteLLM callbacks for real-time usage capture
- Redis + PostgreSQL for hybrid hot/warm data storage
- Hierarchical budgets with soft/hard limit enforcement
- Multi-channel alerts via SSE, email, and Slack
- Cost optimization through caching, cascading, and compression
Expected Outcomes:
- Full cost visibility per project, agent, and model
- Real-time budget enforcement preventing cost overruns
- 60-80% cost reduction through optimization strategies
- Actionable insights for cost-aware decision making
Spike completed. Findings will inform ADR-010: Cost Tracking Architecture.