From acfda1e9a96f46ba04ed1f4e8053d916cff97889 Mon Sep 17 00:00:00 2001 From: Felipe Cardoso Date: Tue, 30 Dec 2025 02:08:03 +0100 Subject: [PATCH] feat(backend): Add SSE endpoint for project event streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add /projects/{project_id}/events/stream SSE endpoint - Add event_bus dependency injection - Add project access authorization (placeholder) - Add test event endpoint for development - Add keepalive comments every 30 seconds - Add reconnection support via Last-Event-ID header - Add rate limiting (10/minute per IP) - Mount events router in API - Add sse-starlette dependency - Add 19 comprehensive tests for SSE functionality Implements #34 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/app/api/dependencies/event_bus.py | 36 ++ backend/app/api/main.py | 3 + backend/app/api/routes/events.py | 283 ++++++++++++ backend/pyproject.toml | 10 +- backend/tests/api/routes/test_events.py | 525 ++++++++++++++++++++++ backend/uv.lock | 159 +++++++ 6 files changed, 1009 insertions(+), 7 deletions(-) create mode 100644 backend/app/api/dependencies/event_bus.py create mode 100644 backend/app/api/routes/events.py create mode 100644 backend/tests/api/routes/test_events.py diff --git a/backend/app/api/dependencies/event_bus.py b/backend/app/api/dependencies/event_bus.py new file mode 100644 index 0000000..97a1a6d --- /dev/null +++ b/backend/app/api/dependencies/event_bus.py @@ -0,0 +1,36 @@ +""" +Event bus dependency for FastAPI routes. + +This module provides the FastAPI dependency for injecting the EventBus +into route handlers. The event bus is a singleton that maintains +Redis pub/sub connections for real-time event streaming. +""" + +from app.services.event_bus import ( + EventBus, + get_connected_event_bus as _get_connected_event_bus, +) + + +async def get_event_bus() -> EventBus: + """ + FastAPI dependency that provides a connected EventBus instance. + + The EventBus is a singleton that maintains Redis pub/sub connections. + It's lazily initialized and connected on first access, and should be + closed during application shutdown via close_event_bus(). + + Usage: + @router.get("/events/stream") + async def stream_events( + event_bus: EventBus = Depends(get_event_bus) + ): + ... + + Returns: + EventBus: The global connected event bus instance + + Raises: + EventBusConnectionError: If connection to Redis fails + """ + return await _get_connected_event_bus() diff --git a/backend/app/api/main.py b/backend/app/api/main.py index 916ef58..e4374a5 100644 --- a/backend/app/api/main.py +++ b/backend/app/api/main.py @@ -3,6 +3,7 @@ from fastapi import APIRouter from app.api.routes import ( admin, auth, + events, oauth, oauth_provider, organizations, @@ -22,3 +23,5 @@ api_router.include_router(admin.router, prefix="/admin", tags=["Admin"]) api_router.include_router( organizations.router, prefix="/organizations", tags=["Organizations"] ) +# SSE events router - no prefix, routes define full paths +api_router.include_router(events.router, tags=["Events"]) diff --git a/backend/app/api/routes/events.py b/backend/app/api/routes/events.py new file mode 100644 index 0000000..3ab150c --- /dev/null +++ b/backend/app/api/routes/events.py @@ -0,0 +1,283 @@ +""" +SSE endpoint for real-time project event streaming. + +This module provides Server-Sent Events (SSE) endpoints for streaming +project events to connected clients. Events are scoped to projects, +with authorization checks to ensure clients only receive events +for projects they have access to. + +Features: +- Real-time event streaming via SSE +- Project-scoped authorization +- Automatic reconnection support (Last-Event-ID) +- Keepalive messages every 30 seconds +- Graceful connection cleanup +""" + +import asyncio +import json +import logging +from uuid import UUID + +from fastapi import APIRouter, Depends, Header, Request +from slowapi import Limiter +from slowapi.util import get_remote_address +from sse_starlette.sse import EventSourceResponse + +from app.api.dependencies.auth import get_current_user +from app.api.dependencies.event_bus import get_event_bus +from app.core.exceptions import AuthorizationError +from app.models.user import User +from app.schemas.errors import ErrorCode +from app.schemas.events import EventType +from app.services.event_bus import EventBus + +logger = logging.getLogger(__name__) + +router = APIRouter() +limiter = Limiter(key_func=get_remote_address) + +# Keepalive interval in seconds +KEEPALIVE_INTERVAL = 30 + + +async def check_project_access( + project_id: UUID, + user: User, +) -> bool: + """ + Check if a user has access to a project's events. + + This is a placeholder implementation that will be replaced + with actual project authorization logic once the Project model + is implemented. Currently allows access for all authenticated users. + + Args: + project_id: The project to check access for + user: The authenticated user + + Returns: + bool: True if user has access, False otherwise + + TODO: Implement actual project authorization + - Check if user owns the project + - Check if user is a member of the project + - Check project visibility settings + """ + # Placeholder: Allow all authenticated users for now + # This will be replaced with actual project ownership/membership check + logger.debug( + f"Project access check for user {user.id} on project {project_id} " + "(placeholder: allowing all authenticated users)" + ) + return True + + +async def event_generator( + project_id: UUID, + event_bus: EventBus, + last_event_id: str | None = None, +): + """ + Generate SSE events for a project. + + This async generator yields SSE-formatted events from the event bus, + including keepalive comments to maintain the connection. + + Args: + project_id: The project to stream events for + event_bus: The EventBus instance + last_event_id: Optional last received event ID for reconnection + + Yields: + dict: SSE event data with 'event', 'data', and optional 'id' fields + """ + try: + async for event_data in event_bus.subscribe_sse( + project_id=project_id, + last_event_id=last_event_id, + keepalive_interval=KEEPALIVE_INTERVAL, + ): + if event_data == "": + # Keepalive - yield SSE comment + yield {"comment": "keepalive"} + else: + # Parse event to extract type and id + try: + event_dict = json.loads(event_data) + event_type = event_dict.get("type", "message") + event_id = event_dict.get("id") + + yield { + "event": event_type, + "data": event_data, + "id": event_id, + } + except json.JSONDecodeError: + # If we can't parse, send as generic message + yield { + "event": "message", + "data": event_data, + } + + except asyncio.CancelledError: + logger.info(f"Event stream cancelled for project {project_id}") + raise + except Exception as e: + logger.error(f"Error in event stream for project {project_id}: {e}") + raise + + +@router.get( + "/projects/{project_id}/events/stream", + summary="Stream Project Events", + description=""" + Stream real-time events for a project via Server-Sent Events (SSE). + + **Authentication**: Required (Bearer token) + **Authorization**: Must have access to the project + + **SSE Event Format**: + ``` + event: agent.status_changed + id: 550e8400-e29b-41d4-a716-446655440000 + data: {"id": "...", "type": "agent.status_changed", "project_id": "...", ...} + + : keepalive + + event: issue.created + id: 550e8400-e29b-41d4-a716-446655440001 + data: {...} + ``` + + **Reconnection**: Include the `Last-Event-ID` header with the last received + event ID to resume from where you left off. + + **Keepalive**: The server sends a comment (`: keepalive`) every 30 seconds + to keep the connection alive. + + **Rate Limit**: 10 connections/minute per IP + """, + response_class=EventSourceResponse, + responses={ + 200: { + "description": "SSE stream established", + "content": {"text/event-stream": {}}, + }, + 401: {"description": "Not authenticated"}, + 403: {"description": "Not authorized to access this project"}, + 404: {"description": "Project not found"}, + }, + operation_id="stream_project_events", +) +@limiter.limit("10/minute") +async def stream_project_events( + request: Request, + project_id: UUID, + current_user: User = Depends(get_current_user), + event_bus: EventBus = Depends(get_event_bus), + last_event_id: str | None = Header(None, alias="Last-Event-ID"), +): + """ + Stream real-time events for a project via SSE. + + This endpoint establishes a persistent SSE connection that streams + project events to the client in real-time. The connection includes: + + - Event streaming: All project events (agent updates, issues, etc.) + - Keepalive: Comment every 30 seconds to maintain connection + - Reconnection: Use Last-Event-ID header to resume after disconnect + + The connection is automatically cleaned up when the client disconnects. + """ + logger.info( + f"SSE connection request for project {project_id} " + f"by user {current_user.id} " + f"(last_event_id={last_event_id})" + ) + + # Check project access + has_access = await check_project_access(project_id, current_user) + if not has_access: + raise AuthorizationError( + message=f"You don't have access to project {project_id}", + error_code=ErrorCode.INSUFFICIENT_PERMISSIONS, + ) + + # Return SSE response + return EventSourceResponse( + event_generator( + project_id=project_id, + event_bus=event_bus, + last_event_id=last_event_id, + ), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering + }, + ) + + +@router.post( + "/projects/{project_id}/events/test", + summary="Send Test Event (Development Only)", + description=""" + Send a test event to a project's event stream. This endpoint is + intended for development and testing purposes. + + **Authentication**: Required (Bearer token) + **Authorization**: Must have access to the project + + **Note**: This endpoint should be disabled or restricted in production. + """, + response_model=dict, + responses={ + 200: {"description": "Test event sent"}, + 401: {"description": "Not authenticated"}, + 403: {"description": "Not authorized to access this project"}, + }, + operation_id="send_test_event", +) +async def send_test_event( + project_id: UUID, + current_user: User = Depends(get_current_user), + event_bus: EventBus = Depends(get_event_bus), +): + """ + Send a test event to the project's event stream. + + This is useful for testing SSE connections during development. + """ + # Check project access + has_access = await check_project_access(project_id, current_user) + if not has_access: + raise AuthorizationError( + message=f"You don't have access to project {project_id}", + error_code=ErrorCode.INSUFFICIENT_PERMISSIONS, + ) + + # Create and publish test event using the Event schema + event = EventBus.create_event( + event_type=EventType.AGENT_MESSAGE, + project_id=project_id, + actor_type="user", + actor_id=current_user.id, + payload={ + "message": "Test event from SSE endpoint", + "message_type": "info", + }, + ) + + channel = event_bus.get_project_channel(project_id) + await event_bus.publish(channel, event) + + logger.info(f"Test event sent to project {project_id}: {event.id}") + + return { + "success": True, + "event_id": event.id, + "event_type": event.type.value, + "message": "Test event sent successfully", + } diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 67f4324..05f9291 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -22,41 +22,37 @@ dependencies = [ "pydantic-settings>=2.2.1", "python-multipart>=0.0.19", "fastapi-utils==0.8.0", - # Database "sqlalchemy>=2.0.29", "alembic>=1.14.1", "psycopg2-binary>=2.9.9", "asyncpg>=0.29.0", "aiosqlite==0.21.0", - # Environment configuration "python-dotenv>=1.0.1", - # API utilities "email-validator>=2.1.0.post1", "ujson>=5.9.0", - # CORS and security "starlette>=0.40.0", "starlette-csrf>=1.4.5", "slowapi>=0.1.9", - # Utilities "httpx>=0.27.0", "tenacity>=8.2.3", "pytz>=2024.1", "pillow>=10.3.0", "apscheduler==3.11.0", - # Security and authentication (pinned for reproducibility) "python-jose==3.4.0", "passlib==1.7.4", "bcrypt==4.2.1", "cryptography==44.0.1", - # OAuth authentication "authlib>=1.3.0", + # Celery for background task processing (Syndarix agent jobs) + "celery[redis]>=5.4.0", + "sse-starlette>=3.1.1", ] # Development dependencies diff --git a/backend/tests/api/routes/test_events.py b/backend/tests/api/routes/test_events.py new file mode 100644 index 0000000..163e179 --- /dev/null +++ b/backend/tests/api/routes/test_events.py @@ -0,0 +1,525 @@ +""" +Tests for the SSE events endpoint. + +This module tests the Server-Sent Events endpoint for project event streaming, +including: +- Authentication and authorization +- SSE stream connection and format +- Keepalive mechanism +- Reconnection support (Last-Event-ID) +- Connection cleanup +""" + +import json +import uuid +from collections.abc import AsyncGenerator +from datetime import UTC, datetime +from unittest.mock import AsyncMock, patch + +import pytest +import pytest_asyncio +from fastapi import status +from httpx import ASGITransport, AsyncClient + +from app.api.dependencies.event_bus import get_event_bus +from app.core.database import get_db +from app.main import app +from app.schemas.events import Event, EventType +from app.services.event_bus import EventBus + + +class MockEventBus: + """Mock EventBus for testing without Redis.""" + + def __init__(self): + self.published_events: list[Event] = [] + self._should_yield_events = True + self._events_to_yield: list[str] = [] + self._connected = True + + @property + def is_connected(self) -> bool: + return self._connected + + async def connect(self) -> None: + self._connected = True + + async def disconnect(self) -> None: + self._connected = False + + def get_project_channel(self, project_id: uuid.UUID | str) -> str: + """Get the channel name for a project.""" + return f"project:{project_id}" + + @staticmethod + def create_event( + event_type: EventType, + project_id: uuid.UUID, + actor_type: str, + payload: dict | None = None, + actor_id: uuid.UUID | None = None, + event_id: str | None = None, + timestamp: datetime | None = None, + ) -> Event: + """Create a new Event.""" + return Event( + id=event_id or str(uuid.uuid4()), + type=event_type, + timestamp=timestamp or datetime.now(UTC), + project_id=project_id, + actor_id=actor_id, + actor_type=actor_type, + payload=payload or {}, + ) + + async def publish(self, channel: str, event: Event) -> int: + """Publish an event to a channel.""" + self.published_events.append(event) + return 1 + + def add_event_to_yield(self, event_json: str) -> None: + """Add an event JSON string to be yielded by subscribe_sse.""" + self._events_to_yield.append(event_json) + + async def subscribe_sse( + self, + project_id: str | uuid.UUID, + last_event_id: str | None = None, + keepalive_interval: int = 30, + ) -> AsyncGenerator[str, None]: + """Mock subscribe_sse that yields pre-configured events then keepalive.""" + # First yield any pre-configured events + for event_data in self._events_to_yield: + yield event_data + + # Then yield keepalive + yield "" + + # Then stop to allow test to complete + self._should_yield_events = False + + +@pytest_asyncio.fixture +async def mock_event_bus(): + """Create a mock event bus for testing.""" + return MockEventBus() + + +@pytest_asyncio.fixture +async def client_with_mock_bus(async_test_db, mock_event_bus): + """ + Create a FastAPI test client with mocked database and event bus. + """ + _test_engine, AsyncTestingSessionLocal = async_test_db + + async def override_get_db(): + async with AsyncTestingSessionLocal() as session: + try: + yield session + finally: + pass + + async def override_get_event_bus(): + return mock_event_bus + + app.dependency_overrides[get_db] = override_get_db + app.dependency_overrides[get_event_bus] = override_get_event_bus + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as test_client: + yield test_client + + app.dependency_overrides.clear() + + +@pytest_asyncio.fixture +async def user_token_with_mock_bus(client_with_mock_bus, async_test_user): + """Create an access token for the test user.""" + response = await client_with_mock_bus.post( + "/api/v1/auth/login", + json={ + "email": async_test_user.email, + "password": "TestPassword123!", + }, + ) + assert response.status_code == 200, f"Login failed: {response.text}" + tokens = response.json() + return tokens["access_token"] + + +class TestSSEEndpointAuthentication: + """Tests for SSE endpoint authentication.""" + + @pytest.mark.asyncio + async def test_stream_events_requires_authentication(self, client_with_mock_bus): + """Test that SSE endpoint requires authentication.""" + project_id = uuid.uuid4() + + response = await client_with_mock_bus.get( + f"/api/v1/projects/{project_id}/events/stream", + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + @pytest.mark.asyncio + async def test_stream_events_with_invalid_token(self, client_with_mock_bus): + """Test that SSE endpoint rejects invalid tokens.""" + project_id = uuid.uuid4() + + response = await client_with_mock_bus.get( + f"/api/v1/projects/{project_id}/events/stream", + headers={"Authorization": "Bearer invalid_token"}, + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +class TestSSEEndpointStream: + """Tests for SSE stream functionality.""" + + @pytest.mark.asyncio + async def test_stream_events_returns_sse_response( + self, client_with_mock_bus, user_token_with_mock_bus + ): + """Test that SSE endpoint returns proper SSE response.""" + project_id = uuid.uuid4() + + # Make request with a timeout to avoid hanging + response = await client_with_mock_bus.get( + f"/api/v1/projects/{project_id}/events/stream", + headers={"Authorization": f"Bearer {user_token_with_mock_bus}"}, + timeout=5.0, + ) + + # The response should start streaming + assert response.status_code == status.HTTP_200_OK + assert "text/event-stream" in response.headers.get("content-type", "") + + @pytest.mark.asyncio + async def test_stream_events_with_events( + self, client_with_mock_bus, user_token_with_mock_bus, mock_event_bus + ): + """Test that SSE endpoint yields events.""" + project_id = uuid.uuid4() + + # Create a test event and add it to the mock bus + test_event = Event( + id=str(uuid.uuid4()), + type=EventType.AGENT_MESSAGE, + timestamp=datetime.now(UTC), + project_id=project_id, + actor_type="agent", + payload={"message": "test"}, + ) + mock_event_bus.add_event_to_yield(test_event.model_dump_json()) + + # Request the stream + response = await client_with_mock_bus.get( + f"/api/v1/projects/{project_id}/events/stream", + headers={"Authorization": f"Bearer {user_token_with_mock_bus}"}, + timeout=5.0, + ) + + assert response.status_code == status.HTTP_200_OK + + # Check response contains event data + content = response.text + assert "agent.message" in content or "data:" in content + + @pytest.mark.asyncio + async def test_stream_events_with_last_event_id( + self, client_with_mock_bus, user_token_with_mock_bus + ): + """Test that Last-Event-ID header is accepted.""" + project_id = uuid.uuid4() + last_event_id = str(uuid.uuid4()) + + response = await client_with_mock_bus.get( + f"/api/v1/projects/{project_id}/events/stream", + headers={ + "Authorization": f"Bearer {user_token_with_mock_bus}", + "Last-Event-ID": last_event_id, + }, + timeout=5.0, + ) + + # Should accept the header and return OK + assert response.status_code == status.HTTP_200_OK + + +class TestSSEEndpointHeaders: + """Tests for SSE response headers.""" + + @pytest.mark.asyncio + async def test_stream_events_cache_control_header( + self, client_with_mock_bus, user_token_with_mock_bus + ): + """Test that SSE response has no-cache header.""" + project_id = uuid.uuid4() + + response = await client_with_mock_bus.get( + f"/api/v1/projects/{project_id}/events/stream", + headers={"Authorization": f"Bearer {user_token_with_mock_bus}"}, + timeout=5.0, + ) + + assert response.status_code == status.HTTP_200_OK + cache_control = response.headers.get("cache-control", "") + assert "no-cache" in cache_control.lower() + + +class TestTestEventEndpoint: + """Tests for the test event endpoint.""" + + @pytest.mark.asyncio + async def test_send_test_event_requires_auth(self, client_with_mock_bus): + """Test that test event endpoint requires authentication.""" + project_id = uuid.uuid4() + + response = await client_with_mock_bus.post( + f"/api/v1/projects/{project_id}/events/test", + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + @pytest.mark.asyncio + async def test_send_test_event_success( + self, client_with_mock_bus, user_token_with_mock_bus, mock_event_bus + ): + """Test sending a test event.""" + project_id = uuid.uuid4() + + response = await client_with_mock_bus.post( + f"/api/v1/projects/{project_id}/events/test", + headers={"Authorization": f"Bearer {user_token_with_mock_bus}"}, + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["success"] is True + assert "event_id" in data + assert data["event_type"] == "agent.message" + + # Verify event was published + assert len(mock_event_bus.published_events) == 1 + published = mock_event_bus.published_events[0] + assert published.type == EventType.AGENT_MESSAGE + assert published.project_id == project_id + + +class TestEventSchema: + """Tests for the Event schema.""" + + def test_event_creation(self): + """Test Event creation with required fields.""" + project_id = uuid.uuid4() + event = Event( + id=str(uuid.uuid4()), + type=EventType.AGENT_MESSAGE, + timestamp=datetime.now(UTC), + project_id=project_id, + actor_type="agent", + payload={"message": "test"}, + ) + + assert event.id is not None + assert event.type == EventType.AGENT_MESSAGE + assert event.project_id == project_id + assert event.actor_type == "agent" + assert event.payload == {"message": "test"} + + def test_event_json_serialization(self): + """Test Event JSON serialization.""" + project_id = uuid.uuid4() + event = Event( + id="test-id", + type=EventType.AGENT_STATUS_CHANGED, + timestamp=datetime.now(UTC), + project_id=project_id, + actor_type="system", + payload={"status": "running"}, + ) + + json_str = event.model_dump_json() + parsed = json.loads(json_str) + + assert parsed["id"] == "test-id" + assert parsed["type"] == "agent.status_changed" + assert str(parsed["project_id"]) == str(project_id) + assert parsed["payload"]["status"] == "running" + + +class TestEventBusUnit: + """Unit tests for EventBus class.""" + + @pytest.mark.asyncio + async def test_event_bus_not_connected_raises(self): + """Test that accessing redis_client before connect raises.""" + from app.services.event_bus import EventBusConnectionError + + bus = EventBus() + + with pytest.raises(EventBusConnectionError, match="not connected"): + _ = bus.redis_client + + @pytest.mark.asyncio + async def test_event_bus_channel_names(self): + """Test channel name generation.""" + bus = EventBus() + project_id = uuid.uuid4() + agent_id = uuid.uuid4() + user_id = uuid.uuid4() + + assert bus.get_project_channel(project_id) == f"project:{project_id}" + assert bus.get_agent_channel(agent_id) == f"agent:{agent_id}" + assert bus.get_user_channel(user_id) == f"user:{user_id}" + + @pytest.mark.asyncio + async def test_event_bus_sequence_counter(self): + """Test sequence counter increments.""" + bus = EventBus() + channel = "test-channel" + + seq1 = bus._get_next_sequence(channel) + seq2 = bus._get_next_sequence(channel) + seq3 = bus._get_next_sequence(channel) + + assert seq1 == 1 + assert seq2 == 2 + assert seq3 == 3 + + @pytest.mark.asyncio + async def test_event_bus_sequence_per_channel(self): + """Test sequence counter is per-channel.""" + bus = EventBus() + + seq1 = bus._get_next_sequence("channel-1") + seq2 = bus._get_next_sequence("channel-2") + seq3 = bus._get_next_sequence("channel-1") + + assert seq1 == 1 + assert seq2 == 1 # Different channel starts at 1 + assert seq3 == 2 + + def test_event_bus_create_event(self): + """Test EventBus.create_event factory method.""" + project_id = uuid.uuid4() + actor_id = uuid.uuid4() + + event = EventBus.create_event( + event_type=EventType.ISSUE_CREATED, + project_id=project_id, + actor_type="user", + actor_id=actor_id, + payload={"title": "Test Issue"}, + ) + + assert event.type == EventType.ISSUE_CREATED + assert event.project_id == project_id + assert event.actor_id == actor_id + assert event.actor_type == "user" + assert event.payload == {"title": "Test Issue"} + + +class TestEventBusIntegration: + """Integration tests for EventBus with mocked Redis.""" + + @pytest.mark.asyncio + async def test_event_bus_connect_disconnect(self): + """Test EventBus connect and disconnect.""" + with patch("app.services.event_bus.redis.from_url") as mock_redis: + mock_client = AsyncMock() + mock_redis.return_value = mock_client + mock_client.ping = AsyncMock() + mock_client.pubsub = lambda: AsyncMock() + + bus = EventBus(redis_url="redis://localhost:6379/0") + + # Connect + await bus.connect() + mock_client.ping.assert_called_once() + assert bus._redis_client is not None + assert bus.is_connected + + # Disconnect + await bus.disconnect() + mock_client.aclose.assert_called_once() + assert bus._redis_client is None + assert not bus.is_connected + + @pytest.mark.asyncio + async def test_event_bus_publish(self): + """Test EventBus event publishing.""" + with patch("app.services.event_bus.redis.from_url") as mock_redis: + mock_client = AsyncMock() + mock_redis.return_value = mock_client + mock_client.ping = AsyncMock() + mock_client.publish = AsyncMock(return_value=1) + mock_client.pubsub = lambda: AsyncMock() + + bus = EventBus() + await bus.connect() + + project_id = uuid.uuid4() + event = EventBus.create_event( + event_type=EventType.AGENT_SPAWNED, + project_id=project_id, + actor_type="system", + payload={"agent_name": "test-agent"}, + ) + + channel = bus.get_project_channel(project_id) + result = await bus.publish(channel, event) + + # Verify publish was called + mock_client.publish.assert_called_once() + call_args = mock_client.publish.call_args + + # Check channel name + assert call_args[0][0] == f"project:{project_id}" + + # Check result + assert result == 1 + + await bus.disconnect() + + @pytest.mark.asyncio + async def test_event_bus_connect_failure(self): + """Test EventBus handles connection failure.""" + from app.services.event_bus import EventBusConnectionError + + with patch("app.services.event_bus.redis.from_url") as mock_redis: + mock_client = AsyncMock() + mock_redis.return_value = mock_client + + import redis.asyncio as redis_async + + mock_client.ping = AsyncMock( + side_effect=redis_async.ConnectionError("Connection refused") + ) + + bus = EventBus() + + with pytest.raises(EventBusConnectionError, match="Failed to connect"): + await bus.connect() + + @pytest.mark.asyncio + async def test_event_bus_already_connected(self): + """Test EventBus connect when already connected is a no-op.""" + with patch("app.services.event_bus.redis.from_url") as mock_redis: + mock_client = AsyncMock() + mock_redis.return_value = mock_client + mock_client.ping = AsyncMock() + mock_client.pubsub = lambda: AsyncMock() + + bus = EventBus() + + # First connect + await bus.connect() + assert mock_client.ping.call_count == 1 + + # Second connect should be a no-op + await bus.connect() + assert mock_client.ping.call_count == 1 + + await bus.disconnect() diff --git a/backend/uv.lock b/backend/uv.lock index 46b988c..6c26b7d 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -28,6 +28,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a5/32/7df1d81ec2e50fb661944a35183d87e62d3f6c6d9f8aff64a4f245226d55/alembic-1.17.1-py3-none-any.whl", hash = "sha256:cbc2386e60f89608bb63f30d2d6cc66c7aaed1fe105bd862828600e5ad167023", size = 247848, upload-time = "2025-10-29T00:23:18.79Z" }, ] +[[package]] +name = "amqp" +version = "5.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "vine" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/79/fc/ec94a357dfc6683d8c86f8b4cfa5416a4c36b28052ec8260c77aca96a443/amqp-5.3.1.tar.gz", hash = "sha256:cddc00c725449522023bad949f70fff7b48f0b1ade74d170a6f10ab044739432", size = 129013, upload-time = "2024-11-12T19:55:44.051Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/26/99/fc813cd978842c26c82534010ea849eee9ab3a13ea2b74e95cb9c99e747b/amqp-5.3.1-py3-none-any.whl", hash = "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2", size = 50944, upload-time = "2024-11-12T19:55:41.782Z" }, +] + [[package]] name = "annotated-doc" version = "0.0.3" @@ -160,6 +172,40 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/76/b9/d51d34e6cd6d887adddb28a8680a1d34235cc45b9d6e238ce39b98199ca0/bcrypt-4.2.1-cp39-abi3-win_amd64.whl", hash = "sha256:e84e0e6f8e40a242b11bce56c313edc2be121cec3e0ec2d76fce01f6af33c07c", size = 153078, upload-time = "2024-11-19T20:08:01.436Z" }, ] +[[package]] +name = "billiard" +version = "4.2.4" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/58/23/b12ac0bcdfb7360d664f40a00b1bda139cbbbced012c34e375506dbd0143/billiard-4.2.4.tar.gz", hash = "sha256:55f542c371209e03cd5862299b74e52e4fbcba8250ba611ad94276b369b6a85f", size = 156537, upload-time = "2025-11-30T13:28:48.52Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cb/87/8bab77b323f16d67be364031220069f79159117dd5e43eeb4be2fef1ac9b/billiard-4.2.4-py3-none-any.whl", hash = "sha256:525b42bdec68d2b983347ac312f892db930858495db601b5836ac24e6477cde5", size = 87070, upload-time = "2025-11-30T13:28:47.016Z" }, +] + +[[package]] +name = "celery" +version = "5.6.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "billiard" }, + { name = "click" }, + { name = "click-didyoumean" }, + { name = "click-plugins" }, + { name = "click-repl" }, + { name = "kombu" }, + { name = "python-dateutil" }, + { name = "tzlocal" }, + { name = "vine" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/e2/1b/b9bbe49b1f799d0ee3de91c66e6b61d095139721f5a2ae25585f49d7c7a9/celery-5.6.1.tar.gz", hash = "sha256:bdc9e02b1480dd137f2df392358c3e94bb623d4f47ae1bc0a7dc5821c90089c7", size = 1716388, upload-time = "2025-12-29T21:48:50.805Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/87/b1/7b7d1e0bc2a3f7ee01576008e3c943f3f23a56809b63f4140ddc96f201c1/celery-5.6.1-py3-none-any.whl", hash = "sha256:ee87aa14d344c655fe83bfc44b2c93bbb7cba39ae11e58b88279523506159d44", size = 445358, upload-time = "2025-12-29T21:48:48.894Z" }, +] + +[package.optional-dependencies] +redis = [ + { name = "kombu", extra = ["redis"] }, +] + [[package]] name = "certifi" version = "2025.10.5" @@ -295,6 +341,43 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/db/d3/9dcc0f5797f070ec8edf30fbadfb200e71d9db6b84d211e3b2085a7589a0/click-8.3.0-py3-none-any.whl", hash = "sha256:9b9f285302c6e3064f4330c05f05b81945b2a39544279343e6e7c5f27a9baddc", size = 107295, upload-time = "2025-09-18T17:32:22.42Z" }, ] +[[package]] +name = "click-didyoumean" +version = "0.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/30/ce/217289b77c590ea1e7c24242d9ddd6e249e52c795ff10fac2c50062c48cb/click_didyoumean-0.3.1.tar.gz", hash = "sha256:4f82fdff0dbe64ef8ab2279bd6aa3f6a99c3b28c05aa09cbfc07c9d7fbb5a463", size = 3089, upload-time = "2024-03-24T08:22:07.499Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1b/5b/974430b5ffdb7a4f1941d13d83c64a0395114503cc357c6b9ae4ce5047ed/click_didyoumean-0.3.1-py3-none-any.whl", hash = "sha256:5c4bb6007cfea5f2fd6583a2fb6701a22a41eb98957e63d0fac41c10e7c3117c", size = 3631, upload-time = "2024-03-24T08:22:06.356Z" }, +] + +[[package]] +name = "click-plugins" +version = "1.1.1.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c3/a4/34847b59150da33690a36da3681d6bbc2ec14ee9a846bc30a6746e5984e4/click_plugins-1.1.1.2.tar.gz", hash = "sha256:d7af3984a99d243c131aa1a828331e7630f4a88a9741fd05c927b204bcf92261", size = 8343, upload-time = "2025-06-25T00:47:37.555Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3d/9a/2abecb28ae875e39c8cad711eb1186d8d14eab564705325e77e4e6ab9ae5/click_plugins-1.1.1.2-py2.py3-none-any.whl", hash = "sha256:008d65743833ffc1f5417bf0e78e8d2c23aab04d9745ba817bd3e71b0feb6aa6", size = 11051, upload-time = "2025-06-25T00:47:36.731Z" }, +] + +[[package]] +name = "click-repl" +version = "0.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "prompt-toolkit" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cb/a2/57f4ac79838cfae6912f997b4d1a64a858fb0c86d7fcaae6f7b58d267fca/click-repl-0.3.0.tar.gz", hash = "sha256:17849c23dba3d667247dc4defe1757fff98694e90fe37474f3feebb69ced26a9", size = 10449, upload-time = "2023-06-15T12:43:51.141Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/52/40/9d857001228658f0d59e97ebd4c346fe73e138c6de1bce61dc568a57c7f8/click_repl-0.3.0-py3-none-any.whl", hash = "sha256:fb7e06deb8da8de86180a33a9da97ac316751c094c6899382da7feeeeb51b812", size = 10289, upload-time = "2023-06-15T12:43:48.626Z" }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -493,6 +576,7 @@ dependencies = [ { name = "asyncpg" }, { name = "authlib" }, { name = "bcrypt" }, + { name = "celery", extra = ["redis"] }, { name = "cryptography" }, { name = "email-validator" }, { name = "fastapi" }, @@ -509,6 +593,7 @@ dependencies = [ { name = "pytz" }, { name = "slowapi" }, { name = "sqlalchemy" }, + { name = "sse-starlette" }, { name = "starlette" }, { name = "starlette-csrf" }, { name = "tenacity" }, @@ -540,6 +625,7 @@ requires-dist = [ { name = "asyncpg", specifier = ">=0.29.0" }, { name = "authlib", specifier = ">=1.3.0" }, { name = "bcrypt", specifier = "==4.2.1" }, + { name = "celery", extras = ["redis"], specifier = ">=5.4.0" }, { name = "cryptography", specifier = "==44.0.1" }, { name = "email-validator", specifier = ">=2.1.0.post1" }, { name = "fastapi", specifier = ">=0.115.8" }, @@ -565,6 +651,7 @@ requires-dist = [ { name = "schemathesis", marker = "extra == 'e2e'", specifier = ">=3.30.0" }, { name = "slowapi", specifier = ">=0.1.9" }, { name = "sqlalchemy", specifier = ">=2.0.29" }, + { name = "sse-starlette", specifier = ">=3.1.1" }, { name = "starlette", specifier = ">=0.40.0" }, { name = "starlette-csrf", specifier = ">=1.4.5" }, { name = "tenacity", specifier = ">=8.2.3" }, @@ -855,6 +942,26 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/93/2d896b5fd3d79b4cadd8882c06650e66d003f465c9d12c488d92853dff78/junit_xml-1.9-py2.py3-none-any.whl", hash = "sha256:ec5ca1a55aefdd76d28fcc0b135251d156c7106fa979686a4b48d62b761b4732", size = 7130, upload-time = "2020-02-22T20:41:37.661Z" }, ] +[[package]] +name = "kombu" +version = "5.6.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "amqp" }, + { name = "packaging" }, + { name = "tzdata" }, + { name = "vine" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b6/a5/607e533ed6c83ae1a696969b8e1c137dfebd5759a2e9682e26ff1b97740b/kombu-5.6.2.tar.gz", hash = "sha256:8060497058066c6f5aed7c26d7cd0d3b574990b09de842a8c5aaed0b92cc5a55", size = 472594, upload-time = "2025-12-29T20:30:07.779Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fb/0f/834427d8c03ff1d7e867d3db3d176470c64871753252b21b4f4897d1fa45/kombu-5.6.2-py3-none-any.whl", hash = "sha256:efcfc559da324d41d61ca311b0c64965ea35b4c55cc04ee36e55386145dace93", size = 214219, upload-time = "2025-12-29T20:30:05.74Z" }, +] + +[package.optional-dependencies] +redis = [ + { name = "redis" }, +] + [[package]] name = "limits" version = "5.6.0" @@ -1111,6 +1218,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, ] +[[package]] +name = "prompt-toolkit" +version = "3.0.52" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "wcwidth" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a1/96/06e01a7b38dce6fe1db213e061a4602dd6032a8a97ef6c1a862537732421/prompt_toolkit-3.0.52.tar.gz", hash = "sha256:28cde192929c8e7321de85de1ddbe736f1375148b02f2e17edd840042b1be855", size = 434198, upload-time = "2025-08-27T15:24:02.057Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/84/03/0d3ce49e2505ae70cf43bc5bb3033955d2fc9f932163e84dc0779cc47f48/prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955", size = 391431, upload-time = "2025-08-27T15:23:59.498Z" }, +] + [[package]] name = "psutil" version = "5.9.8" @@ -1486,6 +1605,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" }, ] +[[package]] +name = "redis" +version = "6.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/0d/d6/e8b92798a5bd67d659d51a18170e91c16ac3b59738d91894651ee255ed49/redis-6.4.0.tar.gz", hash = "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010", size = 4647399, upload-time = "2025-08-07T08:10:11.441Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e8/02/89e2ed7e85db6c93dfa9e8f691c5087df4e3551ab39081a4d7c6d1f90e05/redis-6.4.0-py3-none-any.whl", hash = "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f", size = 279847, upload-time = "2025-08-07T08:10:09.84Z" }, +] + [[package]] name = "referencing" version = "0.37.0" @@ -1766,6 +1894,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9c/5e/6a29fa884d9fb7ddadf6b69490a9d45fded3b38541713010dad16b77d015/sqlalchemy-2.0.44-py3-none-any.whl", hash = "sha256:19de7ca1246fbef9f9d1bff8f1ab25641569df226364a0e40457dc5457c54b05", size = 1928718, upload-time = "2025-10-10T15:29:45.32Z" }, ] +[[package]] +name = "sse-starlette" +version = "3.1.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "starlette" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/62/08/8f554b0e5bad3e4e880521a1686d96c05198471eed860b0eb89b57ea3636/sse_starlette-3.1.1.tar.gz", hash = "sha256:bffa531420c1793ab224f63648c059bcadc412bf9fdb1301ac8de1cf9a67b7fb", size = 24306, upload-time = "2025-12-26T15:22:53.836Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e3/31/4c281581a0f8de137b710a07f65518b34bcf333b201cfa06cfda9af05f8a/sse_starlette-3.1.1-py3-none-any.whl", hash = "sha256:bb38f71ae74cfd86b529907a9fda5632195dfa6ae120f214ea4c890c7ee9d436", size = 12442, upload-time = "2025-12-26T15:22:52.911Z" }, +] + [[package]] name = "starlette" version = "0.49.3" @@ -1955,6 +2096,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/d9/d88e73ca598f4f6ff671fb5fde8a32925c2e08a637303a1d12883c7305fa/uvicorn-0.38.0-py3-none-any.whl", hash = "sha256:48c0afd214ceb59340075b4a052ea1ee91c16fbc2a9b1469cca0e54566977b02", size = 68109, upload-time = "2025-10-18T13:46:42.958Z" }, ] +[[package]] +name = "vine" +version = "5.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/bd/e4/d07b5f29d283596b9727dd5275ccbceb63c44a1a82aa9e4bfd20426762ac/vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0", size = 48980, upload-time = "2023-11-05T08:46:53.857Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/03/ff/7c0c86c43b3cbb927e0ccc0255cb4057ceba4799cd44ae95174ce8e8b5b2/vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc", size = 9636, upload-time = "2023-11-05T08:46:51.205Z" }, +] + +[[package]] +name = "wcwidth" +version = "0.2.14" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/24/30/6b0809f4510673dc723187aeaf24c7f5459922d01e2f794277a3dfb90345/wcwidth-0.2.14.tar.gz", hash = "sha256:4d478375d31bc5395a3c55c40ccdf3354688364cd61c4f6adacaa9215d0b3605", size = 102293, upload-time = "2025-09-22T16:29:53.023Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/af/b5/123f13c975e9f27ab9c0770f514345bd406d0e8d3b7a0723af9d43f710af/wcwidth-0.2.14-py2.py3-none-any.whl", hash = "sha256:a7bb560c8aee30f9957e5f9895805edd20602f2d7f720186dfd906e82b4982e1", size = 37286, upload-time = "2025-09-22T16:29:51.641Z" }, +] + [[package]] name = "webcolors" version = "25.10.0"