feat(frontend): Implement client-side SSE handling (#35)

Implements real-time event streaming on the frontend with:

- Event types and type guards matching backend EventType enum
- Zustand-based event store with per-project buffering
- useProjectEvents hook with auto-reconnection and exponential backoff
- ConnectionStatus component showing connection state
- EventList component with expandable payloads and filtering

All 105 tests passing. Follows design system guidelines.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-30 01:34:41 +01:00
parent d6db6af964
commit fcda8f0f96
14 changed files with 3138 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
/**
* Custom React Hooks
*
* @module lib/hooks
*/
export { useProjectEvents, type UseProjectEventsOptions, type UseProjectEventsResult } from './useProjectEvents';

View File

@@ -0,0 +1,393 @@
/**
* SSE Hook for Project Events
*
* Provides real-time event streaming from the backend with:
* - Automatic reconnection with exponential backoff
* - Connection state management
* - Type-safe event handling
* - Integration with event store
*
* @module lib/hooks/useProjectEvents
*/
'use client';
import { useEffect, useRef, useCallback, useState } from 'react';
import { useAuth } from '@/lib/auth/AuthContext';
import { useEventStore, useProjectEventsFromStore } from '@/lib/stores/eventStore';
import type { ProjectEvent, ConnectionState, SSEError } from '@/lib/types/events';
import config from '@/config/app.config';
// ============================================================================
// Constants
// ============================================================================
/** Initial retry delay in milliseconds */
const INITIAL_RETRY_DELAY = 1000;
/** Maximum retry delay in milliseconds (30 seconds) */
const MAX_RETRY_DELAY = 30000;
/** Maximum number of retry attempts before giving up (0 = unlimited) */
const MAX_RETRY_ATTEMPTS = 0;
/** Backoff multiplier for exponential backoff */
const BACKOFF_MULTIPLIER = 2;
// ============================================================================
// Types
// ============================================================================
export interface UseProjectEventsOptions {
/** Enable automatic connection on mount (default: true) */
autoConnect?: boolean;
/** Custom retry delay in milliseconds (default: 1000) */
initialRetryDelay?: number;
/** Maximum retry delay in milliseconds (default: 30000) */
maxRetryDelay?: number;
/** Maximum retry attempts (0 = unlimited, default: 0) */
maxRetryAttempts?: number;
/** Callback when event is received */
onEvent?: (event: ProjectEvent) => void;
/** Callback when connection state changes */
onConnectionChange?: (state: ConnectionState) => void;
/** Callback when error occurs */
onError?: (error: SSEError) => void;
}
export interface UseProjectEventsResult {
/** Events for the project (from store) */
events: ProjectEvent[];
/** Whether connection is established */
isConnected: boolean;
/** Current connection state */
connectionState: ConnectionState;
/** Current error, if any */
error: SSEError | null;
/** Current retry attempt count */
retryCount: number;
/** Manually trigger reconnection */
reconnect: () => void;
/** Disconnect from SSE */
disconnect: () => void;
/** Clear events for this project */
clearEvents: () => void;
}
// ============================================================================
// Hook Implementation
// ============================================================================
/**
* Hook for consuming real-time project events via SSE
*
* @param projectId - Project ID to subscribe to
* @param options - Configuration options
* @returns Event data and connection controls
*
* @example
* ```tsx
* const { events, isConnected, error, reconnect } = useProjectEvents('project-123');
*
* if (!isConnected) {
* return <ConnectionStatus state={connectionState} onReconnect={reconnect} />;
* }
*
* return <EventList events={events} />;
* ```
*/
export function useProjectEvents(
projectId: string,
options: UseProjectEventsOptions = {}
): UseProjectEventsResult {
const {
autoConnect = true,
initialRetryDelay = INITIAL_RETRY_DELAY,
maxRetryDelay = MAX_RETRY_DELAY,
maxRetryAttempts = MAX_RETRY_ATTEMPTS,
onEvent,
onConnectionChange,
onError,
} = options;
// Auth state
const { accessToken, isAuthenticated } = useAuth();
// Event store
const events = useProjectEventsFromStore(projectId);
const addEvent = useEventStore((state) => state.addEvent);
const clearProjectEvents = useEventStore((state) => state.clearProjectEvents);
// Local state
const [connectionState, setConnectionState] = useState<ConnectionState>('disconnected');
const [error, setError] = useState<SSEError | null>(null);
const [retryCount, setRetryCount] = useState(0);
// Refs for cleanup and reconnection logic
const eventSourceRef = useRef<EventSource | null>(null);
const retryTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const currentRetryDelayRef = useRef(initialRetryDelay);
const isManualDisconnectRef = useRef(false);
const mountedRef = useRef(true);
/**
* Update connection state and notify callback
*/
const updateConnectionState = useCallback(
(state: ConnectionState) => {
if (!mountedRef.current) return;
setConnectionState(state);
onConnectionChange?.(state);
},
[onConnectionChange]
);
/**
* Handle SSE error
*/
const handleError = useCallback(
(message: string, code?: string) => {
if (!mountedRef.current) return;
const sseError: SSEError = {
message,
code,
timestamp: new Date().toISOString(),
retryAttempt: retryCount + 1,
};
setError(sseError);
onError?.(sseError);
},
[retryCount, onError]
);
/**
* Parse and validate event data
*/
const parseEvent = useCallback((data: string): ProjectEvent | null => {
try {
const parsed = JSON.parse(data);
// Validate required fields
if (!parsed.id || !parsed.type || !parsed.timestamp || !parsed.project_id) {
console.warn('[SSE] Invalid event structure:', parsed);
return null;
}
return parsed as ProjectEvent;
} catch (err) {
console.warn('[SSE] Failed to parse event data:', err);
return null;
}
}, []);
/**
* Close existing connection and clear retry timeout
*/
const cleanup = useCallback(() => {
if (retryTimeoutRef.current) {
clearTimeout(retryTimeoutRef.current);
retryTimeoutRef.current = null;
}
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
}, []);
/**
* Calculate next retry delay with exponential backoff
*/
const getNextRetryDelay = useCallback(() => {
const nextDelay = currentRetryDelayRef.current * BACKOFF_MULTIPLIER;
currentRetryDelayRef.current = Math.min(nextDelay, maxRetryDelay);
return currentRetryDelayRef.current;
}, [maxRetryDelay]);
/**
* Schedule reconnection attempt
*/
const scheduleReconnect = useCallback(() => {
if (isManualDisconnectRef.current) return;
if (maxRetryAttempts > 0 && retryCount >= maxRetryAttempts) {
console.warn('[SSE] Max retry attempts reached');
updateConnectionState('error');
return;
}
const delay = getNextRetryDelay();
if (config.debug.api) {
console.log(`[SSE] Scheduling reconnect in ${delay}ms (attempt ${retryCount + 1})`);
}
retryTimeoutRef.current = setTimeout(() => {
if (!mountedRef.current || isManualDisconnectRef.current) return;
setRetryCount((prev) => prev + 1);
connect();
}, delay);
}, [retryCount, maxRetryAttempts, getNextRetryDelay, updateConnectionState]);
/**
* Connect to SSE endpoint
*/
const connect = useCallback(() => {
// Prevent connection if not authenticated or no project ID
if (!isAuthenticated || !accessToken || !projectId) {
if (config.debug.api) {
console.log('[SSE] Cannot connect: missing auth or projectId');
}
return;
}
// Clean up existing connection
cleanup();
isManualDisconnectRef.current = false;
updateConnectionState('connecting');
setError(null);
// Build SSE URL with auth token
const baseUrl = config.api.url;
const sseUrl = `${baseUrl}/api/v1/projects/${projectId}/events`;
// Note: EventSource doesn't support custom headers natively
// We pass the token as a query parameter (backend should validate this)
const urlWithAuth = `${sseUrl}?token=${encodeURIComponent(accessToken)}`;
try {
const eventSource = new EventSource(urlWithAuth);
eventSourceRef.current = eventSource;
eventSource.onopen = () => {
if (!mountedRef.current) return;
if (config.debug.api) {
console.log('[SSE] Connection opened');
}
updateConnectionState('connected');
setRetryCount(0);
currentRetryDelayRef.current = initialRetryDelay;
};
eventSource.onmessage = (event) => {
if (!mountedRef.current) return;
const parsedEvent = parseEvent(event.data);
if (parsedEvent) {
// Add to store
addEvent(parsedEvent);
// Notify callback
onEvent?.(parsedEvent);
}
};
// Handle specific event types from backend
eventSource.addEventListener('ping', () => {
// Keep-alive ping from server, no action needed
if (config.debug.api) {
console.log('[SSE] Received ping');
}
});
eventSource.onerror = (err) => {
if (!mountedRef.current) return;
console.error('[SSE] Connection error:', err);
if (eventSource.readyState === EventSource.CLOSED) {
handleError('Connection closed unexpectedly', 'CONNECTION_CLOSED');
updateConnectionState('disconnected');
scheduleReconnect();
} else {
handleError('Connection error', 'CONNECTION_ERROR');
updateConnectionState('error');
scheduleReconnect();
}
};
} catch (err) {
console.error('[SSE] Failed to create EventSource:', err);
handleError('Failed to create connection', 'CREATION_FAILED');
updateConnectionState('error');
scheduleReconnect();
}
}, [
isAuthenticated,
accessToken,
projectId,
cleanup,
updateConnectionState,
handleError,
parseEvent,
addEvent,
onEvent,
scheduleReconnect,
initialRetryDelay,
]);
/**
* Manually disconnect from SSE
*/
const disconnect = useCallback(() => {
isManualDisconnectRef.current = true;
cleanup();
updateConnectionState('disconnected');
setRetryCount(0);
currentRetryDelayRef.current = initialRetryDelay;
}, [cleanup, updateConnectionState, initialRetryDelay]);
/**
* Manually trigger reconnection
*/
const reconnect = useCallback(() => {
disconnect();
isManualDisconnectRef.current = false;
connect();
}, [disconnect, connect]);
/**
* Clear events for this project
*/
const clearEvents = useCallback(() => {
clearProjectEvents(projectId);
}, [clearProjectEvents, projectId]);
// Auto-connect on mount if enabled
useEffect(() => {
mountedRef.current = true;
if (autoConnect && isAuthenticated && projectId) {
connect();
}
return () => {
mountedRef.current = false;
cleanup();
};
}, [autoConnect, isAuthenticated, projectId, connect, cleanup]);
// Reconnect when auth changes
useEffect(() => {
if (isAuthenticated && accessToken && connectionState === 'disconnected' && autoConnect) {
if (!isManualDisconnectRef.current) {
connect();
}
} else if (!isAuthenticated && connectionState !== 'disconnected') {
disconnect();
}
}, [isAuthenticated, accessToken, connectionState, autoConnect, connect, disconnect]);
return {
events,
isConnected: connectionState === 'connected',
connectionState,
error,
retryCount,
reconnect,
disconnect,
clearEvents,
};
}

View File

@@ -0,0 +1,225 @@
/**
* Event Store - Zustand store for project events
*
* Manages real-time events received via SSE with:
* - Event buffer (configurable, default 100 events)
* - Per-project event management
* - Event filtering utilities
*
* @module lib/stores/eventStore
*/
import { create } from 'zustand';
import type { ProjectEvent, EventType } from '@/lib/types/events';
// ============================================================================
// Constants
// ============================================================================
/** Maximum number of events to keep in buffer per project */
const DEFAULT_MAX_EVENTS = 100;
// ============================================================================
// Types
// ============================================================================
interface EventState {
/** Events indexed by project ID */
eventsByProject: Record<string, ProjectEvent[]>;
/** Maximum events to keep per project */
maxEvents: number;
}
interface EventActions {
/**
* Add an event to the store
* @param event - The event to add
*/
addEvent: (event: ProjectEvent) => void;
/**
* Add multiple events at once
* @param events - Array of events to add
*/
addEvents: (events: ProjectEvent[]) => void;
/**
* Clear all events for a specific project
* @param projectId - Project ID to clear events for
*/
clearProjectEvents: (projectId: string) => void;
/**
* Clear all events from the store
*/
clearAllEvents: () => void;
/**
* Get events for a specific project
* @param projectId - Project ID
* @returns Array of events for the project
*/
getProjectEvents: (projectId: string) => ProjectEvent[];
/**
* Get events filtered by type
* @param projectId - Project ID
* @param types - Event types to filter by
* @returns Filtered array of events
*/
getFilteredEvents: (projectId: string, types: EventType[]) => ProjectEvent[];
/**
* Set the maximum number of events to keep per project
* @param max - Maximum event count
*/
setMaxEvents: (max: number) => void;
}
export type EventStore = EventState & EventActions;
// ============================================================================
// Store Implementation
// ============================================================================
export const useEventStore = create<EventStore>((set, get) => ({
// Initial state
eventsByProject: {},
maxEvents: DEFAULT_MAX_EVENTS,
addEvent: (event: ProjectEvent) => {
set((state) => {
const projectId = event.project_id;
const existingEvents = state.eventsByProject[projectId] || [];
// Check for duplicate event IDs
if (existingEvents.some((e) => e.id === event.id)) {
return state; // Skip duplicate
}
// Add new event and trim to max
const updatedEvents = [...existingEvents, event].slice(-state.maxEvents);
return {
eventsByProject: {
...state.eventsByProject,
[projectId]: updatedEvents,
},
};
});
},
addEvents: (events: ProjectEvent[]) => {
if (events.length === 0) return;
set((state) => {
const updatedEventsByProject = { ...state.eventsByProject };
// Group events by project
const eventsByProjectId = events.reduce(
(acc, event) => {
if (!acc[event.project_id]) {
acc[event.project_id] = [];
}
acc[event.project_id].push(event);
return acc;
},
{} as Record<string, ProjectEvent[]>
);
// Merge events for each project
for (const [projectId, newEvents] of Object.entries(eventsByProjectId)) {
const existingEvents = updatedEventsByProject[projectId] || [];
// Filter out duplicates
const existingIds = new Set(existingEvents.map((e) => e.id));
const uniqueNewEvents = newEvents.filter((e) => !existingIds.has(e.id));
// Merge and trim
updatedEventsByProject[projectId] = [...existingEvents, ...uniqueNewEvents].slice(
-state.maxEvents
);
}
return { eventsByProject: updatedEventsByProject };
});
},
clearProjectEvents: (projectId: string) => {
set((state) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { [projectId]: _removed, ...rest } = state.eventsByProject;
return { eventsByProject: rest };
});
},
clearAllEvents: () => {
set({ eventsByProject: {} });
},
getProjectEvents: (projectId: string) => {
return get().eventsByProject[projectId] || [];
},
getFilteredEvents: (projectId: string, types: EventType[]) => {
const events = get().eventsByProject[projectId] || [];
if (types.length === 0) return events;
const typeSet = new Set(types);
return events.filter((event) => typeSet.has(event.type));
},
setMaxEvents: (max: number) => {
if (max < 1) {
console.warn(`Invalid maxEvents value: ${max}, using default: ${DEFAULT_MAX_EVENTS}`);
max = DEFAULT_MAX_EVENTS;
}
set((state) => {
// Trim existing events if necessary
const trimmedEventsByProject: Record<string, ProjectEvent[]> = {};
for (const [projectId, events] of Object.entries(state.eventsByProject)) {
trimmedEventsByProject[projectId] = events.slice(-max);
}
return {
maxEvents: max,
eventsByProject: trimmedEventsByProject,
};
});
},
}));
// ============================================================================
// Selector Hooks
// ============================================================================
/**
* Hook to get events for a specific project
* @param projectId - Project ID
* @returns Array of events for the project
*/
export function useProjectEventsFromStore(projectId: string): ProjectEvent[] {
return useEventStore((state) => state.eventsByProject[projectId] || []);
}
/**
* Hook to get the latest event for a project
* @param projectId - Project ID
* @returns Latest event or undefined
*/
export function useLatestEvent(projectId: string): ProjectEvent | undefined {
const events = useEventStore((state) => state.eventsByProject[projectId] || []);
return events[events.length - 1];
}
/**
* Hook to get event count for a project
* @param projectId - Project ID
* @returns Number of events
*/
export function useEventCount(projectId: string): number {
return useEventStore((state) => (state.eventsByProject[projectId] || []).length);
}

View File

@@ -3,5 +3,14 @@
export { useAuthStore, initializeAuth, type User } from './authStore';
// Event Store for SSE events
export {
useEventStore,
useProjectEventsFromStore,
useLatestEvent,
useEventCount,
type EventStore,
} from './eventStore';
// Authentication Context (DI wrapper for auth store)
export { useAuth, AuthProvider } from '../auth/AuthContext';

View File

@@ -0,0 +1,307 @@
/**
* Event Types and Interfaces for SSE
*
* These types mirror the backend event schemas from backend/app/schemas/events.py
* for type-safe event handling in the frontend.
*
* @module lib/types/events
*/
// ============================================================================
// Event Type Enum
// ============================================================================
/**
* Event types matching backend EventType enum.
* Naming convention: {domain}.{action}
*/
export enum EventType {
// Agent Events
AGENT_SPAWNED = 'agent.spawned',
AGENT_STATUS_CHANGED = 'agent.status_changed',
AGENT_MESSAGE = 'agent.message',
AGENT_TERMINATED = 'agent.terminated',
// Issue Events
ISSUE_CREATED = 'issue.created',
ISSUE_UPDATED = 'issue.updated',
ISSUE_ASSIGNED = 'issue.assigned',
ISSUE_CLOSED = 'issue.closed',
// Sprint Events
SPRINT_STARTED = 'sprint.started',
SPRINT_COMPLETED = 'sprint.completed',
// Approval Events
APPROVAL_REQUESTED = 'approval.requested',
APPROVAL_GRANTED = 'approval.granted',
APPROVAL_DENIED = 'approval.denied',
// Project Events
PROJECT_CREATED = 'project.created',
PROJECT_UPDATED = 'project.updated',
PROJECT_ARCHIVED = 'project.archived',
// Workflow Events
WORKFLOW_STARTED = 'workflow.started',
WORKFLOW_STEP_COMPLETED = 'workflow.step_completed',
WORKFLOW_COMPLETED = 'workflow.completed',
WORKFLOW_FAILED = 'workflow.failed',
}
// ============================================================================
// Actor Types
// ============================================================================
/**
* Type of actor who triggered an event
*/
export type ActorType = 'agent' | 'user' | 'system';
// ============================================================================
// Base Event Interface
// ============================================================================
/**
* Base event schema matching backend Event model.
* All events from the EventBus conform to this schema.
*/
export interface ProjectEvent {
/** Unique event identifier (UUID string) */
id: string;
/** Event type enum value */
type: EventType;
/** When the event occurred (ISO 8601 UTC) */
timestamp: string;
/** Project this event belongs to (UUID string) */
project_id: string;
/** ID of the agent or user who triggered the event (UUID string) */
actor_id: string | null;
/** Type of actor: 'agent', 'user', or 'system' */
actor_type: ActorType;
/** Event-specific payload data */
payload: EventPayload;
}
// ============================================================================
// Payload Types
// ============================================================================
/**
* Union type for all possible event payloads
*/
export type EventPayload =
| AgentSpawnedPayload
| AgentStatusChangedPayload
| AgentMessagePayload
| AgentTerminatedPayload
| IssueCreatedPayload
| IssueUpdatedPayload
| IssueAssignedPayload
| IssueClosedPayload
| SprintStartedPayload
| SprintCompletedPayload
| ApprovalRequestedPayload
| ApprovalGrantedPayload
| ApprovalDeniedPayload
| WorkflowStartedPayload
| WorkflowStepCompletedPayload
| WorkflowCompletedPayload
| WorkflowFailedPayload
| Record<string, unknown>;
// Agent Payloads
export interface AgentSpawnedPayload {
agent_instance_id: string;
agent_type_id: string;
agent_name: string;
role: string;
}
export interface AgentStatusChangedPayload {
agent_instance_id: string;
previous_status: string;
new_status: string;
reason?: string | null;
}
export interface AgentMessagePayload {
agent_instance_id: string;
message: string;
message_type: 'info' | 'warning' | 'error' | 'debug';
metadata?: Record<string, unknown>;
}
export interface AgentTerminatedPayload {
agent_instance_id: string;
termination_reason: string;
final_status: string;
}
// Issue Payloads
export interface IssueCreatedPayload {
issue_id: string;
title: string;
priority?: string | null;
labels?: string[];
}
export interface IssueUpdatedPayload {
issue_id: string;
changes: Record<string, unknown>;
}
export interface IssueAssignedPayload {
issue_id: string;
assignee_id?: string | null;
assignee_name?: string | null;
}
export interface IssueClosedPayload {
issue_id: string;
resolution: string;
}
// Sprint Payloads
export interface SprintStartedPayload {
sprint_id: string;
sprint_name: string;
goal?: string | null;
issue_count?: number;
}
export interface SprintCompletedPayload {
sprint_id: string;
sprint_name: string;
completed_issues?: number;
incomplete_issues?: number;
}
// Approval Payloads
export interface ApprovalRequestedPayload {
approval_id: string;
approval_type: string;
description: string;
requested_by?: string | null;
timeout_minutes?: number | null;
}
export interface ApprovalGrantedPayload {
approval_id: string;
approved_by: string;
comments?: string | null;
}
export interface ApprovalDeniedPayload {
approval_id: string;
denied_by: string;
reason: string;
}
// Workflow Payloads
export interface WorkflowStartedPayload {
workflow_id: string;
workflow_type: string;
total_steps?: number;
}
export interface WorkflowStepCompletedPayload {
workflow_id: string;
step_name: string;
step_number: number;
total_steps: number;
result?: Record<string, unknown>;
}
export interface WorkflowCompletedPayload {
workflow_id: string;
duration_seconds: number;
result?: Record<string, unknown>;
}
export interface WorkflowFailedPayload {
workflow_id: string;
error_message: string;
failed_step?: string | null;
recoverable?: boolean;
}
// ============================================================================
// Type Guards
// ============================================================================
/**
* Type guard to check if an event is a specific type
*/
export function isEventType<T extends EventType>(
event: ProjectEvent,
type: T
): event is ProjectEvent & { type: T } {
return event.type === type;
}
/**
* Type guard for agent events
*/
export function isAgentEvent(event: ProjectEvent): boolean {
return event.type.startsWith('agent.');
}
/**
* Type guard for issue events
*/
export function isIssueEvent(event: ProjectEvent): boolean {
return event.type.startsWith('issue.');
}
/**
* Type guard for sprint events
*/
export function isSprintEvent(event: ProjectEvent): boolean {
return event.type.startsWith('sprint.');
}
/**
* Type guard for approval events
*/
export function isApprovalEvent(event: ProjectEvent): boolean {
return event.type.startsWith('approval.');
}
/**
* Type guard for workflow events
*/
export function isWorkflowEvent(event: ProjectEvent): boolean {
return event.type.startsWith('workflow.');
}
/**
* Type guard for project events
*/
export function isProjectEvent(event: ProjectEvent): boolean {
return event.type.startsWith('project.');
}
// ============================================================================
// SSE Connection Types
// ============================================================================
/**
* Connection state for SSE
*/
export type ConnectionState = 'connecting' | 'connected' | 'disconnected' | 'error';
/**
* SSE error information
*/
export interface SSEError {
message: string;
code?: string;
timestamp: string;
retryAttempt?: number;
}

View File

@@ -0,0 +1,39 @@
/**
* Type Definitions
*
* @module lib/types
*/
// Event types for SSE
export {
EventType,
type ActorType,
type ProjectEvent,
type EventPayload,
type AgentSpawnedPayload,
type AgentStatusChangedPayload,
type AgentMessagePayload,
type AgentTerminatedPayload,
type IssueCreatedPayload,
type IssueUpdatedPayload,
type IssueAssignedPayload,
type IssueClosedPayload,
type SprintStartedPayload,
type SprintCompletedPayload,
type ApprovalRequestedPayload,
type ApprovalGrantedPayload,
type ApprovalDeniedPayload,
type WorkflowStartedPayload,
type WorkflowStepCompletedPayload,
type WorkflowCompletedPayload,
type WorkflowFailedPayload,
type ConnectionState,
type SSEError,
isEventType,
isAgentEvent,
isIssueEvent,
isSprintEvent,
isApprovalEvent,
isWorkflowEvent,
isProjectEvent,
} from './events';