- Add cancel_sprint and delete_sprint endpoints to sprints.py - Add unassign_issue endpoint to issues.py - Add remove_issue_from_sprint endpoint to sprints.py - Add CRUD methods: remove_sprint_from_issues, unassign, remove_from_sprint - Add validation to prevent closed issues in active/planned sprints - Add authorization tests for SSE events endpoint - Fix IDOR vulnerabilities in agents.py and projects.py - Add Syndarix models migration (0004) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
302 lines
9.3 KiB
Python
302 lines
9.3 KiB
Python
"""
|
|
SSE endpoint for real-time project event streaming.
|
|
|
|
This module provides Server-Sent Events (SSE) endpoints for streaming
|
|
project events to connected clients. Events are scoped to projects,
|
|
with authorization checks to ensure clients only receive events
|
|
for projects they have access to.
|
|
|
|
Features:
|
|
- Real-time event streaming via SSE
|
|
- Project-scoped authorization
|
|
- Automatic reconnection support (Last-Event-ID)
|
|
- Keepalive messages every 30 seconds
|
|
- Graceful connection cleanup
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import TYPE_CHECKING
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, Header, Request
|
|
from slowapi import Limiter
|
|
from slowapi.util import get_remote_address
|
|
from sse_starlette.sse import EventSourceResponse
|
|
|
|
from app.api.dependencies.auth import get_current_user
|
|
from app.api.dependencies.event_bus import get_event_bus
|
|
from app.core.database import get_db
|
|
from app.core.exceptions import AuthorizationError
|
|
from app.models.user import User
|
|
from app.schemas.errors import ErrorCode
|
|
from app.schemas.events import EventType
|
|
from app.services.event_bus import EventBus
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
# Keepalive interval in seconds
|
|
KEEPALIVE_INTERVAL = 30
|
|
|
|
|
|
async def check_project_access(
|
|
project_id: UUID,
|
|
user: User,
|
|
db: "AsyncSession",
|
|
) -> bool:
|
|
"""
|
|
Check if a user has access to a project's events.
|
|
|
|
Authorization rules:
|
|
- Superusers can access all projects
|
|
- Project owners can access their own projects
|
|
|
|
Args:
|
|
project_id: The project to check access for
|
|
user: The authenticated user
|
|
db: Database session for project lookup
|
|
|
|
Returns:
|
|
bool: True if user has access, False otherwise
|
|
"""
|
|
# Superusers can access all projects
|
|
if user.is_superuser:
|
|
logger.debug(
|
|
f"Project access granted for superuser {user.id} on project {project_id}"
|
|
)
|
|
return True
|
|
|
|
# Check if user owns the project
|
|
from app.crud.syndarix import project as project_crud
|
|
|
|
project = await project_crud.get(db, id=project_id)
|
|
if not project:
|
|
logger.debug(f"Project {project_id} not found for access check")
|
|
return False
|
|
|
|
has_access = bool(project.owner_id == user.id)
|
|
logger.debug(
|
|
f"Project access {'granted' if has_access else 'denied'} "
|
|
f"for user {user.id} on project {project_id} (owner: {project.owner_id})"
|
|
)
|
|
return has_access
|
|
|
|
|
|
async def event_generator(
|
|
project_id: UUID,
|
|
event_bus: EventBus,
|
|
last_event_id: str | None = None,
|
|
):
|
|
"""
|
|
Generate SSE events for a project.
|
|
|
|
This async generator yields SSE-formatted events from the event bus,
|
|
including keepalive comments to maintain the connection.
|
|
|
|
Args:
|
|
project_id: The project to stream events for
|
|
event_bus: The EventBus instance
|
|
last_event_id: Optional last received event ID for reconnection
|
|
|
|
Yields:
|
|
dict: SSE event data with 'event', 'data', and optional 'id' fields
|
|
"""
|
|
try:
|
|
async for event_data in event_bus.subscribe_sse(
|
|
project_id=project_id,
|
|
last_event_id=last_event_id,
|
|
keepalive_interval=KEEPALIVE_INTERVAL,
|
|
):
|
|
if event_data == "":
|
|
# Keepalive - yield SSE comment
|
|
yield {"comment": "keepalive"}
|
|
else:
|
|
# Parse event to extract type and id
|
|
try:
|
|
event_dict = json.loads(event_data)
|
|
event_type = event_dict.get("type", "message")
|
|
event_id = event_dict.get("id")
|
|
|
|
yield {
|
|
"event": event_type,
|
|
"data": event_data,
|
|
"id": event_id,
|
|
}
|
|
except json.JSONDecodeError:
|
|
# If we can't parse, send as generic message
|
|
yield {
|
|
"event": "message",
|
|
"data": event_data,
|
|
}
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info(f"Event stream cancelled for project {project_id}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error in event stream for project {project_id}: {e}")
|
|
raise
|
|
|
|
|
|
@router.get(
|
|
"/projects/{project_id}/events/stream",
|
|
summary="Stream Project Events",
|
|
description="""
|
|
Stream real-time events for a project via Server-Sent Events (SSE).
|
|
|
|
**Authentication**: Required (Bearer token)
|
|
**Authorization**: Must have access to the project
|
|
|
|
**SSE Event Format**:
|
|
```
|
|
event: agent.status_changed
|
|
id: 550e8400-e29b-41d4-a716-446655440000
|
|
data: {"id": "...", "type": "agent.status_changed", "project_id": "...", ...}
|
|
|
|
: keepalive
|
|
|
|
event: issue.created
|
|
id: 550e8400-e29b-41d4-a716-446655440001
|
|
data: {...}
|
|
```
|
|
|
|
**Reconnection**: Include the `Last-Event-ID` header with the last received
|
|
event ID to resume from where you left off.
|
|
|
|
**Keepalive**: The server sends a comment (`: keepalive`) every 30 seconds
|
|
to keep the connection alive.
|
|
|
|
**Rate Limit**: 10 connections/minute per IP
|
|
""",
|
|
response_class=EventSourceResponse,
|
|
responses={
|
|
200: {
|
|
"description": "SSE stream established",
|
|
"content": {"text/event-stream": {}},
|
|
},
|
|
401: {"description": "Not authenticated"},
|
|
403: {"description": "Not authorized to access this project"},
|
|
404: {"description": "Project not found"},
|
|
},
|
|
operation_id="stream_project_events",
|
|
)
|
|
@limiter.limit("10/minute")
|
|
async def stream_project_events(
|
|
request: Request,
|
|
project_id: UUID,
|
|
current_user: User = Depends(get_current_user),
|
|
event_bus: EventBus = Depends(get_event_bus),
|
|
db: "AsyncSession" = Depends(get_db),
|
|
last_event_id: str | None = Header(None, alias="Last-Event-ID"),
|
|
):
|
|
"""
|
|
Stream real-time events for a project via SSE.
|
|
|
|
This endpoint establishes a persistent SSE connection that streams
|
|
project events to the client in real-time. The connection includes:
|
|
|
|
- Event streaming: All project events (agent updates, issues, etc.)
|
|
- Keepalive: Comment every 30 seconds to maintain connection
|
|
- Reconnection: Use Last-Event-ID header to resume after disconnect
|
|
|
|
The connection is automatically cleaned up when the client disconnects.
|
|
"""
|
|
logger.info(
|
|
f"SSE connection request for project {project_id} "
|
|
f"by user {current_user.id} "
|
|
f"(last_event_id={last_event_id})"
|
|
)
|
|
|
|
# Check project access
|
|
has_access = await check_project_access(project_id, current_user, db)
|
|
if not has_access:
|
|
raise AuthorizationError(
|
|
message=f"You don't have access to project {project_id}",
|
|
error_code=ErrorCode.INSUFFICIENT_PERMISSIONS,
|
|
)
|
|
|
|
# Return SSE response
|
|
return EventSourceResponse(
|
|
event_generator(
|
|
project_id=project_id,
|
|
event_bus=event_bus,
|
|
last_event_id=last_event_id,
|
|
),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no", # Disable nginx buffering
|
|
},
|
|
)
|
|
|
|
|
|
@router.post(
|
|
"/projects/{project_id}/events/test",
|
|
summary="Send Test Event (Development Only)",
|
|
description="""
|
|
Send a test event to a project's event stream. This endpoint is
|
|
intended for development and testing purposes.
|
|
|
|
**Authentication**: Required (Bearer token)
|
|
**Authorization**: Must have access to the project
|
|
|
|
**Note**: This endpoint should be disabled or restricted in production.
|
|
""",
|
|
response_model=dict,
|
|
responses={
|
|
200: {"description": "Test event sent"},
|
|
401: {"description": "Not authenticated"},
|
|
403: {"description": "Not authorized to access this project"},
|
|
},
|
|
operation_id="send_test_event",
|
|
)
|
|
async def send_test_event(
|
|
project_id: UUID,
|
|
current_user: User = Depends(get_current_user),
|
|
event_bus: EventBus = Depends(get_event_bus),
|
|
db: "AsyncSession" = Depends(get_db),
|
|
):
|
|
"""
|
|
Send a test event to the project's event stream.
|
|
|
|
This is useful for testing SSE connections during development.
|
|
"""
|
|
# Check project access
|
|
has_access = await check_project_access(project_id, current_user, db)
|
|
if not has_access:
|
|
raise AuthorizationError(
|
|
message=f"You don't have access to project {project_id}",
|
|
error_code=ErrorCode.INSUFFICIENT_PERMISSIONS,
|
|
)
|
|
|
|
# Create and publish test event using the Event schema
|
|
event = EventBus.create_event(
|
|
event_type=EventType.AGENT_MESSAGE,
|
|
project_id=project_id,
|
|
actor_type="user",
|
|
actor_id=current_user.id,
|
|
payload={
|
|
"message": "Test event from SSE endpoint",
|
|
"message_type": "info",
|
|
},
|
|
)
|
|
|
|
channel = event_bus.get_project_channel(project_id)
|
|
await event_bus.publish(channel, event)
|
|
|
|
logger.info(f"Test event sent to project {project_id}: {event.id}")
|
|
|
|
return {
|
|
"success": True,
|
|
"event_id": event.id,
|
|
"event_type": event.type.value,
|
|
"message": "Test event sent successfully",
|
|
}
|