Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
40 KiB
SPIKE-011: Audit Logging for Syndarix
Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #11
Executive Summary
Syndarix, as an autonomous AI-powered consulting agency, requires comprehensive audit logging to ensure compliance, enable debugging, and build client trust. This spike researches best practices for audit logging in autonomous AI systems and provides concrete recommendations for implementation.
Recommendation: Implement a structured, OpenTelemetry-compatible audit logging system using:
- Structlog for structured JSON logging with contextual enrichment
- PostgreSQL with TimescaleDB extension for hot storage (0-90 days)
- S3-compatible object storage for cold archival (90+ days)
- Cryptographic hash chaining for immutability verification
- OpenTelemetry integration for correlation with traces and metrics
Objective
Research the optimal audit logging architecture for Syndarix, focusing on:
- Comprehensive event capture for autonomous AI agent actions
- Compliance with SOC2/GDPR requirements
- Searchable, queryable audit trails
- Immutable, tamper-evident logging
- Scalable storage architecture
Research Questions & Findings
1. What to Log in Autonomous AI Systems
Based on LLM observability best practices and Syndarix-specific requirements, the following event categories must be logged:
Agent Actions
| Event Type | Description | Critical Fields |
|---|---|---|
agent.spawned |
Agent instance created | agent_type, agent_id, project_id, config |
agent.action.started |
Agent begins action | action_type, input_params, before_state |
agent.action.completed |
Agent completes action | action_type, output, after_state, duration_ms |
agent.action.failed |
Agent action failed | action_type, error, stack_trace |
agent.decision |
Agent makes autonomous decision | decision_type, options, chosen, reasoning |
agent.terminated |
Agent instance destroyed | reason, final_state |
LLM Interactions
| Event Type | Description | Critical Fields |
|---|---|---|
llm.request |
Prompt sent to LLM | model, prompt_template, variables, token_count |
llm.response |
LLM response received | model, response, token_count, latency_ms |
llm.error |
LLM call failed | model, error, retry_count |
llm.tool_call |
LLM invokes tool | tool_name, arguments |
MCP Tool Invocations
| Event Type | Description | Critical Fields |
|---|---|---|
mcp.tool.invoked |
MCP tool called | server, tool_name, arguments, project_id |
mcp.tool.result |
MCP tool returned | server, tool_name, result, duration_ms |
mcp.tool.error |
MCP tool failed | server, tool_name, error |
Human Approvals
| Event Type | Description | Critical Fields |
|---|---|---|
approval.requested |
System requests human approval | action_type, context, options |
approval.granted |
Human approves action | approver_id, action_id, comments |
approval.rejected |
Human rejects action | approver_id, action_id, reason |
approval.timeout |
Approval request timed out | action_id, timeout_ms |
Git Operations
| Event Type | Description | Critical Fields |
|---|---|---|
git.commit |
Commit created | repo, branch, commit_sha, message, files |
git.branch.created |
Branch created | repo, branch_name, base_branch |
git.pr.created |
Pull request opened | repo, pr_number, title, head, base |
git.pr.merged |
Pull request merged | repo, pr_number, merge_commit |
Project Lifecycle
| Event Type | Description | Critical Fields |
|---|---|---|
project.created |
New project started | project_id, client_id, autonomy_level |
project.sprint.started |
Sprint begins | sprint_id, goals, assigned_agents |
project.milestone.completed |
Milestone achieved | milestone_id, deliverables |
project.checkpoint |
Client checkpoint | checkpoint_type, feedback |
2. Structured Logging Format
Recommendation: Use OpenTelemetry-compatible structured JSON logging.
Base Event Schema
from pydantic import BaseModel
from datetime import datetime
from typing import Any, Optional
from enum import Enum
class AuditEventSeverity(str, Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class AuditEvent(BaseModel):
"""Base schema for all audit events."""
# Identity & Correlation
event_id: str # UUID v7 (time-ordered)
trace_id: Optional[str] = None # OpenTelemetry trace ID
span_id: Optional[str] = None # OpenTelemetry span ID
parent_event_id: Optional[str] = None # For event chains
# Timestamp
timestamp: datetime # ISO 8601 with timezone
timestamp_unix_ms: int # Unix millis for indexing
# Event Classification
event_type: str # e.g., "agent.action.completed"
event_category: str # e.g., "agent", "llm", "mcp"
severity: AuditEventSeverity
# Context
project_id: Optional[str] = None
agent_id: Optional[str] = None
agent_type: Optional[str] = None
user_id: Optional[str] = None # Human actor if applicable
session_id: Optional[str] = None
# Event Data
action: str # Human-readable action description
data: dict[str, Any] # Event-specific payload
# State Tracking
before_state: Optional[dict] = None # State before action
after_state: Optional[dict] = None # State after action
# Technical Metadata
service: str = "syndarix"
service_version: str
environment: str # production, staging, development
hostname: str
# Immutability
previous_hash: Optional[str] = None # Hash of previous event (chain)
event_hash: Optional[str] = None # SHA-256 of this event
class Config:
json_schema_extra = {
"example": {
"event_id": "019373a8-9b2e-7f4c-8d1a-2b3c4d5e6f7a",
"trace_id": "abc123def456",
"timestamp": "2025-12-29T14:30:00.000Z",
"timestamp_unix_ms": 1735480200000,
"event_type": "agent.action.completed",
"event_category": "agent",
"severity": "INFO",
"project_id": "proj-001",
"agent_id": "agent-123",
"agent_type": "software_engineer",
"action": "Created feature branch",
"data": {
"branch_name": "feature/user-auth",
"base_branch": "main"
},
"service": "syndarix",
"service_version": "1.0.0",
"environment": "production"
}
}
LLM-Specific Event Schema
class LLMRequestEvent(AuditEvent):
"""Schema for LLM request events."""
event_type: str = "llm.request"
event_category: str = "llm"
data: dict = {
"model": str, # e.g., "claude-3-5-sonnet"
"provider": str, # e.g., "anthropic", "openai"
"prompt_template_id": str, # Reference to template
"prompt_template_version": str,
"prompt_variables": dict, # Variables substituted
"system_prompt_hash": str, # Hash of system prompt
"user_prompt": str, # Full user prompt (may be truncated)
"token_count_estimate": int,
"max_tokens": int,
"temperature": float,
"tools_available": list[str],
}
class LLMResponseEvent(AuditEvent):
"""Schema for LLM response events."""
event_type: str = "llm.response"
event_category: str = "llm"
data: dict = {
"model": str,
"provider": str,
"response_text": str, # May be truncated for storage
"response_hash": str, # Full response hash
"input_tokens": int,
"output_tokens": int,
"total_tokens": int,
"latency_ms": int,
"finish_reason": str, # "stop", "max_tokens", "tool_use"
"tool_calls": list[dict], # If tools were invoked
"cost_usd": float, # Estimated cost
}
3. Storage Architecture
Recommendation: Tiered storage with hot, warm, and cold layers.
Query Latency
◄────────────►
Fast Slow
┌─────────────────────────────────────────────────────────────────┐
│ HOT STORAGE │
│ (0-30 days, ~10TB/month) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PostgreSQL + TimescaleDB │ │
│ │ - Hypertables partitioned by day │ │
│ │ - Native compression after 7 days │ │
│ │ - Full-text search on action/data fields │ │
│ │ - B-tree indexes on project_id, agent_id, event_type │ │
│ │ - GIN index on JSONB data field │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ (30 days) │
├─────────────────────────────────────────────────────────────────┤
│ WARM STORAGE │
│ (30-90 days, compressed) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ TimescaleDB Continuous Aggregates │ │
│ │ - Hourly/daily rollups of metrics │ │
│ │ - Detailed logs in highly compressed chunks │ │
│ │ - Query via same SQL interface │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ (90 days) │
├─────────────────────────────────────────────────────────────────┤
│ COLD STORAGE │
│ (90+ days, archival, 7 year retention) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ S3-Compatible Object Storage (MinIO / AWS S3) │ │
│ │ - Parquet files partitioned by date/project │ │
│ │ - Glacier-class storage after 1 year │ │
│ │ - Queryable via Trino/Athena for investigations │ │
│ │ - Cryptographic manifest for integrity verification │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
TimescaleDB Schema
-- Enable TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- Main audit events table
CREATE TABLE audit_events (
event_id UUID PRIMARY KEY,
trace_id VARCHAR(32),
span_id VARCHAR(16),
parent_event_id UUID REFERENCES audit_events(event_id),
timestamp TIMESTAMPTZ NOT NULL,
timestamp_unix_ms BIGINT NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_category VARCHAR(50) NOT NULL,
severity VARCHAR(20) NOT NULL,
project_id VARCHAR(50),
agent_id VARCHAR(50),
agent_type VARCHAR(50),
user_id VARCHAR(50),
session_id VARCHAR(50),
action TEXT NOT NULL,
data JSONB NOT NULL,
before_state JSONB,
after_state JSONB,
service VARCHAR(50) NOT NULL,
service_version VARCHAR(20) NOT NULL,
environment VARCHAR(20) NOT NULL,
hostname VARCHAR(100) NOT NULL,
previous_hash VARCHAR(64),
event_hash VARCHAR(64) NOT NULL,
-- Denormalized for efficient queries
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Convert to hypertable for time-series optimization
SELECT create_hypertable(
'audit_events',
'timestamp',
chunk_time_interval => INTERVAL '1 day'
);
-- Enable compression for chunks older than 7 days
ALTER TABLE audit_events SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'project_id, event_category',
timescaledb.compress_orderby = 'timestamp DESC'
);
SELECT add_compression_policy('audit_events', INTERVAL '7 days');
-- Retention policy: move to cold storage after 90 days
SELECT add_retention_policy('audit_events', INTERVAL '90 days');
-- Indexes for common query patterns
CREATE INDEX idx_audit_project_time ON audit_events (project_id, timestamp DESC);
CREATE INDEX idx_audit_agent_time ON audit_events (agent_id, timestamp DESC);
CREATE INDEX idx_audit_type_time ON audit_events (event_type, timestamp DESC);
CREATE INDEX idx_audit_category_time ON audit_events (event_category, timestamp DESC);
CREATE INDEX idx_audit_trace ON audit_events (trace_id) WHERE trace_id IS NOT NULL;
-- GIN index for JSONB queries
CREATE INDEX idx_audit_data ON audit_events USING GIN (data);
-- Full-text search on action field
CREATE INDEX idx_audit_action_fts ON audit_events USING GIN (to_tsvector('english', action));
Cold Storage Archive Job
# jobs/archive_audit_logs.py
import asyncio
from datetime import datetime, timedelta
import pyarrow as pa
import pyarrow.parquet as pq
from minio import Minio
import hashlib
async def archive_old_audit_logs():
"""Archive audit logs older than 90 days to S3/MinIO."""
cutoff_date = datetime.utcnow() - timedelta(days=90)
# Query logs to archive
query = """
SELECT * FROM audit_events
WHERE timestamp < %s
ORDER BY timestamp ASC
"""
async with get_db_session() as session:
result = await session.execute(query, [cutoff_date])
records = result.fetchall()
if not records:
return
# Convert to Parquet
table = pa.Table.from_pylist([dict(r) for r in records])
# Partition by date and project
partition_key = cutoff_date.strftime("%Y/%m/%d")
# Write to buffer
buffer = pa.BufferOutputStream()
pq.write_table(table, buffer, compression='zstd')
# Calculate manifest hash
data = buffer.getvalue().to_pybytes()
content_hash = hashlib.sha256(data).hexdigest()
# Upload to MinIO/S3
client = Minio(
settings.MINIO_ENDPOINT,
access_key=settings.MINIO_ACCESS_KEY,
secret_key=settings.MINIO_SECRET_KEY,
)
object_name = f"audit-logs/{partition_key}/events.parquet"
client.put_object(
bucket_name="syndarix-audit-archive",
object_name=object_name,
data=io.BytesIO(data),
length=len(data),
metadata={"content-hash": content_hash}
)
# Update manifest for integrity verification
await update_archive_manifest(partition_key, object_name, content_hash)
# Delete archived records from TimescaleDB
await session.execute(
"DELETE FROM audit_events WHERE timestamp < %s",
[cutoff_date]
)
4. Immutability & Integrity
Recommendation: Use cryptographic hash chaining for tamper evidence.
# app/audit/integrity.py
import hashlib
import json
from typing import Optional
class AuditIntegrity:
"""Cryptographic hash chaining for audit log integrity."""
def __init__(self, redis_client):
self.redis = redis_client
self._last_hash_key = "audit:last_hash:{project_id}"
async def compute_event_hash(
self,
event: AuditEvent,
previous_hash: Optional[str] = None
) -> str:
"""
Compute SHA-256 hash of event including previous hash.
Creates a blockchain-like chain of events.
"""
# Canonical JSON representation (sorted keys, no whitespace)
canonical = json.dumps(
{
"event_id": str(event.event_id),
"timestamp_unix_ms": event.timestamp_unix_ms,
"event_type": event.event_type,
"project_id": event.project_id,
"agent_id": event.agent_id,
"action": event.action,
"data": event.data,
"previous_hash": previous_hash or "",
},
sort_keys=True,
separators=(",", ":")
)
return hashlib.sha256(canonical.encode()).hexdigest()
async def chain_event(self, event: AuditEvent) -> AuditEvent:
"""Add event to the hash chain."""
project_key = self._last_hash_key.format(project_id=event.project_id)
# Get previous hash atomically
previous_hash = await self.redis.get(project_key)
# Compute new hash
event.previous_hash = previous_hash
event.event_hash = await self.compute_event_hash(event, previous_hash)
# Update last hash atomically
await self.redis.set(project_key, event.event_hash)
return event
async def verify_chain(
self,
project_id: str,
start_event_id: str,
end_event_id: str
) -> tuple[bool, Optional[str]]:
"""
Verify integrity of event chain between two events.
Returns (is_valid, first_invalid_event_id).
"""
events = await self.get_events_in_range(
project_id, start_event_id, end_event_id
)
previous_hash = events[0].previous_hash
for event in events:
expected_hash = await self.compute_event_hash(event, previous_hash)
if event.event_hash != expected_hash:
return (False, str(event.event_id))
previous_hash = event.event_hash
return (True, None)
5. Query Patterns & Indexing
Common Query Patterns
# app/audit/queries.py
class AuditQueries:
"""Optimized audit log queries."""
async def get_project_timeline(
self,
project_id: str,
start_time: datetime,
end_time: datetime,
event_types: Optional[list[str]] = None,
limit: int = 1000
) -> list[AuditEvent]:
"""Get chronological audit trail for a project."""
query = """
SELECT * FROM audit_events
WHERE project_id = $1
AND timestamp BETWEEN $2 AND $3
{type_filter}
ORDER BY timestamp DESC
LIMIT $4
"""
type_filter = ""
if event_types:
type_filter = f"AND event_type = ANY($5)"
return await self.db.fetch(query, project_id, start_time, end_time, limit)
async def get_agent_actions(
self,
agent_id: str,
hours: int = 24
) -> list[AuditEvent]:
"""Get all actions by a specific agent."""
query = """
SELECT * FROM audit_events
WHERE agent_id = $1
AND timestamp > NOW() - INTERVAL '%s hours'
AND event_category = 'agent'
ORDER BY timestamp DESC
"""
return await self.db.fetch(query, agent_id, hours)
async def get_llm_usage_summary(
self,
project_id: str,
days: int = 30
) -> dict:
"""Get LLM usage statistics for billing/monitoring."""
query = """
SELECT
data->>'model' as model,
data->>'provider' as provider,
COUNT(*) as request_count,
SUM((data->>'total_tokens')::int) as total_tokens,
SUM((data->>'cost_usd')::float) as total_cost,
AVG((data->>'latency_ms')::int) as avg_latency_ms
FROM audit_events
WHERE project_id = $1
AND event_type = 'llm.response'
AND timestamp > NOW() - INTERVAL '%s days'
GROUP BY data->>'model', data->>'provider'
"""
return await self.db.fetch(query, project_id, days)
async def search_actions(
self,
query_text: str,
project_id: Optional[str] = None,
limit: int = 100
) -> list[AuditEvent]:
"""Full-text search on action descriptions."""
query = """
SELECT *, ts_rank(to_tsvector('english', action), query) as rank
FROM audit_events, plainto_tsquery('english', $1) query
WHERE to_tsvector('english', action) @@ query
{project_filter}
ORDER BY rank DESC, timestamp DESC
LIMIT $2
"""
project_filter = f"AND project_id = '{project_id}'" if project_id else ""
return await self.db.fetch(query.format(project_filter=project_filter), query_text, limit)
async def trace_event_chain(
self,
event_id: str
) -> list[AuditEvent]:
"""Trace full event chain using parent_event_id."""
query = """
WITH RECURSIVE event_chain AS (
SELECT * FROM audit_events WHERE event_id = $1
UNION ALL
SELECT e.* FROM audit_events e
JOIN event_chain ec ON e.event_id = ec.parent_event_id
)
SELECT * FROM event_chain ORDER BY timestamp ASC
"""
return await self.db.fetch(query, event_id)
6. Logging Decorators & Implementation
# app/audit/decorators.py
import functools
import time
from uuid import uuid7
from datetime import datetime, timezone
from typing import Callable, Any
import structlog
logger = structlog.get_logger()
def audit_agent_action(action_type: str):
"""
Decorator to audit agent actions with before/after state capture.
Usage:
@audit_agent_action("create_branch")
async def create_branch(self, branch_name: str) -> Branch:
...
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
# Capture before state
before_state = await self.get_state() if hasattr(self, 'get_state') else None
event_id = str(uuid7())
start_time = time.perf_counter()
try:
# Execute the action
result = await func(self, *args, **kwargs)
# Capture after state
after_state = await self.get_state() if hasattr(self, 'get_state') else None
# Log success
await audit_logger.log_event(
AuditEvent(
event_id=event_id,
timestamp=datetime.now(timezone.utc),
timestamp_unix_ms=int(time.time() * 1000),
event_type=f"agent.action.completed",
event_category="agent",
severity=AuditEventSeverity.INFO,
project_id=self.project_id,
agent_id=self.agent_id,
agent_type=self.agent_type,
action=f"{action_type}: {func.__name__}",
data={
"action_type": action_type,
"function": func.__name__,
"args": _serialize_args(args),
"kwargs": _serialize_kwargs(kwargs),
"result_summary": _summarize_result(result),
"duration_ms": int((time.perf_counter() - start_time) * 1000),
},
before_state=before_state,
after_state=after_state,
service="syndarix",
service_version=settings.VERSION,
environment=settings.ENVIRONMENT,
hostname=socket.gethostname(),
)
)
return result
except Exception as e:
# Log failure
await audit_logger.log_event(
AuditEvent(
event_id=event_id,
timestamp=datetime.now(timezone.utc),
timestamp_unix_ms=int(time.time() * 1000),
event_type=f"agent.action.failed",
event_category="agent",
severity=AuditEventSeverity.ERROR,
project_id=self.project_id,
agent_id=self.agent_id,
agent_type=self.agent_type,
action=f"{action_type}: {func.__name__} (FAILED)",
data={
"action_type": action_type,
"function": func.__name__,
"error": str(e),
"error_type": type(e).__name__,
"duration_ms": int((time.perf_counter() - start_time) * 1000),
},
before_state=before_state,
service="syndarix",
service_version=settings.VERSION,
environment=settings.ENVIRONMENT,
hostname=socket.gethostname(),
)
)
raise
return wrapper
return decorator
def audit_llm_call(func: Callable) -> Callable:
"""
Decorator to audit LLM calls with prompt/response logging.
Usage:
@audit_llm_call
async def generate_response(self, prompt: str, **kwargs) -> str:
...
"""
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
event_id = str(uuid7())
start_time = time.perf_counter()
# Log request
request_event = AuditEvent(
event_id=event_id,
timestamp=datetime.now(timezone.utc),
timestamp_unix_ms=int(time.time() * 1000),
event_type="llm.request",
event_category="llm",
severity=AuditEventSeverity.INFO,
project_id=getattr(self, 'project_id', None),
agent_id=getattr(self, 'agent_id', None),
action="LLM request initiated",
data={
"model": kwargs.get('model', self.model),
"provider": self.provider,
"prompt_hash": hashlib.sha256(str(args).encode()).hexdigest()[:16],
"max_tokens": kwargs.get('max_tokens', 4096),
"temperature": kwargs.get('temperature', 0.7),
},
service="syndarix",
service_version=settings.VERSION,
environment=settings.ENVIRONMENT,
hostname=socket.gethostname(),
)
await audit_logger.log_event(request_event)
try:
result = await func(self, *args, **kwargs)
# Log response
response_event = AuditEvent(
event_id=str(uuid7()),
parent_event_id=event_id,
timestamp=datetime.now(timezone.utc),
timestamp_unix_ms=int(time.time() * 1000),
event_type="llm.response",
event_category="llm",
severity=AuditEventSeverity.INFO,
project_id=getattr(self, 'project_id', None),
agent_id=getattr(self, 'agent_id', None),
action="LLM response received",
data={
"model": kwargs.get('model', self.model),
"provider": self.provider,
"response_hash": hashlib.sha256(str(result).encode()).hexdigest()[:16],
"input_tokens": result.usage.input_tokens if hasattr(result, 'usage') else None,
"output_tokens": result.usage.output_tokens if hasattr(result, 'usage') else None,
"latency_ms": int((time.perf_counter() - start_time) * 1000),
"finish_reason": result.finish_reason if hasattr(result, 'finish_reason') else None,
},
service="syndarix",
service_version=settings.VERSION,
environment=settings.ENVIRONMENT,
hostname=socket.gethostname(),
)
await audit_logger.log_event(response_event)
return result
except Exception as e:
# Log error
error_event = AuditEvent(
event_id=str(uuid7()),
parent_event_id=event_id,
timestamp=datetime.now(timezone.utc),
timestamp_unix_ms=int(time.time() * 1000),
event_type="llm.error",
event_category="llm",
severity=AuditEventSeverity.ERROR,
project_id=getattr(self, 'project_id', None),
agent_id=getattr(self, 'agent_id', None),
action="LLM request failed",
data={
"model": kwargs.get('model', getattr(self, 'model', None)),
"error": str(e),
"error_type": type(e).__name__,
"latency_ms": int((time.perf_counter() - start_time) * 1000),
},
service="syndarix",
service_version=settings.VERSION,
environment=settings.ENVIRONMENT,
hostname=socket.gethostname(),
)
await audit_logger.log_event(error_event)
raise
return wrapper
def audit_mcp_tool(func: Callable) -> Callable:
"""
Decorator to audit MCP tool invocations.
Usage:
@audit_mcp_tool
async def call_tool(self, server: str, tool: str, args: dict):
...
"""
@functools.wraps(func)
async def wrapper(self, server: str, tool: str, arguments: dict, **kwargs):
event_id = str(uuid7())
start_time = time.perf_counter()
# Log invocation
await audit_logger.log_event(
AuditEvent(
event_id=event_id,
timestamp=datetime.now(timezone.utc),
timestamp_unix_ms=int(time.time() * 1000),
event_type="mcp.tool.invoked",
event_category="mcp",
severity=AuditEventSeverity.INFO,
project_id=arguments.get('project_id'),
agent_id=arguments.get('agent_id'),
action=f"MCP tool invoked: {server}/{tool}",
data={
"server": server,
"tool_name": tool,
"arguments": _redact_sensitive(arguments),
},
service="syndarix",
service_version=settings.VERSION,
environment=settings.ENVIRONMENT,
hostname=socket.gethostname(),
)
)
try:
result = await func(self, server, tool, arguments, **kwargs)
# Log result
await audit_logger.log_event(
AuditEvent(
event_id=str(uuid7()),
parent_event_id=event_id,
timestamp=datetime.now(timezone.utc),
timestamp_unix_ms=int(time.time() * 1000),
event_type="mcp.tool.result",
event_category="mcp",
severity=AuditEventSeverity.INFO,
project_id=arguments.get('project_id'),
agent_id=arguments.get('agent_id'),
action=f"MCP tool completed: {server}/{tool}",
data={
"server": server,
"tool_name": tool,
"result_summary": _summarize_result(result),
"duration_ms": int((time.perf_counter() - start_time) * 1000),
},
service="syndarix",
service_version=settings.VERSION,
environment=settings.ENVIRONMENT,
hostname=socket.gethostname(),
)
)
return result
except Exception as e:
await audit_logger.log_event(
AuditEvent(
event_id=str(uuid7()),
parent_event_id=event_id,
timestamp=datetime.now(timezone.utc),
timestamp_unix_ms=int(time.time() * 1000),
event_type="mcp.tool.error",
event_category="mcp",
severity=AuditEventSeverity.ERROR,
project_id=arguments.get('project_id'),
agent_id=arguments.get('agent_id'),
action=f"MCP tool failed: {server}/{tool}",
data={
"server": server,
"tool_name": tool,
"error": str(e),
"error_type": type(e).__name__,
"duration_ms": int((time.perf_counter() - start_time) * 1000),
},
service="syndarix",
service_version=settings.VERSION,
environment=settings.ENVIRONMENT,
hostname=socket.gethostname(),
)
)
raise
return wrapper
7. OpenTelemetry Integration
# app/audit/otel_integration.py
from opentelemetry import trace
from opentelemetry.trace import Span
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Configure OpenTelemetry
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("syndarix.audit")
otlp_exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(otlp_exporter))
class AuditLoggerWithOTel:
"""Audit logger with OpenTelemetry correlation."""
def __init__(self, db_writer, integrity_checker):
self.db_writer = db_writer
self.integrity = integrity_checker
async def log_event(self, event: AuditEvent) -> None:
"""Log event with OpenTelemetry trace correlation."""
# Get current span context
current_span = trace.get_current_span()
span_context = current_span.get_span_context()
if span_context.is_valid:
event.trace_id = format(span_context.trace_id, '032x')
event.span_id = format(span_context.span_id, '016x')
# Add to hash chain for immutability
event = await self.integrity.chain_event(event)
# Write to database
await self.db_writer.write(event)
# Add event as span event for trace correlation
current_span.add_event(
name=event.event_type,
attributes={
"audit.event_id": str(event.event_id),
"audit.project_id": event.project_id or "",
"audit.agent_id": event.agent_id or "",
"audit.action": event.action,
}
)
8. Retention Policy Recommendations
Based on compliance requirements (SOC2, GDPR) and operational needs:
| Data Category | Hot Storage | Warm Storage | Cold Archive | Total Retention |
|---|---|---|---|---|
| Agent actions | 30 days | 60 days | 7 years | 7 years |
| LLM requests/responses | 30 days | 60 days | 7 years | 7 years |
| MCP tool calls | 30 days | 60 days | 7 years | 7 years |
| Human approvals | 30 days | 60 days | 7 years | 7 years |
| Git operations | 30 days | 60 days | 7 years | 7 years |
| System events | 7 days | 23 days | 1 year | 1 year |
| Debug/trace logs | 3 days | 4 days | N/A | 7 days |
GDPR Considerations:
- PII must be redacted or encrypted before archival
- Implement right-to-deletion capability (pseudonymization in archives)
- Document lawful basis for retention (legitimate business interest, legal compliance)
SOC2 Considerations:
- Audit logs must be tamper-evident (hash chaining)
- Access to audit logs must be logged (audit of audits)
- Retention must be documented in data retention policy
Recommendations
Implementation Phases
Phase 1: Foundation (Week 1-2)
- Set up TimescaleDB with audit_events hypertable
- Implement base AuditEvent schema and writer
- Create
@audit_agent_actiondecorator - Add basic project/agent filtering
Phase 2: LLM & MCP Logging (Week 3-4)
- Implement
@audit_llm_calldecorator - Implement
@audit_mcp_tooldecorator - Add prompt/response logging with redaction
- Integrate with OpenTelemetry
Phase 3: Immutability & Compliance (Week 5-6)
- Implement hash chaining for tamper evidence
- Add integrity verification endpoints
- Implement cold storage archival job
- Document retention policies
Phase 4: Query & Investigation (Week 7-8)
- Build audit query API endpoints
- Implement full-text search
- Add trace correlation queries
- Create audit dashboard
Technology Stack
| Component | Recommendation | Alternative |
|---|---|---|
| Structured Logging | structlog | python-json-logger |
| Hot Storage | PostgreSQL + TimescaleDB | ClickHouse |
| Cold Storage | MinIO (S3-compatible) | AWS S3 |
| Archive Format | Parquet + ZSTD | ORC |
| Tracing | OpenTelemetry | Jaeger |
| Query Engine | Native SQL + Trino | Apache Druid |
Cost Estimates (Self-Hosted)
| Resource | Specification | Monthly Cost |
|---|---|---|
| TimescaleDB (hot) | 500GB SSD, 16GB RAM | Infrastructure only |
| MinIO (cold) | 10TB HDD | Infrastructure only |
| Estimated log volume | ~500GB/month | - |
| Compression ratio | ~10:1 | - |
| Effective storage | ~50GB/month | - |
References
- OpenTelemetry Logging Documentation
- How to Structure Logs Properly in OpenTelemetry
- Langfuse LLM Observability & Tracing
- LLM Observability: Tutorial & Best Practices
- Datadog LLM Observability
- Security Log Retention Best Practices
- SOC 2 Data Security and Retention Requirements
- Immutable Audit Log Architecture
- What Are Immutable Logs? A Complete Guide
- TimescaleDB Documentation
Decision
Adopt a tiered audit logging architecture with:
- Structlog + OpenTelemetry for structured, correlated logging
- PostgreSQL + TimescaleDB for hot storage with time-series optimization
- S3/MinIO + Parquet for cold archival
- Cryptographic hash chaining for immutability
- 7-year retention for compliance with SOC2/financial regulations
Spike completed. Findings will inform ADR-007: Audit Logging Architecture.