# SPIKE-003: Real-time Updates Architecture **Status:** Completed **Date:** 2025-12-29 **Author:** Architecture Team **Related Issue:** #3 --- ## Objective Evaluate WebSocket vs Server-Sent Events (SSE) for real-time updates in Syndarix, focusing on agent activity streams, progress updates, and client notifications. ## Research Questions 1. What are the trade-offs between WebSocket and SSE? 2. Which pattern best fits Syndarix's use cases? 3. How do we handle reconnection and reliability? 4. What is the FastAPI implementation approach? ## Findings ### 1. Use Case Analysis | Use Case | Direction | Frequency | Latency Req | |----------|-----------|-----------|-------------| | Agent activity feed | Server → Client | High | Low | | Sprint progress | Server → Client | Medium | Low | | Build status | Server → Client | Low | Medium | | Client approval requests | Server → Client | Low | High | | Client messages | Client → Server | Low | Medium | | Issue updates | Server → Client | Medium | Low | **Key Insight:** 90%+ of real-time communication is **server-to-client** (unidirectional). ### 2. Technology Comparison | Feature | Server-Sent Events (SSE) | WebSocket | |---------|-------------------------|-----------| | Direction | Unidirectional (server → client) | Bidirectional | | Protocol | HTTP/1.1 or HTTP/2 | Custom (ws://) | | Reconnection | Built-in automatic | Manual implementation | | Connection limits | Limited per domain | Similar limits | | Browser support | Excellent | Excellent | | Through proxies | Native HTTP | May require config | | Complexity | Simple | More complex | | FastAPI support | Native | Native | ### 3. Recommendation: SSE for Primary, WebSocket for Chat **SSE (Recommended for 90% of use cases):** - Agent activity streams - Progress updates - Build/pipeline status - Issue change notifications - Approval request alerts **WebSocket (For bidirectional needs):** - Live chat with agents - Interactive debugging sessions - Real-time collaboration (future) ### 4. FastAPI SSE Implementation ```python # app/api/v1/events.py from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse from app.services.events import EventBus import asyncio router = APIRouter() @router.get("/projects/{project_id}/events") async def project_events( project_id: str, request: Request, current_user: User = Depends(get_current_user) ): """Stream real-time events for a project.""" async def event_generator(): event_bus = EventBus() subscriber = await event_bus.subscribe( channel=f"project:{project_id}", user_id=current_user.id ) try: while True: # Check if client disconnected if await request.is_disconnected(): break # Wait for next event (with timeout for keepalive) try: event = await asyncio.wait_for( subscriber.get_event(), timeout=30.0 ) yield f"event: {event.type}\ndata: {event.json()}\n\n" except asyncio.TimeoutError: # Send keepalive comment yield ": keepalive\n\n" finally: await event_bus.unsubscribe(subscriber) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering } ) ``` ### 5. Event Bus Architecture with Redis ```python # app/services/events.py from dataclasses import dataclass from typing import AsyncIterator import redis.asyncio as redis import json @dataclass class Event: type: str data: dict project_id: str agent_id: str | None = None timestamp: float = None class EventBus: def __init__(self, redis_url: str): self.redis = redis.from_url(redis_url) self.pubsub = self.redis.pubsub() async def publish(self, channel: str, event: Event): """Publish an event to a channel.""" await self.redis.publish( channel, json.dumps(event.__dict__) ) async def subscribe(self, channel: str) -> "Subscriber": """Subscribe to a channel.""" await self.pubsub.subscribe(channel) return Subscriber(self.pubsub, channel) class Subscriber: def __init__(self, pubsub, channel: str): self.pubsub = pubsub self.channel = channel async def get_event(self) -> Event: """Get the next event (blocking).""" while True: message = await self.pubsub.get_message( ignore_subscribe_messages=True, timeout=1.0 ) if message and message["type"] == "message": data = json.loads(message["data"]) return Event(**data) async def unsubscribe(self): await self.pubsub.unsubscribe(self.channel) ``` ### 6. Client-Side Implementation ```typescript // frontend/lib/events.ts class EventSource { private eventSource: EventSource | null = null; private reconnectDelay = 1000; private maxReconnectDelay = 30000; connect(projectId: string, onEvent: (event: ProjectEvent) => void) { const url = `/api/v1/projects/${projectId}/events`; this.eventSource = new EventSource(url, { withCredentials: true }); this.eventSource.onopen = () => { console.log('SSE connected'); this.reconnectDelay = 1000; // Reset on success }; this.eventSource.addEventListener('agent_activity', (e) => { onEvent({ type: 'agent_activity', data: JSON.parse(e.data) }); }); this.eventSource.addEventListener('issue_update', (e) => { onEvent({ type: 'issue_update', data: JSON.parse(e.data) }); }); this.eventSource.addEventListener('approval_required', (e) => { onEvent({ type: 'approval_required', data: JSON.parse(e.data) }); }); this.eventSource.onerror = () => { this.eventSource?.close(); // Exponential backoff reconnect setTimeout(() => this.connect(projectId, onEvent), this.reconnectDelay); this.reconnectDelay = Math.min( this.reconnectDelay * 2, this.maxReconnectDelay ); }; } disconnect() { this.eventSource?.close(); this.eventSource = null; } } ``` ### 7. Event Types ```python # app/schemas/events.py from enum import Enum from pydantic import BaseModel from datetime import datetime class EventType(str, Enum): # Agent Events AGENT_STARTED = "agent_started" AGENT_ACTIVITY = "agent_activity" AGENT_COMPLETED = "agent_completed" AGENT_ERROR = "agent_error" # Project Events ISSUE_CREATED = "issue_created" ISSUE_UPDATED = "issue_updated" ISSUE_CLOSED = "issue_closed" # Git Events BRANCH_CREATED = "branch_created" COMMIT_PUSHED = "commit_pushed" PR_CREATED = "pr_created" PR_MERGED = "pr_merged" # Workflow Events APPROVAL_REQUIRED = "approval_required" SPRINT_STARTED = "sprint_started" SPRINT_COMPLETED = "sprint_completed" # Pipeline Events PIPELINE_STARTED = "pipeline_started" PIPELINE_COMPLETED = "pipeline_completed" PIPELINE_FAILED = "pipeline_failed" class ProjectEvent(BaseModel): id: str type: EventType project_id: str agent_id: str | None data: dict timestamp: datetime ``` ### 8. WebSocket for Chat (Secondary) ```python # app/api/v1/chat.py from fastapi import WebSocket, WebSocketDisconnect from app.services.agent_chat import AgentChatService @router.websocket("/projects/{project_id}/agents/{agent_id}/chat") async def agent_chat( websocket: WebSocket, project_id: str, agent_id: str ): """Bidirectional chat with an agent.""" await websocket.accept() chat_service = AgentChatService(project_id, agent_id) try: while True: # Receive message from client message = await websocket.receive_json() # Stream response from agent async for chunk in chat_service.get_response(message): await websocket.send_json({ "type": "chunk", "content": chunk }) await websocket.send_json({"type": "done"}) except WebSocketDisconnect: pass ``` ## Performance Considerations ### Connection Limits - Browser limit: ~6 connections per domain (HTTP/1.1) - Recommendation: Use single SSE connection per project, multiplex events ### Scalability - Redis Pub/Sub handles cross-instance event distribution - Consider Redis Streams for message persistence (audit/replay) ### Keepalive - Send comment every 30 seconds to prevent timeout - Client reconnects automatically on disconnect ## Recommendations 1. **Use SSE for all server-to-client events** (simpler, auto-reconnect) 2. **Use WebSocket only for interactive chat** with agents 3. **Redis Pub/Sub for event distribution** across instances 4. **Single SSE connection per project** with event multiplexing 5. **Exponential backoff** for client reconnection ## References - [FastAPI SSE](https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse) - [MDN EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) - [Redis Pub/Sub](https://redis.io/topics/pubsub) ## Decision **Adopt SSE as the primary real-time transport** with WebSocket reserved for bidirectional chat. Use Redis Pub/Sub for event distribution. --- *Spike completed. Findings will inform ADR-002: Real-time Communication Architecture.*