feat(backend): Add SSE endpoint for project event streaming
- Add /projects/{project_id}/events/stream SSE endpoint
- Add event_bus dependency injection
- Add project access authorization (placeholder)
- Add test event endpoint for development
- Add keepalive comments every 30 seconds
- Add reconnection support via Last-Event-ID header
- Add rate limiting (10/minute per IP)
- Mount events router in API
- Add sse-starlette dependency
- Add 19 comprehensive tests for SSE functionality
Implements #34
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
36
backend/app/api/dependencies/event_bus.py
Normal file
36
backend/app/api/dependencies/event_bus.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""
|
||||
Event bus dependency for FastAPI routes.
|
||||
|
||||
This module provides the FastAPI dependency for injecting the EventBus
|
||||
into route handlers. The event bus is a singleton that maintains
|
||||
Redis pub/sub connections for real-time event streaming.
|
||||
"""
|
||||
|
||||
from app.services.event_bus import (
|
||||
EventBus,
|
||||
get_connected_event_bus as _get_connected_event_bus,
|
||||
)
|
||||
|
||||
|
||||
async def get_event_bus() -> EventBus:
|
||||
"""
|
||||
FastAPI dependency that provides a connected EventBus instance.
|
||||
|
||||
The EventBus is a singleton that maintains Redis pub/sub connections.
|
||||
It's lazily initialized and connected on first access, and should be
|
||||
closed during application shutdown via close_event_bus().
|
||||
|
||||
Usage:
|
||||
@router.get("/events/stream")
|
||||
async def stream_events(
|
||||
event_bus: EventBus = Depends(get_event_bus)
|
||||
):
|
||||
...
|
||||
|
||||
Returns:
|
||||
EventBus: The global connected event bus instance
|
||||
|
||||
Raises:
|
||||
EventBusConnectionError: If connection to Redis fails
|
||||
"""
|
||||
return await _get_connected_event_bus()
|
||||
@@ -3,6 +3,7 @@ from fastapi import APIRouter
|
||||
from app.api.routes import (
|
||||
admin,
|
||||
auth,
|
||||
events,
|
||||
oauth,
|
||||
oauth_provider,
|
||||
organizations,
|
||||
@@ -22,3 +23,5 @@ api_router.include_router(admin.router, prefix="/admin", tags=["Admin"])
|
||||
api_router.include_router(
|
||||
organizations.router, prefix="/organizations", tags=["Organizations"]
|
||||
)
|
||||
# SSE events router - no prefix, routes define full paths
|
||||
api_router.include_router(events.router, tags=["Events"])
|
||||
|
||||
283
backend/app/api/routes/events.py
Normal file
283
backend/app/api/routes/events.py
Normal file
@@ -0,0 +1,283 @@
|
||||
"""
|
||||
SSE endpoint for real-time project event streaming.
|
||||
|
||||
This module provides Server-Sent Events (SSE) endpoints for streaming
|
||||
project events to connected clients. Events are scoped to projects,
|
||||
with authorization checks to ensure clients only receive events
|
||||
for projects they have access to.
|
||||
|
||||
Features:
|
||||
- Real-time event streaming via SSE
|
||||
- Project-scoped authorization
|
||||
- Automatic reconnection support (Last-Event-ID)
|
||||
- Keepalive messages every 30 seconds
|
||||
- Graceful connection cleanup
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Depends, Header, Request
|
||||
from slowapi import Limiter
|
||||
from slowapi.util import get_remote_address
|
||||
from sse_starlette.sse import EventSourceResponse
|
||||
|
||||
from app.api.dependencies.auth import get_current_user
|
||||
from app.api.dependencies.event_bus import get_event_bus
|
||||
from app.core.exceptions import AuthorizationError
|
||||
from app.models.user import User
|
||||
from app.schemas.errors import ErrorCode
|
||||
from app.schemas.events import EventType
|
||||
from app.services.event_bus import EventBus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
limiter = Limiter(key_func=get_remote_address)
|
||||
|
||||
# Keepalive interval in seconds
|
||||
KEEPALIVE_INTERVAL = 30
|
||||
|
||||
|
||||
async def check_project_access(
|
||||
project_id: UUID,
|
||||
user: User,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if a user has access to a project's events.
|
||||
|
||||
This is a placeholder implementation that will be replaced
|
||||
with actual project authorization logic once the Project model
|
||||
is implemented. Currently allows access for all authenticated users.
|
||||
|
||||
Args:
|
||||
project_id: The project to check access for
|
||||
user: The authenticated user
|
||||
|
||||
Returns:
|
||||
bool: True if user has access, False otherwise
|
||||
|
||||
TODO: Implement actual project authorization
|
||||
- Check if user owns the project
|
||||
- Check if user is a member of the project
|
||||
- Check project visibility settings
|
||||
"""
|
||||
# Placeholder: Allow all authenticated users for now
|
||||
# This will be replaced with actual project ownership/membership check
|
||||
logger.debug(
|
||||
f"Project access check for user {user.id} on project {project_id} "
|
||||
"(placeholder: allowing all authenticated users)"
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
async def event_generator(
|
||||
project_id: UUID,
|
||||
event_bus: EventBus,
|
||||
last_event_id: str | None = None,
|
||||
):
|
||||
"""
|
||||
Generate SSE events for a project.
|
||||
|
||||
This async generator yields SSE-formatted events from the event bus,
|
||||
including keepalive comments to maintain the connection.
|
||||
|
||||
Args:
|
||||
project_id: The project to stream events for
|
||||
event_bus: The EventBus instance
|
||||
last_event_id: Optional last received event ID for reconnection
|
||||
|
||||
Yields:
|
||||
dict: SSE event data with 'event', 'data', and optional 'id' fields
|
||||
"""
|
||||
try:
|
||||
async for event_data in event_bus.subscribe_sse(
|
||||
project_id=project_id,
|
||||
last_event_id=last_event_id,
|
||||
keepalive_interval=KEEPALIVE_INTERVAL,
|
||||
):
|
||||
if event_data == "":
|
||||
# Keepalive - yield SSE comment
|
||||
yield {"comment": "keepalive"}
|
||||
else:
|
||||
# Parse event to extract type and id
|
||||
try:
|
||||
event_dict = json.loads(event_data)
|
||||
event_type = event_dict.get("type", "message")
|
||||
event_id = event_dict.get("id")
|
||||
|
||||
yield {
|
||||
"event": event_type,
|
||||
"data": event_data,
|
||||
"id": event_id,
|
||||
}
|
||||
except json.JSONDecodeError:
|
||||
# If we can't parse, send as generic message
|
||||
yield {
|
||||
"event": "message",
|
||||
"data": event_data,
|
||||
}
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"Event stream cancelled for project {project_id}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error in event stream for project {project_id}: {e}")
|
||||
raise
|
||||
|
||||
|
||||
@router.get(
|
||||
"/projects/{project_id}/events/stream",
|
||||
summary="Stream Project Events",
|
||||
description="""
|
||||
Stream real-time events for a project via Server-Sent Events (SSE).
|
||||
|
||||
**Authentication**: Required (Bearer token)
|
||||
**Authorization**: Must have access to the project
|
||||
|
||||
**SSE Event Format**:
|
||||
```
|
||||
event: agent.status_changed
|
||||
id: 550e8400-e29b-41d4-a716-446655440000
|
||||
data: {"id": "...", "type": "agent.status_changed", "project_id": "...", ...}
|
||||
|
||||
: keepalive
|
||||
|
||||
event: issue.created
|
||||
id: 550e8400-e29b-41d4-a716-446655440001
|
||||
data: {...}
|
||||
```
|
||||
|
||||
**Reconnection**: Include the `Last-Event-ID` header with the last received
|
||||
event ID to resume from where you left off.
|
||||
|
||||
**Keepalive**: The server sends a comment (`: keepalive`) every 30 seconds
|
||||
to keep the connection alive.
|
||||
|
||||
**Rate Limit**: 10 connections/minute per IP
|
||||
""",
|
||||
response_class=EventSourceResponse,
|
||||
responses={
|
||||
200: {
|
||||
"description": "SSE stream established",
|
||||
"content": {"text/event-stream": {}},
|
||||
},
|
||||
401: {"description": "Not authenticated"},
|
||||
403: {"description": "Not authorized to access this project"},
|
||||
404: {"description": "Project not found"},
|
||||
},
|
||||
operation_id="stream_project_events",
|
||||
)
|
||||
@limiter.limit("10/minute")
|
||||
async def stream_project_events(
|
||||
request: Request,
|
||||
project_id: UUID,
|
||||
current_user: User = Depends(get_current_user),
|
||||
event_bus: EventBus = Depends(get_event_bus),
|
||||
last_event_id: str | None = Header(None, alias="Last-Event-ID"),
|
||||
):
|
||||
"""
|
||||
Stream real-time events for a project via SSE.
|
||||
|
||||
This endpoint establishes a persistent SSE connection that streams
|
||||
project events to the client in real-time. The connection includes:
|
||||
|
||||
- Event streaming: All project events (agent updates, issues, etc.)
|
||||
- Keepalive: Comment every 30 seconds to maintain connection
|
||||
- Reconnection: Use Last-Event-ID header to resume after disconnect
|
||||
|
||||
The connection is automatically cleaned up when the client disconnects.
|
||||
"""
|
||||
logger.info(
|
||||
f"SSE connection request for project {project_id} "
|
||||
f"by user {current_user.id} "
|
||||
f"(last_event_id={last_event_id})"
|
||||
)
|
||||
|
||||
# Check project access
|
||||
has_access = await check_project_access(project_id, current_user)
|
||||
if not has_access:
|
||||
raise AuthorizationError(
|
||||
message=f"You don't have access to project {project_id}",
|
||||
error_code=ErrorCode.INSUFFICIENT_PERMISSIONS,
|
||||
)
|
||||
|
||||
# Return SSE response
|
||||
return EventSourceResponse(
|
||||
event_generator(
|
||||
project_id=project_id,
|
||||
event_bus=event_bus,
|
||||
last_event_id=last_event_id,
|
||||
),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no", # Disable nginx buffering
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/projects/{project_id}/events/test",
|
||||
summary="Send Test Event (Development Only)",
|
||||
description="""
|
||||
Send a test event to a project's event stream. This endpoint is
|
||||
intended for development and testing purposes.
|
||||
|
||||
**Authentication**: Required (Bearer token)
|
||||
**Authorization**: Must have access to the project
|
||||
|
||||
**Note**: This endpoint should be disabled or restricted in production.
|
||||
""",
|
||||
response_model=dict,
|
||||
responses={
|
||||
200: {"description": "Test event sent"},
|
||||
401: {"description": "Not authenticated"},
|
||||
403: {"description": "Not authorized to access this project"},
|
||||
},
|
||||
operation_id="send_test_event",
|
||||
)
|
||||
async def send_test_event(
|
||||
project_id: UUID,
|
||||
current_user: User = Depends(get_current_user),
|
||||
event_bus: EventBus = Depends(get_event_bus),
|
||||
):
|
||||
"""
|
||||
Send a test event to the project's event stream.
|
||||
|
||||
This is useful for testing SSE connections during development.
|
||||
"""
|
||||
# Check project access
|
||||
has_access = await check_project_access(project_id, current_user)
|
||||
if not has_access:
|
||||
raise AuthorizationError(
|
||||
message=f"You don't have access to project {project_id}",
|
||||
error_code=ErrorCode.INSUFFICIENT_PERMISSIONS,
|
||||
)
|
||||
|
||||
# Create and publish test event using the Event schema
|
||||
event = EventBus.create_event(
|
||||
event_type=EventType.AGENT_MESSAGE,
|
||||
project_id=project_id,
|
||||
actor_type="user",
|
||||
actor_id=current_user.id,
|
||||
payload={
|
||||
"message": "Test event from SSE endpoint",
|
||||
"message_type": "info",
|
||||
},
|
||||
)
|
||||
|
||||
channel = event_bus.get_project_channel(project_id)
|
||||
await event_bus.publish(channel, event)
|
||||
|
||||
logger.info(f"Test event sent to project {project_id}: {event.id}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"event_id": event.id,
|
||||
"event_type": event.type.value,
|
||||
"message": "Test event sent successfully",
|
||||
}
|
||||
Reference in New Issue
Block a user