""" SSE endpoint for real-time project event streaming. This module provides Server-Sent Events (SSE) endpoints for streaming project events to connected clients. Events are scoped to projects, with authorization checks to ensure clients only receive events for projects they have access to. Features: - Real-time event streaming via SSE - Project-scoped authorization - Automatic reconnection support (Last-Event-ID) - Keepalive messages every 30 seconds - Graceful connection cleanup """ import asyncio import json import logging from uuid import UUID from fastapi import APIRouter, Depends, Header, Request from slowapi import Limiter from slowapi.util import get_remote_address from sse_starlette.sse import EventSourceResponse from app.api.dependencies.auth import get_current_user from app.api.dependencies.event_bus import get_event_bus from app.core.exceptions import AuthorizationError from app.models.user import User from app.schemas.errors import ErrorCode from app.schemas.events import EventType from app.services.event_bus import EventBus logger = logging.getLogger(__name__) router = APIRouter() limiter = Limiter(key_func=get_remote_address) # Keepalive interval in seconds KEEPALIVE_INTERVAL = 30 async def check_project_access( project_id: UUID, user: User, ) -> bool: """ Check if a user has access to a project's events. This is a placeholder implementation that will be replaced with actual project authorization logic once the Project model is implemented. Currently allows access for all authenticated users. Args: project_id: The project to check access for user: The authenticated user Returns: bool: True if user has access, False otherwise TODO: Implement actual project authorization - Check if user owns the project - Check if user is a member of the project - Check project visibility settings """ # Placeholder: Allow all authenticated users for now # This will be replaced with actual project ownership/membership check logger.debug( f"Project access check for user {user.id} on project {project_id} " "(placeholder: allowing all authenticated users)" ) return True async def event_generator( project_id: UUID, event_bus: EventBus, last_event_id: str | None = None, ): """ Generate SSE events for a project. This async generator yields SSE-formatted events from the event bus, including keepalive comments to maintain the connection. Args: project_id: The project to stream events for event_bus: The EventBus instance last_event_id: Optional last received event ID for reconnection Yields: dict: SSE event data with 'event', 'data', and optional 'id' fields """ try: async for event_data in event_bus.subscribe_sse( project_id=project_id, last_event_id=last_event_id, keepalive_interval=KEEPALIVE_INTERVAL, ): if event_data == "": # Keepalive - yield SSE comment yield {"comment": "keepalive"} else: # Parse event to extract type and id try: event_dict = json.loads(event_data) event_type = event_dict.get("type", "message") event_id = event_dict.get("id") yield { "event": event_type, "data": event_data, "id": event_id, } except json.JSONDecodeError: # If we can't parse, send as generic message yield { "event": "message", "data": event_data, } except asyncio.CancelledError: logger.info(f"Event stream cancelled for project {project_id}") raise except Exception as e: logger.error(f"Error in event stream for project {project_id}: {e}") raise @router.get( "/projects/{project_id}/events/stream", summary="Stream Project Events", description=""" Stream real-time events for a project via Server-Sent Events (SSE). **Authentication**: Required (Bearer token) **Authorization**: Must have access to the project **SSE Event Format**: ``` event: agent.status_changed id: 550e8400-e29b-41d4-a716-446655440000 data: {"id": "...", "type": "agent.status_changed", "project_id": "...", ...} : keepalive event: issue.created id: 550e8400-e29b-41d4-a716-446655440001 data: {...} ``` **Reconnection**: Include the `Last-Event-ID` header with the last received event ID to resume from where you left off. **Keepalive**: The server sends a comment (`: keepalive`) every 30 seconds to keep the connection alive. **Rate Limit**: 10 connections/minute per IP """, response_class=EventSourceResponse, responses={ 200: { "description": "SSE stream established", "content": {"text/event-stream": {}}, }, 401: {"description": "Not authenticated"}, 403: {"description": "Not authorized to access this project"}, 404: {"description": "Project not found"}, }, operation_id="stream_project_events", ) @limiter.limit("10/minute") async def stream_project_events( request: Request, project_id: UUID, current_user: User = Depends(get_current_user), event_bus: EventBus = Depends(get_event_bus), last_event_id: str | None = Header(None, alias="Last-Event-ID"), ): """ Stream real-time events for a project via SSE. This endpoint establishes a persistent SSE connection that streams project events to the client in real-time. The connection includes: - Event streaming: All project events (agent updates, issues, etc.) - Keepalive: Comment every 30 seconds to maintain connection - Reconnection: Use Last-Event-ID header to resume after disconnect The connection is automatically cleaned up when the client disconnects. """ logger.info( f"SSE connection request for project {project_id} " f"by user {current_user.id} " f"(last_event_id={last_event_id})" ) # Check project access has_access = await check_project_access(project_id, current_user) if not has_access: raise AuthorizationError( message=f"You don't have access to project {project_id}", error_code=ErrorCode.INSUFFICIENT_PERMISSIONS, ) # Return SSE response return EventSourceResponse( event_generator( project_id=project_id, event_bus=event_bus, last_event_id=last_event_id, ), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering }, ) @router.post( "/projects/{project_id}/events/test", summary="Send Test Event (Development Only)", description=""" Send a test event to a project's event stream. This endpoint is intended for development and testing purposes. **Authentication**: Required (Bearer token) **Authorization**: Must have access to the project **Note**: This endpoint should be disabled or restricted in production. """, response_model=dict, responses={ 200: {"description": "Test event sent"}, 401: {"description": "Not authenticated"}, 403: {"description": "Not authorized to access this project"}, }, operation_id="send_test_event", ) async def send_test_event( project_id: UUID, current_user: User = Depends(get_current_user), event_bus: EventBus = Depends(get_event_bus), ): """ Send a test event to the project's event stream. This is useful for testing SSE connections during development. """ # Check project access has_access = await check_project_access(project_id, current_user) if not has_access: raise AuthorizationError( message=f"You don't have access to project {project_id}", error_code=ErrorCode.INSUFFICIENT_PERMISSIONS, ) # Create and publish test event using the Event schema event = EventBus.create_event( event_type=EventType.AGENT_MESSAGE, project_id=project_id, actor_type="user", actor_id=current_user.id, payload={ "message": "Test event from SSE endpoint", "message_type": "info", }, ) channel = event_bus.get_project_channel(project_id) await event_bus.publish(channel, event) logger.info(f"Test event sent to project {project_id}: {event.id}") return { "success": True, "event_id": event.id, "event_type": event.type.value, "message": "Test event sent successfully", }