forked from cardosofelipe/fast-next-template
Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1822 lines
65 KiB
Markdown
1822 lines
65 KiB
Markdown
# SPIKE-010: Cost Tracking for Syndarix
|
|
|
|
**Status:** Completed
|
|
**Date:** 2025-12-29
|
|
**Author:** Architecture Team
|
|
**Related Issue:** #10
|
|
|
|
---
|
|
|
|
## Executive Summary
|
|
|
|
Syndarix requires comprehensive LLM cost tracking to manage expenses across multiple providers (Anthropic, OpenAI, local Ollama). This spike researches token usage monitoring, budget enforcement, cost optimization strategies, and real-time alerting.
|
|
|
|
### Recommendation
|
|
|
|
**Adopt a multi-layered cost tracking architecture:**
|
|
|
|
1. **LiteLLM Callbacks** for real-time usage capture at the gateway level
|
|
2. **PostgreSQL** for persistent usage records with time-series aggregation
|
|
3. **Redis** for real-time budget enforcement and rate limiting
|
|
4. **Celery Beat** for scheduled budget checks and alert processing
|
|
5. **SSE Events** for real-time dashboard updates
|
|
|
|
**Expected Cost Savings:** 60-80% reduction through combined optimization strategies (semantic caching, model cascading, prompt compression).
|
|
|
|
---
|
|
|
|
## Table of Contents
|
|
|
|
1. [Research Questions & Findings](#research-questions--findings)
|
|
2. [Cost Tracking Architecture](#cost-tracking-architecture)
|
|
3. [Database Schema Design](#database-schema-design)
|
|
4. [LiteLLM Callback Implementation](#litellm-callback-implementation)
|
|
5. [Budget Management System](#budget-management-system)
|
|
6. [Alert System Integration](#alert-system-integration)
|
|
7. [Cost Optimization Strategies](#cost-optimization-strategies)
|
|
8. [Cost Estimation Before Execution](#cost-estimation-before-execution)
|
|
9. [Reporting Dashboard Requirements](#reporting-dashboard-requirements)
|
|
10. [Implementation Roadmap](#implementation-roadmap)
|
|
|
|
---
|
|
|
|
## Research Questions & Findings
|
|
|
|
### 1. How does LiteLLM track token usage and costs?
|
|
|
|
LiteLLM provides built-in cost tracking through multiple mechanisms:
|
|
|
|
**Response Usage Object:**
|
|
```python
|
|
response = await litellm.acompletion(
|
|
model="claude-3-5-sonnet-20241022",
|
|
messages=[...]
|
|
)
|
|
# Access usage data
|
|
print(response.usage.prompt_tokens) # Input tokens
|
|
print(response.usage.completion_tokens) # Output tokens
|
|
print(response.usage.total_tokens) # Total tokens
|
|
```
|
|
|
|
**Automatic Cost Calculation:**
|
|
LiteLLM maintains a centralized pricing database (`model_prices_and_context_window.json`) with costs for 100+ models. Cost is accessible via `kwargs["response_cost"]` in callbacks.
|
|
|
|
**Custom Callbacks:**
|
|
```python
|
|
from litellm.integrations.custom_logger import CustomLogger
|
|
|
|
class SyndarixCostLogger(CustomLogger):
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
|
|
cost = kwargs.get("response_cost", 0)
|
|
model = kwargs.get("model")
|
|
usage = response_obj.usage
|
|
# Store in database
|
|
```
|
|
|
|
**Custom Pricing Override:**
|
|
```python
|
|
response = await litellm.acompletion(
|
|
model="custom-model",
|
|
messages=[...],
|
|
input_cost_per_token=0.000003, # $3 per 1M tokens
|
|
output_cost_per_token=0.000015 # $15 per 1M tokens
|
|
)
|
|
```
|
|
|
|
### 2. Best practices for LLM cost attribution in multi-tenant systems?
|
|
|
|
**Key Patterns Identified:**
|
|
|
|
1. **Hierarchical Attribution:** Track costs at multiple levels (organization > project > agent type > agent instance > request)
|
|
2. **Metadata Tagging:** Include cost center, budget ID, and user context in every request
|
|
3. **Tenant-Aware Budgets:** Implement per-customer tier budgets with different limits
|
|
4. **Chargeback/Showback:** Enable detailed cost allocation for billing integration
|
|
|
|
**Multi-Tenant Architecture for Syndarix:**
|
|
```
|
|
Organization (Billing Entity)
|
|
└── Project (Cost Center)
|
|
└── Sprint (Time-bounded Budget)
|
|
└── Agent Instance (Worker)
|
|
└── LLM Request (Atomic Cost Unit)
|
|
```
|
|
|
|
### 3. Real-time cost monitoring approaches?
|
|
|
|
**Recommended Approach:** Hybrid Redis + PostgreSQL
|
|
|
|
| Layer | Technology | Purpose |
|
|
|-------|------------|---------|
|
|
| Hot data | Redis | Real-time budget counters, rate limiting |
|
|
| Warm data | PostgreSQL | Hourly/daily aggregates, analytics |
|
|
| Cold data | S3/Archive | Monthly exports, long-term retention |
|
|
|
|
**Real-time Pipeline:**
|
|
```
|
|
LLM Request → LiteLLM Callback → Redis INCR → Budget Check
|
|
↓
|
|
Async Queue → PostgreSQL Insert → SSE Event
|
|
```
|
|
|
|
### 4. Budget enforcement strategies (soft vs hard limits)?
|
|
|
|
**Soft Limits (Recommended for most cases):**
|
|
- Send alerts at threshold percentages (50%, 80%, 100%)
|
|
- Allow continued usage with warnings
|
|
- Automatic model downgrade (e.g., Sonnet → Haiku)
|
|
- Require approval for continued expensive operations
|
|
|
|
**Hard Limits (For critical cost control):**
|
|
- Block requests once budget exhausted
|
|
- Reject LLM calls at gateway level
|
|
- Require manual budget increase
|
|
|
|
**Syndarix Recommendation:**
|
|
- **Daily budgets:** Hard limits (prevent runaway costs)
|
|
- **Weekly/Monthly budgets:** Soft limits with alerts and escalation
|
|
- **Per-request limits:** Enforce max tokens per call
|
|
|
|
### 5. Cost optimization techniques?
|
|
|
|
**Tier 1: Immediate Impact (0 code changes)**
|
|
- Semantic caching: 15-30% cost reduction
|
|
- Response caching for deterministic queries
|
|
|
|
**Tier 2: Model Selection (Architecture changes)**
|
|
- Model cascading: Start with cheaper models, escalate as needed
|
|
- 87% cost reduction possible (90% of queries handled by smaller models)
|
|
- Dynamic routing based on query complexity
|
|
|
|
**Tier 3: Prompt Engineering (Ongoing optimization)**
|
|
- Prompt compression with LLMLingua: Up to 20x compression with <5% quality loss
|
|
- Extractive compression for RAG: 2-10x compression, often improves accuracy
|
|
|
|
**Tier 4: Infrastructure (Long-term)**
|
|
- Self-hosted models for high-volume, latency-tolerant tasks
|
|
- Fine-tuned smaller models for domain-specific tasks
|
|
|
|
### 6. Reporting and visualization requirements?
|
|
|
|
**Dashboard Requirements:**
|
|
1. Real-time spend ticker (updated via SSE)
|
|
2. Cost breakdown by: project, agent type, model, time period
|
|
3. Budget utilization gauges with threshold indicators
|
|
4. Historical trend charts (daily/weekly/monthly)
|
|
5. Anomaly detection alerts (spending spikes)
|
|
6. Forecast projections based on current burn rate
|
|
7. Cost per task/feature analysis
|
|
8. Comparative efficiency metrics (cost per successful completion)
|
|
|
|
### 7. Cost estimation before execution?
|
|
|
|
**Pre-execution Estimation Pattern:**
|
|
```python
|
|
def estimate_cost(messages: list, model: str) -> CostEstimate:
|
|
"""Estimate cost before making LLM call."""
|
|
input_tokens = count_tokens(messages, model)
|
|
estimated_output = estimate_output_tokens(messages, model)
|
|
|
|
costs = MODEL_COSTS[model]
|
|
input_cost = (input_tokens / 1_000_000) * costs["input"]
|
|
output_cost = (estimated_output / 1_000_000) * costs["output"]
|
|
|
|
return CostEstimate(
|
|
input_tokens=input_tokens,
|
|
estimated_output_tokens=estimated_output,
|
|
estimated_cost_usd=input_cost + output_cost,
|
|
confidence="medium" # Based on output estimation accuracy
|
|
)
|
|
```
|
|
|
|
**Token Counting:**
|
|
- Use `tiktoken` for OpenAI models
|
|
- Use `anthropic.count_tokens()` for Claude models
|
|
- Use LiteLLM's `token_counter()` for unified counting
|
|
|
|
---
|
|
|
|
## Cost Tracking Architecture
|
|
|
|
### System Overview
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────────┐
|
|
│ Syndarix Backend │
|
|
│ ┌────────────────────────────────────────────────────────────────────┐ │
|
|
│ │ LLM Gateway (LiteLLM) │ │
|
|
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
|
|
│ │ │ Pre-Request │ │ Router │ │ Post-Request│ │ │
|
|
│ │ │ Callback │──▶│ (Failover) │──▶│ Callback │ │ │
|
|
│ │ └──────┬──────┘ └─────────────┘ └──────┬──────┘ │ │
|
|
│ │ │ │ │ │
|
|
│ │ ▼ ▼ │ │
|
|
│ │ ┌─────────────┐ ┌──────────────────────┐ │ │
|
|
│ │ │ Budget │ │ Cost Tracker │ │ │
|
|
│ │ │ Check │ │ Service │ │ │
|
|
│ │ └──────┬──────┘ └──────────┬───────────┘ │ │
|
|
│ └─────────┼────────────────────────────────┼─────────────────────────┘ │
|
|
│ │ │ │
|
|
│ ▼ ▼ │
|
|
│ ┌─────────────────────┐ ┌──────────────────────┐ │
|
|
│ │ Redis │ │ PostgreSQL │ │
|
|
│ │ ┌───────────────┐ │ │ ┌────────────────┐ │ │
|
|
│ │ │ Budget │ │ │ │ token_usage │ │ │
|
|
│ │ │ Counters │ │ │ │ (time-series) │ │ │
|
|
│ │ ├───────────────┤ │ │ ├────────────────┤ │ │
|
|
│ │ │ Rate Limits │ │ │ │ budgets │ │ │
|
|
│ │ ├───────────────┤ │ │ ├────────────────┤ │ │
|
|
│ │ │ Cache Keys │ │ │ │ alerts_log │ │ │
|
|
│ │ └───────────────┘ │ │ ├────────────────┤ │ │
|
|
│ └─────────────────────┘ │ │ daily_summary │ │ │
|
|
│ │ └────────────────┘ │ │
|
|
│ └──────────────────────┘ │
|
|
│ │ │
|
|
│ ┌──────────────────────────────┴─────────────┐ │
|
|
│ ▼ ▼ │
|
|
│ ┌─────────────────────┐ ┌────────────────────────┐ │
|
|
│ │ Event Bus (SSE) │ │ Celery Beat │ │
|
|
│ │ - cost_update │ │ - budget_check │ │
|
|
│ │ - budget_alert │ │ - daily_rollup │ │
|
|
│ │ - threshold_warn │ │ - monthly_report │ │
|
|
│ └─────────────────────┘ └────────────────────────┘ │
|
|
└─────────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### Component Responsibilities
|
|
|
|
| Component | Responsibility |
|
|
|-----------|----------------|
|
|
| LLM Gateway | Route requests, capture usage, enforce limits |
|
|
| Cost Tracker | Calculate costs, persist records, update counters |
|
|
| Redis | Real-time budget counters, rate limiting, caching |
|
|
| PostgreSQL | Persistent usage records, aggregations, analytics |
|
|
| Event Bus | Real-time notifications to dashboards |
|
|
| Celery Beat | Scheduled tasks (rollups, alerts, reports) |
|
|
|
|
---
|
|
|
|
## Database Schema Design
|
|
|
|
### Core Tables
|
|
|
|
```python
|
|
# app/models/cost_tracking.py
|
|
from sqlalchemy import Column, String, Integer, Float, DateTime, ForeignKey, Enum, Index
|
|
from sqlalchemy.dialects.postgresql import UUID, JSONB
|
|
from sqlalchemy.orm import relationship
|
|
from app.models.base import Base, TimestampMixin, UUIDMixin
|
|
import enum
|
|
|
|
class BudgetPeriod(str, enum.Enum):
|
|
DAILY = "daily"
|
|
WEEKLY = "weekly"
|
|
MONTHLY = "monthly"
|
|
|
|
class AlertSeverity(str, enum.Enum):
|
|
INFO = "info" # 50% threshold
|
|
WARNING = "warning" # 80% threshold
|
|
CRITICAL = "critical" # 100% threshold
|
|
|
|
class AlertStatus(str, enum.Enum):
|
|
PENDING = "pending"
|
|
ACKNOWLEDGED = "acknowledged"
|
|
RESOLVED = "resolved"
|
|
|
|
|
|
class TokenUsage(Base, UUIDMixin, TimestampMixin):
|
|
"""Individual LLM request usage record."""
|
|
__tablename__ = "token_usage"
|
|
|
|
# Attribution
|
|
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
|
|
agent_instance_id = Column(UUID(as_uuid=True), ForeignKey("agent_instances.id"), nullable=True)
|
|
agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True)
|
|
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
|
|
|
|
# Request details
|
|
request_id = Column(String(64), unique=True, nullable=False) # Correlation ID
|
|
model = Column(String(100), nullable=False)
|
|
provider = Column(String(50), nullable=False) # anthropic, openai, ollama
|
|
|
|
# Token counts
|
|
prompt_tokens = Column(Integer, nullable=False)
|
|
completion_tokens = Column(Integer, nullable=False)
|
|
total_tokens = Column(Integer, nullable=False)
|
|
cached_tokens = Column(Integer, default=0) # Tokens served from cache
|
|
|
|
# Cost calculation
|
|
input_cost_usd = Column(Float, nullable=False)
|
|
output_cost_usd = Column(Float, nullable=False)
|
|
total_cost_usd = Column(Float, nullable=False)
|
|
|
|
# Timing
|
|
latency_ms = Column(Integer, nullable=True)
|
|
request_timestamp = Column(DateTime(timezone=True), nullable=False)
|
|
|
|
# Metadata
|
|
task_type = Column(String(50), nullable=True) # reasoning, coding, chat, etc.
|
|
success = Column(Boolean, default=True)
|
|
error_type = Column(String(100), nullable=True)
|
|
metadata = Column(JSONB, default={}) # Additional context
|
|
|
|
# Indexes for common queries
|
|
__table_args__ = (
|
|
Index("ix_token_usage_project_timestamp", "project_id", "request_timestamp"),
|
|
Index("ix_token_usage_agent_instance_timestamp", "agent_instance_id", "request_timestamp"),
|
|
Index("ix_token_usage_model_timestamp", "model", "request_timestamp"),
|
|
Index("ix_token_usage_request_timestamp", "request_timestamp"),
|
|
)
|
|
|
|
|
|
class Budget(Base, UUIDMixin, TimestampMixin):
|
|
"""Budget definition for cost control."""
|
|
__tablename__ = "budgets"
|
|
|
|
# Scope (one of these will be set)
|
|
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True)
|
|
agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True)
|
|
organization_id = Column(UUID(as_uuid=True), ForeignKey("organizations.id"), nullable=True)
|
|
|
|
# Budget configuration
|
|
name = Column(String(100), nullable=False)
|
|
period = Column(Enum(BudgetPeriod), nullable=False)
|
|
limit_usd = Column(Float, nullable=False)
|
|
|
|
# Thresholds (percentages)
|
|
warn_threshold = Column(Float, default=0.5) # 50%
|
|
alert_threshold = Column(Float, default=0.8) # 80%
|
|
critical_threshold = Column(Float, default=1.0) # 100%
|
|
|
|
# Enforcement
|
|
hard_limit = Column(Boolean, default=False) # Block requests at limit
|
|
auto_downgrade = Column(Boolean, default=True) # Downgrade to cheaper model
|
|
|
|
# Status
|
|
is_active = Column(Boolean, default=True)
|
|
current_spend = Column(Float, default=0.0) # Cached from Redis
|
|
period_start = Column(DateTime(timezone=True), nullable=False)
|
|
period_end = Column(DateTime(timezone=True), nullable=False)
|
|
|
|
__table_args__ = (
|
|
Index("ix_budgets_project_active", "project_id", "is_active"),
|
|
Index("ix_budgets_period_end", "period_end"),
|
|
)
|
|
|
|
|
|
class BudgetAlert(Base, UUIDMixin, TimestampMixin):
|
|
"""Alert record for budget threshold breaches."""
|
|
__tablename__ = "budget_alerts"
|
|
|
|
budget_id = Column(UUID(as_uuid=True), ForeignKey("budgets.id"), nullable=False)
|
|
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True)
|
|
|
|
severity = Column(Enum(AlertSeverity), nullable=False)
|
|
status = Column(Enum(AlertStatus), default=AlertStatus.PENDING)
|
|
|
|
threshold_percent = Column(Float, nullable=False)
|
|
current_spend = Column(Float, nullable=False)
|
|
budget_limit = Column(Float, nullable=False)
|
|
|
|
message = Column(String(500), nullable=False)
|
|
acknowledged_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
|
|
acknowledged_at = Column(DateTime(timezone=True), nullable=True)
|
|
|
|
# Notification tracking
|
|
notifications_sent = Column(JSONB, default=[]) # [{"channel": "email", "sent_at": "..."}]
|
|
|
|
budget = relationship("Budget", backref="alerts")
|
|
|
|
|
|
class DailyCostSummary(Base, UUIDMixin):
|
|
"""Materialized daily cost aggregations for fast reporting."""
|
|
__tablename__ = "daily_cost_summaries"
|
|
|
|
date = Column(DateTime(timezone=True), nullable=False)
|
|
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=True)
|
|
agent_type_id = Column(UUID(as_uuid=True), ForeignKey("agent_types.id"), nullable=True)
|
|
model = Column(String(100), nullable=True)
|
|
|
|
# Aggregated metrics
|
|
request_count = Column(Integer, default=0)
|
|
total_prompt_tokens = Column(Integer, default=0)
|
|
total_completion_tokens = Column(Integer, default=0)
|
|
total_tokens = Column(Integer, default=0)
|
|
total_cost_usd = Column(Float, default=0.0)
|
|
|
|
# Performance metrics
|
|
avg_latency_ms = Column(Float, nullable=True)
|
|
cache_hit_rate = Column(Float, default=0.0)
|
|
success_rate = Column(Float, default=1.0)
|
|
|
|
__table_args__ = (
|
|
Index("ix_daily_summary_date_project", "date", "project_id"),
|
|
Index("ix_daily_summary_date_model", "date", "model"),
|
|
)
|
|
|
|
|
|
class ModelPricing(Base, UUIDMixin, TimestampMixin):
|
|
"""Custom model pricing overrides."""
|
|
__tablename__ = "model_pricing"
|
|
|
|
model = Column(String(100), unique=True, nullable=False)
|
|
provider = Column(String(50), nullable=False)
|
|
|
|
input_cost_per_million = Column(Float, nullable=False)
|
|
output_cost_per_million = Column(Float, nullable=False)
|
|
|
|
# Context limits
|
|
max_input_tokens = Column(Integer, nullable=True)
|
|
max_output_tokens = Column(Integer, nullable=True)
|
|
|
|
# Validity period
|
|
effective_from = Column(DateTime(timezone=True), nullable=False)
|
|
effective_until = Column(DateTime(timezone=True), nullable=True)
|
|
|
|
is_active = Column(Boolean, default=True)
|
|
```
|
|
|
|
### Redis Key Structure
|
|
|
|
```python
|
|
# Redis key patterns for real-time tracking
|
|
|
|
REDIS_KEYS = {
|
|
# Budget counters (reset on period start)
|
|
"budget_spend": "budget:{budget_id}:spend", # INCRBYFLOAT
|
|
"budget_requests": "budget:{budget_id}:requests", # INCR
|
|
|
|
# Project-level aggregations
|
|
"project_daily_spend": "project:{project_id}:daily:{date}:spend",
|
|
"project_monthly_spend": "project:{project_id}:monthly:{year_month}:spend",
|
|
|
|
# Rate limiting
|
|
"rate_limit": "ratelimit:{project_id}:{model}:{window}",
|
|
|
|
# Semantic cache
|
|
"semantic_cache": "cache:semantic:{hash}",
|
|
|
|
# Real-time metrics (for dashboard)
|
|
"realtime_spend": "metrics:realtime:spend:{project_id}",
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## LiteLLM Callback Implementation
|
|
|
|
### Custom Callback Handler
|
|
|
|
```python
|
|
# app/services/cost_tracking/callbacks.py
|
|
from typing import Any
|
|
from datetime import datetime, UTC
|
|
import asyncio
|
|
from litellm.integrations.custom_logger import CustomLogger
|
|
from app.core.config import settings
|
|
from app.services.cost_tracking.tracker import CostTracker
|
|
from app.services.cost_tracking.budget import BudgetEnforcer
|
|
from app.services.events import EventBus
|
|
|
|
class SyndarixCostCallback(CustomLogger):
|
|
"""
|
|
LiteLLM callback handler for cost tracking and budget enforcement.
|
|
|
|
Integrates with:
|
|
- Redis for real-time budget counters
|
|
- PostgreSQL for persistent usage records
|
|
- Event bus for real-time dashboard updates
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.tracker = CostTracker()
|
|
self.budget_enforcer = BudgetEnforcer()
|
|
self.event_bus = EventBus()
|
|
|
|
def log_pre_api_call(self, model: str, messages: list, kwargs: dict):
|
|
"""Pre-request validation and cost estimation."""
|
|
project_id = kwargs.get("metadata", {}).get("project_id")
|
|
agent_id = kwargs.get("metadata", {}).get("agent_id")
|
|
|
|
if not project_id:
|
|
return # Skip tracking for internal calls
|
|
|
|
# Check budget before making request
|
|
budget_status = self.budget_enforcer.check_budget_sync(project_id)
|
|
|
|
if budget_status.exceeded and budget_status.hard_limit:
|
|
raise BudgetExceededError(
|
|
f"Budget exceeded for project {project_id}. "
|
|
f"Limit: ${budget_status.limit:.2f}, "
|
|
f"Spent: ${budget_status.spent:.2f}"
|
|
)
|
|
|
|
if budget_status.exceeded and budget_status.auto_downgrade:
|
|
# Suggest model downgrade
|
|
kwargs["model"] = self._get_fallback_model(model)
|
|
|
|
async def async_log_success_event(
|
|
self,
|
|
kwargs: dict,
|
|
response_obj: Any,
|
|
start_time: float,
|
|
end_time: float
|
|
):
|
|
"""Record successful request usage and costs."""
|
|
metadata = kwargs.get("metadata", {})
|
|
project_id = metadata.get("project_id")
|
|
agent_id = metadata.get("agent_id")
|
|
agent_type_id = metadata.get("agent_type_id")
|
|
|
|
if not project_id:
|
|
return
|
|
|
|
# Extract usage data
|
|
usage = response_obj.usage
|
|
model = kwargs.get("model")
|
|
cost = kwargs.get("response_cost", 0)
|
|
|
|
# Calculate costs if not provided
|
|
if cost == 0:
|
|
cost = self._calculate_cost(model, usage)
|
|
|
|
# Create usage record
|
|
usage_record = {
|
|
"request_id": kwargs.get("litellm_call_id", str(uuid.uuid4())),
|
|
"project_id": project_id,
|
|
"agent_instance_id": agent_id,
|
|
"agent_type_id": agent_type_id,
|
|
"model": model,
|
|
"provider": self._get_provider(model),
|
|
"prompt_tokens": usage.prompt_tokens,
|
|
"completion_tokens": usage.completion_tokens,
|
|
"total_tokens": usage.total_tokens,
|
|
"input_cost_usd": self._calculate_input_cost(model, usage.prompt_tokens),
|
|
"output_cost_usd": self._calculate_output_cost(model, usage.completion_tokens),
|
|
"total_cost_usd": cost,
|
|
"latency_ms": int((end_time - start_time) * 1000),
|
|
"request_timestamp": datetime.now(UTC),
|
|
"task_type": metadata.get("task_type"),
|
|
"success": True,
|
|
}
|
|
|
|
# Async operations in parallel
|
|
await asyncio.gather(
|
|
self.tracker.record_usage(usage_record),
|
|
self.budget_enforcer.increment_spend(project_id, cost),
|
|
self._publish_cost_event(project_id, usage_record),
|
|
)
|
|
|
|
async def async_log_failure_event(
|
|
self,
|
|
kwargs: dict,
|
|
response_obj: Any,
|
|
start_time: float,
|
|
end_time: float
|
|
):
|
|
"""Record failed request (still costs input tokens)."""
|
|
metadata = kwargs.get("metadata", {})
|
|
project_id = metadata.get("project_id")
|
|
|
|
if not project_id:
|
|
return
|
|
|
|
# Failed requests still consume input tokens
|
|
input_tokens = kwargs.get("input_tokens", 0)
|
|
model = kwargs.get("model")
|
|
|
|
if input_tokens > 0:
|
|
input_cost = self._calculate_input_cost(model, input_tokens)
|
|
|
|
usage_record = {
|
|
"request_id": kwargs.get("litellm_call_id", str(uuid.uuid4())),
|
|
"project_id": project_id,
|
|
"model": model,
|
|
"prompt_tokens": input_tokens,
|
|
"completion_tokens": 0,
|
|
"total_tokens": input_tokens,
|
|
"total_cost_usd": input_cost,
|
|
"success": False,
|
|
"error_type": type(response_obj).__name__,
|
|
"request_timestamp": datetime.now(UTC),
|
|
}
|
|
|
|
await self.tracker.record_usage(usage_record)
|
|
|
|
async def _publish_cost_event(self, project_id: str, usage: dict):
|
|
"""Publish real-time cost event to dashboard."""
|
|
await self.event_bus.publish(f"project:{project_id}", {
|
|
"type": "cost_update",
|
|
"data": {
|
|
"model": usage["model"],
|
|
"tokens": usage["total_tokens"],
|
|
"cost_usd": usage["total_cost_usd"],
|
|
"timestamp": usage["request_timestamp"].isoformat(),
|
|
}
|
|
})
|
|
|
|
def _calculate_cost(self, model: str, usage) -> float:
|
|
"""Calculate total cost from usage."""
|
|
input_cost = self._calculate_input_cost(model, usage.prompt_tokens)
|
|
output_cost = self._calculate_output_cost(model, usage.completion_tokens)
|
|
return input_cost + output_cost
|
|
|
|
def _calculate_input_cost(self, model: str, tokens: int) -> float:
|
|
"""Calculate input token cost."""
|
|
costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
|
|
return (tokens / 1_000_000) * costs["input"]
|
|
|
|
def _calculate_output_cost(self, model: str, tokens: int) -> float:
|
|
"""Calculate output token cost."""
|
|
costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
|
|
return (tokens / 1_000_000) * costs["output"]
|
|
|
|
def _get_provider(self, model: str) -> str:
|
|
"""Extract provider from model name."""
|
|
if model.startswith("claude"):
|
|
return "anthropic"
|
|
elif model.startswith("gpt") or model.startswith("o1"):
|
|
return "openai"
|
|
elif model.startswith("ollama/"):
|
|
return "ollama"
|
|
return "unknown"
|
|
|
|
def _get_fallback_model(self, model: str) -> str:
|
|
"""Get cheaper fallback model."""
|
|
fallbacks = {
|
|
"claude-3-5-sonnet-20241022": "claude-3-haiku-20240307",
|
|
"gpt-4-turbo": "gpt-4o-mini",
|
|
"gpt-4o": "gpt-4o-mini",
|
|
}
|
|
return fallbacks.get(model, model)
|
|
|
|
|
|
# Model cost database (per 1M tokens)
|
|
MODEL_COSTS = {
|
|
# Anthropic
|
|
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
|
|
"claude-3-opus-20240229": {"input": 15.00, "output": 75.00},
|
|
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
|
|
|
|
# OpenAI
|
|
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
|
|
"gpt-4o": {"input": 2.50, "output": 10.00},
|
|
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
|
|
"o1-preview": {"input": 15.00, "output": 60.00},
|
|
"o1-mini": {"input": 3.00, "output": 12.00},
|
|
|
|
# Local (compute-only, no API cost)
|
|
"ollama/llama3": {"input": 0.00, "output": 0.00},
|
|
"ollama/mixtral": {"input": 0.00, "output": 0.00},
|
|
}
|
|
```
|
|
|
|
### Registering the Callback
|
|
|
|
```python
|
|
# app/services/llm_gateway.py
|
|
import litellm
|
|
from app.services.cost_tracking.callbacks import SyndarixCostCallback
|
|
|
|
# Register the callback globally
|
|
syndarix_callback = SyndarixCostCallback()
|
|
litellm.callbacks = [syndarix_callback]
|
|
|
|
# Or register specific callbacks
|
|
litellm.success_callback = [syndarix_callback.async_log_success_event]
|
|
litellm.failure_callback = [syndarix_callback.async_log_failure_event]
|
|
```
|
|
|
|
---
|
|
|
|
## Budget Management System
|
|
|
|
### Budget Enforcer
|
|
|
|
```python
|
|
# app/services/cost_tracking/budget.py
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, UTC, timedelta
|
|
from typing import Optional
|
|
import redis.asyncio as redis
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.models.cost_tracking import Budget, BudgetPeriod, BudgetAlert, AlertSeverity
|
|
|
|
@dataclass
|
|
class BudgetStatus:
|
|
budget_id: str
|
|
limit: float
|
|
spent: float
|
|
remaining: float
|
|
percent_used: float
|
|
exceeded: bool
|
|
hard_limit: bool
|
|
auto_downgrade: bool
|
|
threshold_breached: Optional[str] = None # warn, alert, critical
|
|
|
|
|
|
class BudgetEnforcer:
|
|
"""Real-time budget enforcement using Redis."""
|
|
|
|
def __init__(self, redis_url: str = None):
|
|
self.redis = redis.from_url(redis_url or settings.REDIS_URL)
|
|
|
|
async def check_budget(self, project_id: str) -> BudgetStatus:
|
|
"""Check current budget status for a project."""
|
|
# Get active budgets for project
|
|
budgets = await self._get_active_budgets(project_id)
|
|
|
|
if not budgets:
|
|
return BudgetStatus(
|
|
budget_id=None,
|
|
limit=float("inf"),
|
|
spent=0,
|
|
remaining=float("inf"),
|
|
percent_used=0,
|
|
exceeded=False,
|
|
hard_limit=False,
|
|
auto_downgrade=False,
|
|
)
|
|
|
|
# Check most restrictive budget
|
|
for budget in budgets:
|
|
spend_key = f"budget:{budget.id}:spend"
|
|
spent = float(await self.redis.get(spend_key) or 0)
|
|
|
|
percent_used = (spent / budget.limit_usd) * 100
|
|
exceeded = spent >= budget.limit_usd
|
|
|
|
# Determine threshold breach
|
|
threshold_breached = None
|
|
if percent_used >= budget.critical_threshold * 100:
|
|
threshold_breached = "critical"
|
|
elif percent_used >= budget.alert_threshold * 100:
|
|
threshold_breached = "alert"
|
|
elif percent_used >= budget.warn_threshold * 100:
|
|
threshold_breached = "warn"
|
|
|
|
return BudgetStatus(
|
|
budget_id=str(budget.id),
|
|
limit=budget.limit_usd,
|
|
spent=spent,
|
|
remaining=max(0, budget.limit_usd - spent),
|
|
percent_used=percent_used,
|
|
exceeded=exceeded,
|
|
hard_limit=budget.hard_limit,
|
|
auto_downgrade=budget.auto_downgrade,
|
|
threshold_breached=threshold_breached,
|
|
)
|
|
|
|
async def increment_spend(self, project_id: str, amount: float) -> BudgetStatus:
|
|
"""Increment budget spend and check thresholds."""
|
|
budgets = await self._get_active_budgets(project_id)
|
|
|
|
for budget in budgets:
|
|
spend_key = f"budget:{budget.id}:spend"
|
|
|
|
# Atomic increment
|
|
new_spend = await self.redis.incrbyfloat(spend_key, amount)
|
|
|
|
# Check and trigger alerts
|
|
await self._check_thresholds(budget, new_spend)
|
|
|
|
return await self.check_budget(project_id)
|
|
|
|
async def _check_thresholds(self, budget: Budget, current_spend: float):
|
|
"""Check thresholds and create alerts if needed."""
|
|
percent = (current_spend / budget.limit_usd) * 100
|
|
|
|
thresholds = [
|
|
(budget.warn_threshold * 100, AlertSeverity.INFO),
|
|
(budget.alert_threshold * 100, AlertSeverity.WARNING),
|
|
(budget.critical_threshold * 100, AlertSeverity.CRITICAL),
|
|
]
|
|
|
|
for threshold, severity in thresholds:
|
|
if percent >= threshold:
|
|
# Check if alert already exists for this threshold
|
|
alert_key = f"budget:{budget.id}:alert:{severity.value}"
|
|
if not await self.redis.exists(alert_key):
|
|
await self._create_alert(budget, severity, percent, current_spend)
|
|
# Mark alert as sent
|
|
await self.redis.setex(alert_key, 86400, "1") # 24h TTL
|
|
|
|
async def _create_alert(
|
|
self,
|
|
budget: Budget,
|
|
severity: AlertSeverity,
|
|
percent: float,
|
|
current_spend: float
|
|
):
|
|
"""Create budget alert and trigger notifications."""
|
|
alert = BudgetAlert(
|
|
budget_id=budget.id,
|
|
project_id=budget.project_id,
|
|
severity=severity,
|
|
threshold_percent=percent,
|
|
current_spend=current_spend,
|
|
budget_limit=budget.limit_usd,
|
|
message=f"Budget '{budget.name}' at {percent:.1f}% (${current_spend:.2f}/${budget.limit_usd:.2f})"
|
|
)
|
|
|
|
# Persist alert
|
|
async with get_async_session() as session:
|
|
session.add(alert)
|
|
await session.commit()
|
|
|
|
# Publish alert event
|
|
await EventBus().publish(f"project:{budget.project_id}", {
|
|
"type": "budget_alert",
|
|
"data": {
|
|
"severity": severity.value,
|
|
"message": alert.message,
|
|
"percent_used": percent,
|
|
"budget_name": budget.name,
|
|
}
|
|
})
|
|
|
|
async def reset_budget_period(self, budget_id: str):
|
|
"""Reset budget counter for new period."""
|
|
spend_key = f"budget:{budget_id}:spend"
|
|
await self.redis.delete(spend_key)
|
|
|
|
# Clear alert markers
|
|
for severity in AlertSeverity:
|
|
alert_key = f"budget:{budget_id}:alert:{severity.value}"
|
|
await self.redis.delete(alert_key)
|
|
|
|
def check_budget_sync(self, project_id: str) -> BudgetStatus:
|
|
"""Synchronous version for pre-request checks."""
|
|
import asyncio
|
|
loop = asyncio.get_event_loop()
|
|
return loop.run_until_complete(self.check_budget(project_id))
|
|
```
|
|
|
|
### Budget Period Management (Celery Task)
|
|
|
|
```python
|
|
# app/tasks/budget_tasks.py
|
|
from celery import shared_task
|
|
from datetime import datetime, UTC, timedelta
|
|
from app.services.cost_tracking.budget import BudgetEnforcer
|
|
|
|
@shared_task
|
|
def check_budget_periods():
|
|
"""
|
|
Scheduled task to manage budget periods.
|
|
Run every hour via Celery Beat.
|
|
"""
|
|
now = datetime.now(UTC)
|
|
enforcer = BudgetEnforcer()
|
|
|
|
# Find budgets that need period reset
|
|
with get_session() as session:
|
|
expired_budgets = session.query(Budget).filter(
|
|
Budget.is_active == True,
|
|
Budget.period_end <= now
|
|
).all()
|
|
|
|
for budget in expired_budgets:
|
|
# Archive current period spend
|
|
archive_budget_period(budget)
|
|
|
|
# Calculate new period
|
|
if budget.period == BudgetPeriod.DAILY:
|
|
period_start = now.replace(hour=0, minute=0, second=0)
|
|
period_end = period_start + timedelta(days=1)
|
|
elif budget.period == BudgetPeriod.WEEKLY:
|
|
period_start = now - timedelta(days=now.weekday())
|
|
period_end = period_start + timedelta(weeks=1)
|
|
elif budget.period == BudgetPeriod.MONTHLY:
|
|
period_start = now.replace(day=1)
|
|
next_month = period_start + timedelta(days=32)
|
|
period_end = next_month.replace(day=1)
|
|
|
|
# Update budget
|
|
budget.period_start = period_start
|
|
budget.period_end = period_end
|
|
budget.current_spend = 0
|
|
|
|
session.commit()
|
|
|
|
# Reset Redis counter
|
|
asyncio.run(enforcer.reset_budget_period(str(budget.id)))
|
|
|
|
@shared_task
|
|
def daily_cost_rollup():
|
|
"""
|
|
Aggregate daily costs into summary table.
|
|
Run at 1 AM daily via Celery Beat.
|
|
"""
|
|
yesterday = datetime.now(UTC).date() - timedelta(days=1)
|
|
|
|
with get_session() as session:
|
|
# Aggregate by project, agent_type, model
|
|
results = session.execute("""
|
|
INSERT INTO daily_cost_summaries
|
|
(date, project_id, agent_type_id, model,
|
|
request_count, total_prompt_tokens, total_completion_tokens,
|
|
total_tokens, total_cost_usd, avg_latency_ms,
|
|
cache_hit_rate, success_rate)
|
|
SELECT
|
|
DATE(request_timestamp) as date,
|
|
project_id,
|
|
agent_type_id,
|
|
model,
|
|
COUNT(*) as request_count,
|
|
SUM(prompt_tokens) as total_prompt_tokens,
|
|
SUM(completion_tokens) as total_completion_tokens,
|
|
SUM(total_tokens) as total_tokens,
|
|
SUM(total_cost_usd) as total_cost_usd,
|
|
AVG(latency_ms) as avg_latency_ms,
|
|
SUM(cached_tokens)::float / NULLIF(SUM(total_tokens), 0) as cache_hit_rate,
|
|
AVG(CASE WHEN success THEN 1 ELSE 0 END) as success_rate
|
|
FROM token_usage
|
|
WHERE DATE(request_timestamp) = :yesterday
|
|
GROUP BY DATE(request_timestamp), project_id, agent_type_id, model
|
|
ON CONFLICT DO NOTHING
|
|
""", {"yesterday": yesterday})
|
|
|
|
session.commit()
|
|
```
|
|
|
|
---
|
|
|
|
## Alert System Integration
|
|
|
|
### Alert Configuration
|
|
|
|
```python
|
|
# app/services/cost_tracking/alerts.py
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from typing import List, Optional
|
|
|
|
class AlertChannel(str, Enum):
|
|
EMAIL = "email"
|
|
SLACK = "slack"
|
|
WEBHOOK = "webhook"
|
|
SSE = "sse" # Real-time dashboard
|
|
|
|
@dataclass
|
|
class AlertRule:
|
|
budget_id: str
|
|
channels: List[AlertChannel]
|
|
recipients: List[str] # Email addresses, Slack channels, etc.
|
|
escalation_delay: int = 0 # Minutes before escalating
|
|
|
|
class AlertDispatcher:
|
|
"""Dispatch budget alerts to configured channels."""
|
|
|
|
async def dispatch(self, alert: BudgetAlert, rules: List[AlertRule]):
|
|
"""Send alert through all configured channels."""
|
|
for rule in rules:
|
|
if rule.budget_id == str(alert.budget_id):
|
|
for channel in rule.channels:
|
|
await self._send_alert(channel, alert, rule.recipients)
|
|
|
|
async def _send_alert(
|
|
self,
|
|
channel: AlertChannel,
|
|
alert: BudgetAlert,
|
|
recipients: List[str]
|
|
):
|
|
"""Send alert to specific channel."""
|
|
notification = {
|
|
"channel": channel.value,
|
|
"sent_at": datetime.now(UTC).isoformat(),
|
|
"recipients": recipients,
|
|
}
|
|
|
|
if channel == AlertChannel.EMAIL:
|
|
await self._send_email_alert(alert, recipients)
|
|
elif channel == AlertChannel.SLACK:
|
|
await self._send_slack_alert(alert, recipients)
|
|
elif channel == AlertChannel.WEBHOOK:
|
|
await self._send_webhook_alert(alert, recipients)
|
|
elif channel == AlertChannel.SSE:
|
|
# Already handled via EventBus
|
|
pass
|
|
|
|
# Track notification
|
|
alert.notifications_sent.append(notification)
|
|
|
|
async def _send_slack_alert(self, alert: BudgetAlert, channels: List[str]):
|
|
"""Send alert to Slack."""
|
|
color = {
|
|
AlertSeverity.INFO: "#36a64f", # Green
|
|
AlertSeverity.WARNING: "#ff9800", # Orange
|
|
AlertSeverity.CRITICAL: "#f44336", # Red
|
|
}[alert.severity]
|
|
|
|
payload = {
|
|
"attachments": [{
|
|
"color": color,
|
|
"title": f"Budget Alert: {alert.message}",
|
|
"fields": [
|
|
{"title": "Budget", "value": f"${alert.budget_limit:.2f}", "short": True},
|
|
{"title": "Current Spend", "value": f"${alert.current_spend:.2f}", "short": True},
|
|
{"title": "Usage", "value": f"{alert.threshold_percent:.1f}%", "short": True},
|
|
{"title": "Severity", "value": alert.severity.value.upper(), "short": True},
|
|
],
|
|
"ts": datetime.now(UTC).timestamp(),
|
|
}]
|
|
}
|
|
|
|
for channel in channels:
|
|
await send_slack_message(channel, payload)
|
|
```
|
|
|
|
---
|
|
|
|
## Cost Optimization Strategies
|
|
|
|
### 1. Semantic Caching
|
|
|
|
```python
|
|
# app/services/cost_tracking/cache.py
|
|
import hashlib
|
|
import numpy as np
|
|
from typing import Optional, Tuple
|
|
import redis.asyncio as redis
|
|
|
|
class SemanticCache:
|
|
"""
|
|
Cache LLM responses based on semantic similarity.
|
|
Uses embedding vectors to find similar queries.
|
|
"""
|
|
|
|
def __init__(self, redis_url: str, similarity_threshold: float = 0.95):
|
|
self.redis = redis.from_url(redis_url)
|
|
self.threshold = similarity_threshold
|
|
self.embedding_model = "text-embedding-3-small" # OpenAI
|
|
|
|
async def get(
|
|
self,
|
|
messages: list,
|
|
model: str,
|
|
project_id: str
|
|
) -> Optional[Tuple[str, dict]]:
|
|
"""
|
|
Check cache for semantically similar query.
|
|
|
|
Returns:
|
|
Tuple of (cached_response, usage_info) or None
|
|
"""
|
|
query_embedding = await self._get_embedding(messages)
|
|
cache_key_pattern = f"cache:semantic:{project_id}:{model}:*"
|
|
|
|
# Search for similar embeddings
|
|
async for key in self.redis.scan_iter(match=cache_key_pattern):
|
|
cached = await self.redis.hgetall(key)
|
|
if cached:
|
|
cached_embedding = np.frombuffer(cached[b"embedding"], dtype=np.float32)
|
|
similarity = self._cosine_similarity(query_embedding, cached_embedding)
|
|
|
|
if similarity >= self.threshold:
|
|
return (
|
|
cached[b"response"].decode(),
|
|
{
|
|
"cache_hit": True,
|
|
"similarity": similarity,
|
|
"tokens_saved": int(cached[b"tokens"]),
|
|
"cost_saved": float(cached[b"cost"]),
|
|
}
|
|
)
|
|
|
|
return None
|
|
|
|
async def set(
|
|
self,
|
|
messages: list,
|
|
model: str,
|
|
project_id: str,
|
|
response: str,
|
|
usage: dict,
|
|
ttl: int = 3600
|
|
):
|
|
"""Cache response with embedding for future similarity matching."""
|
|
query_embedding = await self._get_embedding(messages)
|
|
|
|
# Create unique key
|
|
content_hash = hashlib.sha256(str(messages).encode()).hexdigest()[:16]
|
|
cache_key = f"cache:semantic:{project_id}:{model}:{content_hash}"
|
|
|
|
await self.redis.hset(cache_key, mapping={
|
|
"embedding": query_embedding.tobytes(),
|
|
"response": response,
|
|
"tokens": usage["total_tokens"],
|
|
"cost": usage["total_cost_usd"],
|
|
"model": model,
|
|
"created_at": datetime.now(UTC).isoformat(),
|
|
})
|
|
await self.redis.expire(cache_key, ttl)
|
|
|
|
async def _get_embedding(self, messages: list) -> np.ndarray:
|
|
"""Generate embedding for messages."""
|
|
# Extract text content
|
|
text = " ".join(m.get("content", "") for m in messages)
|
|
|
|
# Call embedding API
|
|
response = await litellm.aembedding(
|
|
model=self.embedding_model,
|
|
input=text
|
|
)
|
|
|
|
return np.array(response.data[0].embedding, dtype=np.float32)
|
|
|
|
@staticmethod
|
|
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
|
"""Calculate cosine similarity between vectors."""
|
|
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
|
|
```
|
|
|
|
### 2. Model Cascading
|
|
|
|
```python
|
|
# app/services/cost_tracking/cascade.py
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional
|
|
|
|
@dataclass
|
|
class CascadeModel:
|
|
model: str
|
|
cost_per_1k_tokens: float
|
|
max_complexity: float # 0-1 score
|
|
timeout: int = 30
|
|
|
|
class ModelCascade:
|
|
"""
|
|
Route queries to cheapest capable model.
|
|
Escalate to more expensive models only when needed.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.models = [
|
|
CascadeModel("claude-3-haiku-20240307", 0.00075, 0.3, timeout=10),
|
|
CascadeModel("gpt-4o-mini", 0.000375, 0.4, timeout=15),
|
|
CascadeModel("claude-3-5-sonnet-20241022", 0.009, 0.8, timeout=30),
|
|
CascadeModel("gpt-4-turbo", 0.02, 1.0, timeout=60),
|
|
]
|
|
self.complexity_classifier = None # TODO: Train classifier
|
|
|
|
async def route(
|
|
self,
|
|
messages: list,
|
|
required_quality: float = 0.7
|
|
) -> str:
|
|
"""
|
|
Determine the best model for the query.
|
|
|
|
Args:
|
|
messages: The query messages
|
|
required_quality: Minimum quality threshold (0-1)
|
|
|
|
Returns:
|
|
Model identifier to use
|
|
"""
|
|
complexity = await self._estimate_complexity(messages)
|
|
|
|
# Find cheapest model that meets requirements
|
|
for model in self.models:
|
|
if model.max_complexity >= complexity and model.max_complexity >= required_quality:
|
|
return model.model
|
|
|
|
# Fallback to most capable
|
|
return self.models[-1].model
|
|
|
|
async def _estimate_complexity(self, messages: list) -> float:
|
|
"""
|
|
Estimate query complexity (0-1).
|
|
|
|
Factors:
|
|
- Message length
|
|
- Technical terms
|
|
- Multi-step reasoning required
|
|
- Code generation requested
|
|
"""
|
|
text = " ".join(m.get("content", "") for m in messages)
|
|
|
|
# Simple heuristics (replace with ML model)
|
|
complexity = 0.0
|
|
|
|
# Length factor
|
|
word_count = len(text.split())
|
|
if word_count > 500:
|
|
complexity += 0.2
|
|
elif word_count > 200:
|
|
complexity += 0.1
|
|
|
|
# Technical indicators
|
|
technical_terms = ["implement", "architect", "optimize", "debug", "refactor"]
|
|
if any(term in text.lower() for term in technical_terms):
|
|
complexity += 0.3
|
|
|
|
# Code indicators
|
|
if "```" in text or "code" in text.lower():
|
|
complexity += 0.2
|
|
|
|
# Multi-step indicators
|
|
if "step" in text.lower() or "first" in text.lower() or "then" in text.lower():
|
|
complexity += 0.2
|
|
|
|
return min(1.0, complexity)
|
|
|
|
async def execute_with_fallback(
|
|
self,
|
|
messages: list,
|
|
required_quality: float = 0.7,
|
|
confidence_threshold: float = 0.8
|
|
) -> dict:
|
|
"""
|
|
Execute query with automatic fallback to stronger models.
|
|
"""
|
|
for model in self.models:
|
|
if model.max_complexity < required_quality:
|
|
continue
|
|
|
|
try:
|
|
response = await litellm.acompletion(
|
|
model=model.model,
|
|
messages=messages,
|
|
timeout=model.timeout,
|
|
)
|
|
|
|
# Check response quality (simplified)
|
|
confidence = await self._evaluate_response(response, messages)
|
|
|
|
if confidence >= confidence_threshold:
|
|
return {
|
|
"response": response,
|
|
"model_used": model.model,
|
|
"fallback_count": self.models.index(model),
|
|
}
|
|
|
|
except Exception as e:
|
|
# Log and try next model
|
|
continue
|
|
|
|
# Final fallback
|
|
return await self._execute_premium(messages)
|
|
```
|
|
|
|
### 3. Prompt Compression (LLMLingua Integration)
|
|
|
|
```python
|
|
# app/services/cost_tracking/compression.py
|
|
from llmlingua import PromptCompressor
|
|
|
|
class PromptOptimizer:
|
|
"""
|
|
Compress prompts to reduce token count while preserving meaning.
|
|
Uses LLMLingua for intelligent compression.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.compressor = PromptCompressor(
|
|
model_name="microsoft/llmlingua-2-bert-base-multilingual-cased-meetingbank",
|
|
use_llmlingua2=True
|
|
)
|
|
|
|
def compress(
|
|
self,
|
|
messages: list,
|
|
target_ratio: float = 0.5, # Compress to 50% of original
|
|
preserve_instructions: bool = True
|
|
) -> list:
|
|
"""
|
|
Compress messages while preserving critical information.
|
|
|
|
Args:
|
|
messages: Original messages
|
|
target_ratio: Target compression ratio
|
|
preserve_instructions: Whether to preserve system instructions
|
|
|
|
Returns:
|
|
Compressed messages
|
|
"""
|
|
compressed_messages = []
|
|
|
|
for msg in messages:
|
|
role = msg["role"]
|
|
content = msg["content"]
|
|
|
|
# Don't compress system instructions if requested
|
|
if role == "system" and preserve_instructions:
|
|
compressed_messages.append(msg)
|
|
continue
|
|
|
|
# Apply compression
|
|
compressed = self.compressor.compress_prompt(
|
|
content,
|
|
rate=target_ratio,
|
|
force_tokens=self._extract_critical_tokens(content),
|
|
)
|
|
|
|
compressed_messages.append({
|
|
"role": role,
|
|
"content": compressed["compressed_prompt"]
|
|
})
|
|
|
|
return compressed_messages
|
|
|
|
def _extract_critical_tokens(self, text: str) -> list:
|
|
"""Extract tokens that should never be removed."""
|
|
# Preserve code blocks, variable names, etc.
|
|
import re
|
|
|
|
critical = []
|
|
|
|
# Preserve code blocks
|
|
code_blocks = re.findall(r'```[\s\S]*?```', text)
|
|
for block in code_blocks:
|
|
critical.extend(block.split())
|
|
|
|
# Preserve quoted strings
|
|
quotes = re.findall(r'"[^"]*"', text)
|
|
critical.extend(quotes)
|
|
|
|
return critical
|
|
|
|
def estimate_savings(self, original: list, compressed: list) -> dict:
|
|
"""Calculate token savings from compression."""
|
|
original_tokens = sum(
|
|
len(m["content"].split()) * 1.3 # Rough token estimate
|
|
for m in original
|
|
)
|
|
compressed_tokens = sum(
|
|
len(m["content"].split()) * 1.3
|
|
for m in compressed
|
|
)
|
|
|
|
return {
|
|
"original_tokens": int(original_tokens),
|
|
"compressed_tokens": int(compressed_tokens),
|
|
"tokens_saved": int(original_tokens - compressed_tokens),
|
|
"compression_ratio": compressed_tokens / original_tokens,
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Cost Estimation Before Execution
|
|
|
|
```python
|
|
# app/services/cost_tracking/estimator.py
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
import tiktoken
|
|
|
|
@dataclass
|
|
class CostEstimate:
|
|
input_tokens: int
|
|
estimated_output_tokens: int
|
|
min_cost_usd: float
|
|
max_cost_usd: float
|
|
expected_cost_usd: float
|
|
model: str
|
|
confidence: str # low, medium, high
|
|
|
|
class CostEstimator:
|
|
"""
|
|
Estimate LLM request cost before execution.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.encoders = {}
|
|
|
|
def estimate(
|
|
self,
|
|
messages: list,
|
|
model: str,
|
|
max_tokens: Optional[int] = None
|
|
) -> CostEstimate:
|
|
"""
|
|
Estimate cost for a completion request.
|
|
|
|
Args:
|
|
messages: Input messages
|
|
model: Target model
|
|
max_tokens: Maximum output tokens (if known)
|
|
"""
|
|
# Count input tokens
|
|
input_tokens = self._count_tokens(messages, model)
|
|
|
|
# Estimate output tokens
|
|
if max_tokens:
|
|
estimated_output = max_tokens
|
|
confidence = "high"
|
|
else:
|
|
estimated_output = self._estimate_output_tokens(messages, model)
|
|
confidence = "medium"
|
|
|
|
# Get model costs
|
|
costs = MODEL_COSTS.get(model, {"input": 0, "output": 0})
|
|
|
|
# Calculate costs
|
|
input_cost = (input_tokens / 1_000_000) * costs["input"]
|
|
|
|
min_output = estimated_output * 0.3 # Conservative
|
|
max_output = estimated_output * 1.5 # Liberal
|
|
expected_output = estimated_output
|
|
|
|
min_output_cost = (min_output / 1_000_000) * costs["output"]
|
|
max_output_cost = (max_output / 1_000_000) * costs["output"]
|
|
expected_output_cost = (expected_output / 1_000_000) * costs["output"]
|
|
|
|
return CostEstimate(
|
|
input_tokens=input_tokens,
|
|
estimated_output_tokens=estimated_output,
|
|
min_cost_usd=input_cost + min_output_cost,
|
|
max_cost_usd=input_cost + max_output_cost,
|
|
expected_cost_usd=input_cost + expected_output_cost,
|
|
model=model,
|
|
confidence=confidence,
|
|
)
|
|
|
|
def _count_tokens(self, messages: list, model: str) -> int:
|
|
"""Count tokens for input messages."""
|
|
# Get or create encoder
|
|
if model not in self.encoders:
|
|
try:
|
|
if "gpt" in model or "o1" in model:
|
|
self.encoders[model] = tiktoken.encoding_for_model(model)
|
|
else:
|
|
# Fallback to cl100k_base for non-OpenAI
|
|
self.encoders[model] = tiktoken.get_encoding("cl100k_base")
|
|
except Exception:
|
|
self.encoders[model] = tiktoken.get_encoding("cl100k_base")
|
|
|
|
encoder = self.encoders[model]
|
|
|
|
total = 0
|
|
for msg in messages:
|
|
total += len(encoder.encode(msg.get("content", "")))
|
|
total += 4 # Message overhead tokens
|
|
|
|
return total + 2 # Priming tokens
|
|
|
|
def _estimate_output_tokens(self, messages: list, model: str) -> int:
|
|
"""
|
|
Estimate expected output tokens based on input.
|
|
Uses heuristics and historical data.
|
|
"""
|
|
input_tokens = self._count_tokens(messages, model)
|
|
last_message = messages[-1].get("content", "") if messages else ""
|
|
|
|
# Heuristics based on request type
|
|
if "explain" in last_message.lower() or "describe" in last_message.lower():
|
|
# Explanatory responses tend to be longer
|
|
return min(2000, input_tokens * 1.5)
|
|
|
|
if "code" in last_message.lower() or "implement" in last_message.lower():
|
|
# Code generation can be substantial
|
|
return min(4000, input_tokens * 2)
|
|
|
|
if "yes or no" in last_message.lower() or "brief" in last_message.lower():
|
|
# Short responses
|
|
return 100
|
|
|
|
if "list" in last_message.lower():
|
|
# Lists are medium length
|
|
return min(1000, input_tokens * 0.8)
|
|
|
|
# Default: output roughly equal to input
|
|
return min(2000, input_tokens)
|
|
|
|
def should_warn(
|
|
self,
|
|
estimate: CostEstimate,
|
|
budget_remaining: float,
|
|
threshold: float = 0.1 # Warn if request is >10% of remaining budget
|
|
) -> bool:
|
|
"""Check if user should be warned about expensive request."""
|
|
if budget_remaining <= 0:
|
|
return True
|
|
|
|
return estimate.max_cost_usd > (budget_remaining * threshold)
|
|
```
|
|
|
|
### Pre-Execution Check Integration
|
|
|
|
```python
|
|
# app/services/llm_gateway.py
|
|
|
|
class LLMGateway:
|
|
async def complete(
|
|
self,
|
|
agent_id: str,
|
|
project_id: str,
|
|
messages: list,
|
|
model_preference: str = "high-reasoning",
|
|
require_estimate: bool = False,
|
|
**kwargs
|
|
) -> dict:
|
|
"""Generate completion with optional cost estimation."""
|
|
|
|
model = self._resolve_model(model_preference)
|
|
|
|
if require_estimate:
|
|
estimator = CostEstimator()
|
|
estimate = estimator.estimate(messages, model, kwargs.get("max_tokens"))
|
|
|
|
# Check against budget
|
|
budget_status = await self.budget_enforcer.check_budget(project_id)
|
|
|
|
if estimator.should_warn(estimate, budget_status.remaining):
|
|
return {
|
|
"type": "cost_warning",
|
|
"estimate": estimate,
|
|
"budget_remaining": budget_status.remaining,
|
|
"message": f"This request may cost ${estimate.expected_cost_usd:.4f}. "
|
|
f"Remaining budget: ${budget_status.remaining:.2f}. "
|
|
f"Proceed?",
|
|
"alternatives": self._suggest_alternatives(estimate, model),
|
|
}
|
|
|
|
# Proceed with request
|
|
return await self._execute_completion(...)
|
|
|
|
def _suggest_alternatives(self, estimate: CostEstimate, model: str) -> list:
|
|
"""Suggest cost-saving alternatives."""
|
|
alternatives = []
|
|
|
|
# Cheaper model
|
|
if model == "claude-3-5-sonnet-20241022":
|
|
alternatives.append({
|
|
"action": "use_haiku",
|
|
"model": "claude-3-haiku-20240307",
|
|
"estimated_savings": estimate.expected_cost_usd * 0.9,
|
|
"quality_impact": "May reduce quality for complex tasks",
|
|
})
|
|
|
|
# Compression
|
|
alternatives.append({
|
|
"action": "compress_prompt",
|
|
"estimated_savings": estimate.expected_cost_usd * 0.3,
|
|
"quality_impact": "Minimal for most use cases",
|
|
})
|
|
|
|
return alternatives
|
|
```
|
|
|
|
---
|
|
|
|
## Reporting Dashboard Requirements
|
|
|
|
### Dashboard Components
|
|
|
|
| Component | Data Source | Update Frequency |
|
|
|-----------|-------------|------------------|
|
|
| Real-time Spend Ticker | Redis + SSE | Real-time |
|
|
| Budget Utilization Gauges | Redis | 1 second |
|
|
| Daily Cost Chart | PostgreSQL (daily_cost_summaries) | 1 minute |
|
|
| Model Breakdown Pie Chart | PostgreSQL | 5 minutes |
|
|
| Agent Efficiency Table | PostgreSQL | 5 minutes |
|
|
| Cost Forecast | PostgreSQL + ML | 1 hour |
|
|
| Alert Feed | SSE + PostgreSQL | Real-time |
|
|
|
|
### API Endpoints
|
|
|
|
```python
|
|
# app/api/v1/costs.py
|
|
from fastapi import APIRouter, Query
|
|
from datetime import datetime, timedelta
|
|
|
|
router = APIRouter(prefix="/costs", tags=["costs"])
|
|
|
|
@router.get("/summary/{project_id}")
|
|
async def get_cost_summary(
|
|
project_id: str,
|
|
period: str = Query("day", regex="^(day|week|month)$")
|
|
) -> CostSummaryResponse:
|
|
"""Get cost summary for a project."""
|
|
pass
|
|
|
|
@router.get("/breakdown/{project_id}")
|
|
async def get_cost_breakdown(
|
|
project_id: str,
|
|
group_by: str = Query("model", regex="^(model|agent_type|day)$"),
|
|
start_date: datetime = None,
|
|
end_date: datetime = None
|
|
) -> CostBreakdownResponse:
|
|
"""Get detailed cost breakdown."""
|
|
pass
|
|
|
|
@router.get("/trends/{project_id}")
|
|
async def get_cost_trends(
|
|
project_id: str,
|
|
days: int = Query(30, ge=1, le=365)
|
|
) -> CostTrendsResponse:
|
|
"""Get historical cost trends."""
|
|
pass
|
|
|
|
@router.get("/forecast/{project_id}")
|
|
async def get_cost_forecast(
|
|
project_id: str,
|
|
days_ahead: int = Query(7, ge=1, le=30)
|
|
) -> CostForecastResponse:
|
|
"""Get projected costs based on current trends."""
|
|
pass
|
|
|
|
@router.get("/efficiency/{project_id}")
|
|
async def get_efficiency_metrics(
|
|
project_id: str
|
|
) -> EfficiencyMetricsResponse:
|
|
"""Get cost efficiency metrics (cost per task, cache hit rate, etc.)."""
|
|
pass
|
|
|
|
@router.get("/budgets/{project_id}")
|
|
async def get_budgets(project_id: str) -> List[BudgetResponse]:
|
|
"""Get all budgets for a project."""
|
|
pass
|
|
|
|
@router.post("/budgets")
|
|
async def create_budget(budget: BudgetCreate) -> BudgetResponse:
|
|
"""Create a new budget."""
|
|
pass
|
|
|
|
@router.put("/budgets/{budget_id}")
|
|
async def update_budget(budget_id: str, budget: BudgetUpdate) -> BudgetResponse:
|
|
"""Update budget configuration."""
|
|
pass
|
|
|
|
@router.get("/alerts/{project_id}")
|
|
async def get_alerts(
|
|
project_id: str,
|
|
status: str = Query(None, regex="^(pending|acknowledged|resolved)$")
|
|
) -> List[AlertResponse]:
|
|
"""Get budget alerts for a project."""
|
|
pass
|
|
|
|
@router.post("/alerts/{alert_id}/acknowledge")
|
|
async def acknowledge_alert(alert_id: str) -> AlertResponse:
|
|
"""Acknowledge a budget alert."""
|
|
pass
|
|
```
|
|
|
|
### Dashboard Wireframe
|
|
|
|
```
|
|
+------------------------------------------------------------------+
|
|
| Syndarix Cost Dashboard |
|
|
+------------------------------------------------------------------+
|
|
| |
|
|
| +------------------+ +------------------+ +-------------------+ |
|
|
| | DAILY SPEND | | WEEKLY BUDGET | | MONTHLY BUDGET | |
|
|
| | $12.47 | | ████████░░ 76% | | ████░░░░░░ 42% | |
|
|
| | ▲ 15% vs avg | | $152/$200 | | $421/$1000 | |
|
|
| +------------------+ +------------------+ +-------------------+ |
|
|
| |
|
|
| +-------------------------------+ +----------------------------+ |
|
|
| | COST BY MODEL (7 days) | | DAILY COST TREND | |
|
|
| | ┌───────────────────┐ | | $ | |
|
|
| | │ Claude Sonnet 64% │ | | 20├────────────────────── | |
|
|
| | │ GPT-4o-mini 22% │ | | 15├─────────────█──────────| |
|
|
| | │ Claude Haiku 9% │ | | 10├────────█───█───────────| |
|
|
| | │ Other 5% │ | | 5├──█───█───█───█─────────| |
|
|
| | └───────────────────┘ | | 0└───────────────────────>| |
|
|
| +-------------------------------+ | Mon Tue Wed Thu Fri | |
|
|
| +----------------------------+ |
|
|
| |
|
|
| +---------------------------------------------------------------+ |
|
|
| | AGENT EFFICIENCY | |
|
|
| +----------+----------+----------+-----------+------------------+ |
|
|
| | Agent | Requests | Tokens | Cost | Cost/Task | |
|
|
| +----------+----------+----------+-----------+------------------+ |
|
|
| | Arch | 145 | 2.3M | $18.50 | $0.13 | |
|
|
| | Engineer | 892 | 12.1M | $96.80 | $0.11 | |
|
|
| | QA | 234 | 1.2M | $4.20 | $0.02 | |
|
|
| +----------+----------+----------+-----------+------------------+ |
|
|
| |
|
|
| +---------------------------------------------------------------+ |
|
|
| | RECENT ALERTS [Clear] | |
|
|
| +---------------------------------------------------------------+ |
|
|
| | ! WARNING Daily budget at 80% ($16/$20) 2 mins ago | |
|
|
| | i INFO Weekly budget at 50% 1 hour ago | |
|
|
| +---------------------------------------------------------------+ |
|
|
+------------------------------------------------------------------+
|
|
```
|
|
|
|
---
|
|
|
|
## Implementation Roadmap
|
|
|
|
### Phase 1: Core Infrastructure (Week 1-2)
|
|
|
|
1. **Database Schema**
|
|
- [ ] Create migration for `token_usage`, `budgets`, `budget_alerts`
|
|
- [ ] Create migration for `daily_cost_summaries`, `model_pricing`
|
|
- [ ] Add indexes for performance
|
|
|
|
2. **Redis Setup**
|
|
- [ ] Configure Redis keys for budget counters
|
|
- [ ] Implement budget increment/decrement operations
|
|
- [ ] Set up TTL for temporary data
|
|
|
|
3. **LiteLLM Callback**
|
|
- [ ] Implement `SyndarixCostCallback` class
|
|
- [ ] Integrate with existing LLM Gateway
|
|
- [ ] Add metadata propagation for attribution
|
|
|
|
### Phase 2: Budget Management (Week 2-3)
|
|
|
|
4. **Budget Enforcer**
|
|
- [ ] Implement `BudgetEnforcer` class
|
|
- [ ] Add pre-request budget checks
|
|
- [ ] Implement soft/hard limit logic
|
|
|
|
5. **Alert System**
|
|
- [ ] Implement threshold detection
|
|
- [ ] Create alert dispatcher
|
|
- [ ] Integrate with SSE event bus
|
|
|
|
6. **Celery Tasks**
|
|
- [ ] Budget period management task
|
|
- [ ] Daily cost rollup task
|
|
- [ ] Alert digest task
|
|
|
|
### Phase 3: Cost Optimization (Week 3-4)
|
|
|
|
7. **Semantic Cache**
|
|
- [ ] Implement `SemanticCache` with Redis
|
|
- [ ] Integrate embedding generation
|
|
- [ ] Add cache hit/miss tracking
|
|
|
|
8. **Model Cascading**
|
|
- [ ] Implement `ModelCascade` router
|
|
- [ ] Add complexity estimation
|
|
- [ ] Integrate with LLM Gateway
|
|
|
|
9. **Cost Estimation**
|
|
- [ ] Implement `CostEstimator`
|
|
- [ ] Add pre-execution warnings
|
|
- [ ] Create alternative suggestions
|
|
|
|
### Phase 4: Reporting (Week 4-5)
|
|
|
|
10. **API Endpoints**
|
|
- [ ] Implement cost summary endpoints
|
|
- [ ] Add breakdown and trends endpoints
|
|
- [ ] Create forecast endpoint
|
|
|
|
11. **Dashboard**
|
|
- [ ] Design and implement cost dashboard UI
|
|
- [ ] Add real-time updates via SSE
|
|
- [ ] Create alert management interface
|
|
|
|
### Phase 5: Testing & Documentation (Week 5-6)
|
|
|
|
12. **Testing**
|
|
- [ ] Unit tests for cost tracking
|
|
- [ ] Integration tests for budget enforcement
|
|
- [ ] Load tests for high-volume scenarios
|
|
|
|
13. **Documentation**
|
|
- [ ] API documentation
|
|
- [ ] User guide for budget management
|
|
- [ ] Operations runbook
|
|
|
|
---
|
|
|
|
## References
|
|
|
|
### Research Sources
|
|
|
|
- [LiteLLM Custom Callbacks Documentation](https://docs.litellm.ai/docs/observability/custom_callback)
|
|
- [LiteLLM Cost Calculation](https://deepwiki.com/BerriAI/litellm/2.5-cost-calculation-and-model-pricing)
|
|
- [LiteLLM Spend Tracking](https://docs.litellm.ai/docs/proxy/cost_tracking)
|
|
- [LLM Cost Optimization Guide 2025](https://ai.koombea.com/blog/llm-cost-optimization)
|
|
- [Dynamic Load Balancing for Multi-Tenant LLMs](https://latitude-blog.ghost.io/blog/dynamic-load-balancing-for-multi-tenant-llms/)
|
|
- [Multi-Tenant AI Architecture - Azure](https://learn.microsoft.com/en-us/azure/architecture/guide/multitenant/approaches/ai-ml)
|
|
- [LLMLingua Prompt Compression](https://www.llmlingua.com/)
|
|
- [Model Cascading for Cost Reduction](https://arxiv.org/abs/2410.10347)
|
|
- [BlockLLM Multi-tenant LLM Serving](https://arxiv.org/html/2404.18322v1)
|
|
|
|
### Related Spikes
|
|
|
|
- [SPIKE-005: LLM Provider Abstraction](./SPIKE-005-llm-provider-abstraction.md) - LiteLLM integration baseline
|
|
- [SPIKE-003: Real-time Updates](./SPIKE-003-realtime-updates.md) - SSE event architecture
|
|
- [SPIKE-004: Celery + Redis Integration](./SPIKE-004-celery-redis-integration.md) - Background task infrastructure
|
|
|
|
---
|
|
|
|
## Decision
|
|
|
|
**Implement a comprehensive cost tracking system** with:
|
|
|
|
1. **LiteLLM callbacks** for real-time usage capture
|
|
2. **Redis + PostgreSQL** for hybrid hot/warm data storage
|
|
3. **Hierarchical budgets** with soft/hard limit enforcement
|
|
4. **Multi-channel alerts** via SSE, email, and Slack
|
|
5. **Cost optimization** through caching, cascading, and compression
|
|
|
|
**Expected Outcomes:**
|
|
- Full cost visibility per project, agent, and model
|
|
- Real-time budget enforcement preventing cost overruns
|
|
- 60-80% cost reduction through optimization strategies
|
|
- Actionable insights for cost-aware decision making
|
|
|
|
---
|
|
|
|
*Spike completed. Findings will inform ADR-010: Cost Tracking Architecture.*
|