# app/tasks/cost.py """ Cost tracking and budget management tasks for Syndarix. These tasks implement multi-layered cost tracking per ADR-012: - Per-agent token usage tracking - Project budget monitoring - Daily cost aggregation - Budget threshold alerts - Cost reporting Costs are tracked in real-time in Redis for speed, then aggregated to PostgreSQL for durability. """ import logging from typing import Any from app.celery_app import celery_app logger = logging.getLogger(__name__) @celery_app.task(bind=True, name="app.tasks.cost.aggregate_daily_costs") def aggregate_daily_costs(self) -> dict[str, Any]: """ Aggregate daily costs from Redis to PostgreSQL. This periodic task (runs daily): 1. Read accumulated costs from Redis 2. Aggregate by project, agent, and model 3. Store in PostgreSQL cost_records table 4. Clear Redis counters for new day Returns: dict with status """ logger.info("Starting daily cost aggregation") # TODO: Implement cost aggregation # This will involve: # 1. Fetching cost data from Redis # 2. Grouping by project_id, agent_id, model # 3. Inserting into PostgreSQL cost tables # 4. Resetting Redis counters return { "status": "pending", } @celery_app.task(bind=True, name="app.tasks.cost.check_budget_thresholds") def check_budget_thresholds( self, project_id: str, ) -> dict[str, Any]: """ Check if a project has exceeded budget thresholds. This task checks budget limits: 1. Get current spend from Redis counters 2. Compare against project budget limits 3. Send alerts if thresholds exceeded 4. Pause agents if hard limit reached Args: project_id: UUID of the project Returns: dict with status and project_id """ logger.info(f"Checking budget thresholds for project {project_id}") # TODO: Implement budget checking # This will involve: # 1. Loading project budget configuration # 2. Getting current spend from Redis # 3. Comparing against soft/hard limits # 4. Sending alerts or pausing agents return { "status": "pending", "project_id": project_id, } @celery_app.task(bind=True, name="app.tasks.cost.record_llm_usage") def record_llm_usage( self, agent_id: str, project_id: str, model: str, prompt_tokens: int, completion_tokens: int, cost_usd: float, ) -> dict[str, Any]: """ Record LLM usage from an agent call. This task tracks each LLM API call: 1. Increment Redis counters for real-time tracking 2. Store raw usage event for audit 3. Trigger budget check if threshold approaching Args: agent_id: UUID of the agent instance project_id: UUID of the project model: Model identifier (e.g., claude-opus-4-5-20251101) prompt_tokens: Number of input tokens completion_tokens: Number of output tokens cost_usd: Calculated cost in USD Returns: dict with status, agent_id, project_id, and cost_usd """ logger.debug( f"Recording LLM usage for model {model}: " f"{prompt_tokens} prompt + {completion_tokens} completion tokens = ${cost_usd}" ) # TODO: Implement usage recording # This will involve: # 1. Incrementing Redis counters # 2. Storing usage event # 3. Checking if near budget threshold return { "status": "pending", "agent_id": agent_id, "project_id": project_id, "cost_usd": cost_usd, } @celery_app.task(bind=True, name="app.tasks.cost.generate_cost_report") def generate_cost_report( self, project_id: str, start_date: str, end_date: str, ) -> dict[str, Any]: """ Generate a cost report for a project. This task creates a detailed cost breakdown: 1. Query cost records for date range 2. Group by agent, model, and day 3. Calculate totals and trends 4. Format report for display Args: project_id: UUID of the project start_date: Report start date (YYYY-MM-DD) end_date: Report end date (YYYY-MM-DD) Returns: dict with status, project_id, and date range """ logger.info( f"Generating cost report for project {project_id} from {start_date} to {end_date}" ) # TODO: Implement report generation # This will involve: # 1. Querying PostgreSQL for cost records # 2. Aggregating by various dimensions # 3. Calculating totals and averages # 4. Formatting report data return { "status": "pending", "project_id": project_id, "start_date": start_date, "end_date": end_date, } @celery_app.task(bind=True, name="app.tasks.cost.reset_daily_budget_counters") def reset_daily_budget_counters(self) -> dict[str, Any]: """ Reset daily budget counters in Redis. This periodic task (runs daily at midnight UTC): 1. Archive current day's counters 2. Reset all daily budget counters 3. Prepare for new day's tracking Returns: dict with status """ logger.info("Resetting daily budget counters") # TODO: Implement counter reset # This will involve: # 1. Getting all daily counter keys from Redis # 2. Archiving current values # 3. Resetting counters to zero return { "status": "pending", }