Files
syndarix/docs/spikes/SPIKE-011-audit-logging.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

40 KiB

SPIKE-011: Audit Logging for Syndarix

Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #11


Executive Summary

Syndarix, as an autonomous AI-powered consulting agency, requires comprehensive audit logging to ensure compliance, enable debugging, and build client trust. This spike researches best practices for audit logging in autonomous AI systems and provides concrete recommendations for implementation.

Recommendation: Implement a structured, OpenTelemetry-compatible audit logging system using:

  • Structlog for structured JSON logging with contextual enrichment
  • PostgreSQL with TimescaleDB extension for hot storage (0-90 days)
  • S3-compatible object storage for cold archival (90+ days)
  • Cryptographic hash chaining for immutability verification
  • OpenTelemetry integration for correlation with traces and metrics

Objective

Research the optimal audit logging architecture for Syndarix, focusing on:

  1. Comprehensive event capture for autonomous AI agent actions
  2. Compliance with SOC2/GDPR requirements
  3. Searchable, queryable audit trails
  4. Immutable, tamper-evident logging
  5. Scalable storage architecture

Research Questions & Findings

1. What to Log in Autonomous AI Systems

Based on LLM observability best practices and Syndarix-specific requirements, the following event categories must be logged:

Agent Actions

Event Type Description Critical Fields
agent.spawned Agent instance created agent_type, agent_id, project_id, config
agent.action.started Agent begins action action_type, input_params, before_state
agent.action.completed Agent completes action action_type, output, after_state, duration_ms
agent.action.failed Agent action failed action_type, error, stack_trace
agent.decision Agent makes autonomous decision decision_type, options, chosen, reasoning
agent.terminated Agent instance destroyed reason, final_state

LLM Interactions

Event Type Description Critical Fields
llm.request Prompt sent to LLM model, prompt_template, variables, token_count
llm.response LLM response received model, response, token_count, latency_ms
llm.error LLM call failed model, error, retry_count
llm.tool_call LLM invokes tool tool_name, arguments

MCP Tool Invocations

Event Type Description Critical Fields
mcp.tool.invoked MCP tool called server, tool_name, arguments, project_id
mcp.tool.result MCP tool returned server, tool_name, result, duration_ms
mcp.tool.error MCP tool failed server, tool_name, error

Human Approvals

Event Type Description Critical Fields
approval.requested System requests human approval action_type, context, options
approval.granted Human approves action approver_id, action_id, comments
approval.rejected Human rejects action approver_id, action_id, reason
approval.timeout Approval request timed out action_id, timeout_ms

Git Operations

Event Type Description Critical Fields
git.commit Commit created repo, branch, commit_sha, message, files
git.branch.created Branch created repo, branch_name, base_branch
git.pr.created Pull request opened repo, pr_number, title, head, base
git.pr.merged Pull request merged repo, pr_number, merge_commit

Project Lifecycle

Event Type Description Critical Fields
project.created New project started project_id, client_id, autonomy_level
project.sprint.started Sprint begins sprint_id, goals, assigned_agents
project.milestone.completed Milestone achieved milestone_id, deliverables
project.checkpoint Client checkpoint checkpoint_type, feedback

2. Structured Logging Format

Recommendation: Use OpenTelemetry-compatible structured JSON logging.

Base Event Schema

from pydantic import BaseModel
from datetime import datetime
from typing import Any, Optional
from enum import Enum

class AuditEventSeverity(str, Enum):
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"

class AuditEvent(BaseModel):
    """Base schema for all audit events."""

    # Identity & Correlation
    event_id: str                          # UUID v7 (time-ordered)
    trace_id: Optional[str] = None         # OpenTelemetry trace ID
    span_id: Optional[str] = None          # OpenTelemetry span ID
    parent_event_id: Optional[str] = None  # For event chains

    # Timestamp
    timestamp: datetime                     # ISO 8601 with timezone
    timestamp_unix_ms: int                  # Unix millis for indexing

    # Event Classification
    event_type: str                         # e.g., "agent.action.completed"
    event_category: str                     # e.g., "agent", "llm", "mcp"
    severity: AuditEventSeverity

    # Context
    project_id: Optional[str] = None
    agent_id: Optional[str] = None
    agent_type: Optional[str] = None
    user_id: Optional[str] = None          # Human actor if applicable
    session_id: Optional[str] = None

    # Event Data
    action: str                             # Human-readable action description
    data: dict[str, Any]                    # Event-specific payload

    # State Tracking
    before_state: Optional[dict] = None    # State before action
    after_state: Optional[dict] = None     # State after action

    # Technical Metadata
    service: str = "syndarix"
    service_version: str
    environment: str                        # production, staging, development
    hostname: str

    # Immutability
    previous_hash: Optional[str] = None    # Hash of previous event (chain)
    event_hash: Optional[str] = None       # SHA-256 of this event

    class Config:
        json_schema_extra = {
            "example": {
                "event_id": "019373a8-9b2e-7f4c-8d1a-2b3c4d5e6f7a",
                "trace_id": "abc123def456",
                "timestamp": "2025-12-29T14:30:00.000Z",
                "timestamp_unix_ms": 1735480200000,
                "event_type": "agent.action.completed",
                "event_category": "agent",
                "severity": "INFO",
                "project_id": "proj-001",
                "agent_id": "agent-123",
                "agent_type": "software_engineer",
                "action": "Created feature branch",
                "data": {
                    "branch_name": "feature/user-auth",
                    "base_branch": "main"
                },
                "service": "syndarix",
                "service_version": "1.0.0",
                "environment": "production"
            }
        }

LLM-Specific Event Schema

class LLMRequestEvent(AuditEvent):
    """Schema for LLM request events."""
    event_type: str = "llm.request"
    event_category: str = "llm"

    data: dict = {
        "model": str,                    # e.g., "claude-3-5-sonnet"
        "provider": str,                 # e.g., "anthropic", "openai"
        "prompt_template_id": str,       # Reference to template
        "prompt_template_version": str,
        "prompt_variables": dict,        # Variables substituted
        "system_prompt_hash": str,       # Hash of system prompt
        "user_prompt": str,              # Full user prompt (may be truncated)
        "token_count_estimate": int,
        "max_tokens": int,
        "temperature": float,
        "tools_available": list[str],
    }

class LLMResponseEvent(AuditEvent):
    """Schema for LLM response events."""
    event_type: str = "llm.response"
    event_category: str = "llm"

    data: dict = {
        "model": str,
        "provider": str,
        "response_text": str,            # May be truncated for storage
        "response_hash": str,            # Full response hash
        "input_tokens": int,
        "output_tokens": int,
        "total_tokens": int,
        "latency_ms": int,
        "finish_reason": str,            # "stop", "max_tokens", "tool_use"
        "tool_calls": list[dict],        # If tools were invoked
        "cost_usd": float,               # Estimated cost
    }

3. Storage Architecture

Recommendation: Tiered storage with hot, warm, and cold layers.

                                    Query Latency
                                    ◄────────────►
                                    Fast       Slow
┌─────────────────────────────────────────────────────────────────┐
│                        HOT STORAGE                               │
│                   (0-30 days, ~10TB/month)                       │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │   PostgreSQL + TimescaleDB                                 │  │
│  │   - Hypertables partitioned by day                         │  │
│  │   - Native compression after 7 days                        │  │
│  │   - Full-text search on action/data fields                 │  │
│  │   - B-tree indexes on project_id, agent_id, event_type     │  │
│  │   - GIN index on JSONB data field                          │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                   │
│                              ▼ (30 days)                         │
├─────────────────────────────────────────────────────────────────┤
│                       WARM STORAGE                               │
│                  (30-90 days, compressed)                        │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │   TimescaleDB Continuous Aggregates                        │  │
│  │   - Hourly/daily rollups of metrics                        │  │
│  │   - Detailed logs in highly compressed chunks              │  │
│  │   - Query via same SQL interface                           │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                   │
│                              ▼ (90 days)                         │
├─────────────────────────────────────────────────────────────────┤
│                       COLD STORAGE                               │
│               (90+ days, archival, 7 year retention)             │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │   S3-Compatible Object Storage (MinIO / AWS S3)            │  │
│  │   - Parquet files partitioned by date/project              │  │
│  │   - Glacier-class storage after 1 year                     │  │
│  │   - Queryable via Trino/Athena for investigations          │  │
│  │   - Cryptographic manifest for integrity verification      │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

TimescaleDB Schema

-- Enable TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- Main audit events table
CREATE TABLE audit_events (
    event_id UUID PRIMARY KEY,
    trace_id VARCHAR(32),
    span_id VARCHAR(16),
    parent_event_id UUID REFERENCES audit_events(event_id),

    timestamp TIMESTAMPTZ NOT NULL,
    timestamp_unix_ms BIGINT NOT NULL,

    event_type VARCHAR(100) NOT NULL,
    event_category VARCHAR(50) NOT NULL,
    severity VARCHAR(20) NOT NULL,

    project_id VARCHAR(50),
    agent_id VARCHAR(50),
    agent_type VARCHAR(50),
    user_id VARCHAR(50),
    session_id VARCHAR(50),

    action TEXT NOT NULL,
    data JSONB NOT NULL,

    before_state JSONB,
    after_state JSONB,

    service VARCHAR(50) NOT NULL,
    service_version VARCHAR(20) NOT NULL,
    environment VARCHAR(20) NOT NULL,
    hostname VARCHAR(100) NOT NULL,

    previous_hash VARCHAR(64),
    event_hash VARCHAR(64) NOT NULL,

    -- Denormalized for efficient queries
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Convert to hypertable for time-series optimization
SELECT create_hypertable(
    'audit_events',
    'timestamp',
    chunk_time_interval => INTERVAL '1 day'
);

-- Enable compression for chunks older than 7 days
ALTER TABLE audit_events SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'project_id, event_category',
    timescaledb.compress_orderby = 'timestamp DESC'
);

SELECT add_compression_policy('audit_events', INTERVAL '7 days');

-- Retention policy: move to cold storage after 90 days
SELECT add_retention_policy('audit_events', INTERVAL '90 days');

-- Indexes for common query patterns
CREATE INDEX idx_audit_project_time ON audit_events (project_id, timestamp DESC);
CREATE INDEX idx_audit_agent_time ON audit_events (agent_id, timestamp DESC);
CREATE INDEX idx_audit_type_time ON audit_events (event_type, timestamp DESC);
CREATE INDEX idx_audit_category_time ON audit_events (event_category, timestamp DESC);
CREATE INDEX idx_audit_trace ON audit_events (trace_id) WHERE trace_id IS NOT NULL;

-- GIN index for JSONB queries
CREATE INDEX idx_audit_data ON audit_events USING GIN (data);

-- Full-text search on action field
CREATE INDEX idx_audit_action_fts ON audit_events USING GIN (to_tsvector('english', action));

Cold Storage Archive Job

# jobs/archive_audit_logs.py
import asyncio
from datetime import datetime, timedelta
import pyarrow as pa
import pyarrow.parquet as pq
from minio import Minio
import hashlib

async def archive_old_audit_logs():
    """Archive audit logs older than 90 days to S3/MinIO."""

    cutoff_date = datetime.utcnow() - timedelta(days=90)

    # Query logs to archive
    query = """
        SELECT * FROM audit_events
        WHERE timestamp < %s
        ORDER BY timestamp ASC
    """

    async with get_db_session() as session:
        result = await session.execute(query, [cutoff_date])
        records = result.fetchall()

    if not records:
        return

    # Convert to Parquet
    table = pa.Table.from_pylist([dict(r) for r in records])

    # Partition by date and project
    partition_key = cutoff_date.strftime("%Y/%m/%d")

    # Write to buffer
    buffer = pa.BufferOutputStream()
    pq.write_table(table, buffer, compression='zstd')

    # Calculate manifest hash
    data = buffer.getvalue().to_pybytes()
    content_hash = hashlib.sha256(data).hexdigest()

    # Upload to MinIO/S3
    client = Minio(
        settings.MINIO_ENDPOINT,
        access_key=settings.MINIO_ACCESS_KEY,
        secret_key=settings.MINIO_SECRET_KEY,
    )

    object_name = f"audit-logs/{partition_key}/events.parquet"
    client.put_object(
        bucket_name="syndarix-audit-archive",
        object_name=object_name,
        data=io.BytesIO(data),
        length=len(data),
        metadata={"content-hash": content_hash}
    )

    # Update manifest for integrity verification
    await update_archive_manifest(partition_key, object_name, content_hash)

    # Delete archived records from TimescaleDB
    await session.execute(
        "DELETE FROM audit_events WHERE timestamp < %s",
        [cutoff_date]
    )

4. Immutability & Integrity

Recommendation: Use cryptographic hash chaining for tamper evidence.

# app/audit/integrity.py
import hashlib
import json
from typing import Optional

class AuditIntegrity:
    """Cryptographic hash chaining for audit log integrity."""

    def __init__(self, redis_client):
        self.redis = redis_client
        self._last_hash_key = "audit:last_hash:{project_id}"

    async def compute_event_hash(
        self,
        event: AuditEvent,
        previous_hash: Optional[str] = None
    ) -> str:
        """
        Compute SHA-256 hash of event including previous hash.
        Creates a blockchain-like chain of events.
        """
        # Canonical JSON representation (sorted keys, no whitespace)
        canonical = json.dumps(
            {
                "event_id": str(event.event_id),
                "timestamp_unix_ms": event.timestamp_unix_ms,
                "event_type": event.event_type,
                "project_id": event.project_id,
                "agent_id": event.agent_id,
                "action": event.action,
                "data": event.data,
                "previous_hash": previous_hash or "",
            },
            sort_keys=True,
            separators=(",", ":")
        )

        return hashlib.sha256(canonical.encode()).hexdigest()

    async def chain_event(self, event: AuditEvent) -> AuditEvent:
        """Add event to the hash chain."""
        project_key = self._last_hash_key.format(project_id=event.project_id)

        # Get previous hash atomically
        previous_hash = await self.redis.get(project_key)

        # Compute new hash
        event.previous_hash = previous_hash
        event.event_hash = await self.compute_event_hash(event, previous_hash)

        # Update last hash atomically
        await self.redis.set(project_key, event.event_hash)

        return event

    async def verify_chain(
        self,
        project_id: str,
        start_event_id: str,
        end_event_id: str
    ) -> tuple[bool, Optional[str]]:
        """
        Verify integrity of event chain between two events.
        Returns (is_valid, first_invalid_event_id).
        """
        events = await self.get_events_in_range(
            project_id, start_event_id, end_event_id
        )

        previous_hash = events[0].previous_hash

        for event in events:
            expected_hash = await self.compute_event_hash(event, previous_hash)

            if event.event_hash != expected_hash:
                return (False, str(event.event_id))

            previous_hash = event.event_hash

        return (True, None)

5. Query Patterns & Indexing

Common Query Patterns

# app/audit/queries.py

class AuditQueries:
    """Optimized audit log queries."""

    async def get_project_timeline(
        self,
        project_id: str,
        start_time: datetime,
        end_time: datetime,
        event_types: Optional[list[str]] = None,
        limit: int = 1000
    ) -> list[AuditEvent]:
        """Get chronological audit trail for a project."""
        query = """
            SELECT * FROM audit_events
            WHERE project_id = $1
              AND timestamp BETWEEN $2 AND $3
              {type_filter}
            ORDER BY timestamp DESC
            LIMIT $4
        """
        type_filter = ""
        if event_types:
            type_filter = f"AND event_type = ANY($5)"

        return await self.db.fetch(query, project_id, start_time, end_time, limit)

    async def get_agent_actions(
        self,
        agent_id: str,
        hours: int = 24
    ) -> list[AuditEvent]:
        """Get all actions by a specific agent."""
        query = """
            SELECT * FROM audit_events
            WHERE agent_id = $1
              AND timestamp > NOW() - INTERVAL '%s hours'
              AND event_category = 'agent'
            ORDER BY timestamp DESC
        """
        return await self.db.fetch(query, agent_id, hours)

    async def get_llm_usage_summary(
        self,
        project_id: str,
        days: int = 30
    ) -> dict:
        """Get LLM usage statistics for billing/monitoring."""
        query = """
            SELECT
                data->>'model' as model,
                data->>'provider' as provider,
                COUNT(*) as request_count,
                SUM((data->>'total_tokens')::int) as total_tokens,
                SUM((data->>'cost_usd')::float) as total_cost,
                AVG((data->>'latency_ms')::int) as avg_latency_ms
            FROM audit_events
            WHERE project_id = $1
              AND event_type = 'llm.response'
              AND timestamp > NOW() - INTERVAL '%s days'
            GROUP BY data->>'model', data->>'provider'
        """
        return await self.db.fetch(query, project_id, days)

    async def search_actions(
        self,
        query_text: str,
        project_id: Optional[str] = None,
        limit: int = 100
    ) -> list[AuditEvent]:
        """Full-text search on action descriptions."""
        query = """
            SELECT *, ts_rank(to_tsvector('english', action), query) as rank
            FROM audit_events, plainto_tsquery('english', $1) query
            WHERE to_tsvector('english', action) @@ query
              {project_filter}
            ORDER BY rank DESC, timestamp DESC
            LIMIT $2
        """
        project_filter = f"AND project_id = '{project_id}'" if project_id else ""
        return await self.db.fetch(query.format(project_filter=project_filter), query_text, limit)

    async def trace_event_chain(
        self,
        event_id: str
    ) -> list[AuditEvent]:
        """Trace full event chain using parent_event_id."""
        query = """
            WITH RECURSIVE event_chain AS (
                SELECT * FROM audit_events WHERE event_id = $1
                UNION ALL
                SELECT e.* FROM audit_events e
                JOIN event_chain ec ON e.event_id = ec.parent_event_id
            )
            SELECT * FROM event_chain ORDER BY timestamp ASC
        """
        return await self.db.fetch(query, event_id)

6. Logging Decorators & Implementation

# app/audit/decorators.py
import functools
import time
from uuid import uuid7
from datetime import datetime, timezone
from typing import Callable, Any
import structlog

logger = structlog.get_logger()

def audit_agent_action(action_type: str):
    """
    Decorator to audit agent actions with before/after state capture.

    Usage:
        @audit_agent_action("create_branch")
        async def create_branch(self, branch_name: str) -> Branch:
            ...
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(self, *args, **kwargs):
            # Capture before state
            before_state = await self.get_state() if hasattr(self, 'get_state') else None

            event_id = str(uuid7())
            start_time = time.perf_counter()

            try:
                # Execute the action
                result = await func(self, *args, **kwargs)

                # Capture after state
                after_state = await self.get_state() if hasattr(self, 'get_state') else None

                # Log success
                await audit_logger.log_event(
                    AuditEvent(
                        event_id=event_id,
                        timestamp=datetime.now(timezone.utc),
                        timestamp_unix_ms=int(time.time() * 1000),
                        event_type=f"agent.action.completed",
                        event_category="agent",
                        severity=AuditEventSeverity.INFO,
                        project_id=self.project_id,
                        agent_id=self.agent_id,
                        agent_type=self.agent_type,
                        action=f"{action_type}: {func.__name__}",
                        data={
                            "action_type": action_type,
                            "function": func.__name__,
                            "args": _serialize_args(args),
                            "kwargs": _serialize_kwargs(kwargs),
                            "result_summary": _summarize_result(result),
                            "duration_ms": int((time.perf_counter() - start_time) * 1000),
                        },
                        before_state=before_state,
                        after_state=after_state,
                        service="syndarix",
                        service_version=settings.VERSION,
                        environment=settings.ENVIRONMENT,
                        hostname=socket.gethostname(),
                    )
                )

                return result

            except Exception as e:
                # Log failure
                await audit_logger.log_event(
                    AuditEvent(
                        event_id=event_id,
                        timestamp=datetime.now(timezone.utc),
                        timestamp_unix_ms=int(time.time() * 1000),
                        event_type=f"agent.action.failed",
                        event_category="agent",
                        severity=AuditEventSeverity.ERROR,
                        project_id=self.project_id,
                        agent_id=self.agent_id,
                        agent_type=self.agent_type,
                        action=f"{action_type}: {func.__name__} (FAILED)",
                        data={
                            "action_type": action_type,
                            "function": func.__name__,
                            "error": str(e),
                            "error_type": type(e).__name__,
                            "duration_ms": int((time.perf_counter() - start_time) * 1000),
                        },
                        before_state=before_state,
                        service="syndarix",
                        service_version=settings.VERSION,
                        environment=settings.ENVIRONMENT,
                        hostname=socket.gethostname(),
                    )
                )
                raise

        return wrapper
    return decorator


def audit_llm_call(func: Callable) -> Callable:
    """
    Decorator to audit LLM calls with prompt/response logging.

    Usage:
        @audit_llm_call
        async def generate_response(self, prompt: str, **kwargs) -> str:
            ...
    """
    @functools.wraps(func)
    async def wrapper(self, *args, **kwargs):
        event_id = str(uuid7())
        start_time = time.perf_counter()

        # Log request
        request_event = AuditEvent(
            event_id=event_id,
            timestamp=datetime.now(timezone.utc),
            timestamp_unix_ms=int(time.time() * 1000),
            event_type="llm.request",
            event_category="llm",
            severity=AuditEventSeverity.INFO,
            project_id=getattr(self, 'project_id', None),
            agent_id=getattr(self, 'agent_id', None),
            action="LLM request initiated",
            data={
                "model": kwargs.get('model', self.model),
                "provider": self.provider,
                "prompt_hash": hashlib.sha256(str(args).encode()).hexdigest()[:16],
                "max_tokens": kwargs.get('max_tokens', 4096),
                "temperature": kwargs.get('temperature', 0.7),
            },
            service="syndarix",
            service_version=settings.VERSION,
            environment=settings.ENVIRONMENT,
            hostname=socket.gethostname(),
        )
        await audit_logger.log_event(request_event)

        try:
            result = await func(self, *args, **kwargs)

            # Log response
            response_event = AuditEvent(
                event_id=str(uuid7()),
                parent_event_id=event_id,
                timestamp=datetime.now(timezone.utc),
                timestamp_unix_ms=int(time.time() * 1000),
                event_type="llm.response",
                event_category="llm",
                severity=AuditEventSeverity.INFO,
                project_id=getattr(self, 'project_id', None),
                agent_id=getattr(self, 'agent_id', None),
                action="LLM response received",
                data={
                    "model": kwargs.get('model', self.model),
                    "provider": self.provider,
                    "response_hash": hashlib.sha256(str(result).encode()).hexdigest()[:16],
                    "input_tokens": result.usage.input_tokens if hasattr(result, 'usage') else None,
                    "output_tokens": result.usage.output_tokens if hasattr(result, 'usage') else None,
                    "latency_ms": int((time.perf_counter() - start_time) * 1000),
                    "finish_reason": result.finish_reason if hasattr(result, 'finish_reason') else None,
                },
                service="syndarix",
                service_version=settings.VERSION,
                environment=settings.ENVIRONMENT,
                hostname=socket.gethostname(),
            )
            await audit_logger.log_event(response_event)

            return result

        except Exception as e:
            # Log error
            error_event = AuditEvent(
                event_id=str(uuid7()),
                parent_event_id=event_id,
                timestamp=datetime.now(timezone.utc),
                timestamp_unix_ms=int(time.time() * 1000),
                event_type="llm.error",
                event_category="llm",
                severity=AuditEventSeverity.ERROR,
                project_id=getattr(self, 'project_id', None),
                agent_id=getattr(self, 'agent_id', None),
                action="LLM request failed",
                data={
                    "model": kwargs.get('model', getattr(self, 'model', None)),
                    "error": str(e),
                    "error_type": type(e).__name__,
                    "latency_ms": int((time.perf_counter() - start_time) * 1000),
                },
                service="syndarix",
                service_version=settings.VERSION,
                environment=settings.ENVIRONMENT,
                hostname=socket.gethostname(),
            )
            await audit_logger.log_event(error_event)
            raise

    return wrapper


def audit_mcp_tool(func: Callable) -> Callable:
    """
    Decorator to audit MCP tool invocations.

    Usage:
        @audit_mcp_tool
        async def call_tool(self, server: str, tool: str, args: dict):
            ...
    """
    @functools.wraps(func)
    async def wrapper(self, server: str, tool: str, arguments: dict, **kwargs):
        event_id = str(uuid7())
        start_time = time.perf_counter()

        # Log invocation
        await audit_logger.log_event(
            AuditEvent(
                event_id=event_id,
                timestamp=datetime.now(timezone.utc),
                timestamp_unix_ms=int(time.time() * 1000),
                event_type="mcp.tool.invoked",
                event_category="mcp",
                severity=AuditEventSeverity.INFO,
                project_id=arguments.get('project_id'),
                agent_id=arguments.get('agent_id'),
                action=f"MCP tool invoked: {server}/{tool}",
                data={
                    "server": server,
                    "tool_name": tool,
                    "arguments": _redact_sensitive(arguments),
                },
                service="syndarix",
                service_version=settings.VERSION,
                environment=settings.ENVIRONMENT,
                hostname=socket.gethostname(),
            )
        )

        try:
            result = await func(self, server, tool, arguments, **kwargs)

            # Log result
            await audit_logger.log_event(
                AuditEvent(
                    event_id=str(uuid7()),
                    parent_event_id=event_id,
                    timestamp=datetime.now(timezone.utc),
                    timestamp_unix_ms=int(time.time() * 1000),
                    event_type="mcp.tool.result",
                    event_category="mcp",
                    severity=AuditEventSeverity.INFO,
                    project_id=arguments.get('project_id'),
                    agent_id=arguments.get('agent_id'),
                    action=f"MCP tool completed: {server}/{tool}",
                    data={
                        "server": server,
                        "tool_name": tool,
                        "result_summary": _summarize_result(result),
                        "duration_ms": int((time.perf_counter() - start_time) * 1000),
                    },
                    service="syndarix",
                    service_version=settings.VERSION,
                    environment=settings.ENVIRONMENT,
                    hostname=socket.gethostname(),
                )
            )

            return result

        except Exception as e:
            await audit_logger.log_event(
                AuditEvent(
                    event_id=str(uuid7()),
                    parent_event_id=event_id,
                    timestamp=datetime.now(timezone.utc),
                    timestamp_unix_ms=int(time.time() * 1000),
                    event_type="mcp.tool.error",
                    event_category="mcp",
                    severity=AuditEventSeverity.ERROR,
                    project_id=arguments.get('project_id'),
                    agent_id=arguments.get('agent_id'),
                    action=f"MCP tool failed: {server}/{tool}",
                    data={
                        "server": server,
                        "tool_name": tool,
                        "error": str(e),
                        "error_type": type(e).__name__,
                        "duration_ms": int((time.perf_counter() - start_time) * 1000),
                    },
                    service="syndarix",
                    service_version=settings.VERSION,
                    environment=settings.ENVIRONMENT,
                    hostname=socket.gethostname(),
                )
            )
            raise

    return wrapper

7. OpenTelemetry Integration

# app/audit/otel_integration.py
from opentelemetry import trace
from opentelemetry.trace import Span
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Configure OpenTelemetry
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("syndarix.audit")

otlp_exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(otlp_exporter))


class AuditLoggerWithOTel:
    """Audit logger with OpenTelemetry correlation."""

    def __init__(self, db_writer, integrity_checker):
        self.db_writer = db_writer
        self.integrity = integrity_checker

    async def log_event(self, event: AuditEvent) -> None:
        """Log event with OpenTelemetry trace correlation."""
        # Get current span context
        current_span = trace.get_current_span()
        span_context = current_span.get_span_context()

        if span_context.is_valid:
            event.trace_id = format(span_context.trace_id, '032x')
            event.span_id = format(span_context.span_id, '016x')

        # Add to hash chain for immutability
        event = await self.integrity.chain_event(event)

        # Write to database
        await self.db_writer.write(event)

        # Add event as span event for trace correlation
        current_span.add_event(
            name=event.event_type,
            attributes={
                "audit.event_id": str(event.event_id),
                "audit.project_id": event.project_id or "",
                "audit.agent_id": event.agent_id or "",
                "audit.action": event.action,
            }
        )

8. Retention Policy Recommendations

Based on compliance requirements (SOC2, GDPR) and operational needs:

Data Category Hot Storage Warm Storage Cold Archive Total Retention
Agent actions 30 days 60 days 7 years 7 years
LLM requests/responses 30 days 60 days 7 years 7 years
MCP tool calls 30 days 60 days 7 years 7 years
Human approvals 30 days 60 days 7 years 7 years
Git operations 30 days 60 days 7 years 7 years
System events 7 days 23 days 1 year 1 year
Debug/trace logs 3 days 4 days N/A 7 days

GDPR Considerations:

  • PII must be redacted or encrypted before archival
  • Implement right-to-deletion capability (pseudonymization in archives)
  • Document lawful basis for retention (legitimate business interest, legal compliance)

SOC2 Considerations:

  • Audit logs must be tamper-evident (hash chaining)
  • Access to audit logs must be logged (audit of audits)
  • Retention must be documented in data retention policy

Recommendations

Implementation Phases

Phase 1: Foundation (Week 1-2)

  1. Set up TimescaleDB with audit_events hypertable
  2. Implement base AuditEvent schema and writer
  3. Create @audit_agent_action decorator
  4. Add basic project/agent filtering

Phase 2: LLM & MCP Logging (Week 3-4)

  1. Implement @audit_llm_call decorator
  2. Implement @audit_mcp_tool decorator
  3. Add prompt/response logging with redaction
  4. Integrate with OpenTelemetry

Phase 3: Immutability & Compliance (Week 5-6)

  1. Implement hash chaining for tamper evidence
  2. Add integrity verification endpoints
  3. Implement cold storage archival job
  4. Document retention policies

Phase 4: Query & Investigation (Week 7-8)

  1. Build audit query API endpoints
  2. Implement full-text search
  3. Add trace correlation queries
  4. Create audit dashboard

Technology Stack

Component Recommendation Alternative
Structured Logging structlog python-json-logger
Hot Storage PostgreSQL + TimescaleDB ClickHouse
Cold Storage MinIO (S3-compatible) AWS S3
Archive Format Parquet + ZSTD ORC
Tracing OpenTelemetry Jaeger
Query Engine Native SQL + Trino Apache Druid

Cost Estimates (Self-Hosted)

Resource Specification Monthly Cost
TimescaleDB (hot) 500GB SSD, 16GB RAM Infrastructure only
MinIO (cold) 10TB HDD Infrastructure only
Estimated log volume ~500GB/month -
Compression ratio ~10:1 -
Effective storage ~50GB/month -

References


Decision

Adopt a tiered audit logging architecture with:

  1. Structlog + OpenTelemetry for structured, correlated logging
  2. PostgreSQL + TimescaleDB for hot storage with time-series optimization
  3. S3/MinIO + Parquet for cold archival
  4. Cryptographic hash chaining for immutability
  5. 7-year retention for compliance with SOC2/financial regulations

Spike completed. Findings will inform ADR-007: Audit Logging Architecture.