forked from cardosofelipe/fast-next-template
- Added research findings and recommendations as separate SPIKE documents in `docs/spikes/`: - `SPIKE-005-llm-provider-abstraction.md`: Research on unified abstraction for LLM providers with failover, cost tracking, and caching strategies. - `SPIKE-001-mcp-integration-pattern.md`: Optimal pattern for integrating MCP with project/agent scoping and authentication strategies. - `SPIKE-003-realtime-updates.md`: Evaluation of SSE vs WebSocket for real-time updates, aligned with use-case needs. - Focused on aligning implementation architectures with scalability, efficiency, and user needs. - Documentation intended to inform upcoming ADRs.
9.5 KiB
9.5 KiB
SPIKE-003: Real-time Updates Architecture
Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #3
Objective
Evaluate WebSocket vs Server-Sent Events (SSE) for real-time updates in Syndarix, focusing on agent activity streams, progress updates, and client notifications.
Research Questions
- What are the trade-offs between WebSocket and SSE?
- Which pattern best fits Syndarix's use cases?
- How do we handle reconnection and reliability?
- What is the FastAPI implementation approach?
Findings
1. Use Case Analysis
| Use Case | Direction | Frequency | Latency Req |
|---|---|---|---|
| Agent activity feed | Server → Client | High | Low |
| Sprint progress | Server → Client | Medium | Low |
| Build status | Server → Client | Low | Medium |
| Client approval requests | Server → Client | Low | High |
| Client messages | Client → Server | Low | Medium |
| Issue updates | Server → Client | Medium | Low |
Key Insight: 90%+ of real-time communication is server-to-client (unidirectional).
2. Technology Comparison
| Feature | Server-Sent Events (SSE) | WebSocket |
|---|---|---|
| Direction | Unidirectional (server → client) | Bidirectional |
| Protocol | HTTP/1.1 or HTTP/2 | Custom (ws://) |
| Reconnection | Built-in automatic | Manual implementation |
| Connection limits | Limited per domain | Similar limits |
| Browser support | Excellent | Excellent |
| Through proxies | Native HTTP | May require config |
| Complexity | Simple | More complex |
| FastAPI support | Native | Native |
3. Recommendation: SSE for Primary, WebSocket for Chat
SSE (Recommended for 90% of use cases):
- Agent activity streams
- Progress updates
- Build/pipeline status
- Issue change notifications
- Approval request alerts
WebSocket (For bidirectional needs):
- Live chat with agents
- Interactive debugging sessions
- Real-time collaboration (future)
4. FastAPI SSE Implementation
# app/api/v1/events.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from app.services.events import EventBus
import asyncio
router = APIRouter()
@router.get("/projects/{project_id}/events")
async def project_events(
project_id: str,
request: Request,
current_user: User = Depends(get_current_user)
):
"""Stream real-time events for a project."""
async def event_generator():
event_bus = EventBus()
subscriber = await event_bus.subscribe(
channel=f"project:{project_id}",
user_id=current_user.id
)
try:
while True:
# Check if client disconnected
if await request.is_disconnected():
break
# Wait for next event (with timeout for keepalive)
try:
event = await asyncio.wait_for(
subscriber.get_event(),
timeout=30.0
)
yield f"event: {event.type}\ndata: {event.json()}\n\n"
except asyncio.TimeoutError:
# Send keepalive comment
yield ": keepalive\n\n"
finally:
await event_bus.unsubscribe(subscriber)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
5. Event Bus Architecture with Redis
# app/services/events.py
from dataclasses import dataclass
from typing import AsyncIterator
import redis.asyncio as redis
import json
@dataclass
class Event:
type: str
data: dict
project_id: str
agent_id: str | None = None
timestamp: float = None
class EventBus:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
async def publish(self, channel: str, event: Event):
"""Publish an event to a channel."""
await self.redis.publish(
channel,
json.dumps(event.__dict__)
)
async def subscribe(self, channel: str) -> "Subscriber":
"""Subscribe to a channel."""
await self.pubsub.subscribe(channel)
return Subscriber(self.pubsub, channel)
class Subscriber:
def __init__(self, pubsub, channel: str):
self.pubsub = pubsub
self.channel = channel
async def get_event(self) -> Event:
"""Get the next event (blocking)."""
while True:
message = await self.pubsub.get_message(
ignore_subscribe_messages=True,
timeout=1.0
)
if message and message["type"] == "message":
data = json.loads(message["data"])
return Event(**data)
async def unsubscribe(self):
await self.pubsub.unsubscribe(self.channel)
6. Client-Side Implementation
// frontend/lib/events.ts
class EventSource {
private eventSource: EventSource | null = null;
private reconnectDelay = 1000;
private maxReconnectDelay = 30000;
connect(projectId: string, onEvent: (event: ProjectEvent) => void) {
const url = `/api/v1/projects/${projectId}/events`;
this.eventSource = new EventSource(url, {
withCredentials: true
});
this.eventSource.onopen = () => {
console.log('SSE connected');
this.reconnectDelay = 1000; // Reset on success
};
this.eventSource.addEventListener('agent_activity', (e) => {
onEvent({ type: 'agent_activity', data: JSON.parse(e.data) });
});
this.eventSource.addEventListener('issue_update', (e) => {
onEvent({ type: 'issue_update', data: JSON.parse(e.data) });
});
this.eventSource.addEventListener('approval_required', (e) => {
onEvent({ type: 'approval_required', data: JSON.parse(e.data) });
});
this.eventSource.onerror = () => {
this.eventSource?.close();
// Exponential backoff reconnect
setTimeout(() => this.connect(projectId, onEvent), this.reconnectDelay);
this.reconnectDelay = Math.min(
this.reconnectDelay * 2,
this.maxReconnectDelay
);
};
}
disconnect() {
this.eventSource?.close();
this.eventSource = null;
}
}
7. Event Types
# app/schemas/events.py
from enum import Enum
from pydantic import BaseModel
from datetime import datetime
class EventType(str, Enum):
# Agent Events
AGENT_STARTED = "agent_started"
AGENT_ACTIVITY = "agent_activity"
AGENT_COMPLETED = "agent_completed"
AGENT_ERROR = "agent_error"
# Project Events
ISSUE_CREATED = "issue_created"
ISSUE_UPDATED = "issue_updated"
ISSUE_CLOSED = "issue_closed"
# Git Events
BRANCH_CREATED = "branch_created"
COMMIT_PUSHED = "commit_pushed"
PR_CREATED = "pr_created"
PR_MERGED = "pr_merged"
# Workflow Events
APPROVAL_REQUIRED = "approval_required"
SPRINT_STARTED = "sprint_started"
SPRINT_COMPLETED = "sprint_completed"
# Pipeline Events
PIPELINE_STARTED = "pipeline_started"
PIPELINE_COMPLETED = "pipeline_completed"
PIPELINE_FAILED = "pipeline_failed"
class ProjectEvent(BaseModel):
id: str
type: EventType
project_id: str
agent_id: str | None
data: dict
timestamp: datetime
8. WebSocket for Chat (Secondary)
# app/api/v1/chat.py
from fastapi import WebSocket, WebSocketDisconnect
from app.services.agent_chat import AgentChatService
@router.websocket("/projects/{project_id}/agents/{agent_id}/chat")
async def agent_chat(
websocket: WebSocket,
project_id: str,
agent_id: str
):
"""Bidirectional chat with an agent."""
await websocket.accept()
chat_service = AgentChatService(project_id, agent_id)
try:
while True:
# Receive message from client
message = await websocket.receive_json()
# Stream response from agent
async for chunk in chat_service.get_response(message):
await websocket.send_json({
"type": "chunk",
"content": chunk
})
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
pass
Performance Considerations
Connection Limits
- Browser limit: ~6 connections per domain (HTTP/1.1)
- Recommendation: Use single SSE connection per project, multiplex events
Scalability
- Redis Pub/Sub handles cross-instance event distribution
- Consider Redis Streams for message persistence (audit/replay)
Keepalive
- Send comment every 30 seconds to prevent timeout
- Client reconnects automatically on disconnect
Recommendations
- Use SSE for all server-to-client events (simpler, auto-reconnect)
- Use WebSocket only for interactive chat with agents
- Redis Pub/Sub for event distribution across instances
- Single SSE connection per project with event multiplexing
- Exponential backoff for client reconnection
References
Decision
Adopt SSE as the primary real-time transport with WebSocket reserved for bidirectional chat. Use Redis Pub/Sub for event distribution.
Spike completed. Findings will inform ADR-002: Real-time Communication Architecture.