forked from cardosofelipe/fast-next-template
Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1065 lines
40 KiB
Markdown
1065 lines
40 KiB
Markdown
# SPIKE-011: Audit Logging for Syndarix
|
|
|
|
**Status:** Completed
|
|
**Date:** 2025-12-29
|
|
**Author:** Architecture Team
|
|
**Related Issue:** #11
|
|
|
|
---
|
|
|
|
## Executive Summary
|
|
|
|
Syndarix, as an autonomous AI-powered consulting agency, requires comprehensive audit logging to ensure compliance, enable debugging, and build client trust. This spike researches best practices for audit logging in autonomous AI systems and provides concrete recommendations for implementation.
|
|
|
|
**Recommendation:** Implement a structured, OpenTelemetry-compatible audit logging system using:
|
|
- **Structlog** for structured JSON logging with contextual enrichment
|
|
- **PostgreSQL** with TimescaleDB extension for hot storage (0-90 days)
|
|
- **S3-compatible object storage** for cold archival (90+ days)
|
|
- **Cryptographic hash chaining** for immutability verification
|
|
- **OpenTelemetry** integration for correlation with traces and metrics
|
|
|
|
---
|
|
|
|
## Objective
|
|
|
|
Research the optimal audit logging architecture for Syndarix, focusing on:
|
|
1. Comprehensive event capture for autonomous AI agent actions
|
|
2. Compliance with SOC2/GDPR requirements
|
|
3. Searchable, queryable audit trails
|
|
4. Immutable, tamper-evident logging
|
|
5. Scalable storage architecture
|
|
|
|
---
|
|
|
|
## Research Questions & Findings
|
|
|
|
### 1. What to Log in Autonomous AI Systems
|
|
|
|
Based on LLM observability best practices and Syndarix-specific requirements, the following event categories must be logged:
|
|
|
|
#### Agent Actions
|
|
| Event Type | Description | Critical Fields |
|
|
|------------|-------------|-----------------|
|
|
| `agent.spawned` | Agent instance created | agent_type, agent_id, project_id, config |
|
|
| `agent.action.started` | Agent begins action | action_type, input_params, before_state |
|
|
| `agent.action.completed` | Agent completes action | action_type, output, after_state, duration_ms |
|
|
| `agent.action.failed` | Agent action failed | action_type, error, stack_trace |
|
|
| `agent.decision` | Agent makes autonomous decision | decision_type, options, chosen, reasoning |
|
|
| `agent.terminated` | Agent instance destroyed | reason, final_state |
|
|
|
|
#### LLM Interactions
|
|
| Event Type | Description | Critical Fields |
|
|
|------------|-------------|-----------------|
|
|
| `llm.request` | Prompt sent to LLM | model, prompt_template, variables, token_count |
|
|
| `llm.response` | LLM response received | model, response, token_count, latency_ms |
|
|
| `llm.error` | LLM call failed | model, error, retry_count |
|
|
| `llm.tool_call` | LLM invokes tool | tool_name, arguments |
|
|
|
|
#### MCP Tool Invocations
|
|
| Event Type | Description | Critical Fields |
|
|
|------------|-------------|-----------------|
|
|
| `mcp.tool.invoked` | MCP tool called | server, tool_name, arguments, project_id |
|
|
| `mcp.tool.result` | MCP tool returned | server, tool_name, result, duration_ms |
|
|
| `mcp.tool.error` | MCP tool failed | server, tool_name, error |
|
|
|
|
#### Human Approvals
|
|
| Event Type | Description | Critical Fields |
|
|
|------------|-------------|-----------------|
|
|
| `approval.requested` | System requests human approval | action_type, context, options |
|
|
| `approval.granted` | Human approves action | approver_id, action_id, comments |
|
|
| `approval.rejected` | Human rejects action | approver_id, action_id, reason |
|
|
| `approval.timeout` | Approval request timed out | action_id, timeout_ms |
|
|
|
|
#### Git Operations
|
|
| Event Type | Description | Critical Fields |
|
|
|------------|-------------|-----------------|
|
|
| `git.commit` | Commit created | repo, branch, commit_sha, message, files |
|
|
| `git.branch.created` | Branch created | repo, branch_name, base_branch |
|
|
| `git.pr.created` | Pull request opened | repo, pr_number, title, head, base |
|
|
| `git.pr.merged` | Pull request merged | repo, pr_number, merge_commit |
|
|
|
|
#### Project Lifecycle
|
|
| Event Type | Description | Critical Fields |
|
|
|------------|-------------|-----------------|
|
|
| `project.created` | New project started | project_id, client_id, autonomy_level |
|
|
| `project.sprint.started` | Sprint begins | sprint_id, goals, assigned_agents |
|
|
| `project.milestone.completed` | Milestone achieved | milestone_id, deliverables |
|
|
| `project.checkpoint` | Client checkpoint | checkpoint_type, feedback |
|
|
|
|
### 2. Structured Logging Format
|
|
|
|
**Recommendation:** Use OpenTelemetry-compatible structured JSON logging.
|
|
|
|
#### Base Event Schema
|
|
|
|
```python
|
|
from pydantic import BaseModel
|
|
from datetime import datetime
|
|
from typing import Any, Optional
|
|
from enum import Enum
|
|
|
|
class AuditEventSeverity(str, Enum):
|
|
DEBUG = "DEBUG"
|
|
INFO = "INFO"
|
|
WARNING = "WARNING"
|
|
ERROR = "ERROR"
|
|
CRITICAL = "CRITICAL"
|
|
|
|
class AuditEvent(BaseModel):
|
|
"""Base schema for all audit events."""
|
|
|
|
# Identity & Correlation
|
|
event_id: str # UUID v7 (time-ordered)
|
|
trace_id: Optional[str] = None # OpenTelemetry trace ID
|
|
span_id: Optional[str] = None # OpenTelemetry span ID
|
|
parent_event_id: Optional[str] = None # For event chains
|
|
|
|
# Timestamp
|
|
timestamp: datetime # ISO 8601 with timezone
|
|
timestamp_unix_ms: int # Unix millis for indexing
|
|
|
|
# Event Classification
|
|
event_type: str # e.g., "agent.action.completed"
|
|
event_category: str # e.g., "agent", "llm", "mcp"
|
|
severity: AuditEventSeverity
|
|
|
|
# Context
|
|
project_id: Optional[str] = None
|
|
agent_id: Optional[str] = None
|
|
agent_type: Optional[str] = None
|
|
user_id: Optional[str] = None # Human actor if applicable
|
|
session_id: Optional[str] = None
|
|
|
|
# Event Data
|
|
action: str # Human-readable action description
|
|
data: dict[str, Any] # Event-specific payload
|
|
|
|
# State Tracking
|
|
before_state: Optional[dict] = None # State before action
|
|
after_state: Optional[dict] = None # State after action
|
|
|
|
# Technical Metadata
|
|
service: str = "syndarix"
|
|
service_version: str
|
|
environment: str # production, staging, development
|
|
hostname: str
|
|
|
|
# Immutability
|
|
previous_hash: Optional[str] = None # Hash of previous event (chain)
|
|
event_hash: Optional[str] = None # SHA-256 of this event
|
|
|
|
class Config:
|
|
json_schema_extra = {
|
|
"example": {
|
|
"event_id": "019373a8-9b2e-7f4c-8d1a-2b3c4d5e6f7a",
|
|
"trace_id": "abc123def456",
|
|
"timestamp": "2025-12-29T14:30:00.000Z",
|
|
"timestamp_unix_ms": 1735480200000,
|
|
"event_type": "agent.action.completed",
|
|
"event_category": "agent",
|
|
"severity": "INFO",
|
|
"project_id": "proj-001",
|
|
"agent_id": "agent-123",
|
|
"agent_type": "software_engineer",
|
|
"action": "Created feature branch",
|
|
"data": {
|
|
"branch_name": "feature/user-auth",
|
|
"base_branch": "main"
|
|
},
|
|
"service": "syndarix",
|
|
"service_version": "1.0.0",
|
|
"environment": "production"
|
|
}
|
|
}
|
|
```
|
|
|
|
#### LLM-Specific Event Schema
|
|
|
|
```python
|
|
class LLMRequestEvent(AuditEvent):
|
|
"""Schema for LLM request events."""
|
|
event_type: str = "llm.request"
|
|
event_category: str = "llm"
|
|
|
|
data: dict = {
|
|
"model": str, # e.g., "claude-3-5-sonnet"
|
|
"provider": str, # e.g., "anthropic", "openai"
|
|
"prompt_template_id": str, # Reference to template
|
|
"prompt_template_version": str,
|
|
"prompt_variables": dict, # Variables substituted
|
|
"system_prompt_hash": str, # Hash of system prompt
|
|
"user_prompt": str, # Full user prompt (may be truncated)
|
|
"token_count_estimate": int,
|
|
"max_tokens": int,
|
|
"temperature": float,
|
|
"tools_available": list[str],
|
|
}
|
|
|
|
class LLMResponseEvent(AuditEvent):
|
|
"""Schema for LLM response events."""
|
|
event_type: str = "llm.response"
|
|
event_category: str = "llm"
|
|
|
|
data: dict = {
|
|
"model": str,
|
|
"provider": str,
|
|
"response_text": str, # May be truncated for storage
|
|
"response_hash": str, # Full response hash
|
|
"input_tokens": int,
|
|
"output_tokens": int,
|
|
"total_tokens": int,
|
|
"latency_ms": int,
|
|
"finish_reason": str, # "stop", "max_tokens", "tool_use"
|
|
"tool_calls": list[dict], # If tools were invoked
|
|
"cost_usd": float, # Estimated cost
|
|
}
|
|
```
|
|
|
|
### 3. Storage Architecture
|
|
|
|
**Recommendation:** Tiered storage with hot, warm, and cold layers.
|
|
|
|
```
|
|
Query Latency
|
|
◄────────────►
|
|
Fast Slow
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ HOT STORAGE │
|
|
│ (0-30 days, ~10TB/month) │
|
|
│ ┌───────────────────────────────────────────────────────────┐ │
|
|
│ │ PostgreSQL + TimescaleDB │ │
|
|
│ │ - Hypertables partitioned by day │ │
|
|
│ │ - Native compression after 7 days │ │
|
|
│ │ - Full-text search on action/data fields │ │
|
|
│ │ - B-tree indexes on project_id, agent_id, event_type │ │
|
|
│ │ - GIN index on JSONB data field │ │
|
|
│ └───────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ▼ (30 days) │
|
|
├─────────────────────────────────────────────────────────────────┤
|
|
│ WARM STORAGE │
|
|
│ (30-90 days, compressed) │
|
|
│ ┌───────────────────────────────────────────────────────────┐ │
|
|
│ │ TimescaleDB Continuous Aggregates │ │
|
|
│ │ - Hourly/daily rollups of metrics │ │
|
|
│ │ - Detailed logs in highly compressed chunks │ │
|
|
│ │ - Query via same SQL interface │ │
|
|
│ └───────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ▼ (90 days) │
|
|
├─────────────────────────────────────────────────────────────────┤
|
|
│ COLD STORAGE │
|
|
│ (90+ days, archival, 7 year retention) │
|
|
│ ┌───────────────────────────────────────────────────────────┐ │
|
|
│ │ S3-Compatible Object Storage (MinIO / AWS S3) │ │
|
|
│ │ - Parquet files partitioned by date/project │ │
|
|
│ │ - Glacier-class storage after 1 year │ │
|
|
│ │ - Queryable via Trino/Athena for investigations │ │
|
|
│ │ - Cryptographic manifest for integrity verification │ │
|
|
│ └───────────────────────────────────────────────────────────┘ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
#### TimescaleDB Schema
|
|
|
|
```sql
|
|
-- Enable TimescaleDB extension
|
|
CREATE EXTENSION IF NOT EXISTS timescaledb;
|
|
|
|
-- Main audit events table
|
|
CREATE TABLE audit_events (
|
|
event_id UUID PRIMARY KEY,
|
|
trace_id VARCHAR(32),
|
|
span_id VARCHAR(16),
|
|
parent_event_id UUID REFERENCES audit_events(event_id),
|
|
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
timestamp_unix_ms BIGINT NOT NULL,
|
|
|
|
event_type VARCHAR(100) NOT NULL,
|
|
event_category VARCHAR(50) NOT NULL,
|
|
severity VARCHAR(20) NOT NULL,
|
|
|
|
project_id VARCHAR(50),
|
|
agent_id VARCHAR(50),
|
|
agent_type VARCHAR(50),
|
|
user_id VARCHAR(50),
|
|
session_id VARCHAR(50),
|
|
|
|
action TEXT NOT NULL,
|
|
data JSONB NOT NULL,
|
|
|
|
before_state JSONB,
|
|
after_state JSONB,
|
|
|
|
service VARCHAR(50) NOT NULL,
|
|
service_version VARCHAR(20) NOT NULL,
|
|
environment VARCHAR(20) NOT NULL,
|
|
hostname VARCHAR(100) NOT NULL,
|
|
|
|
previous_hash VARCHAR(64),
|
|
event_hash VARCHAR(64) NOT NULL,
|
|
|
|
-- Denormalized for efficient queries
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
-- Convert to hypertable for time-series optimization
|
|
SELECT create_hypertable(
|
|
'audit_events',
|
|
'timestamp',
|
|
chunk_time_interval => INTERVAL '1 day'
|
|
);
|
|
|
|
-- Enable compression for chunks older than 7 days
|
|
ALTER TABLE audit_events SET (
|
|
timescaledb.compress,
|
|
timescaledb.compress_segmentby = 'project_id, event_category',
|
|
timescaledb.compress_orderby = 'timestamp DESC'
|
|
);
|
|
|
|
SELECT add_compression_policy('audit_events', INTERVAL '7 days');
|
|
|
|
-- Retention policy: move to cold storage after 90 days
|
|
SELECT add_retention_policy('audit_events', INTERVAL '90 days');
|
|
|
|
-- Indexes for common query patterns
|
|
CREATE INDEX idx_audit_project_time ON audit_events (project_id, timestamp DESC);
|
|
CREATE INDEX idx_audit_agent_time ON audit_events (agent_id, timestamp DESC);
|
|
CREATE INDEX idx_audit_type_time ON audit_events (event_type, timestamp DESC);
|
|
CREATE INDEX idx_audit_category_time ON audit_events (event_category, timestamp DESC);
|
|
CREATE INDEX idx_audit_trace ON audit_events (trace_id) WHERE trace_id IS NOT NULL;
|
|
|
|
-- GIN index for JSONB queries
|
|
CREATE INDEX idx_audit_data ON audit_events USING GIN (data);
|
|
|
|
-- Full-text search on action field
|
|
CREATE INDEX idx_audit_action_fts ON audit_events USING GIN (to_tsvector('english', action));
|
|
```
|
|
|
|
#### Cold Storage Archive Job
|
|
|
|
```python
|
|
# jobs/archive_audit_logs.py
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
import pyarrow as pa
|
|
import pyarrow.parquet as pq
|
|
from minio import Minio
|
|
import hashlib
|
|
|
|
async def archive_old_audit_logs():
|
|
"""Archive audit logs older than 90 days to S3/MinIO."""
|
|
|
|
cutoff_date = datetime.utcnow() - timedelta(days=90)
|
|
|
|
# Query logs to archive
|
|
query = """
|
|
SELECT * FROM audit_events
|
|
WHERE timestamp < %s
|
|
ORDER BY timestamp ASC
|
|
"""
|
|
|
|
async with get_db_session() as session:
|
|
result = await session.execute(query, [cutoff_date])
|
|
records = result.fetchall()
|
|
|
|
if not records:
|
|
return
|
|
|
|
# Convert to Parquet
|
|
table = pa.Table.from_pylist([dict(r) for r in records])
|
|
|
|
# Partition by date and project
|
|
partition_key = cutoff_date.strftime("%Y/%m/%d")
|
|
|
|
# Write to buffer
|
|
buffer = pa.BufferOutputStream()
|
|
pq.write_table(table, buffer, compression='zstd')
|
|
|
|
# Calculate manifest hash
|
|
data = buffer.getvalue().to_pybytes()
|
|
content_hash = hashlib.sha256(data).hexdigest()
|
|
|
|
# Upload to MinIO/S3
|
|
client = Minio(
|
|
settings.MINIO_ENDPOINT,
|
|
access_key=settings.MINIO_ACCESS_KEY,
|
|
secret_key=settings.MINIO_SECRET_KEY,
|
|
)
|
|
|
|
object_name = f"audit-logs/{partition_key}/events.parquet"
|
|
client.put_object(
|
|
bucket_name="syndarix-audit-archive",
|
|
object_name=object_name,
|
|
data=io.BytesIO(data),
|
|
length=len(data),
|
|
metadata={"content-hash": content_hash}
|
|
)
|
|
|
|
# Update manifest for integrity verification
|
|
await update_archive_manifest(partition_key, object_name, content_hash)
|
|
|
|
# Delete archived records from TimescaleDB
|
|
await session.execute(
|
|
"DELETE FROM audit_events WHERE timestamp < %s",
|
|
[cutoff_date]
|
|
)
|
|
```
|
|
|
|
### 4. Immutability & Integrity
|
|
|
|
**Recommendation:** Use cryptographic hash chaining for tamper evidence.
|
|
|
|
```python
|
|
# app/audit/integrity.py
|
|
import hashlib
|
|
import json
|
|
from typing import Optional
|
|
|
|
class AuditIntegrity:
|
|
"""Cryptographic hash chaining for audit log integrity."""
|
|
|
|
def __init__(self, redis_client):
|
|
self.redis = redis_client
|
|
self._last_hash_key = "audit:last_hash:{project_id}"
|
|
|
|
async def compute_event_hash(
|
|
self,
|
|
event: AuditEvent,
|
|
previous_hash: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Compute SHA-256 hash of event including previous hash.
|
|
Creates a blockchain-like chain of events.
|
|
"""
|
|
# Canonical JSON representation (sorted keys, no whitespace)
|
|
canonical = json.dumps(
|
|
{
|
|
"event_id": str(event.event_id),
|
|
"timestamp_unix_ms": event.timestamp_unix_ms,
|
|
"event_type": event.event_type,
|
|
"project_id": event.project_id,
|
|
"agent_id": event.agent_id,
|
|
"action": event.action,
|
|
"data": event.data,
|
|
"previous_hash": previous_hash or "",
|
|
},
|
|
sort_keys=True,
|
|
separators=(",", ":")
|
|
)
|
|
|
|
return hashlib.sha256(canonical.encode()).hexdigest()
|
|
|
|
async def chain_event(self, event: AuditEvent) -> AuditEvent:
|
|
"""Add event to the hash chain."""
|
|
project_key = self._last_hash_key.format(project_id=event.project_id)
|
|
|
|
# Get previous hash atomically
|
|
previous_hash = await self.redis.get(project_key)
|
|
|
|
# Compute new hash
|
|
event.previous_hash = previous_hash
|
|
event.event_hash = await self.compute_event_hash(event, previous_hash)
|
|
|
|
# Update last hash atomically
|
|
await self.redis.set(project_key, event.event_hash)
|
|
|
|
return event
|
|
|
|
async def verify_chain(
|
|
self,
|
|
project_id: str,
|
|
start_event_id: str,
|
|
end_event_id: str
|
|
) -> tuple[bool, Optional[str]]:
|
|
"""
|
|
Verify integrity of event chain between two events.
|
|
Returns (is_valid, first_invalid_event_id).
|
|
"""
|
|
events = await self.get_events_in_range(
|
|
project_id, start_event_id, end_event_id
|
|
)
|
|
|
|
previous_hash = events[0].previous_hash
|
|
|
|
for event in events:
|
|
expected_hash = await self.compute_event_hash(event, previous_hash)
|
|
|
|
if event.event_hash != expected_hash:
|
|
return (False, str(event.event_id))
|
|
|
|
previous_hash = event.event_hash
|
|
|
|
return (True, None)
|
|
```
|
|
|
|
### 5. Query Patterns & Indexing
|
|
|
|
#### Common Query Patterns
|
|
|
|
```python
|
|
# app/audit/queries.py
|
|
|
|
class AuditQueries:
|
|
"""Optimized audit log queries."""
|
|
|
|
async def get_project_timeline(
|
|
self,
|
|
project_id: str,
|
|
start_time: datetime,
|
|
end_time: datetime,
|
|
event_types: Optional[list[str]] = None,
|
|
limit: int = 1000
|
|
) -> list[AuditEvent]:
|
|
"""Get chronological audit trail for a project."""
|
|
query = """
|
|
SELECT * FROM audit_events
|
|
WHERE project_id = $1
|
|
AND timestamp BETWEEN $2 AND $3
|
|
{type_filter}
|
|
ORDER BY timestamp DESC
|
|
LIMIT $4
|
|
"""
|
|
type_filter = ""
|
|
if event_types:
|
|
type_filter = f"AND event_type = ANY($5)"
|
|
|
|
return await self.db.fetch(query, project_id, start_time, end_time, limit)
|
|
|
|
async def get_agent_actions(
|
|
self,
|
|
agent_id: str,
|
|
hours: int = 24
|
|
) -> list[AuditEvent]:
|
|
"""Get all actions by a specific agent."""
|
|
query = """
|
|
SELECT * FROM audit_events
|
|
WHERE agent_id = $1
|
|
AND timestamp > NOW() - INTERVAL '%s hours'
|
|
AND event_category = 'agent'
|
|
ORDER BY timestamp DESC
|
|
"""
|
|
return await self.db.fetch(query, agent_id, hours)
|
|
|
|
async def get_llm_usage_summary(
|
|
self,
|
|
project_id: str,
|
|
days: int = 30
|
|
) -> dict:
|
|
"""Get LLM usage statistics for billing/monitoring."""
|
|
query = """
|
|
SELECT
|
|
data->>'model' as model,
|
|
data->>'provider' as provider,
|
|
COUNT(*) as request_count,
|
|
SUM((data->>'total_tokens')::int) as total_tokens,
|
|
SUM((data->>'cost_usd')::float) as total_cost,
|
|
AVG((data->>'latency_ms')::int) as avg_latency_ms
|
|
FROM audit_events
|
|
WHERE project_id = $1
|
|
AND event_type = 'llm.response'
|
|
AND timestamp > NOW() - INTERVAL '%s days'
|
|
GROUP BY data->>'model', data->>'provider'
|
|
"""
|
|
return await self.db.fetch(query, project_id, days)
|
|
|
|
async def search_actions(
|
|
self,
|
|
query_text: str,
|
|
project_id: Optional[str] = None,
|
|
limit: int = 100
|
|
) -> list[AuditEvent]:
|
|
"""Full-text search on action descriptions."""
|
|
query = """
|
|
SELECT *, ts_rank(to_tsvector('english', action), query) as rank
|
|
FROM audit_events, plainto_tsquery('english', $1) query
|
|
WHERE to_tsvector('english', action) @@ query
|
|
{project_filter}
|
|
ORDER BY rank DESC, timestamp DESC
|
|
LIMIT $2
|
|
"""
|
|
project_filter = f"AND project_id = '{project_id}'" if project_id else ""
|
|
return await self.db.fetch(query.format(project_filter=project_filter), query_text, limit)
|
|
|
|
async def trace_event_chain(
|
|
self,
|
|
event_id: str
|
|
) -> list[AuditEvent]:
|
|
"""Trace full event chain using parent_event_id."""
|
|
query = """
|
|
WITH RECURSIVE event_chain AS (
|
|
SELECT * FROM audit_events WHERE event_id = $1
|
|
UNION ALL
|
|
SELECT e.* FROM audit_events e
|
|
JOIN event_chain ec ON e.event_id = ec.parent_event_id
|
|
)
|
|
SELECT * FROM event_chain ORDER BY timestamp ASC
|
|
"""
|
|
return await self.db.fetch(query, event_id)
|
|
```
|
|
|
|
### 6. Logging Decorators & Implementation
|
|
|
|
```python
|
|
# app/audit/decorators.py
|
|
import functools
|
|
import time
|
|
from uuid import uuid7
|
|
from datetime import datetime, timezone
|
|
from typing import Callable, Any
|
|
import structlog
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
def audit_agent_action(action_type: str):
|
|
"""
|
|
Decorator to audit agent actions with before/after state capture.
|
|
|
|
Usage:
|
|
@audit_agent_action("create_branch")
|
|
async def create_branch(self, branch_name: str) -> Branch:
|
|
...
|
|
"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@functools.wraps(func)
|
|
async def wrapper(self, *args, **kwargs):
|
|
# Capture before state
|
|
before_state = await self.get_state() if hasattr(self, 'get_state') else None
|
|
|
|
event_id = str(uuid7())
|
|
start_time = time.perf_counter()
|
|
|
|
try:
|
|
# Execute the action
|
|
result = await func(self, *args, **kwargs)
|
|
|
|
# Capture after state
|
|
after_state = await self.get_state() if hasattr(self, 'get_state') else None
|
|
|
|
# Log success
|
|
await audit_logger.log_event(
|
|
AuditEvent(
|
|
event_id=event_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
timestamp_unix_ms=int(time.time() * 1000),
|
|
event_type=f"agent.action.completed",
|
|
event_category="agent",
|
|
severity=AuditEventSeverity.INFO,
|
|
project_id=self.project_id,
|
|
agent_id=self.agent_id,
|
|
agent_type=self.agent_type,
|
|
action=f"{action_type}: {func.__name__}",
|
|
data={
|
|
"action_type": action_type,
|
|
"function": func.__name__,
|
|
"args": _serialize_args(args),
|
|
"kwargs": _serialize_kwargs(kwargs),
|
|
"result_summary": _summarize_result(result),
|
|
"duration_ms": int((time.perf_counter() - start_time) * 1000),
|
|
},
|
|
before_state=before_state,
|
|
after_state=after_state,
|
|
service="syndarix",
|
|
service_version=settings.VERSION,
|
|
environment=settings.ENVIRONMENT,
|
|
hostname=socket.gethostname(),
|
|
)
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Log failure
|
|
await audit_logger.log_event(
|
|
AuditEvent(
|
|
event_id=event_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
timestamp_unix_ms=int(time.time() * 1000),
|
|
event_type=f"agent.action.failed",
|
|
event_category="agent",
|
|
severity=AuditEventSeverity.ERROR,
|
|
project_id=self.project_id,
|
|
agent_id=self.agent_id,
|
|
agent_type=self.agent_type,
|
|
action=f"{action_type}: {func.__name__} (FAILED)",
|
|
data={
|
|
"action_type": action_type,
|
|
"function": func.__name__,
|
|
"error": str(e),
|
|
"error_type": type(e).__name__,
|
|
"duration_ms": int((time.perf_counter() - start_time) * 1000),
|
|
},
|
|
before_state=before_state,
|
|
service="syndarix",
|
|
service_version=settings.VERSION,
|
|
environment=settings.ENVIRONMENT,
|
|
hostname=socket.gethostname(),
|
|
)
|
|
)
|
|
raise
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def audit_llm_call(func: Callable) -> Callable:
|
|
"""
|
|
Decorator to audit LLM calls with prompt/response logging.
|
|
|
|
Usage:
|
|
@audit_llm_call
|
|
async def generate_response(self, prompt: str, **kwargs) -> str:
|
|
...
|
|
"""
|
|
@functools.wraps(func)
|
|
async def wrapper(self, *args, **kwargs):
|
|
event_id = str(uuid7())
|
|
start_time = time.perf_counter()
|
|
|
|
# Log request
|
|
request_event = AuditEvent(
|
|
event_id=event_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
timestamp_unix_ms=int(time.time() * 1000),
|
|
event_type="llm.request",
|
|
event_category="llm",
|
|
severity=AuditEventSeverity.INFO,
|
|
project_id=getattr(self, 'project_id', None),
|
|
agent_id=getattr(self, 'agent_id', None),
|
|
action="LLM request initiated",
|
|
data={
|
|
"model": kwargs.get('model', self.model),
|
|
"provider": self.provider,
|
|
"prompt_hash": hashlib.sha256(str(args).encode()).hexdigest()[:16],
|
|
"max_tokens": kwargs.get('max_tokens', 4096),
|
|
"temperature": kwargs.get('temperature', 0.7),
|
|
},
|
|
service="syndarix",
|
|
service_version=settings.VERSION,
|
|
environment=settings.ENVIRONMENT,
|
|
hostname=socket.gethostname(),
|
|
)
|
|
await audit_logger.log_event(request_event)
|
|
|
|
try:
|
|
result = await func(self, *args, **kwargs)
|
|
|
|
# Log response
|
|
response_event = AuditEvent(
|
|
event_id=str(uuid7()),
|
|
parent_event_id=event_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
timestamp_unix_ms=int(time.time() * 1000),
|
|
event_type="llm.response",
|
|
event_category="llm",
|
|
severity=AuditEventSeverity.INFO,
|
|
project_id=getattr(self, 'project_id', None),
|
|
agent_id=getattr(self, 'agent_id', None),
|
|
action="LLM response received",
|
|
data={
|
|
"model": kwargs.get('model', self.model),
|
|
"provider": self.provider,
|
|
"response_hash": hashlib.sha256(str(result).encode()).hexdigest()[:16],
|
|
"input_tokens": result.usage.input_tokens if hasattr(result, 'usage') else None,
|
|
"output_tokens": result.usage.output_tokens if hasattr(result, 'usage') else None,
|
|
"latency_ms": int((time.perf_counter() - start_time) * 1000),
|
|
"finish_reason": result.finish_reason if hasattr(result, 'finish_reason') else None,
|
|
},
|
|
service="syndarix",
|
|
service_version=settings.VERSION,
|
|
environment=settings.ENVIRONMENT,
|
|
hostname=socket.gethostname(),
|
|
)
|
|
await audit_logger.log_event(response_event)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Log error
|
|
error_event = AuditEvent(
|
|
event_id=str(uuid7()),
|
|
parent_event_id=event_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
timestamp_unix_ms=int(time.time() * 1000),
|
|
event_type="llm.error",
|
|
event_category="llm",
|
|
severity=AuditEventSeverity.ERROR,
|
|
project_id=getattr(self, 'project_id', None),
|
|
agent_id=getattr(self, 'agent_id', None),
|
|
action="LLM request failed",
|
|
data={
|
|
"model": kwargs.get('model', getattr(self, 'model', None)),
|
|
"error": str(e),
|
|
"error_type": type(e).__name__,
|
|
"latency_ms": int((time.perf_counter() - start_time) * 1000),
|
|
},
|
|
service="syndarix",
|
|
service_version=settings.VERSION,
|
|
environment=settings.ENVIRONMENT,
|
|
hostname=socket.gethostname(),
|
|
)
|
|
await audit_logger.log_event(error_event)
|
|
raise
|
|
|
|
return wrapper
|
|
|
|
|
|
def audit_mcp_tool(func: Callable) -> Callable:
|
|
"""
|
|
Decorator to audit MCP tool invocations.
|
|
|
|
Usage:
|
|
@audit_mcp_tool
|
|
async def call_tool(self, server: str, tool: str, args: dict):
|
|
...
|
|
"""
|
|
@functools.wraps(func)
|
|
async def wrapper(self, server: str, tool: str, arguments: dict, **kwargs):
|
|
event_id = str(uuid7())
|
|
start_time = time.perf_counter()
|
|
|
|
# Log invocation
|
|
await audit_logger.log_event(
|
|
AuditEvent(
|
|
event_id=event_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
timestamp_unix_ms=int(time.time() * 1000),
|
|
event_type="mcp.tool.invoked",
|
|
event_category="mcp",
|
|
severity=AuditEventSeverity.INFO,
|
|
project_id=arguments.get('project_id'),
|
|
agent_id=arguments.get('agent_id'),
|
|
action=f"MCP tool invoked: {server}/{tool}",
|
|
data={
|
|
"server": server,
|
|
"tool_name": tool,
|
|
"arguments": _redact_sensitive(arguments),
|
|
},
|
|
service="syndarix",
|
|
service_version=settings.VERSION,
|
|
environment=settings.ENVIRONMENT,
|
|
hostname=socket.gethostname(),
|
|
)
|
|
)
|
|
|
|
try:
|
|
result = await func(self, server, tool, arguments, **kwargs)
|
|
|
|
# Log result
|
|
await audit_logger.log_event(
|
|
AuditEvent(
|
|
event_id=str(uuid7()),
|
|
parent_event_id=event_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
timestamp_unix_ms=int(time.time() * 1000),
|
|
event_type="mcp.tool.result",
|
|
event_category="mcp",
|
|
severity=AuditEventSeverity.INFO,
|
|
project_id=arguments.get('project_id'),
|
|
agent_id=arguments.get('agent_id'),
|
|
action=f"MCP tool completed: {server}/{tool}",
|
|
data={
|
|
"server": server,
|
|
"tool_name": tool,
|
|
"result_summary": _summarize_result(result),
|
|
"duration_ms": int((time.perf_counter() - start_time) * 1000),
|
|
},
|
|
service="syndarix",
|
|
service_version=settings.VERSION,
|
|
environment=settings.ENVIRONMENT,
|
|
hostname=socket.gethostname(),
|
|
)
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
await audit_logger.log_event(
|
|
AuditEvent(
|
|
event_id=str(uuid7()),
|
|
parent_event_id=event_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
timestamp_unix_ms=int(time.time() * 1000),
|
|
event_type="mcp.tool.error",
|
|
event_category="mcp",
|
|
severity=AuditEventSeverity.ERROR,
|
|
project_id=arguments.get('project_id'),
|
|
agent_id=arguments.get('agent_id'),
|
|
action=f"MCP tool failed: {server}/{tool}",
|
|
data={
|
|
"server": server,
|
|
"tool_name": tool,
|
|
"error": str(e),
|
|
"error_type": type(e).__name__,
|
|
"duration_ms": int((time.perf_counter() - start_time) * 1000),
|
|
},
|
|
service="syndarix",
|
|
service_version=settings.VERSION,
|
|
environment=settings.ENVIRONMENT,
|
|
hostname=socket.gethostname(),
|
|
)
|
|
)
|
|
raise
|
|
|
|
return wrapper
|
|
```
|
|
|
|
### 7. OpenTelemetry Integration
|
|
|
|
```python
|
|
# app/audit/otel_integration.py
|
|
from opentelemetry import trace
|
|
from opentelemetry.trace import Span
|
|
from opentelemetry.sdk.trace import TracerProvider
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
|
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
|
|
|
# Configure OpenTelemetry
|
|
trace.set_tracer_provider(TracerProvider())
|
|
tracer = trace.get_tracer("syndarix.audit")
|
|
|
|
otlp_exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
|
|
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(otlp_exporter))
|
|
|
|
|
|
class AuditLoggerWithOTel:
|
|
"""Audit logger with OpenTelemetry correlation."""
|
|
|
|
def __init__(self, db_writer, integrity_checker):
|
|
self.db_writer = db_writer
|
|
self.integrity = integrity_checker
|
|
|
|
async def log_event(self, event: AuditEvent) -> None:
|
|
"""Log event with OpenTelemetry trace correlation."""
|
|
# Get current span context
|
|
current_span = trace.get_current_span()
|
|
span_context = current_span.get_span_context()
|
|
|
|
if span_context.is_valid:
|
|
event.trace_id = format(span_context.trace_id, '032x')
|
|
event.span_id = format(span_context.span_id, '016x')
|
|
|
|
# Add to hash chain for immutability
|
|
event = await self.integrity.chain_event(event)
|
|
|
|
# Write to database
|
|
await self.db_writer.write(event)
|
|
|
|
# Add event as span event for trace correlation
|
|
current_span.add_event(
|
|
name=event.event_type,
|
|
attributes={
|
|
"audit.event_id": str(event.event_id),
|
|
"audit.project_id": event.project_id or "",
|
|
"audit.agent_id": event.agent_id or "",
|
|
"audit.action": event.action,
|
|
}
|
|
)
|
|
```
|
|
|
|
### 8. Retention Policy Recommendations
|
|
|
|
Based on compliance requirements (SOC2, GDPR) and operational needs:
|
|
|
|
| Data Category | Hot Storage | Warm Storage | Cold Archive | Total Retention |
|
|
|---------------|-------------|--------------|--------------|-----------------|
|
|
| Agent actions | 30 days | 60 days | 7 years | 7 years |
|
|
| LLM requests/responses | 30 days | 60 days | 7 years | 7 years |
|
|
| MCP tool calls | 30 days | 60 days | 7 years | 7 years |
|
|
| Human approvals | 30 days | 60 days | 7 years | 7 years |
|
|
| Git operations | 30 days | 60 days | 7 years | 7 years |
|
|
| System events | 7 days | 23 days | 1 year | 1 year |
|
|
| Debug/trace logs | 3 days | 4 days | N/A | 7 days |
|
|
|
|
**GDPR Considerations:**
|
|
- PII must be redacted or encrypted before archival
|
|
- Implement right-to-deletion capability (pseudonymization in archives)
|
|
- Document lawful basis for retention (legitimate business interest, legal compliance)
|
|
|
|
**SOC2 Considerations:**
|
|
- Audit logs must be tamper-evident (hash chaining)
|
|
- Access to audit logs must be logged (audit of audits)
|
|
- Retention must be documented in data retention policy
|
|
|
|
---
|
|
|
|
## Recommendations
|
|
|
|
### Implementation Phases
|
|
|
|
#### Phase 1: Foundation (Week 1-2)
|
|
1. Set up TimescaleDB with audit_events hypertable
|
|
2. Implement base AuditEvent schema and writer
|
|
3. Create `@audit_agent_action` decorator
|
|
4. Add basic project/agent filtering
|
|
|
|
#### Phase 2: LLM & MCP Logging (Week 3-4)
|
|
1. Implement `@audit_llm_call` decorator
|
|
2. Implement `@audit_mcp_tool` decorator
|
|
3. Add prompt/response logging with redaction
|
|
4. Integrate with OpenTelemetry
|
|
|
|
#### Phase 3: Immutability & Compliance (Week 5-6)
|
|
1. Implement hash chaining for tamper evidence
|
|
2. Add integrity verification endpoints
|
|
3. Implement cold storage archival job
|
|
4. Document retention policies
|
|
|
|
#### Phase 4: Query & Investigation (Week 7-8)
|
|
1. Build audit query API endpoints
|
|
2. Implement full-text search
|
|
3. Add trace correlation queries
|
|
4. Create audit dashboard
|
|
|
|
### Technology Stack
|
|
|
|
| Component | Recommendation | Alternative |
|
|
|-----------|----------------|-------------|
|
|
| Structured Logging | structlog | python-json-logger |
|
|
| Hot Storage | PostgreSQL + TimescaleDB | ClickHouse |
|
|
| Cold Storage | MinIO (S3-compatible) | AWS S3 |
|
|
| Archive Format | Parquet + ZSTD | ORC |
|
|
| Tracing | OpenTelemetry | Jaeger |
|
|
| Query Engine | Native SQL + Trino | Apache Druid |
|
|
|
|
### Cost Estimates (Self-Hosted)
|
|
|
|
| Resource | Specification | Monthly Cost |
|
|
|----------|---------------|--------------|
|
|
| TimescaleDB (hot) | 500GB SSD, 16GB RAM | Infrastructure only |
|
|
| MinIO (cold) | 10TB HDD | Infrastructure only |
|
|
| Estimated log volume | ~500GB/month | - |
|
|
| Compression ratio | ~10:1 | - |
|
|
| Effective storage | ~50GB/month | - |
|
|
|
|
---
|
|
|
|
## References
|
|
|
|
- [OpenTelemetry Logging Documentation](https://opentelemetry.io/docs/specs/otel/logs/)
|
|
- [How to Structure Logs Properly in OpenTelemetry](https://oneuptime.com/blog/post/2025-08-28-how-to-structure-logs-properly-in-opentelemetry/view)
|
|
- [Langfuse LLM Observability & Tracing](https://langfuse.com/docs/tracing)
|
|
- [LLM Observability: Tutorial & Best Practices](https://www.patronus.ai/llm-testing/llm-observability)
|
|
- [Datadog LLM Observability](https://www.datadoghq.com/product/llm-observability/)
|
|
- [Security Log Retention Best Practices](https://auditboard.com/blog/security-log-retention-best-practices-guide)
|
|
- [SOC 2 Data Security and Retention Requirements](https://www.bytebase.com/blog/soc2-data-security-and-retention-requirements/)
|
|
- [Immutable Audit Log Architecture](https://www.emergentmind.com/topics/immutable-audit-log)
|
|
- [What Are Immutable Logs? A Complete Guide](https://www.hubifi.com/blog/immutable-audit-log-guide)
|
|
- [TimescaleDB Documentation](https://docs.timescale.com/)
|
|
|
|
---
|
|
|
|
## Decision
|
|
|
|
**Adopt a tiered audit logging architecture** with:
|
|
1. **Structlog + OpenTelemetry** for structured, correlated logging
|
|
2. **PostgreSQL + TimescaleDB** for hot storage with time-series optimization
|
|
3. **S3/MinIO + Parquet** for cold archival
|
|
4. **Cryptographic hash chaining** for immutability
|
|
5. **7-year retention** for compliance with SOC2/financial regulations
|
|
|
|
---
|
|
|
|
*Spike completed. Findings will inform ADR-007: Audit Logging Architecture.*
|