Files
syndarix/docs/spikes/SPIKE-003-realtime-updates.md
Felipe Cardoso a6a336b66e docs: add spike findings for LLM abstraction, MCP integration, and real-time updates
- Added research findings and recommendations as separate SPIKE documents in `docs/spikes/`:
  - `SPIKE-005-llm-provider-abstraction.md`: Research on unified abstraction for LLM providers with failover, cost tracking, and caching strategies.
  - `SPIKE-001-mcp-integration-pattern.md`: Optimal pattern for integrating MCP with project/agent scoping and authentication strategies.
  - `SPIKE-003-realtime-updates.md`: Evaluation of SSE vs WebSocket for real-time updates, aligned with use-case needs.
- Focused on aligning implementation architectures with scalability, efficiency, and user needs.
- Documentation intended to inform upcoming ADRs.
2025-12-29 13:15:50 +01:00

9.5 KiB

SPIKE-003: Real-time Updates Architecture

Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #3


Objective

Evaluate WebSocket vs Server-Sent Events (SSE) for real-time updates in Syndarix, focusing on agent activity streams, progress updates, and client notifications.

Research Questions

  1. What are the trade-offs between WebSocket and SSE?
  2. Which pattern best fits Syndarix's use cases?
  3. How do we handle reconnection and reliability?
  4. What is the FastAPI implementation approach?

Findings

1. Use Case Analysis

Use Case Direction Frequency Latency Req
Agent activity feed Server → Client High Low
Sprint progress Server → Client Medium Low
Build status Server → Client Low Medium
Client approval requests Server → Client Low High
Client messages Client → Server Low Medium
Issue updates Server → Client Medium Low

Key Insight: 90%+ of real-time communication is server-to-client (unidirectional).

2. Technology Comparison

Feature Server-Sent Events (SSE) WebSocket
Direction Unidirectional (server → client) Bidirectional
Protocol HTTP/1.1 or HTTP/2 Custom (ws://)
Reconnection Built-in automatic Manual implementation
Connection limits Limited per domain Similar limits
Browser support Excellent Excellent
Through proxies Native HTTP May require config
Complexity Simple More complex
FastAPI support Native Native

3. Recommendation: SSE for Primary, WebSocket for Chat

SSE (Recommended for 90% of use cases):

  • Agent activity streams
  • Progress updates
  • Build/pipeline status
  • Issue change notifications
  • Approval request alerts

WebSocket (For bidirectional needs):

  • Live chat with agents
  • Interactive debugging sessions
  • Real-time collaboration (future)

4. FastAPI SSE Implementation

# app/api/v1/events.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from app.services.events import EventBus
import asyncio

router = APIRouter()

@router.get("/projects/{project_id}/events")
async def project_events(
    project_id: str,
    request: Request,
    current_user: User = Depends(get_current_user)
):
    """Stream real-time events for a project."""

    async def event_generator():
        event_bus = EventBus()
        subscriber = await event_bus.subscribe(
            channel=f"project:{project_id}",
            user_id=current_user.id
        )

        try:
            while True:
                # Check if client disconnected
                if await request.is_disconnected():
                    break

                # Wait for next event (with timeout for keepalive)
                try:
                    event = await asyncio.wait_for(
                        subscriber.get_event(),
                        timeout=30.0
                    )
                    yield f"event: {event.type}\ndata: {event.json()}\n\n"
                except asyncio.TimeoutError:
                    # Send keepalive comment
                    yield ": keepalive\n\n"
        finally:
            await event_bus.unsubscribe(subscriber)

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        }
    )

5. Event Bus Architecture with Redis

# app/services/events.py
from dataclasses import dataclass
from typing import AsyncIterator
import redis.asyncio as redis
import json

@dataclass
class Event:
    type: str
    data: dict
    project_id: str
    agent_id: str | None = None
    timestamp: float = None

class EventBus:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.pubsub = self.redis.pubsub()

    async def publish(self, channel: str, event: Event):
        """Publish an event to a channel."""
        await self.redis.publish(
            channel,
            json.dumps(event.__dict__)
        )

    async def subscribe(self, channel: str) -> "Subscriber":
        """Subscribe to a channel."""
        await self.pubsub.subscribe(channel)
        return Subscriber(self.pubsub, channel)

class Subscriber:
    def __init__(self, pubsub, channel: str):
        self.pubsub = pubsub
        self.channel = channel

    async def get_event(self) -> Event:
        """Get the next event (blocking)."""
        while True:
            message = await self.pubsub.get_message(
                ignore_subscribe_messages=True,
                timeout=1.0
            )
            if message and message["type"] == "message":
                data = json.loads(message["data"])
                return Event(**data)

    async def unsubscribe(self):
        await self.pubsub.unsubscribe(self.channel)

6. Client-Side Implementation

// frontend/lib/events.ts
class EventSource {
  private eventSource: EventSource | null = null;
  private reconnectDelay = 1000;
  private maxReconnectDelay = 30000;

  connect(projectId: string, onEvent: (event: ProjectEvent) => void) {
    const url = `/api/v1/projects/${projectId}/events`;

    this.eventSource = new EventSource(url, {
      withCredentials: true
    });

    this.eventSource.onopen = () => {
      console.log('SSE connected');
      this.reconnectDelay = 1000; // Reset on success
    };

    this.eventSource.addEventListener('agent_activity', (e) => {
      onEvent({ type: 'agent_activity', data: JSON.parse(e.data) });
    });

    this.eventSource.addEventListener('issue_update', (e) => {
      onEvent({ type: 'issue_update', data: JSON.parse(e.data) });
    });

    this.eventSource.addEventListener('approval_required', (e) => {
      onEvent({ type: 'approval_required', data: JSON.parse(e.data) });
    });

    this.eventSource.onerror = () => {
      this.eventSource?.close();
      // Exponential backoff reconnect
      setTimeout(() => this.connect(projectId, onEvent), this.reconnectDelay);
      this.reconnectDelay = Math.min(
        this.reconnectDelay * 2,
        this.maxReconnectDelay
      );
    };
  }

  disconnect() {
    this.eventSource?.close();
    this.eventSource = null;
  }
}

7. Event Types

# app/schemas/events.py
from enum import Enum
from pydantic import BaseModel
from datetime import datetime

class EventType(str, Enum):
    # Agent Events
    AGENT_STARTED = "agent_started"
    AGENT_ACTIVITY = "agent_activity"
    AGENT_COMPLETED = "agent_completed"
    AGENT_ERROR = "agent_error"

    # Project Events
    ISSUE_CREATED = "issue_created"
    ISSUE_UPDATED = "issue_updated"
    ISSUE_CLOSED = "issue_closed"

    # Git Events
    BRANCH_CREATED = "branch_created"
    COMMIT_PUSHED = "commit_pushed"
    PR_CREATED = "pr_created"
    PR_MERGED = "pr_merged"

    # Workflow Events
    APPROVAL_REQUIRED = "approval_required"
    SPRINT_STARTED = "sprint_started"
    SPRINT_COMPLETED = "sprint_completed"

    # Pipeline Events
    PIPELINE_STARTED = "pipeline_started"
    PIPELINE_COMPLETED = "pipeline_completed"
    PIPELINE_FAILED = "pipeline_failed"

class ProjectEvent(BaseModel):
    id: str
    type: EventType
    project_id: str
    agent_id: str | None
    data: dict
    timestamp: datetime

8. WebSocket for Chat (Secondary)

# app/api/v1/chat.py
from fastapi import WebSocket, WebSocketDisconnect
from app.services.agent_chat import AgentChatService

@router.websocket("/projects/{project_id}/agents/{agent_id}/chat")
async def agent_chat(
    websocket: WebSocket,
    project_id: str,
    agent_id: str
):
    """Bidirectional chat with an agent."""
    await websocket.accept()

    chat_service = AgentChatService(project_id, agent_id)

    try:
        while True:
            # Receive message from client
            message = await websocket.receive_json()

            # Stream response from agent
            async for chunk in chat_service.get_response(message):
                await websocket.send_json({
                    "type": "chunk",
                    "content": chunk
                })

            await websocket.send_json({"type": "done"})
    except WebSocketDisconnect:
        pass

Performance Considerations

Connection Limits

  • Browser limit: ~6 connections per domain (HTTP/1.1)
  • Recommendation: Use single SSE connection per project, multiplex events

Scalability

  • Redis Pub/Sub handles cross-instance event distribution
  • Consider Redis Streams for message persistence (audit/replay)

Keepalive

  • Send comment every 30 seconds to prevent timeout
  • Client reconnects automatically on disconnect

Recommendations

  1. Use SSE for all server-to-client events (simpler, auto-reconnect)
  2. Use WebSocket only for interactive chat with agents
  3. Redis Pub/Sub for event distribution across instances
  4. Single SSE connection per project with event multiplexing
  5. Exponential backoff for client reconnection

References

Decision

Adopt SSE as the primary real-time transport with WebSocket reserved for bidirectional chat. Use Redis Pub/Sub for event distribution.


Spike completed. Findings will inform ADR-002: Real-time Communication Architecture.