# SPIKE-010: Cost Tracking for Syndarix **Status:** Completed **Date:** 2025-12-29 **Author:** Architecture Team **Related Issue:** #10 --- ## Executive Summary Syndarix requires comprehensive LLM cost tracking to manage expenses across multiple providers (Anthropic, OpenAI, local Ollama). This spike researches token usage monitoring, budget enforcement, cost optimization strategies, and real-time alerting. ### Recommendation **Adopt a multi-layered cost tracking architecture:** 1. **LiteLLM Callbacks** for real-time usage capture at the gateway level 2. **PostgreSQL** for persistent usage records with time-series aggregation 3. **Redis** for real-time budget enforcement and rate limiting 4. **Celery Beat** for scheduled budget checks and alert processing 5. **SSE Events** for real-time dashboard updates **Expected Cost Savings:** 60-80% reduction through combined optimization strategies (semantic caching, model cascading, prompt compression). --- ## Table of Contents 1. [Research Questions & Findings](#research-questions--findings) 2. [Cost Tracking Architecture](#cost-tracking-architecture) 3. [Database Schema Design](#database-schema-design) 4. [LiteLLM Callback Implementation](#litellm-callback-implementation) 5. [Budget Management System](#budget-management-system) 6. [Alert System Integration](#alert-system-integration) 7. [Cost Optimization Strategies](#cost-optimization-strategies) 8. [Cost Estimation Before Execution](#cost-estimation-before-execution) 9. [Reporting Dashboard Requirements](#reporting-dashboard-requirements) 10. [Implementation Roadmap](#implementation-roadmap) --- ## Research Questions & Findings ### 1. How does LiteLLM track token usage and costs? LiteLLM provides built-in cost tracking through multiple mechanisms: **Response Usage Object:** ```python response = await litellm.acompletion( model="claude-3-5-sonnet-20241022", messages=[...] ) # Access usage data print(response.usage.prompt_tokens) # Input tokens print(response.usage.completion_tokens) # Output tokens print(response.usage.total_tokens) # Total tokens ``` **Automatic Cost Calculation:** LiteLLM maintains a centralized pricing database (`model_prices_and_context_window.json`) with costs for 100+ models. Cost is accessible via `kwargs["response_cost"]` in callbacks. **Custom Callbacks:** ```python from litellm.integrations.custom_logger import CustomLogger class SyndarixCostLogger(CustomLogger): async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): cost = kwargs.get("response_cost", 0) model = kwargs.get("model") usage = response_obj.usage # Store in database ``` **Custom Pricing Override:** ```python response = await litellm.acompletion( model="custom-model", messages=[...], input_cost_per_token=0.000003, # $3 per 1M tokens output_cost_per_token=0.000015 # $15 per 1M tokens ) ``` ### 2. Best practices for LLM cost attribution in multi-tenant systems? **Key Patterns Identified:** 1. **Hierarchical Attribution:** Track costs at multiple levels (organization > project > agent type > agent instance > request) 2. **Metadata Tagging:** Include cost center, budget ID, and user context in every request 3. **Tenant-Aware Budgets:** Implement per-customer tier budgets with different limits 4. **Chargeback/Showback:** Enable detailed cost allocation for billing integration **Multi-Tenant Architecture for Syndarix:** ``` Organization (Billing Entity) └── Project (Cost Center) └── Sprint (Time-bounded Budget) └── Agent Instance (Worker) └── LLM Request (Atomic Cost Unit) ``` ### 3. Real-time cost monitoring approaches? **Recommended Approach:** Hybrid Redis + PostgreSQL | Layer | Technology | Purpose | |-------|------------|---------| | Hot data | Redis | Real-time budget counters, rate limiting | | Warm data | PostgreSQL | Hourly/daily aggregates, analytics | | Cold data | S3/Archive | Monthly exports, long-term retention | **Real-time Pipeline:** ``` LLM Request → LiteLLM Callback → Redis INCR → Budget Check ↓ Async Queue → PostgreSQL Insert → SSE Event ``` ### 4. Budget enforcement strategies (soft vs hard limits)? **Soft Limits (Recommended for most cases):** - Send alerts at threshold percentages (50%, 80%, 100%) - Allow continued usage with warnings - Automatic model downgrade (e.g., Sonnet → Haiku) - Require approval for continued expensive operations **Hard Limits (For critical cost control):** - Block requests once budget exhausted - Reject LLM calls at gateway level - Require manual budget increase **Syndarix Recommendation:** - **Daily budgets:** Hard limits (prevent runaway costs) - **Weekly/Monthly budgets:** Soft limits with alerts and escalation - **Per-request limits:** Enforce max tokens per call ### 5. Cost optimization techniques? **Tier 1: Immediate Impact (0 code changes)** - Semantic caching: 15-30% cost reduction - Response caching for deterministic queries **Tier 2: Model Selection (Architecture changes)** - Model cascading: Start with cheaper models, escalate as needed - 87% cost reduction possible (90% of queries handled by smaller models) - Dynamic routing based on query complexity **Tier 3: Prompt Engineering (Ongoing optimization)** - Prompt compression with LLMLingua: Up to 20x compression with <5% quality loss - Extractive compression for RAG: 2-10x compression, often improves accuracy **Tier 4: Infrastructure (Long-term)** - Self-hosted models for high-volume, latency-tolerant tasks - Fine-tuned smaller models for domain-specific tasks ### 6. Reporting and visualization requirements? **Dashboard Requirements:** 1. Real-time spend ticker (updated via SSE) 2. Cost breakdown by: project, agent type, model, time period 3. Budget utilization gauges with threshold indicators 4. Historical trend charts (daily/weekly/monthly) 5. Anomaly detection alerts (spending spikes) 6. Forecast projections based on current burn rate 7. Cost per task/feature analysis 8. Comparative efficiency metrics (cost per successful completion) ### 7. Cost estimation before execution? **Pre-execution Estimation Pattern:** ```python def estimate_cost(messages: list, model: str) -> CostEstimate: """Estimate cost before making LLM call.""" input_tokens = count_tokens(messages, model) estimated_output = estimate_output_tokens(messages, model) costs = MODEL_COSTS[model] input_cost = (input_tokens / 1_000_000) * costs["input"] output_cost = (estimated_output / 1_000_000) * costs["output"] return CostEstimate( input_tokens=input_tokens, estimated_output_tokens=estimated_output, estimated_cost_usd=input_cost + output_cost, confidence="medium" # Based on output estimation accuracy ) ``` **Token Counting:** - Use `tiktoken` for OpenAI models - Use `anthropic.count_tokens()` for Claude models - Use LiteLLM's `token_counter()` for unified counting --- ## Cost Tracking Architecture ### System Overview ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ Syndarix Backend │ │ ┌────────────────────────────────────────────────────────────────────┐ │ │ │ LLM Gateway (LiteLLM) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Pre-Request │ │ Router │ │ Post-Request│ │ │ │ │ │ Callback │──▶│ (Failover) │──▶│ Callback │ │ │ │ │ └──────┬──────┘ └─────────────┘ └──────┬──────┘ │ │ │ │ │ │ │ │ │ │ ▼ ▼ │ │ │ │ ┌─────────────┐ ┌──────────────────────┐ │ │ │ │ │ Budget │ │ Cost Tracker │ │ │ │ │ │ Check │ │ Service │ │ │ │ │ └──────┬──────┘ └──────────┬───────────┘ │ │ │ └─────────┼────────────────────────────────┼─────────────────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────────────┐ ┌──────────────────────┐ │ │ │ Redis │ │ PostgreSQL │ │ │ │ ┌───────────────┐ │ │ ┌────────────────┐ │ │ │ │ │ Budget │ │ │ │ token_usage │ │ │ │ │ │ Counters │ │ │ │ (time-series) │ │ │ │ │ ├───────────────┤ │ │ ├────────────────┤ │ │ │ │ │ Rate Limits │ │ │ │ budgets │ │ │ │ │ ├───────────────┤ │ │ ├────────────────┤ │ │ │ │ │ Cache Keys │ │ │ │ alerts_log │ │ │ │ │ └───────────────┘ │ │ ├────────────────┤ │ │ │ └─────────────────────┘ │ │ daily_summary │ │ │ │ │ └────────────────┘ │ │ │ └──────────────────────┘ │ │ │ │ │ ┌──────────────────────────────┴─────────────┐ │ │ ▼ ▼ │ │ ┌─────────────────────┐ ┌────────────────────────┐ │ │ │ Event Bus (SSE) │ │ Celery Beat │ │ │ │ - cost_update │ │ - budget_check │ │ │ │ - budget_alert │ │ - daily_rollup │ │ │ │ - threshold_warn │ │ - monthly_report │ │ │ └─────────────────────┘ └────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────┘ ``` ### Component Responsibilities | Component | Responsibility | |-----------|----------------| | LLM Gateway | Route requests, capture usage, enforce limits | | Cost Tracker | Calculate costs, persist records, update counters | | Redis | Real-time budget counters, rate limiting, caching | | PostgreSQL | Persistent usage records, aggregations, analytics | | Event Bus | Real-time notifications to dashboards | | Celery Beat | Scheduled tasks (rollups, alerts, reports) | --- ## Database Schema Design ### Core Tables ```python # app/models/cost_tracking.py from sqlalchemy import Column, String, Integer, Float, DateTime, ForeignKey, Enum, Index from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.orm import relationship from app.models.base import Base, TimestampMixin, UUIDMixin import enum class BudgetPeriod(str, enum.Enum): DAILY = "daily" WEEKLY = "weekly" MONTHLY = "monthly" class AlertSeverity(str, enum.Enum): INFO = "info" # 50% threshold WARNING = "warning" # 80% threshold CRITICAL = "critical" # 100% threshold class AlertStatus(str, enum.Enum): PENDING = "pending" ACKNOWLEDGED = "acknowledged" RESOLVED = "resolved" class TokenUsage(Base, UUIDMixin, TimestampMixin): """Individual LLM request usage record.""" __tablename__ = "token_usage" # Attribution project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False) agent_instance_id = Column(UUID(as_uuid=True), ForeignKey("agent_instances.id"), nullable=True) agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True) user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True) # Request details request_id = Column(String(64), unique=True, nullable=False) # Correlation ID model = Column(String(100), nullable=False) provider = Column(String(50), nullable=False) # anthropic, openai, ollama # Token counts prompt_tokens = Column(Integer, nullable=False) completion_tokens = Column(Integer, nullable=False) total_tokens = Column(Integer, nullable=False) cached_tokens = Column(Integer, default=0) # Tokens served from cache # Cost calculation input_cost_usd = Column(Float, nullable=False) output_cost_usd = Column(Float, nullable=False) total_cost_usd = Column(Float, nullable=False) # Timing latency_ms = Column(Integer, nullable=True) request_timestamp = Column(DateTime(timezone=True), nullable=False) # Metadata task_type = Column(String(50), nullable=True) # reasoning, coding, chat, etc. success = Column(Boolean, default=True) error_type = Column(String(100), nullable=True) metadata = Column(JSONB, default={}) # Additional context # Indexes for common queries __table_args__ = ( Index("ix_token_usage_project_timestamp", "project_id", "request_timestamp"), Index("ix_token_usage_agent_instance_timestamp", "agent_instance_id", "request_timestamp"), Index("ix_token_usage_model_timestamp", "model", "request_timestamp"), Index("ix_token_usage_request_timestamp", "request_timestamp"), ) class Budget(Base, UUIDMixin, TimestampMixin): """Budget definition for cost control.""" __tablename__ = "budgets" # Scope (one of these will be set) project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True) agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True) organization_id = Column(UUID(as_uuid=True), ForeignKey("organizations.id"), nullable=True) # Budget configuration name = Column(String(100), nullable=False) period = Column(Enum(BudgetPeriod), nullable=False) limit_usd = Column(Float, nullable=False) # Thresholds (percentages) warn_threshold = Column(Float, default=0.5) # 50% alert_threshold = Column(Float, default=0.8) # 80% critical_threshold = Column(Float, default=1.0) # 100% # Enforcement hard_limit = Column(Boolean, default=False) # Block requests at limit auto_downgrade = Column(Boolean, default=True) # Downgrade to cheaper model # Status is_active = Column(Boolean, default=True) current_spend = Column(Float, default=0.0) # Cached from Redis period_start = Column(DateTime(timezone=True), nullable=False) period_end = Column(DateTime(timezone=True), nullable=False) __table_args__ = ( Index("ix_budgets_project_active", "project_id", "is_active"), Index("ix_budgets_period_end", "period_end"), ) class BudgetAlert(Base, UUIDMixin, TimestampMixin): """Alert record for budget threshold breaches.""" __tablename__ = "budget_alerts" budget_id = Column(UUID(as_uuid=True), ForeignKey("budgets.id"), nullable=False) project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True) severity = Column(Enum(AlertSeverity), nullable=False) status = Column(Enum(AlertStatus), default=AlertStatus.PENDING) threshold_percent = Column(Float, nullable=False) current_spend = Column(Float, nullable=False) budget_limit = Column(Float, nullable=False) message = Column(String(500), nullable=False) acknowledged_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True) acknowledged_at = Column(DateTime(timezone=True), nullable=True) # Notification tracking notifications_sent = Column(JSONB, default=[]) # [{"channel": "email", "sent_at": "..."}] budget = relationship("Budget", backref="alerts") class DailyCostSummary(Base, UUIDMixin): """Materialized daily cost aggregations for fast reporting.""" __tablename__ = "daily_cost_summaries" date = Column(DateTime(timezone=True), nullable=False) project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True) agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True) model = Column(String(100), nullable=True) # Aggregated metrics request_count = Column(Integer, default=0) total_prompt_tokens = Column(Integer, default=0) total_completion_tokens = Column(Integer, default=0) total_tokens = Column(Integer, default=0) total_cost_usd = Column(Float, default=0.0) # Performance metrics avg_latency_ms = Column(Float, nullable=True) cache_hit_rate = Column(Float, default=0.0) success_rate = Column(Float, default=1.0) __table_args__ = ( Index("ix_daily_summary_date_project", "date", "project_id"), Index("ix_daily_summary_date_model", "date", "model"), ) class ModelPricing(Base, UUIDMixin, TimestampMixin): """Custom model pricing overrides.""" __tablename__ = "model_pricing" model = Column(String(100), unique=True, nullable=False) provider = Column(String(50), nullable=False) input_cost_per_million = Column(Float, nullable=False) output_cost_per_million = Column(Float, nullable=False) # Context limits max_input_tokens = Column(Integer, nullable=True) max_output_tokens = Column(Integer, nullable=True) # Validity period effective_from = Column(DateTime(timezone=True), nullable=False) effective_until = Column(DateTime(timezone=True), nullable=True) is_active = Column(Boolean, default=True) ``` ### Redis Key Structure ```python # Redis key patterns for real-time tracking REDIS_KEYS = { # Budget counters (reset on period start) "budget_spend": "budget:{budget_id}:spend", # INCRBYFLOAT "budget_requests": "budget:{budget_id}:requests", # INCR # Project-level aggregations "project_daily_spend": "project:{project_id}:daily:{date}:spend", "project_monthly_spend": "project:{project_id}:monthly:{year_month}:spend", # Rate limiting "rate_limit": "ratelimit:{project_id}:{model}:{window}", # Semantic cache "semantic_cache": "cache:semantic:{hash}", # Real-time metrics (for dashboard) "realtime_spend": "metrics:realtime:spend:{project_id}", } ``` --- ## LiteLLM Callback Implementation ### Custom Callback Handler ```python # app/services/cost_tracking/callbacks.py from typing import Any from datetime import datetime, UTC import asyncio from litellm.integrations.custom_logger import CustomLogger from app.core.config import settings from app.services.cost_tracking.tracker import CostTracker from app.services.cost_tracking.budget import BudgetEnforcer from app.services.events import EventBus class SyndarixCostCallback(CustomLogger): """ LiteLLM callback handler for cost tracking and budget enforcement. Integrates with: - Redis for real-time budget counters - PostgreSQL for persistent usage records - Event bus for real-time dashboard updates """ def __init__(self): self.tracker = CostTracker() self.budget_enforcer = BudgetEnforcer() self.event_bus = EventBus() def log_pre_api_call(self, model: str, messages: list, kwargs: dict): """Pre-request validation and cost estimation.""" project_id = kwargs.get("metadata", {}).get("project_id") agent_id = kwargs.get("metadata", {}).get("agent_id") if not project_id: return # Skip tracking for internal calls # Check budget before making request budget_status = self.budget_enforcer.check_budget_sync(project_id) if budget_status.exceeded and budget_status.hard_limit: raise BudgetExceededError( f"Budget exceeded for project {project_id}. " f"Limit: ${budget_status.limit:.2f}, " f"Spent: ${budget_status.spent:.2f}" ) if budget_status.exceeded and budget_status.auto_downgrade: # Suggest model downgrade kwargs["model"] = self._get_fallback_model(model) async def async_log_success_event( self, kwargs: dict, response_obj: Any, start_time: float, end_time: float ): """Record successful request usage and costs.""" metadata = kwargs.get("metadata", {}) project_id = metadata.get("project_id") agent_id = metadata.get("agent_id") agent_type_id = metadata.get("agent_type_id") if not project_id: return # Extract usage data usage = response_obj.usage model = kwargs.get("model") cost = kwargs.get("response_cost", 0) # Calculate costs if not provided if cost == 0: cost = self._calculate_cost(model, usage) # Create usage record usage_record = { "request_id": kwargs.get("litellm_call_id", str(uuid.uuid4())), "project_id": project_id, "agent_instance_id": agent_id, "agent_type_id": agent_type_id, "model": model, "provider": self._get_provider(model), "prompt_tokens": usage.prompt_tokens, "completion_tokens": usage.completion_tokens, "total_tokens": usage.total_tokens, "input_cost_usd": self._calculate_input_cost(model, usage.prompt_tokens), "output_cost_usd": self._calculate_output_cost(model, usage.completion_tokens), "total_cost_usd": cost, "latency_ms": int((end_time - start_time) * 1000), "request_timestamp": datetime.now(UTC), "task_type": metadata.get("task_type"), "success": True, } # Async operations in parallel await asyncio.gather( self.tracker.record_usage(usage_record), self.budget_enforcer.increment_spend(project_id, cost), self._publish_cost_event(project_id, usage_record), ) async def async_log_failure_event( self, kwargs: dict, response_obj: Any, start_time: float, end_time: float ): """Record failed request (still costs input tokens).""" metadata = kwargs.get("metadata", {}) project_id = metadata.get("project_id") if not project_id: return # Failed requests still consume input tokens input_tokens = kwargs.get("input_tokens", 0) model = kwargs.get("model") if input_tokens > 0: input_cost = self._calculate_input_cost(model, input_tokens) usage_record = { "request_id": kwargs.get("litellm_call_id", str(uuid.uuid4())), "project_id": project_id, "model": model, "prompt_tokens": input_tokens, "completion_tokens": 0, "total_tokens": input_tokens, "total_cost_usd": input_cost, "success": False, "error_type": type(response_obj).__name__, "request_timestamp": datetime.now(UTC), } await self.tracker.record_usage(usage_record) async def _publish_cost_event(self, project_id: str, usage: dict): """Publish real-time cost event to dashboard.""" await self.event_bus.publish(f"project:{project_id}", { "type": "cost_update", "data": { "model": usage["model"], "tokens": usage["total_tokens"], "cost_usd": usage["total_cost_usd"], "timestamp": usage["request_timestamp"].isoformat(), } }) def _calculate_cost(self, model: str, usage) -> float: """Calculate total cost from usage.""" input_cost = self._calculate_input_cost(model, usage.prompt_tokens) output_cost = self._calculate_output_cost(model, usage.completion_tokens) return input_cost + output_cost def _calculate_input_cost(self, model: str, tokens: int) -> float: """Calculate input token cost.""" costs = MODEL_COSTS.get(model, {"input": 0, "output": 0}) return (tokens / 1_000_000) * costs["input"] def _calculate_output_cost(self, model: str, tokens: int) -> float: """Calculate output token cost.""" costs = MODEL_COSTS.get(model, {"input": 0, "output": 0}) return (tokens / 1_000_000) * costs["output"] def _get_provider(self, model: str) -> str: """Extract provider from model name.""" if model.startswith("claude"): return "anthropic" elif model.startswith("gpt") or model.startswith("o1"): return "openai" elif model.startswith("ollama/"): return "ollama" return "unknown" def _get_fallback_model(self, model: str) -> str: """Get cheaper fallback model.""" fallbacks = { "claude-3-5-sonnet-20241022": "claude-3-haiku-20240307", "gpt-4-turbo": "gpt-4o-mini", "gpt-4o": "gpt-4o-mini", } return fallbacks.get(model, model) # Model cost database (per 1M tokens) MODEL_COSTS = { # Anthropic "claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00}, "claude-3-opus-20240229": {"input": 15.00, "output": 75.00}, "claude-3-haiku-20240307": {"input": 0.25, "output": 1.25}, # OpenAI "gpt-4-turbo": {"input": 10.00, "output": 30.00}, "gpt-4o": {"input": 2.50, "output": 10.00}, "gpt-4o-mini": {"input": 0.15, "output": 0.60}, "o1-preview": {"input": 15.00, "output": 60.00}, "o1-mini": {"input": 3.00, "output": 12.00}, # Local (compute-only, no API cost) "ollama/llama3": {"input": 0.00, "output": 0.00}, "ollama/mixtral": {"input": 0.00, "output": 0.00}, } ``` ### Registering the Callback ```python # app/services/llm_gateway.py import litellm from app.services.cost_tracking.callbacks import SyndarixCostCallback # Register the callback globally syndarix_callback = SyndarixCostCallback() litellm.callbacks = [syndarix_callback] # Or register specific callbacks litellm.success_callback = [syndarix_callback.async_log_success_event] litellm.failure_callback = [syndarix_callback.async_log_failure_event] ``` --- ## Budget Management System ### Budget Enforcer ```python # app/services/cost_tracking/budget.py from dataclasses import dataclass from datetime import datetime, UTC, timedelta from typing import Optional import redis.asyncio as redis from sqlalchemy.ext.asyncio import AsyncSession from app.models.cost_tracking import Budget, BudgetPeriod, BudgetAlert, AlertSeverity @dataclass class BudgetStatus: budget_id: str limit: float spent: float remaining: float percent_used: float exceeded: bool hard_limit: bool auto_downgrade: bool threshold_breached: Optional[str] = None # warn, alert, critical class BudgetEnforcer: """Real-time budget enforcement using Redis.""" def __init__(self, redis_url: str = None): self.redis = redis.from_url(redis_url or settings.REDIS_URL) async def check_budget(self, project_id: str) -> BudgetStatus: """Check current budget status for a project.""" # Get active budgets for project budgets = await self._get_active_budgets(project_id) if not budgets: return BudgetStatus( budget_id=None, limit=float("inf"), spent=0, remaining=float("inf"), percent_used=0, exceeded=False, hard_limit=False, auto_downgrade=False, ) # Check most restrictive budget for budget in budgets: spend_key = f"budget:{budget.id}:spend" spent = float(await self.redis.get(spend_key) or 0) percent_used = (spent / budget.limit_usd) * 100 exceeded = spent >= budget.limit_usd # Determine threshold breach threshold_breached = None if percent_used >= budget.critical_threshold * 100: threshold_breached = "critical" elif percent_used >= budget.alert_threshold * 100: threshold_breached = "alert" elif percent_used >= budget.warn_threshold * 100: threshold_breached = "warn" return BudgetStatus( budget_id=str(budget.id), limit=budget.limit_usd, spent=spent, remaining=max(0, budget.limit_usd - spent), percent_used=percent_used, exceeded=exceeded, hard_limit=budget.hard_limit, auto_downgrade=budget.auto_downgrade, threshold_breached=threshold_breached, ) async def increment_spend(self, project_id: str, amount: float) -> BudgetStatus: """Increment budget spend and check thresholds.""" budgets = await self._get_active_budgets(project_id) for budget in budgets: spend_key = f"budget:{budget.id}:spend" # Atomic increment new_spend = await self.redis.incrbyfloat(spend_key, amount) # Check and trigger alerts await self._check_thresholds(budget, new_spend) return await self.check_budget(project_id) async def _check_thresholds(self, budget: Budget, current_spend: float): """Check thresholds and create alerts if needed.""" percent = (current_spend / budget.limit_usd) * 100 thresholds = [ (budget.warn_threshold * 100, AlertSeverity.INFO), (budget.alert_threshold * 100, AlertSeverity.WARNING), (budget.critical_threshold * 100, AlertSeverity.CRITICAL), ] for threshold, severity in thresholds: if percent >= threshold: # Check if alert already exists for this threshold alert_key = f"budget:{budget.id}:alert:{severity.value}" if not await self.redis.exists(alert_key): await self._create_alert(budget, severity, percent, current_spend) # Mark alert as sent await self.redis.setex(alert_key, 86400, "1") # 24h TTL async def _create_alert( self, budget: Budget, severity: AlertSeverity, percent: float, current_spend: float ): """Create budget alert and trigger notifications.""" alert = BudgetAlert( budget_id=budget.id, project_id=budget.project_id, severity=severity, threshold_percent=percent, current_spend=current_spend, budget_limit=budget.limit_usd, message=f"Budget '{budget.name}' at {percent:.1f}% (${current_spend:.2f}/${budget.limit_usd:.2f})" ) # Persist alert async with get_async_session() as session: session.add(alert) await session.commit() # Publish alert event await EventBus().publish(f"project:{budget.project_id}", { "type": "budget_alert", "data": { "severity": severity.value, "message": alert.message, "percent_used": percent, "budget_name": budget.name, } }) async def reset_budget_period(self, budget_id: str): """Reset budget counter for new period.""" spend_key = f"budget:{budget_id}:spend" await self.redis.delete(spend_key) # Clear alert markers for severity in AlertSeverity: alert_key = f"budget:{budget_id}:alert:{severity.value}" await self.redis.delete(alert_key) def check_budget_sync(self, project_id: str) -> BudgetStatus: """Synchronous version for pre-request checks.""" import asyncio loop = asyncio.get_event_loop() return loop.run_until_complete(self.check_budget(project_id)) ``` ### Budget Period Management (Celery Task) ```python # app/tasks/budget_tasks.py from celery import shared_task from datetime import datetime, UTC, timedelta from app.services.cost_tracking.budget import BudgetEnforcer @shared_task def check_budget_periods(): """ Scheduled task to manage budget periods. Run every hour via Celery Beat. """ now = datetime.now(UTC) enforcer = BudgetEnforcer() # Find budgets that need period reset with get_session() as session: expired_budgets = session.query(Budget).filter( Budget.is_active == True, Budget.period_end <= now ).all() for budget in expired_budgets: # Archive current period spend archive_budget_period(budget) # Calculate new period if budget.period == BudgetPeriod.DAILY: period_start = now.replace(hour=0, minute=0, second=0) period_end = period_start + timedelta(days=1) elif budget.period == BudgetPeriod.WEEKLY: period_start = now - timedelta(days=now.weekday()) period_end = period_start + timedelta(weeks=1) elif budget.period == BudgetPeriod.MONTHLY: period_start = now.replace(day=1) next_month = period_start + timedelta(days=32) period_end = next_month.replace(day=1) # Update budget budget.period_start = period_start budget.period_end = period_end budget.current_spend = 0 session.commit() # Reset Redis counter asyncio.run(enforcer.reset_budget_period(str(budget.id))) @shared_task def daily_cost_rollup(): """ Aggregate daily costs into summary table. Run at 1 AM daily via Celery Beat. """ yesterday = datetime.now(UTC).date() - timedelta(days=1) with get_session() as session: # Aggregate by project, agent_type, model results = session.execute(""" INSERT INTO daily_cost_summaries (date, project_id, agent_type_id, model, request_count, total_prompt_tokens, total_completion_tokens, total_tokens, total_cost_usd, avg_latency_ms, cache_hit_rate, success_rate) SELECT DATE(request_timestamp) as date, project_id, agent_type_id, model, COUNT(*) as request_count, SUM(prompt_tokens) as total_prompt_tokens, SUM(completion_tokens) as total_completion_tokens, SUM(total_tokens) as total_tokens, SUM(total_cost_usd) as total_cost_usd, AVG(latency_ms) as avg_latency_ms, SUM(cached_tokens)::float / NULLIF(SUM(total_tokens), 0) as cache_hit_rate, AVG(CASE WHEN success THEN 1 ELSE 0 END) as success_rate FROM token_usage WHERE DATE(request_timestamp) = :yesterday GROUP BY DATE(request_timestamp), project_id, agent_type_id, model ON CONFLICT DO NOTHING """, {"yesterday": yesterday}) session.commit() ``` --- ## Alert System Integration ### Alert Configuration ```python # app/services/cost_tracking/alerts.py from dataclasses import dataclass from enum import Enum from typing import List, Optional class AlertChannel(str, Enum): EMAIL = "email" SLACK = "slack" WEBHOOK = "webhook" SSE = "sse" # Real-time dashboard @dataclass class AlertRule: budget_id: str channels: List[AlertChannel] recipients: List[str] # Email addresses, Slack channels, etc. escalation_delay: int = 0 # Minutes before escalating class AlertDispatcher: """Dispatch budget alerts to configured channels.""" async def dispatch(self, alert: BudgetAlert, rules: List[AlertRule]): """Send alert through all configured channels.""" for rule in rules: if rule.budget_id == str(alert.budget_id): for channel in rule.channels: await self._send_alert(channel, alert, rule.recipients) async def _send_alert( self, channel: AlertChannel, alert: BudgetAlert, recipients: List[str] ): """Send alert to specific channel.""" notification = { "channel": channel.value, "sent_at": datetime.now(UTC).isoformat(), "recipients": recipients, } if channel == AlertChannel.EMAIL: await self._send_email_alert(alert, recipients) elif channel == AlertChannel.SLACK: await self._send_slack_alert(alert, recipients) elif channel == AlertChannel.WEBHOOK: await self._send_webhook_alert(alert, recipients) elif channel == AlertChannel.SSE: # Already handled via EventBus pass # Track notification alert.notifications_sent.append(notification) async def _send_slack_alert(self, alert: BudgetAlert, channels: List[str]): """Send alert to Slack.""" color = { AlertSeverity.INFO: "#36a64f", # Green AlertSeverity.WARNING: "#ff9800", # Orange AlertSeverity.CRITICAL: "#f44336", # Red }[alert.severity] payload = { "attachments": [{ "color": color, "title": f"Budget Alert: {alert.message}", "fields": [ {"title": "Budget", "value": f"${alert.budget_limit:.2f}", "short": True}, {"title": "Current Spend", "value": f"${alert.current_spend:.2f}", "short": True}, {"title": "Usage", "value": f"{alert.threshold_percent:.1f}%", "short": True}, {"title": "Severity", "value": alert.severity.value.upper(), "short": True}, ], "ts": datetime.now(UTC).timestamp(), }] } for channel in channels: await send_slack_message(channel, payload) ``` --- ## Cost Optimization Strategies ### 1. Semantic Caching ```python # app/services/cost_tracking/cache.py import hashlib import numpy as np from typing import Optional, Tuple import redis.asyncio as redis class SemanticCache: """ Cache LLM responses based on semantic similarity. Uses embedding vectors to find similar queries. """ def __init__(self, redis_url: str, similarity_threshold: float = 0.95): self.redis = redis.from_url(redis_url) self.threshold = similarity_threshold self.embedding_model = "text-embedding-3-small" # OpenAI async def get( self, messages: list, model: str, project_id: str ) -> Optional[Tuple[str, dict]]: """ Check cache for semantically similar query. Returns: Tuple of (cached_response, usage_info) or None """ query_embedding = await self._get_embedding(messages) cache_key_pattern = f"cache:semantic:{project_id}:{model}:*" # Search for similar embeddings async for key in self.redis.scan_iter(match=cache_key_pattern): cached = await self.redis.hgetall(key) if cached: cached_embedding = np.frombuffer(cached[b"embedding"], dtype=np.float32) similarity = self._cosine_similarity(query_embedding, cached_embedding) if similarity >= self.threshold: return ( cached[b"response"].decode(), { "cache_hit": True, "similarity": similarity, "tokens_saved": int(cached[b"tokens"]), "cost_saved": float(cached[b"cost"]), } ) return None async def set( self, messages: list, model: str, project_id: str, response: str, usage: dict, ttl: int = 3600 ): """Cache response with embedding for future similarity matching.""" query_embedding = await self._get_embedding(messages) # Create unique key content_hash = hashlib.sha256(str(messages).encode()).hexdigest()[:16] cache_key = f"cache:semantic:{project_id}:{model}:{content_hash}" await self.redis.hset(cache_key, mapping={ "embedding": query_embedding.tobytes(), "response": response, "tokens": usage["total_tokens"], "cost": usage["total_cost_usd"], "model": model, "created_at": datetime.now(UTC).isoformat(), }) await self.redis.expire(cache_key, ttl) async def _get_embedding(self, messages: list) -> np.ndarray: """Generate embedding for messages.""" # Extract text content text = " ".join(m.get("content", "") for m in messages) # Call embedding API response = await litellm.aembedding( model=self.embedding_model, input=text ) return np.array(response.data[0].embedding, dtype=np.float32) @staticmethod def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: """Calculate cosine similarity between vectors.""" return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) ``` ### 2. Model Cascading ```python # app/services/cost_tracking/cascade.py from dataclasses import dataclass from typing import List, Optional @dataclass class CascadeModel: model: str cost_per_1k_tokens: float max_complexity: float # 0-1 score timeout: int = 30 class ModelCascade: """ Route queries to cheapest capable model. Escalate to more expensive models only when needed. """ def __init__(self): self.models = [ CascadeModel("claude-3-haiku-20240307", 0.00075, 0.3, timeout=10), CascadeModel("gpt-4o-mini", 0.000375, 0.4, timeout=15), CascadeModel("claude-3-5-sonnet-20241022", 0.009, 0.8, timeout=30), CascadeModel("gpt-4-turbo", 0.02, 1.0, timeout=60), ] self.complexity_classifier = None # TODO: Train classifier async def route( self, messages: list, required_quality: float = 0.7 ) -> str: """ Determine the best model for the query. Args: messages: The query messages required_quality: Minimum quality threshold (0-1) Returns: Model identifier to use """ complexity = await self._estimate_complexity(messages) # Find cheapest model that meets requirements for model in self.models: if model.max_complexity >= complexity and model.max_complexity >= required_quality: return model.model # Fallback to most capable return self.models[-1].model async def _estimate_complexity(self, messages: list) -> float: """ Estimate query complexity (0-1). Factors: - Message length - Technical terms - Multi-step reasoning required - Code generation requested """ text = " ".join(m.get("content", "") for m in messages) # Simple heuristics (replace with ML model) complexity = 0.0 # Length factor word_count = len(text.split()) if word_count > 500: complexity += 0.2 elif word_count > 200: complexity += 0.1 # Technical indicators technical_terms = ["implement", "architect", "optimize", "debug", "refactor"] if any(term in text.lower() for term in technical_terms): complexity += 0.3 # Code indicators if "```" in text or "code" in text.lower(): complexity += 0.2 # Multi-step indicators if "step" in text.lower() or "first" in text.lower() or "then" in text.lower(): complexity += 0.2 return min(1.0, complexity) async def execute_with_fallback( self, messages: list, required_quality: float = 0.7, confidence_threshold: float = 0.8 ) -> dict: """ Execute query with automatic fallback to stronger models. """ for model in self.models: if model.max_complexity < required_quality: continue try: response = await litellm.acompletion( model=model.model, messages=messages, timeout=model.timeout, ) # Check response quality (simplified) confidence = await self._evaluate_response(response, messages) if confidence >= confidence_threshold: return { "response": response, "model_used": model.model, "fallback_count": self.models.index(model), } except Exception as e: # Log and try next model continue # Final fallback return await self._execute_premium(messages) ``` ### 3. Prompt Compression (LLMLingua Integration) ```python # app/services/cost_tracking/compression.py from llmlingua import PromptCompressor class PromptOptimizer: """ Compress prompts to reduce token count while preserving meaning. Uses LLMLingua for intelligent compression. """ def __init__(self): self.compressor = PromptCompressor( model_name="microsoft/llmlingua-2-bert-base-multilingual-cased-meetingbank", use_llmlingua2=True ) def compress( self, messages: list, target_ratio: float = 0.5, # Compress to 50% of original preserve_instructions: bool = True ) -> list: """ Compress messages while preserving critical information. Args: messages: Original messages target_ratio: Target compression ratio preserve_instructions: Whether to preserve system instructions Returns: Compressed messages """ compressed_messages = [] for msg in messages: role = msg["role"] content = msg["content"] # Don't compress system instructions if requested if role == "system" and preserve_instructions: compressed_messages.append(msg) continue # Apply compression compressed = self.compressor.compress_prompt( content, rate=target_ratio, force_tokens=self._extract_critical_tokens(content), ) compressed_messages.append({ "role": role, "content": compressed["compressed_prompt"] }) return compressed_messages def _extract_critical_tokens(self, text: str) -> list: """Extract tokens that should never be removed.""" # Preserve code blocks, variable names, etc. import re critical = [] # Preserve code blocks code_blocks = re.findall(r'```[\s\S]*?```', text) for block in code_blocks: critical.extend(block.split()) # Preserve quoted strings quotes = re.findall(r'"[^"]*"', text) critical.extend(quotes) return critical def estimate_savings(self, original: list, compressed: list) -> dict: """Calculate token savings from compression.""" original_tokens = sum( len(m["content"].split()) * 1.3 # Rough token estimate for m in original ) compressed_tokens = sum( len(m["content"].split()) * 1.3 for m in compressed ) return { "original_tokens": int(original_tokens), "compressed_tokens": int(compressed_tokens), "tokens_saved": int(original_tokens - compressed_tokens), "compression_ratio": compressed_tokens / original_tokens, } ``` --- ## Cost Estimation Before Execution ```python # app/services/cost_tracking/estimator.py from dataclasses import dataclass from typing import Optional import tiktoken @dataclass class CostEstimate: input_tokens: int estimated_output_tokens: int min_cost_usd: float max_cost_usd: float expected_cost_usd: float model: str confidence: str # low, medium, high class CostEstimator: """ Estimate LLM request cost before execution. """ def __init__(self): self.encoders = {} def estimate( self, messages: list, model: str, max_tokens: Optional[int] = None ) -> CostEstimate: """ Estimate cost for a completion request. Args: messages: Input messages model: Target model max_tokens: Maximum output tokens (if known) """ # Count input tokens input_tokens = self._count_tokens(messages, model) # Estimate output tokens if max_tokens: estimated_output = max_tokens confidence = "high" else: estimated_output = self._estimate_output_tokens(messages, model) confidence = "medium" # Get model costs costs = MODEL_COSTS.get(model, {"input": 0, "output": 0}) # Calculate costs input_cost = (input_tokens / 1_000_000) * costs["input"] min_output = estimated_output * 0.3 # Conservative max_output = estimated_output * 1.5 # Liberal expected_output = estimated_output min_output_cost = (min_output / 1_000_000) * costs["output"] max_output_cost = (max_output / 1_000_000) * costs["output"] expected_output_cost = (expected_output / 1_000_000) * costs["output"] return CostEstimate( input_tokens=input_tokens, estimated_output_tokens=estimated_output, min_cost_usd=input_cost + min_output_cost, max_cost_usd=input_cost + max_output_cost, expected_cost_usd=input_cost + expected_output_cost, model=model, confidence=confidence, ) def _count_tokens(self, messages: list, model: str) -> int: """Count tokens for input messages.""" # Get or create encoder if model not in self.encoders: try: if "gpt" in model or "o1" in model: self.encoders[model] = tiktoken.encoding_for_model(model) else: # Fallback to cl100k_base for non-OpenAI self.encoders[model] = tiktoken.get_encoding("cl100k_base") except Exception: self.encoders[model] = tiktoken.get_encoding("cl100k_base") encoder = self.encoders[model] total = 0 for msg in messages: total += len(encoder.encode(msg.get("content", ""))) total += 4 # Message overhead tokens return total + 2 # Priming tokens def _estimate_output_tokens(self, messages: list, model: str) -> int: """ Estimate expected output tokens based on input. Uses heuristics and historical data. """ input_tokens = self._count_tokens(messages, model) last_message = messages[-1].get("content", "") if messages else "" # Heuristics based on request type if "explain" in last_message.lower() or "describe" in last_message.lower(): # Explanatory responses tend to be longer return min(2000, input_tokens * 1.5) if "code" in last_message.lower() or "implement" in last_message.lower(): # Code generation can be substantial return min(4000, input_tokens * 2) if "yes or no" in last_message.lower() or "brief" in last_message.lower(): # Short responses return 100 if "list" in last_message.lower(): # Lists are medium length return min(1000, input_tokens * 0.8) # Default: output roughly equal to input return min(2000, input_tokens) def should_warn( self, estimate: CostEstimate, budget_remaining: float, threshold: float = 0.1 # Warn if request is >10% of remaining budget ) -> bool: """Check if user should be warned about expensive request.""" if budget_remaining <= 0: return True return estimate.max_cost_usd > (budget_remaining * threshold) ``` ### Pre-Execution Check Integration ```python # app/services/llm_gateway.py class LLMGateway: async def complete( self, agent_id: str, project_id: str, messages: list, model_preference: str = "high-reasoning", require_estimate: bool = False, **kwargs ) -> dict: """Generate completion with optional cost estimation.""" model = self._resolve_model(model_preference) if require_estimate: estimator = CostEstimator() estimate = estimator.estimate(messages, model, kwargs.get("max_tokens")) # Check against budget budget_status = await self.budget_enforcer.check_budget(project_id) if estimator.should_warn(estimate, budget_status.remaining): return { "type": "cost_warning", "estimate": estimate, "budget_remaining": budget_status.remaining, "message": f"This request may cost ${estimate.expected_cost_usd:.4f}. " f"Remaining budget: ${budget_status.remaining:.2f}. " f"Proceed?", "alternatives": self._suggest_alternatives(estimate, model), } # Proceed with request return await self._execute_completion(...) def _suggest_alternatives(self, estimate: CostEstimate, model: str) -> list: """Suggest cost-saving alternatives.""" alternatives = [] # Cheaper model if model == "claude-3-5-sonnet-20241022": alternatives.append({ "action": "use_haiku", "model": "claude-3-haiku-20240307", "estimated_savings": estimate.expected_cost_usd * 0.9, "quality_impact": "May reduce quality for complex tasks", }) # Compression alternatives.append({ "action": "compress_prompt", "estimated_savings": estimate.expected_cost_usd * 0.3, "quality_impact": "Minimal for most use cases", }) return alternatives ``` --- ## Reporting Dashboard Requirements ### Dashboard Components | Component | Data Source | Update Frequency | |-----------|-------------|------------------| | Real-time Spend Ticker | Redis + SSE | Real-time | | Budget Utilization Gauges | Redis | 1 second | | Daily Cost Chart | PostgreSQL (daily_cost_summaries) | 1 minute | | Model Breakdown Pie Chart | PostgreSQL | 5 minutes | | Agent Efficiency Table | PostgreSQL | 5 minutes | | Cost Forecast | PostgreSQL + ML | 1 hour | | Alert Feed | SSE + PostgreSQL | Real-time | ### API Endpoints ```python # app/api/v1/costs.py from fastapi import APIRouter, Query from datetime import datetime, timedelta router = APIRouter(prefix="/costs", tags=["costs"]) @router.get("/summary/{project_id}") async def get_cost_summary( project_id: str, period: str = Query("day", regex="^(day|week|month)$") ) -> CostSummaryResponse: """Get cost summary for a project.""" pass @router.get("/breakdown/{project_id}") async def get_cost_breakdown( project_id: str, group_by: str = Query("model", regex="^(model|agent_type|day)$"), start_date: datetime = None, end_date: datetime = None ) -> CostBreakdownResponse: """Get detailed cost breakdown.""" pass @router.get("/trends/{project_id}") async def get_cost_trends( project_id: str, days: int = Query(30, ge=1, le=365) ) -> CostTrendsResponse: """Get historical cost trends.""" pass @router.get("/forecast/{project_id}") async def get_cost_forecast( project_id: str, days_ahead: int = Query(7, ge=1, le=30) ) -> CostForecastResponse: """Get projected costs based on current trends.""" pass @router.get("/efficiency/{project_id}") async def get_efficiency_metrics( project_id: str ) -> EfficiencyMetricsResponse: """Get cost efficiency metrics (cost per task, cache hit rate, etc.).""" pass @router.get("/budgets/{project_id}") async def get_budgets(project_id: str) -> List[BudgetResponse]: """Get all budgets for a project.""" pass @router.post("/budgets") async def create_budget(budget: BudgetCreate) -> BudgetResponse: """Create a new budget.""" pass @router.put("/budgets/{budget_id}") async def update_budget(budget_id: str, budget: BudgetUpdate) -> BudgetResponse: """Update budget configuration.""" pass @router.get("/alerts/{project_id}") async def get_alerts( project_id: str, status: str = Query(None, regex="^(pending|acknowledged|resolved)$") ) -> List[AlertResponse]: """Get budget alerts for a project.""" pass @router.post("/alerts/{alert_id}/acknowledge") async def acknowledge_alert(alert_id: str) -> AlertResponse: """Acknowledge a budget alert.""" pass ``` ### Dashboard Wireframe ``` +------------------------------------------------------------------+ | Syndarix Cost Dashboard | +------------------------------------------------------------------+ | | | +------------------+ +------------------+ +-------------------+ | | | DAILY SPEND | | WEEKLY BUDGET | | MONTHLY BUDGET | | | | $12.47 | | ████████░░ 76% | | ████░░░░░░ 42% | | | | ▲ 15% vs avg | | $152/$200 | | $421/$1000 | | | +------------------+ +------------------+ +-------------------+ | | | | +-------------------------------+ +----------------------------+ | | | COST BY MODEL (7 days) | | DAILY COST TREND | | | | ┌───────────────────┐ | | $ | | | | │ Claude Sonnet 64% │ | | 20├────────────────────── | | | | │ GPT-4o-mini 22% │ | | 15├─────────────█──────────| | | | │ Claude Haiku 9% │ | | 10├────────█───█───────────| | | | │ Other 5% │ | | 5├──█───█───█───█─────────| | | | └───────────────────┘ | | 0└───────────────────────>| | | +-------------------------------+ | Mon Tue Wed Thu Fri | | | +----------------------------+ | | | | +---------------------------------------------------------------+ | | | AGENT EFFICIENCY | | | +----------+----------+----------+-----------+------------------+ | | | Agent | Requests | Tokens | Cost | Cost/Task | | | +----------+----------+----------+-----------+------------------+ | | | Arch | 145 | 2.3M | $18.50 | $0.13 | | | | Engineer | 892 | 12.1M | $96.80 | $0.11 | | | | QA | 234 | 1.2M | $4.20 | $0.02 | | | +----------+----------+----------+-----------+------------------+ | | | | +---------------------------------------------------------------+ | | | RECENT ALERTS [Clear] | | | +---------------------------------------------------------------+ | | | ! WARNING Daily budget at 80% ($16/$20) 2 mins ago | | | | i INFO Weekly budget at 50% 1 hour ago | | | +---------------------------------------------------------------+ | +------------------------------------------------------------------+ ``` --- ## Implementation Roadmap ### Phase 1: Core Infrastructure (Week 1-2) 1. **Database Schema** - [ ] Create migration for `token_usage`, `budgets`, `budget_alerts` - [ ] Create migration for `daily_cost_summaries`, `model_pricing` - [ ] Add indexes for performance 2. **Redis Setup** - [ ] Configure Redis keys for budget counters - [ ] Implement budget increment/decrement operations - [ ] Set up TTL for temporary data 3. **LiteLLM Callback** - [ ] Implement `SyndarixCostCallback` class - [ ] Integrate with existing LLM Gateway - [ ] Add metadata propagation for attribution ### Phase 2: Budget Management (Week 2-3) 4. **Budget Enforcer** - [ ] Implement `BudgetEnforcer` class - [ ] Add pre-request budget checks - [ ] Implement soft/hard limit logic 5. **Alert System** - [ ] Implement threshold detection - [ ] Create alert dispatcher - [ ] Integrate with SSE event bus 6. **Celery Tasks** - [ ] Budget period management task - [ ] Daily cost rollup task - [ ] Alert digest task ### Phase 3: Cost Optimization (Week 3-4) 7. **Semantic Cache** - [ ] Implement `SemanticCache` with Redis - [ ] Integrate embedding generation - [ ] Add cache hit/miss tracking 8. **Model Cascading** - [ ] Implement `ModelCascade` router - [ ] Add complexity estimation - [ ] Integrate with LLM Gateway 9. **Cost Estimation** - [ ] Implement `CostEstimator` - [ ] Add pre-execution warnings - [ ] Create alternative suggestions ### Phase 4: Reporting (Week 4-5) 10. **API Endpoints** - [ ] Implement cost summary endpoints - [ ] Add breakdown and trends endpoints - [ ] Create forecast endpoint 11. **Dashboard** - [ ] Design and implement cost dashboard UI - [ ] Add real-time updates via SSE - [ ] Create alert management interface ### Phase 5: Testing & Documentation (Week 5-6) 12. **Testing** - [ ] Unit tests for cost tracking - [ ] Integration tests for budget enforcement - [ ] Load tests for high-volume scenarios 13. **Documentation** - [ ] API documentation - [ ] User guide for budget management - [ ] Operations runbook --- ## References ### Research Sources - [LiteLLM Custom Callbacks Documentation](https://docs.litellm.ai/docs/observability/custom_callback) - [LiteLLM Cost Calculation](https://deepwiki.com/BerriAI/litellm/2.5-cost-calculation-and-model-pricing) - [LiteLLM Spend Tracking](https://docs.litellm.ai/docs/proxy/cost_tracking) - [LLM Cost Optimization Guide 2025](https://ai.koombea.com/blog/llm-cost-optimization) - [Dynamic Load Balancing for Multi-Tenant LLMs](https://latitude-blog.ghost.io/blog/dynamic-load-balancing-for-multi-tenant-llms/) - [Multi-Tenant AI Architecture - Azure](https://learn.microsoft.com/en-us/azure/architecture/guide/multitenant/approaches/ai-ml) - [LLMLingua Prompt Compression](https://www.llmlingua.com/) - [Model Cascading for Cost Reduction](https://arxiv.org/abs/2410.10347) - [BlockLLM Multi-tenant LLM Serving](https://arxiv.org/html/2404.18322v1) ### Related Spikes - [SPIKE-005: LLM Provider Abstraction](./SPIKE-005-llm-provider-abstraction.md) - LiteLLM integration baseline - [SPIKE-003: Real-time Updates](./SPIKE-003-realtime-updates.md) - SSE event architecture - [SPIKE-004: Celery + Redis Integration](./SPIKE-004-celery-redis-integration.md) - Background task infrastructure --- ## Decision **Implement a comprehensive cost tracking system** with: 1. **LiteLLM callbacks** for real-time usage capture 2. **Redis + PostgreSQL** for hybrid hot/warm data storage 3. **Hierarchical budgets** with soft/hard limit enforcement 4. **Multi-channel alerts** via SSE, email, and Slack 5. **Cost optimization** through caching, cascading, and compression **Expected Outcomes:** - Full cost visibility per project, agent, and model - Real-time budget enforcement preventing cost overruns - 60-80% cost reduction through optimization strategies - Actionable insights for cost-aware decision making --- *Spike completed. Findings will inform ADR-010: Cost Tracking Architecture.*