Files
syndarix/docs/spikes/SPIKE-010-cost-tracking.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

65 KiB

SPIKE-010: Cost Tracking for Syndarix

Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #10


Executive Summary

Syndarix requires comprehensive LLM cost tracking to manage expenses across multiple providers (Anthropic, OpenAI, local Ollama). This spike researches token usage monitoring, budget enforcement, cost optimization strategies, and real-time alerting.

Recommendation

Adopt a multi-layered cost tracking architecture:

  1. LiteLLM Callbacks for real-time usage capture at the gateway level
  2. PostgreSQL for persistent usage records with time-series aggregation
  3. Redis for real-time budget enforcement and rate limiting
  4. Celery Beat for scheduled budget checks and alert processing
  5. SSE Events for real-time dashboard updates

Expected Cost Savings: 60-80% reduction through combined optimization strategies (semantic caching, model cascading, prompt compression).


Table of Contents

  1. Research Questions & Findings
  2. Cost Tracking Architecture
  3. Database Schema Design
  4. LiteLLM Callback Implementation
  5. Budget Management System
  6. Alert System Integration
  7. Cost Optimization Strategies
  8. Cost Estimation Before Execution
  9. Reporting Dashboard Requirements
  10. Implementation Roadmap

Research Questions & Findings

1. How does LiteLLM track token usage and costs?

LiteLLM provides built-in cost tracking through multiple mechanisms:

Response Usage Object:

response = await litellm.acompletion(
    model="claude-3-5-sonnet-20241022",
    messages=[...]
)
# Access usage data
print(response.usage.prompt_tokens)      # Input tokens
print(response.usage.completion_tokens)  # Output tokens
print(response.usage.total_tokens)       # Total tokens

Automatic Cost Calculation: LiteLLM maintains a centralized pricing database (model_prices_and_context_window.json) with costs for 100+ models. Cost is accessible via kwargs["response_cost"] in callbacks.

Custom Callbacks:

from litellm.integrations.custom_logger import CustomLogger

class SyndarixCostLogger(CustomLogger):
    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        cost = kwargs.get("response_cost", 0)
        model = kwargs.get("model")
        usage = response_obj.usage
        # Store in database

Custom Pricing Override:

response = await litellm.acompletion(
    model="custom-model",
    messages=[...],
    input_cost_per_token=0.000003,   # $3 per 1M tokens
    output_cost_per_token=0.000015   # $15 per 1M tokens
)

2. Best practices for LLM cost attribution in multi-tenant systems?

Key Patterns Identified:

  1. Hierarchical Attribution: Track costs at multiple levels (organization > project > agent type > agent instance > request)
  2. Metadata Tagging: Include cost center, budget ID, and user context in every request
  3. Tenant-Aware Budgets: Implement per-customer tier budgets with different limits
  4. Chargeback/Showback: Enable detailed cost allocation for billing integration

Multi-Tenant Architecture for Syndarix:

Organization (Billing Entity)
  └── Project (Cost Center)
        └── Sprint (Time-bounded Budget)
              └── Agent Instance (Worker)
                    └── LLM Request (Atomic Cost Unit)

3. Real-time cost monitoring approaches?

Recommended Approach: Hybrid Redis + PostgreSQL

Layer Technology Purpose
Hot data Redis Real-time budget counters, rate limiting
Warm data PostgreSQL Hourly/daily aggregates, analytics
Cold data S3/Archive Monthly exports, long-term retention

Real-time Pipeline:

LLM Request → LiteLLM Callback → Redis INCR → Budget Check
                    ↓
              Async Queue → PostgreSQL Insert → SSE Event

4. Budget enforcement strategies (soft vs hard limits)?

Soft Limits (Recommended for most cases):

  • Send alerts at threshold percentages (50%, 80%, 100%)
  • Allow continued usage with warnings
  • Automatic model downgrade (e.g., Sonnet → Haiku)
  • Require approval for continued expensive operations

Hard Limits (For critical cost control):

  • Block requests once budget exhausted
  • Reject LLM calls at gateway level
  • Require manual budget increase

Syndarix Recommendation:

  • Daily budgets: Hard limits (prevent runaway costs)
  • Weekly/Monthly budgets: Soft limits with alerts and escalation
  • Per-request limits: Enforce max tokens per call

5. Cost optimization techniques?

Tier 1: Immediate Impact (0 code changes)

  • Semantic caching: 15-30% cost reduction
  • Response caching for deterministic queries

Tier 2: Model Selection (Architecture changes)

  • Model cascading: Start with cheaper models, escalate as needed
  • 87% cost reduction possible (90% of queries handled by smaller models)
  • Dynamic routing based on query complexity

Tier 3: Prompt Engineering (Ongoing optimization)

  • Prompt compression with LLMLingua: Up to 20x compression with <5% quality loss
  • Extractive compression for RAG: 2-10x compression, often improves accuracy

Tier 4: Infrastructure (Long-term)

  • Self-hosted models for high-volume, latency-tolerant tasks
  • Fine-tuned smaller models for domain-specific tasks

6. Reporting and visualization requirements?

Dashboard Requirements:

  1. Real-time spend ticker (updated via SSE)
  2. Cost breakdown by: project, agent type, model, time period
  3. Budget utilization gauges with threshold indicators
  4. Historical trend charts (daily/weekly/monthly)
  5. Anomaly detection alerts (spending spikes)
  6. Forecast projections based on current burn rate
  7. Cost per task/feature analysis
  8. Comparative efficiency metrics (cost per successful completion)

7. Cost estimation before execution?

Pre-execution Estimation Pattern:

def estimate_cost(messages: list, model: str) -> CostEstimate:
    """Estimate cost before making LLM call."""
    input_tokens = count_tokens(messages, model)
    estimated_output = estimate_output_tokens(messages, model)

    costs = MODEL_COSTS[model]
    input_cost = (input_tokens / 1_000_000) * costs["input"]
    output_cost = (estimated_output / 1_000_000) * costs["output"]

    return CostEstimate(
        input_tokens=input_tokens,
        estimated_output_tokens=estimated_output,
        estimated_cost_usd=input_cost + output_cost,
        confidence="medium"  # Based on output estimation accuracy
    )

Token Counting:

  • Use tiktoken for OpenAI models
  • Use anthropic.count_tokens() for Claude models
  • Use LiteLLM's token_counter() for unified counting

Cost Tracking Architecture

System Overview

┌─────────────────────────────────────────────────────────────────────────┐
│                           Syndarix Backend                               │
│  ┌────────────────────────────────────────────────────────────────────┐ │
│  │                        LLM Gateway (LiteLLM)                        │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                 │ │
│  │  │ Pre-Request │  │   Router    │  │ Post-Request│                 │ │
│  │  │  Callback   │──▶│  (Failover) │──▶│  Callback  │                 │ │
│  │  └──────┬──────┘  └─────────────┘  └──────┬──────┘                 │ │
│  │         │                                  │                        │ │
│  │         ▼                                  ▼                        │ │
│  │  ┌─────────────┐              ┌──────────────────────┐             │ │
│  │  │   Budget    │              │   Cost Tracker       │             │ │
│  │  │   Check     │              │   Service            │             │ │
│  │  └──────┬──────┘              └──────────┬───────────┘             │ │
│  └─────────┼────────────────────────────────┼─────────────────────────┘ │
│            │                                │                           │
│            ▼                                ▼                           │
│  ┌─────────────────────┐        ┌──────────────────────┐               │
│  │       Redis         │        │      PostgreSQL       │               │
│  │  ┌───────────────┐  │        │  ┌────────────────┐  │               │
│  │  │ Budget        │  │        │  │ token_usage    │  │               │
│  │  │ Counters      │  │        │  │ (time-series)  │  │               │
│  │  ├───────────────┤  │        │  ├────────────────┤  │               │
│  │  │ Rate Limits   │  │        │  │ budgets        │  │               │
│  │  ├───────────────┤  │        │  ├────────────────┤  │               │
│  │  │ Cache Keys    │  │        │  │ alerts_log     │  │               │
│  │  └───────────────┘  │        │  ├────────────────┤  │               │
│  └─────────────────────┘        │  │ daily_summary  │  │               │
│                                 │  └────────────────┘  │               │
│                                 └──────────────────────┘               │
│                                           │                             │
│            ┌──────────────────────────────┴─────────────┐              │
│            ▼                                            ▼              │
│  ┌─────────────────────┐                   ┌────────────────────────┐  │
│  │   Event Bus (SSE)   │                   │   Celery Beat          │  │
│  │   - cost_update     │                   │   - budget_check       │  │
│  │   - budget_alert    │                   │   - daily_rollup       │  │
│  │   - threshold_warn  │                   │   - monthly_report     │  │
│  └─────────────────────┘                   └────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────┘

Component Responsibilities

Component Responsibility
LLM Gateway Route requests, capture usage, enforce limits
Cost Tracker Calculate costs, persist records, update counters
Redis Real-time budget counters, rate limiting, caching
PostgreSQL Persistent usage records, aggregations, analytics
Event Bus Real-time notifications to dashboards
Celery Beat Scheduled tasks (rollups, alerts, reports)

Database Schema Design

Core Tables

# app/models/cost_tracking.py
from sqlalchemy import Column, String, Integer, Float, DateTime, ForeignKey, Enum, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.models.base import Base, TimestampMixin, UUIDMixin
import enum

class BudgetPeriod(str, enum.Enum):
    DAILY = "daily"
    WEEKLY = "weekly"
    MONTHLY = "monthly"

class AlertSeverity(str, enum.Enum):
    INFO = "info"         # 50% threshold
    WARNING = "warning"   # 80% threshold
    CRITICAL = "critical" # 100% threshold

class AlertStatus(str, enum.Enum):
    PENDING = "pending"
    ACKNOWLEDGED = "acknowledged"
    RESOLVED = "resolved"


class TokenUsage(Base, UUIDMixin, TimestampMixin):
    """Individual LLM request usage record."""
    __tablename__ = "token_usage"

    # Attribution
    project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
    agent_instance_id = Column(UUID(as_uuid=True), ForeignKey("agent_instances.id"), nullable=True)
    agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)

    # Request details
    request_id = Column(String(64), unique=True, nullable=False)  # Correlation ID
    model = Column(String(100), nullable=False)
    provider = Column(String(50), nullable=False)  # anthropic, openai, ollama

    # Token counts
    prompt_tokens = Column(Integer, nullable=False)
    completion_tokens = Column(Integer, nullable=False)
    total_tokens = Column(Integer, nullable=False)
    cached_tokens = Column(Integer, default=0)  # Tokens served from cache

    # Cost calculation
    input_cost_usd = Column(Float, nullable=False)
    output_cost_usd = Column(Float, nullable=False)
    total_cost_usd = Column(Float, nullable=False)

    # Timing
    latency_ms = Column(Integer, nullable=True)
    request_timestamp = Column(DateTime(timezone=True), nullable=False)

    # Metadata
    task_type = Column(String(50), nullable=True)  # reasoning, coding, chat, etc.
    success = Column(Boolean, default=True)
    error_type = Column(String(100), nullable=True)
    metadata = Column(JSONB, default={})  # Additional context

    # Indexes for common queries
    __table_args__ = (
        Index("ix_token_usage_project_timestamp", "project_id", "request_timestamp"),
        Index("ix_token_usage_agent_instance_timestamp", "agent_instance_id", "request_timestamp"),
        Index("ix_token_usage_model_timestamp", "model", "request_timestamp"),
        Index("ix_token_usage_request_timestamp", "request_timestamp"),
    )


class Budget(Base, UUIDMixin, TimestampMixin):
    """Budget definition for cost control."""
    __tablename__ = "budgets"

    # Scope (one of these will be set)
    project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True)
    agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True)
    organization_id = Column(UUID(as_uuid=True), ForeignKey("organizations.id"), nullable=True)

    # Budget configuration
    name = Column(String(100), nullable=False)
    period = Column(Enum(BudgetPeriod), nullable=False)
    limit_usd = Column(Float, nullable=False)

    # Thresholds (percentages)
    warn_threshold = Column(Float, default=0.5)    # 50%
    alert_threshold = Column(Float, default=0.8)   # 80%
    critical_threshold = Column(Float, default=1.0) # 100%

    # Enforcement
    hard_limit = Column(Boolean, default=False)  # Block requests at limit
    auto_downgrade = Column(Boolean, default=True)  # Downgrade to cheaper model

    # Status
    is_active = Column(Boolean, default=True)
    current_spend = Column(Float, default=0.0)  # Cached from Redis
    period_start = Column(DateTime(timezone=True), nullable=False)
    period_end = Column(DateTime(timezone=True), nullable=False)

    __table_args__ = (
        Index("ix_budgets_project_active", "project_id", "is_active"),
        Index("ix_budgets_period_end", "period_end"),
    )


class BudgetAlert(Base, UUIDMixin, TimestampMixin):
    """Alert record for budget threshold breaches."""
    __tablename__ = "budget_alerts"

    budget_id = Column(UUID(as_uuid=True), ForeignKey("budgets.id"), nullable=False)
    project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True)

    severity = Column(Enum(AlertSeverity), nullable=False)
    status = Column(Enum(AlertStatus), default=AlertStatus.PENDING)

    threshold_percent = Column(Float, nullable=False)
    current_spend = Column(Float, nullable=False)
    budget_limit = Column(Float, nullable=False)

    message = Column(String(500), nullable=False)
    acknowledged_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
    acknowledged_at = Column(DateTime(timezone=True), nullable=True)

    # Notification tracking
    notifications_sent = Column(JSONB, default=[])  # [{"channel": "email", "sent_at": "..."}]

    budget = relationship("Budget", backref="alerts")


class DailyCostSummary(Base, UUIDMixin):
    """Materialized daily cost aggregations for fast reporting."""
    __tablename__ = "daily_cost_summaries"

    date = Column(DateTime(timezone=True), nullable=False)
    project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True)
    agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True)
    model = Column(String(100), nullable=True)

    # Aggregated metrics
    request_count = Column(Integer, default=0)
    total_prompt_tokens = Column(Integer, default=0)
    total_completion_tokens = Column(Integer, default=0)
    total_tokens = Column(Integer, default=0)
    total_cost_usd = Column(Float, default=0.0)

    # Performance metrics
    avg_latency_ms = Column(Float, nullable=True)
    cache_hit_rate = Column(Float, default=0.0)
    success_rate = Column(Float, default=1.0)

    __table_args__ = (
        Index("ix_daily_summary_date_project", "date", "project_id"),
        Index("ix_daily_summary_date_model", "date", "model"),
    )


class ModelPricing(Base, UUIDMixin, TimestampMixin):
    """Custom model pricing overrides."""
    __tablename__ = "model_pricing"

    model = Column(String(100), unique=True, nullable=False)
    provider = Column(String(50), nullable=False)

    input_cost_per_million = Column(Float, nullable=False)
    output_cost_per_million = Column(Float, nullable=False)

    # Context limits
    max_input_tokens = Column(Integer, nullable=True)
    max_output_tokens = Column(Integer, nullable=True)

    # Validity period
    effective_from = Column(DateTime(timezone=True), nullable=False)
    effective_until = Column(DateTime(timezone=True), nullable=True)

    is_active = Column(Boolean, default=True)

Redis Key Structure

# Redis key patterns for real-time tracking

REDIS_KEYS = {
    # Budget counters (reset on period start)
    "budget_spend": "budget:{budget_id}:spend",              # INCRBYFLOAT
    "budget_requests": "budget:{budget_id}:requests",        # INCR

    # Project-level aggregations
    "project_daily_spend": "project:{project_id}:daily:{date}:spend",
    "project_monthly_spend": "project:{project_id}:monthly:{year_month}:spend",

    # Rate limiting
    "rate_limit": "ratelimit:{project_id}:{model}:{window}",

    # Semantic cache
    "semantic_cache": "cache:semantic:{hash}",

    # Real-time metrics (for dashboard)
    "realtime_spend": "metrics:realtime:spend:{project_id}",
}

LiteLLM Callback Implementation

Custom Callback Handler

# app/services/cost_tracking/callbacks.py
from typing import Any
from datetime import datetime, UTC
import asyncio
from litellm.integrations.custom_logger import CustomLogger
from app.core.config import settings
from app.services.cost_tracking.tracker import CostTracker
from app.services.cost_tracking.budget import BudgetEnforcer
from app.services.events import EventBus

class SyndarixCostCallback(CustomLogger):
    """
    LiteLLM callback handler for cost tracking and budget enforcement.

    Integrates with:
    - Redis for real-time budget counters
    - PostgreSQL for persistent usage records
    - Event bus for real-time dashboard updates
    """

    def __init__(self):
        self.tracker = CostTracker()
        self.budget_enforcer = BudgetEnforcer()
        self.event_bus = EventBus()

    def log_pre_api_call(self, model: str, messages: list, kwargs: dict):
        """Pre-request validation and cost estimation."""
        project_id = kwargs.get("metadata", {}).get("project_id")
        agent_id = kwargs.get("metadata", {}).get("agent_id")

        if not project_id:
            return  # Skip tracking for internal calls

        # Check budget before making request
        budget_status = self.budget_enforcer.check_budget_sync(project_id)

        if budget_status.exceeded and budget_status.hard_limit:
            raise BudgetExceededError(
                f"Budget exceeded for project {project_id}. "
                f"Limit: ${budget_status.limit:.2f}, "
                f"Spent: ${budget_status.spent:.2f}"
            )

        if budget_status.exceeded and budget_status.auto_downgrade:
            # Suggest model downgrade
            kwargs["model"] = self._get_fallback_model(model)

    async def async_log_success_event(
        self,
        kwargs: dict,
        response_obj: Any,
        start_time: float,
        end_time: float
    ):
        """Record successful request usage and costs."""
        metadata = kwargs.get("metadata", {})
        project_id = metadata.get("project_id")
        agent_id = metadata.get("agent_id")
        agent_type_id = metadata.get("agent_type_id")

        if not project_id:
            return

        # Extract usage data
        usage = response_obj.usage
        model = kwargs.get("model")
        cost = kwargs.get("response_cost", 0)

        # Calculate costs if not provided
        if cost == 0:
            cost = self._calculate_cost(model, usage)

        # Create usage record
        usage_record = {
            "request_id": kwargs.get("litellm_call_id", str(uuid.uuid4())),
            "project_id": project_id,
            "agent_instance_id": agent_id,
            "agent_type_id": agent_type_id,
            "model": model,
            "provider": self._get_provider(model),
            "prompt_tokens": usage.prompt_tokens,
            "completion_tokens": usage.completion_tokens,
            "total_tokens": usage.total_tokens,
            "input_cost_usd": self._calculate_input_cost(model, usage.prompt_tokens),
            "output_cost_usd": self._calculate_output_cost(model, usage.completion_tokens),
            "total_cost_usd": cost,
            "latency_ms": int((end_time - start_time) * 1000),
            "request_timestamp": datetime.now(UTC),
            "task_type": metadata.get("task_type"),
            "success": True,
        }

        # Async operations in parallel
        await asyncio.gather(
            self.tracker.record_usage(usage_record),
            self.budget_enforcer.increment_spend(project_id, cost),
            self._publish_cost_event(project_id, usage_record),
        )

    async def async_log_failure_event(
        self,
        kwargs: dict,
        response_obj: Any,
        start_time: float,
        end_time: float
    ):
        """Record failed request (still costs input tokens)."""
        metadata = kwargs.get("metadata", {})
        project_id = metadata.get("project_id")

        if not project_id:
            return

        # Failed requests still consume input tokens
        input_tokens = kwargs.get("input_tokens", 0)
        model = kwargs.get("model")

        if input_tokens > 0:
            input_cost = self._calculate_input_cost(model, input_tokens)

            usage_record = {
                "request_id": kwargs.get("litellm_call_id", str(uuid.uuid4())),
                "project_id": project_id,
                "model": model,
                "prompt_tokens": input_tokens,
                "completion_tokens": 0,
                "total_tokens": input_tokens,
                "total_cost_usd": input_cost,
                "success": False,
                "error_type": type(response_obj).__name__,
                "request_timestamp": datetime.now(UTC),
            }

            await self.tracker.record_usage(usage_record)

    async def _publish_cost_event(self, project_id: str, usage: dict):
        """Publish real-time cost event to dashboard."""
        await self.event_bus.publish(f"project:{project_id}", {
            "type": "cost_update",
            "data": {
                "model": usage["model"],
                "tokens": usage["total_tokens"],
                "cost_usd": usage["total_cost_usd"],
                "timestamp": usage["request_timestamp"].isoformat(),
            }
        })

    def _calculate_cost(self, model: str, usage) -> float:
        """Calculate total cost from usage."""
        input_cost = self._calculate_input_cost(model, usage.prompt_tokens)
        output_cost = self._calculate_output_cost(model, usage.completion_tokens)
        return input_cost + output_cost

    def _calculate_input_cost(self, model: str, tokens: int) -> float:
        """Calculate input token cost."""
        costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
        return (tokens / 1_000_000) * costs["input"]

    def _calculate_output_cost(self, model: str, tokens: int) -> float:
        """Calculate output token cost."""
        costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
        return (tokens / 1_000_000) * costs["output"]

    def _get_provider(self, model: str) -> str:
        """Extract provider from model name."""
        if model.startswith("claude"):
            return "anthropic"
        elif model.startswith("gpt") or model.startswith("o1"):
            return "openai"
        elif model.startswith("ollama/"):
            return "ollama"
        return "unknown"

    def _get_fallback_model(self, model: str) -> str:
        """Get cheaper fallback model."""
        fallbacks = {
            "claude-3-5-sonnet-20241022": "claude-3-haiku-20240307",
            "gpt-4-turbo": "gpt-4o-mini",
            "gpt-4o": "gpt-4o-mini",
        }
        return fallbacks.get(model, model)


# Model cost database (per 1M tokens)
MODEL_COSTS = {
    # Anthropic
    "claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
    "claude-3-opus-20240229": {"input": 15.00, "output": 75.00},
    "claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},

    # OpenAI
    "gpt-4-turbo": {"input": 10.00, "output": 30.00},
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "o1-preview": {"input": 15.00, "output": 60.00},
    "o1-mini": {"input": 3.00, "output": 12.00},

    # Local (compute-only, no API cost)
    "ollama/llama3": {"input": 0.00, "output": 0.00},
    "ollama/mixtral": {"input": 0.00, "output": 0.00},
}

Registering the Callback

# app/services/llm_gateway.py
import litellm
from app.services.cost_tracking.callbacks import SyndarixCostCallback

# Register the callback globally
syndarix_callback = SyndarixCostCallback()
litellm.callbacks = [syndarix_callback]

# Or register specific callbacks
litellm.success_callback = [syndarix_callback.async_log_success_event]
litellm.failure_callback = [syndarix_callback.async_log_failure_event]

Budget Management System

Budget Enforcer

# app/services/cost_tracking/budget.py
from dataclasses import dataclass
from datetime import datetime, UTC, timedelta
from typing import Optional
import redis.asyncio as redis
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.cost_tracking import Budget, BudgetPeriod, BudgetAlert, AlertSeverity

@dataclass
class BudgetStatus:
    budget_id: str
    limit: float
    spent: float
    remaining: float
    percent_used: float
    exceeded: bool
    hard_limit: bool
    auto_downgrade: bool
    threshold_breached: Optional[str] = None  # warn, alert, critical


class BudgetEnforcer:
    """Real-time budget enforcement using Redis."""

    def __init__(self, redis_url: str = None):
        self.redis = redis.from_url(redis_url or settings.REDIS_URL)

    async def check_budget(self, project_id: str) -> BudgetStatus:
        """Check current budget status for a project."""
        # Get active budgets for project
        budgets = await self._get_active_budgets(project_id)

        if not budgets:
            return BudgetStatus(
                budget_id=None,
                limit=float("inf"),
                spent=0,
                remaining=float("inf"),
                percent_used=0,
                exceeded=False,
                hard_limit=False,
                auto_downgrade=False,
            )

        # Check most restrictive budget
        for budget in budgets:
            spend_key = f"budget:{budget.id}:spend"
            spent = float(await self.redis.get(spend_key) or 0)

            percent_used = (spent / budget.limit_usd) * 100
            exceeded = spent >= budget.limit_usd

            # Determine threshold breach
            threshold_breached = None
            if percent_used >= budget.critical_threshold * 100:
                threshold_breached = "critical"
            elif percent_used >= budget.alert_threshold * 100:
                threshold_breached = "alert"
            elif percent_used >= budget.warn_threshold * 100:
                threshold_breached = "warn"

            return BudgetStatus(
                budget_id=str(budget.id),
                limit=budget.limit_usd,
                spent=spent,
                remaining=max(0, budget.limit_usd - spent),
                percent_used=percent_used,
                exceeded=exceeded,
                hard_limit=budget.hard_limit,
                auto_downgrade=budget.auto_downgrade,
                threshold_breached=threshold_breached,
            )

    async def increment_spend(self, project_id: str, amount: float) -> BudgetStatus:
        """Increment budget spend and check thresholds."""
        budgets = await self._get_active_budgets(project_id)

        for budget in budgets:
            spend_key = f"budget:{budget.id}:spend"

            # Atomic increment
            new_spend = await self.redis.incrbyfloat(spend_key, amount)

            # Check and trigger alerts
            await self._check_thresholds(budget, new_spend)

        return await self.check_budget(project_id)

    async def _check_thresholds(self, budget: Budget, current_spend: float):
        """Check thresholds and create alerts if needed."""
        percent = (current_spend / budget.limit_usd) * 100

        thresholds = [
            (budget.warn_threshold * 100, AlertSeverity.INFO),
            (budget.alert_threshold * 100, AlertSeverity.WARNING),
            (budget.critical_threshold * 100, AlertSeverity.CRITICAL),
        ]

        for threshold, severity in thresholds:
            if percent >= threshold:
                # Check if alert already exists for this threshold
                alert_key = f"budget:{budget.id}:alert:{severity.value}"
                if not await self.redis.exists(alert_key):
                    await self._create_alert(budget, severity, percent, current_spend)
                    # Mark alert as sent
                    await self.redis.setex(alert_key, 86400, "1")  # 24h TTL

    async def _create_alert(
        self,
        budget: Budget,
        severity: AlertSeverity,
        percent: float,
        current_spend: float
    ):
        """Create budget alert and trigger notifications."""
        alert = BudgetAlert(
            budget_id=budget.id,
            project_id=budget.project_id,
            severity=severity,
            threshold_percent=percent,
            current_spend=current_spend,
            budget_limit=budget.limit_usd,
            message=f"Budget '{budget.name}' at {percent:.1f}% (${current_spend:.2f}/${budget.limit_usd:.2f})"
        )

        # Persist alert
        async with get_async_session() as session:
            session.add(alert)
            await session.commit()

        # Publish alert event
        await EventBus().publish(f"project:{budget.project_id}", {
            "type": "budget_alert",
            "data": {
                "severity": severity.value,
                "message": alert.message,
                "percent_used": percent,
                "budget_name": budget.name,
            }
        })

    async def reset_budget_period(self, budget_id: str):
        """Reset budget counter for new period."""
        spend_key = f"budget:{budget_id}:spend"
        await self.redis.delete(spend_key)

        # Clear alert markers
        for severity in AlertSeverity:
            alert_key = f"budget:{budget_id}:alert:{severity.value}"
            await self.redis.delete(alert_key)

    def check_budget_sync(self, project_id: str) -> BudgetStatus:
        """Synchronous version for pre-request checks."""
        import asyncio
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.check_budget(project_id))

Budget Period Management (Celery Task)

# app/tasks/budget_tasks.py
from celery import shared_task
from datetime import datetime, UTC, timedelta
from app.services.cost_tracking.budget import BudgetEnforcer

@shared_task
def check_budget_periods():
    """
    Scheduled task to manage budget periods.
    Run every hour via Celery Beat.
    """
    now = datetime.now(UTC)
    enforcer = BudgetEnforcer()

    # Find budgets that need period reset
    with get_session() as session:
        expired_budgets = session.query(Budget).filter(
            Budget.is_active == True,
            Budget.period_end <= now
        ).all()

        for budget in expired_budgets:
            # Archive current period spend
            archive_budget_period(budget)

            # Calculate new period
            if budget.period == BudgetPeriod.DAILY:
                period_start = now.replace(hour=0, minute=0, second=0)
                period_end = period_start + timedelta(days=1)
            elif budget.period == BudgetPeriod.WEEKLY:
                period_start = now - timedelta(days=now.weekday())
                period_end = period_start + timedelta(weeks=1)
            elif budget.period == BudgetPeriod.MONTHLY:
                period_start = now.replace(day=1)
                next_month = period_start + timedelta(days=32)
                period_end = next_month.replace(day=1)

            # Update budget
            budget.period_start = period_start
            budget.period_end = period_end
            budget.current_spend = 0

            session.commit()

            # Reset Redis counter
            asyncio.run(enforcer.reset_budget_period(str(budget.id)))

@shared_task
def daily_cost_rollup():
    """
    Aggregate daily costs into summary table.
    Run at 1 AM daily via Celery Beat.
    """
    yesterday = datetime.now(UTC).date() - timedelta(days=1)

    with get_session() as session:
        # Aggregate by project, agent_type, model
        results = session.execute("""
            INSERT INTO daily_cost_summaries
            (date, project_id, agent_type_id, model,
             request_count, total_prompt_tokens, total_completion_tokens,
             total_tokens, total_cost_usd, avg_latency_ms,
             cache_hit_rate, success_rate)
            SELECT
                DATE(request_timestamp) as date,
                project_id,
                agent_type_id,
                model,
                COUNT(*) as request_count,
                SUM(prompt_tokens) as total_prompt_tokens,
                SUM(completion_tokens) as total_completion_tokens,
                SUM(total_tokens) as total_tokens,
                SUM(total_cost_usd) as total_cost_usd,
                AVG(latency_ms) as avg_latency_ms,
                SUM(cached_tokens)::float / NULLIF(SUM(total_tokens), 0) as cache_hit_rate,
                AVG(CASE WHEN success THEN 1 ELSE 0 END) as success_rate
            FROM token_usage
            WHERE DATE(request_timestamp) = :yesterday
            GROUP BY DATE(request_timestamp), project_id, agent_type_id, model
            ON CONFLICT DO NOTHING
        """, {"yesterday": yesterday})

        session.commit()

Alert System Integration

Alert Configuration

# app/services/cost_tracking/alerts.py
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional

class AlertChannel(str, Enum):
    EMAIL = "email"
    SLACK = "slack"
    WEBHOOK = "webhook"
    SSE = "sse"  # Real-time dashboard

@dataclass
class AlertRule:
    budget_id: str
    channels: List[AlertChannel]
    recipients: List[str]  # Email addresses, Slack channels, etc.
    escalation_delay: int = 0  # Minutes before escalating

class AlertDispatcher:
    """Dispatch budget alerts to configured channels."""

    async def dispatch(self, alert: BudgetAlert, rules: List[AlertRule]):
        """Send alert through all configured channels."""
        for rule in rules:
            if rule.budget_id == str(alert.budget_id):
                for channel in rule.channels:
                    await self._send_alert(channel, alert, rule.recipients)

    async def _send_alert(
        self,
        channel: AlertChannel,
        alert: BudgetAlert,
        recipients: List[str]
    ):
        """Send alert to specific channel."""
        notification = {
            "channel": channel.value,
            "sent_at": datetime.now(UTC).isoformat(),
            "recipients": recipients,
        }

        if channel == AlertChannel.EMAIL:
            await self._send_email_alert(alert, recipients)
        elif channel == AlertChannel.SLACK:
            await self._send_slack_alert(alert, recipients)
        elif channel == AlertChannel.WEBHOOK:
            await self._send_webhook_alert(alert, recipients)
        elif channel == AlertChannel.SSE:
            # Already handled via EventBus
            pass

        # Track notification
        alert.notifications_sent.append(notification)

    async def _send_slack_alert(self, alert: BudgetAlert, channels: List[str]):
        """Send alert to Slack."""
        color = {
            AlertSeverity.INFO: "#36a64f",      # Green
            AlertSeverity.WARNING: "#ff9800",    # Orange
            AlertSeverity.CRITICAL: "#f44336",   # Red
        }[alert.severity]

        payload = {
            "attachments": [{
                "color": color,
                "title": f"Budget Alert: {alert.message}",
                "fields": [
                    {"title": "Budget", "value": f"${alert.budget_limit:.2f}", "short": True},
                    {"title": "Current Spend", "value": f"${alert.current_spend:.2f}", "short": True},
                    {"title": "Usage", "value": f"{alert.threshold_percent:.1f}%", "short": True},
                    {"title": "Severity", "value": alert.severity.value.upper(), "short": True},
                ],
                "ts": datetime.now(UTC).timestamp(),
            }]
        }

        for channel in channels:
            await send_slack_message(channel, payload)

Cost Optimization Strategies

1. Semantic Caching

# app/services/cost_tracking/cache.py
import hashlib
import numpy as np
from typing import Optional, Tuple
import redis.asyncio as redis

class SemanticCache:
    """
    Cache LLM responses based on semantic similarity.
    Uses embedding vectors to find similar queries.
    """

    def __init__(self, redis_url: str, similarity_threshold: float = 0.95):
        self.redis = redis.from_url(redis_url)
        self.threshold = similarity_threshold
        self.embedding_model = "text-embedding-3-small"  # OpenAI

    async def get(
        self,
        messages: list,
        model: str,
        project_id: str
    ) -> Optional[Tuple[str, dict]]:
        """
        Check cache for semantically similar query.

        Returns:
            Tuple of (cached_response, usage_info) or None
        """
        query_embedding = await self._get_embedding(messages)
        cache_key_pattern = f"cache:semantic:{project_id}:{model}:*"

        # Search for similar embeddings
        async for key in self.redis.scan_iter(match=cache_key_pattern):
            cached = await self.redis.hgetall(key)
            if cached:
                cached_embedding = np.frombuffer(cached[b"embedding"], dtype=np.float32)
                similarity = self._cosine_similarity(query_embedding, cached_embedding)

                if similarity >= self.threshold:
                    return (
                        cached[b"response"].decode(),
                        {
                            "cache_hit": True,
                            "similarity": similarity,
                            "tokens_saved": int(cached[b"tokens"]),
                            "cost_saved": float(cached[b"cost"]),
                        }
                    )

        return None

    async def set(
        self,
        messages: list,
        model: str,
        project_id: str,
        response: str,
        usage: dict,
        ttl: int = 3600
    ):
        """Cache response with embedding for future similarity matching."""
        query_embedding = await self._get_embedding(messages)

        # Create unique key
        content_hash = hashlib.sha256(str(messages).encode()).hexdigest()[:16]
        cache_key = f"cache:semantic:{project_id}:{model}:{content_hash}"

        await self.redis.hset(cache_key, mapping={
            "embedding": query_embedding.tobytes(),
            "response": response,
            "tokens": usage["total_tokens"],
            "cost": usage["total_cost_usd"],
            "model": model,
            "created_at": datetime.now(UTC).isoformat(),
        })
        await self.redis.expire(cache_key, ttl)

    async def _get_embedding(self, messages: list) -> np.ndarray:
        """Generate embedding for messages."""
        # Extract text content
        text = " ".join(m.get("content", "") for m in messages)

        # Call embedding API
        response = await litellm.aembedding(
            model=self.embedding_model,
            input=text
        )

        return np.array(response.data[0].embedding, dtype=np.float32)

    @staticmethod
    def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
        """Calculate cosine similarity between vectors."""
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

2. Model Cascading

# app/services/cost_tracking/cascade.py
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class CascadeModel:
    model: str
    cost_per_1k_tokens: float
    max_complexity: float  # 0-1 score
    timeout: int = 30

class ModelCascade:
    """
    Route queries to cheapest capable model.
    Escalate to more expensive models only when needed.
    """

    def __init__(self):
        self.models = [
            CascadeModel("claude-3-haiku-20240307", 0.00075, 0.3, timeout=10),
            CascadeModel("gpt-4o-mini", 0.000375, 0.4, timeout=15),
            CascadeModel("claude-3-5-sonnet-20241022", 0.009, 0.8, timeout=30),
            CascadeModel("gpt-4-turbo", 0.02, 1.0, timeout=60),
        ]
        self.complexity_classifier = None  # TODO: Train classifier

    async def route(
        self,
        messages: list,
        required_quality: float = 0.7
    ) -> str:
        """
        Determine the best model for the query.

        Args:
            messages: The query messages
            required_quality: Minimum quality threshold (0-1)

        Returns:
            Model identifier to use
        """
        complexity = await self._estimate_complexity(messages)

        # Find cheapest model that meets requirements
        for model in self.models:
            if model.max_complexity >= complexity and model.max_complexity >= required_quality:
                return model.model

        # Fallback to most capable
        return self.models[-1].model

    async def _estimate_complexity(self, messages: list) -> float:
        """
        Estimate query complexity (0-1).

        Factors:
        - Message length
        - Technical terms
        - Multi-step reasoning required
        - Code generation requested
        """
        text = " ".join(m.get("content", "") for m in messages)

        # Simple heuristics (replace with ML model)
        complexity = 0.0

        # Length factor
        word_count = len(text.split())
        if word_count > 500:
            complexity += 0.2
        elif word_count > 200:
            complexity += 0.1

        # Technical indicators
        technical_terms = ["implement", "architect", "optimize", "debug", "refactor"]
        if any(term in text.lower() for term in technical_terms):
            complexity += 0.3

        # Code indicators
        if "```" in text or "code" in text.lower():
            complexity += 0.2

        # Multi-step indicators
        if "step" in text.lower() or "first" in text.lower() or "then" in text.lower():
            complexity += 0.2

        return min(1.0, complexity)

    async def execute_with_fallback(
        self,
        messages: list,
        required_quality: float = 0.7,
        confidence_threshold: float = 0.8
    ) -> dict:
        """
        Execute query with automatic fallback to stronger models.
        """
        for model in self.models:
            if model.max_complexity < required_quality:
                continue

            try:
                response = await litellm.acompletion(
                    model=model.model,
                    messages=messages,
                    timeout=model.timeout,
                )

                # Check response quality (simplified)
                confidence = await self._evaluate_response(response, messages)

                if confidence >= confidence_threshold:
                    return {
                        "response": response,
                        "model_used": model.model,
                        "fallback_count": self.models.index(model),
                    }

            except Exception as e:
                # Log and try next model
                continue

        # Final fallback
        return await self._execute_premium(messages)

3. Prompt Compression (LLMLingua Integration)

# app/services/cost_tracking/compression.py
from llmlingua import PromptCompressor

class PromptOptimizer:
    """
    Compress prompts to reduce token count while preserving meaning.
    Uses LLMLingua for intelligent compression.
    """

    def __init__(self):
        self.compressor = PromptCompressor(
            model_name="microsoft/llmlingua-2-bert-base-multilingual-cased-meetingbank",
            use_llmlingua2=True
        )

    def compress(
        self,
        messages: list,
        target_ratio: float = 0.5,  # Compress to 50% of original
        preserve_instructions: bool = True
    ) -> list:
        """
        Compress messages while preserving critical information.

        Args:
            messages: Original messages
            target_ratio: Target compression ratio
            preserve_instructions: Whether to preserve system instructions

        Returns:
            Compressed messages
        """
        compressed_messages = []

        for msg in messages:
            role = msg["role"]
            content = msg["content"]

            # Don't compress system instructions if requested
            if role == "system" and preserve_instructions:
                compressed_messages.append(msg)
                continue

            # Apply compression
            compressed = self.compressor.compress_prompt(
                content,
                rate=target_ratio,
                force_tokens=self._extract_critical_tokens(content),
            )

            compressed_messages.append({
                "role": role,
                "content": compressed["compressed_prompt"]
            })

        return compressed_messages

    def _extract_critical_tokens(self, text: str) -> list:
        """Extract tokens that should never be removed."""
        # Preserve code blocks, variable names, etc.
        import re

        critical = []

        # Preserve code blocks
        code_blocks = re.findall(r'```[\s\S]*?```', text)
        for block in code_blocks:
            critical.extend(block.split())

        # Preserve quoted strings
        quotes = re.findall(r'"[^"]*"', text)
        critical.extend(quotes)

        return critical

    def estimate_savings(self, original: list, compressed: list) -> dict:
        """Calculate token savings from compression."""
        original_tokens = sum(
            len(m["content"].split()) * 1.3  # Rough token estimate
            for m in original
        )
        compressed_tokens = sum(
            len(m["content"].split()) * 1.3
            for m in compressed
        )

        return {
            "original_tokens": int(original_tokens),
            "compressed_tokens": int(compressed_tokens),
            "tokens_saved": int(original_tokens - compressed_tokens),
            "compression_ratio": compressed_tokens / original_tokens,
        }

Cost Estimation Before Execution

# app/services/cost_tracking/estimator.py
from dataclasses import dataclass
from typing import Optional
import tiktoken

@dataclass
class CostEstimate:
    input_tokens: int
    estimated_output_tokens: int
    min_cost_usd: float
    max_cost_usd: float
    expected_cost_usd: float
    model: str
    confidence: str  # low, medium, high

class CostEstimator:
    """
    Estimate LLM request cost before execution.
    """

    def __init__(self):
        self.encoders = {}

    def estimate(
        self,
        messages: list,
        model: str,
        max_tokens: Optional[int] = None
    ) -> CostEstimate:
        """
        Estimate cost for a completion request.

        Args:
            messages: Input messages
            model: Target model
            max_tokens: Maximum output tokens (if known)
        """
        # Count input tokens
        input_tokens = self._count_tokens(messages, model)

        # Estimate output tokens
        if max_tokens:
            estimated_output = max_tokens
            confidence = "high"
        else:
            estimated_output = self._estimate_output_tokens(messages, model)
            confidence = "medium"

        # Get model costs
        costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})

        # Calculate costs
        input_cost = (input_tokens / 1_000_000) * costs["input"]

        min_output = estimated_output * 0.3  # Conservative
        max_output = estimated_output * 1.5  # Liberal
        expected_output = estimated_output

        min_output_cost = (min_output / 1_000_000) * costs["output"]
        max_output_cost = (max_output / 1_000_000) * costs["output"]
        expected_output_cost = (expected_output / 1_000_000) * costs["output"]

        return CostEstimate(
            input_tokens=input_tokens,
            estimated_output_tokens=estimated_output,
            min_cost_usd=input_cost + min_output_cost,
            max_cost_usd=input_cost + max_output_cost,
            expected_cost_usd=input_cost + expected_output_cost,
            model=model,
            confidence=confidence,
        )

    def _count_tokens(self, messages: list, model: str) -> int:
        """Count tokens for input messages."""
        # Get or create encoder
        if model not in self.encoders:
            try:
                if "gpt" in model or "o1" in model:
                    self.encoders[model] = tiktoken.encoding_for_model(model)
                else:
                    # Fallback to cl100k_base for non-OpenAI
                    self.encoders[model] = tiktoken.get_encoding("cl100k_base")
            except Exception:
                self.encoders[model] = tiktoken.get_encoding("cl100k_base")

        encoder = self.encoders[model]

        total = 0
        for msg in messages:
            total += len(encoder.encode(msg.get("content", "")))
            total += 4  # Message overhead tokens

        return total + 2  # Priming tokens

    def _estimate_output_tokens(self, messages: list, model: str) -> int:
        """
        Estimate expected output tokens based on input.
        Uses heuristics and historical data.
        """
        input_tokens = self._count_tokens(messages, model)
        last_message = messages[-1].get("content", "") if messages else ""

        # Heuristics based on request type
        if "explain" in last_message.lower() or "describe" in last_message.lower():
            # Explanatory responses tend to be longer
            return min(2000, input_tokens * 1.5)

        if "code" in last_message.lower() or "implement" in last_message.lower():
            # Code generation can be substantial
            return min(4000, input_tokens * 2)

        if "yes or no" in last_message.lower() or "brief" in last_message.lower():
            # Short responses
            return 100

        if "list" in last_message.lower():
            # Lists are medium length
            return min(1000, input_tokens * 0.8)

        # Default: output roughly equal to input
        return min(2000, input_tokens)

    def should_warn(
        self,
        estimate: CostEstimate,
        budget_remaining: float,
        threshold: float = 0.1  # Warn if request is >10% of remaining budget
    ) -> bool:
        """Check if user should be warned about expensive request."""
        if budget_remaining <= 0:
            return True

        return estimate.max_cost_usd > (budget_remaining * threshold)

Pre-Execution Check Integration

# app/services/llm_gateway.py

class LLMGateway:
    async def complete(
        self,
        agent_id: str,
        project_id: str,
        messages: list,
        model_preference: str = "high-reasoning",
        require_estimate: bool = False,
        **kwargs
    ) -> dict:
        """Generate completion with optional cost estimation."""

        model = self._resolve_model(model_preference)

        if require_estimate:
            estimator = CostEstimator()
            estimate = estimator.estimate(messages, model, kwargs.get("max_tokens"))

            # Check against budget
            budget_status = await self.budget_enforcer.check_budget(project_id)

            if estimator.should_warn(estimate, budget_status.remaining):
                return {
                    "type": "cost_warning",
                    "estimate": estimate,
                    "budget_remaining": budget_status.remaining,
                    "message": f"This request may cost ${estimate.expected_cost_usd:.4f}. "
                               f"Remaining budget: ${budget_status.remaining:.2f}. "
                               f"Proceed?",
                    "alternatives": self._suggest_alternatives(estimate, model),
                }

        # Proceed with request
        return await self._execute_completion(...)

    def _suggest_alternatives(self, estimate: CostEstimate, model: str) -> list:
        """Suggest cost-saving alternatives."""
        alternatives = []

        # Cheaper model
        if model == "claude-3-5-sonnet-20241022":
            alternatives.append({
                "action": "use_haiku",
                "model": "claude-3-haiku-20240307",
                "estimated_savings": estimate.expected_cost_usd * 0.9,
                "quality_impact": "May reduce quality for complex tasks",
            })

        # Compression
        alternatives.append({
            "action": "compress_prompt",
            "estimated_savings": estimate.expected_cost_usd * 0.3,
            "quality_impact": "Minimal for most use cases",
        })

        return alternatives

Reporting Dashboard Requirements

Dashboard Components

Component Data Source Update Frequency
Real-time Spend Ticker Redis + SSE Real-time
Budget Utilization Gauges Redis 1 second
Daily Cost Chart PostgreSQL (daily_cost_summaries) 1 minute
Model Breakdown Pie Chart PostgreSQL 5 minutes
Agent Efficiency Table PostgreSQL 5 minutes
Cost Forecast PostgreSQL + ML 1 hour
Alert Feed SSE + PostgreSQL Real-time

API Endpoints

# app/api/v1/costs.py
from fastapi import APIRouter, Query
from datetime import datetime, timedelta

router = APIRouter(prefix="/costs", tags=["costs"])

@router.get("/summary/{project_id}")
async def get_cost_summary(
    project_id: str,
    period: str = Query("day", regex="^(day|week|month)$")
) -> CostSummaryResponse:
    """Get cost summary for a project."""
    pass

@router.get("/breakdown/{project_id}")
async def get_cost_breakdown(
    project_id: str,
    group_by: str = Query("model", regex="^(model|agent_type|day)$"),
    start_date: datetime = None,
    end_date: datetime = None
) -> CostBreakdownResponse:
    """Get detailed cost breakdown."""
    pass

@router.get("/trends/{project_id}")
async def get_cost_trends(
    project_id: str,
    days: int = Query(30, ge=1, le=365)
) -> CostTrendsResponse:
    """Get historical cost trends."""
    pass

@router.get("/forecast/{project_id}")
async def get_cost_forecast(
    project_id: str,
    days_ahead: int = Query(7, ge=1, le=30)
) -> CostForecastResponse:
    """Get projected costs based on current trends."""
    pass

@router.get("/efficiency/{project_id}")
async def get_efficiency_metrics(
    project_id: str
) -> EfficiencyMetricsResponse:
    """Get cost efficiency metrics (cost per task, cache hit rate, etc.)."""
    pass

@router.get("/budgets/{project_id}")
async def get_budgets(project_id: str) -> List[BudgetResponse]:
    """Get all budgets for a project."""
    pass

@router.post("/budgets")
async def create_budget(budget: BudgetCreate) -> BudgetResponse:
    """Create a new budget."""
    pass

@router.put("/budgets/{budget_id}")
async def update_budget(budget_id: str, budget: BudgetUpdate) -> BudgetResponse:
    """Update budget configuration."""
    pass

@router.get("/alerts/{project_id}")
async def get_alerts(
    project_id: str,
    status: str = Query(None, regex="^(pending|acknowledged|resolved)$")
) -> List[AlertResponse]:
    """Get budget alerts for a project."""
    pass

@router.post("/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: str) -> AlertResponse:
    """Acknowledge a budget alert."""
    pass

Dashboard Wireframe

+------------------------------------------------------------------+
|                    Syndarix Cost Dashboard                        |
+------------------------------------------------------------------+
|                                                                    |
| +------------------+  +------------------+  +-------------------+  |
| |  DAILY SPEND     |  |  WEEKLY BUDGET   |  |  MONTHLY BUDGET   |  |
| |    $12.47        |  |  ████████░░ 76%  |  |  ████░░░░░░ 42%   |  |
| |   ▲ 15% vs avg   |  |  $152/$200       |  |  $421/$1000       |  |
| +------------------+  +------------------+  +-------------------+  |
|                                                                    |
| +-------------------------------+  +----------------------------+  |
| |     COST BY MODEL (7 days)    |  |     DAILY COST TREND       |  |
| |     ┌───────────────────┐     |  |  $                         |  |
| |     │ Claude Sonnet 64% │     |  | 20├──────────────────────  |  |
| |     │ GPT-4o-mini  22%  │     |  | 15├─────────────█──────────|  |
| |     │ Claude Haiku  9%  │     |  | 10├────────█───█───────────|  |
| |     │ Other         5%  │     |  |  5├──█───█───█───█─────────|  |
| |     └───────────────────┘     |  |  0└───────────────────────>|  |
| +-------------------------------+  |    Mon Tue Wed Thu Fri     |  |
|                                    +----------------------------+  |
|                                                                    |
| +---------------------------------------------------------------+  |
| |  AGENT EFFICIENCY                                              |  |
| +----------+----------+----------+-----------+------------------+  |
| | Agent    | Requests | Tokens   | Cost      | Cost/Task        |  |
| +----------+----------+----------+-----------+------------------+  |
| | Arch     | 145      | 2.3M     | $18.50    | $0.13            |  |
| | Engineer | 892      | 12.1M    | $96.80    | $0.11            |  |
| | QA       | 234      | 1.2M     | $4.20     | $0.02            |  |
| +----------+----------+----------+-----------+------------------+  |
|                                                                    |
| +---------------------------------------------------------------+  |
| |  RECENT ALERTS                                       [Clear]  |  |
| +---------------------------------------------------------------+  |
| | ! WARNING  Daily budget at 80% ($16/$20)        2 mins ago    |  |
| | i INFO     Weekly budget at 50%                 1 hour ago    |  |
| +---------------------------------------------------------------+  |
+------------------------------------------------------------------+

Implementation Roadmap

Phase 1: Core Infrastructure (Week 1-2)

  1. Database Schema

    • Create migration for token_usage, budgets, budget_alerts
    • Create migration for daily_cost_summaries, model_pricing
    • Add indexes for performance
  2. Redis Setup

    • Configure Redis keys for budget counters
    • Implement budget increment/decrement operations
    • Set up TTL for temporary data
  3. LiteLLM Callback

    • Implement SyndarixCostCallback class
    • Integrate with existing LLM Gateway
    • Add metadata propagation for attribution

Phase 2: Budget Management (Week 2-3)

  1. Budget Enforcer

    • Implement BudgetEnforcer class
    • Add pre-request budget checks
    • Implement soft/hard limit logic
  2. Alert System

    • Implement threshold detection
    • Create alert dispatcher
    • Integrate with SSE event bus
  3. Celery Tasks

    • Budget period management task
    • Daily cost rollup task
    • Alert digest task

Phase 3: Cost Optimization (Week 3-4)

  1. Semantic Cache

    • Implement SemanticCache with Redis
    • Integrate embedding generation
    • Add cache hit/miss tracking
  2. Model Cascading

    • Implement ModelCascade router
    • Add complexity estimation
    • Integrate with LLM Gateway
  3. Cost Estimation

    • Implement CostEstimator
    • Add pre-execution warnings
    • Create alternative suggestions

Phase 4: Reporting (Week 4-5)

  1. API Endpoints

    • Implement cost summary endpoints
    • Add breakdown and trends endpoints
    • Create forecast endpoint
  2. Dashboard

    • Design and implement cost dashboard UI
    • Add real-time updates via SSE
    • Create alert management interface

Phase 5: Testing & Documentation (Week 5-6)

  1. Testing

    • Unit tests for cost tracking
    • Integration tests for budget enforcement
    • Load tests for high-volume scenarios
  2. Documentation

    • API documentation
    • User guide for budget management
    • Operations runbook

References

Research Sources


Decision

Implement a comprehensive cost tracking system with:

  1. LiteLLM callbacks for real-time usage capture
  2. Redis + PostgreSQL for hybrid hot/warm data storage
  3. Hierarchical budgets with soft/hard limit enforcement
  4. Multi-channel alerts via SSE, email, and Slack
  5. Cost optimization through caching, cascading, and compression

Expected Outcomes:

  • Full cost visibility per project, agent, and model
  • Real-time budget enforcement preventing cost overruns
  • 60-80% cost reduction through optimization strategies
  • Actionable insights for cost-aware decision making

Spike completed. Findings will inform ADR-010: Cost Tracking Architecture.