diff --git a/backend/app/alembic/versions/0004_add_syndarix_models.py b/backend/app/alembic/versions/0004_add_syndarix_models.py new file mode 100644 index 0000000..30c3c82 --- /dev/null +++ b/backend/app/alembic/versions/0004_add_syndarix_models.py @@ -0,0 +1,488 @@ +"""Add Syndarix models + +Revision ID: 0004 +Revises: 0003 +Create Date: 2025-12-30 + +This migration creates the core Syndarix domain tables: +- projects: Client engagement projects +- agent_types: Agent template configurations +- agent_instances: Spawned agent instances assigned to projects +- issues: Work items (stories, tasks, bugs) +- sprints: Sprint containers for issues +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "0004" +down_revision: str | None = "0003" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Create Syndarix domain tables.""" + + # Create ENUM types first + op.execute( + """ + CREATE TYPE autonomy_level AS ENUM ( + 'full_control', 'milestone', 'autonomous' + ) + """ + ) + op.execute( + """ + CREATE TYPE project_status AS ENUM ( + 'active', 'paused', 'completed', 'archived' + ) + """ + ) + op.execute( + """ + CREATE TYPE project_complexity AS ENUM ( + 'script', 'simple', 'medium', 'complex' + ) + """ + ) + op.execute( + """ + CREATE TYPE client_mode AS ENUM ( + 'technical', 'auto' + ) + """ + ) + op.execute( + """ + CREATE TYPE agent_status AS ENUM ( + 'idle', 'working', 'waiting', 'paused', 'terminated' + ) + """ + ) + op.execute( + """ + CREATE TYPE issue_status AS ENUM ( + 'open', 'in_progress', 'in_review', 'closed', 'blocked' + ) + """ + ) + op.execute( + """ + CREATE TYPE issue_priority AS ENUM ( + 'critical', 'high', 'medium', 'low' + ) + """ + ) + op.execute( + """ + CREATE TYPE external_tracker_type AS ENUM ( + 'gitea', 'github', 'gitlab', 'jira' + ) + """ + ) + op.execute( + """ + CREATE TYPE sync_status AS ENUM ( + 'synced', 'pending', 'conflict', 'error' + ) + """ + ) + op.execute( + """ + CREATE TYPE sprint_status AS ENUM ( + 'planned', 'active', 'completed', 'cancelled' + ) + """ + ) + + # Create projects table + op.create_table( + "projects", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("name", sa.String(255), nullable=False), + sa.Column("slug", sa.String(255), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column( + "autonomy_level", + sa.Enum( + "full_control", + "milestone", + "autonomous", + name="autonomy_level", + create_type=False, + ), + nullable=False, + server_default="milestone", + ), + sa.Column( + "status", + sa.Enum( + "active", + "paused", + "completed", + "archived", + name="project_status", + create_type=False, + ), + nullable=False, + server_default="active", + ), + sa.Column( + "complexity", + sa.Enum( + "script", + "simple", + "medium", + "complex", + name="project_complexity", + create_type=False, + ), + nullable=False, + server_default="medium", + ), + sa.Column( + "client_mode", + sa.Enum("technical", "auto", name="client_mode", create_type=False), + nullable=False, + server_default="auto", + ), + sa.Column( + "settings", postgresql.JSONB(astext_type=sa.Text()), nullable=False, server_default="{}" + ), + sa.Column("owner_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint( + ["owner_id"], ["users.id"], ondelete="SET NULL" + ), + sa.UniqueConstraint("slug"), + ) + op.create_index("ix_projects_name", "projects", ["name"]) + op.create_index("ix_projects_slug", "projects", ["slug"]) + op.create_index("ix_projects_status", "projects", ["status"]) + op.create_index("ix_projects_autonomy_level", "projects", ["autonomy_level"]) + op.create_index("ix_projects_complexity", "projects", ["complexity"]) + op.create_index("ix_projects_client_mode", "projects", ["client_mode"]) + op.create_index("ix_projects_owner_id", "projects", ["owner_id"]) + op.create_index("ix_projects_slug_status", "projects", ["slug", "status"]) + op.create_index("ix_projects_owner_status", "projects", ["owner_id", "status"]) + op.create_index( + "ix_projects_autonomy_status", "projects", ["autonomy_level", "status"] + ) + op.create_index( + "ix_projects_complexity_status", "projects", ["complexity", "status"] + ) + + # Create agent_types table + op.create_table( + "agent_types", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("name", sa.String(100), nullable=False), + sa.Column("slug", sa.String(100), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column("primary_model", sa.String(100), nullable=False), + sa.Column( + "fallback_models", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default="[]", + ), + sa.Column("system_prompt", sa.Text(), nullable=True), + sa.Column("personality_prompt", sa.Text(), nullable=True), + sa.Column( + "capabilities", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default="[]", + ), + sa.Column( + "default_config", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default="{}", + ), + sa.Column("is_active", sa.Boolean(), nullable=False, server_default="true"), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("slug"), + ) + op.create_index("ix_agent_types_name", "agent_types", ["name"]) + op.create_index("ix_agent_types_slug", "agent_types", ["slug"]) + op.create_index("ix_agent_types_is_active", "agent_types", ["is_active"]) + op.create_index("ix_agent_types_primary_model", "agent_types", ["primary_model"]) + + # Create agent_instances table + op.create_table( + "agent_instances", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("agent_type_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("project_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("name", sa.String(100), nullable=False), + sa.Column( + "status", + sa.Enum( + "idle", + "working", + "waiting", + "paused", + "terminated", + name="agent_status", + create_type=False, + ), + nullable=False, + server_default="idle", + ), + sa.Column("current_task", sa.Text(), nullable=True), + sa.Column( + "config_overrides", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default="{}", + ), + sa.Column( + "metadata", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default="{}", + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint( + ["agent_type_id"], ["agent_types.id"], ondelete="RESTRICT" + ), + sa.ForeignKeyConstraint(["project_id"], ["projects.id"], ondelete="CASCADE"), + ) + op.create_index("ix_agent_instances_name", "agent_instances", ["name"]) + op.create_index("ix_agent_instances_status", "agent_instances", ["status"]) + op.create_index( + "ix_agent_instances_agent_type_id", "agent_instances", ["agent_type_id"] + ) + op.create_index("ix_agent_instances_project_id", "agent_instances", ["project_id"]) + op.create_index( + "ix_agent_instances_project_status", + "agent_instances", + ["project_id", "status"], + ) + op.create_index( + "ix_agent_instances_type_status", + "agent_instances", + ["agent_type_id", "status"], + ) + + # Create sprints table (before issues for FK reference) + op.create_table( + "sprints", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("project_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("name", sa.String(100), nullable=False), + sa.Column("number", sa.Integer(), nullable=False), + sa.Column("goal", sa.Text(), nullable=True), + sa.Column("start_date", sa.Date(), nullable=True), + sa.Column("end_date", sa.Date(), nullable=True), + sa.Column( + "status", + sa.Enum( + "planned", + "active", + "completed", + "cancelled", + name="sprint_status", + create_type=False, + ), + nullable=False, + server_default="planned", + ), + sa.Column("planned_points", sa.Integer(), nullable=True), + sa.Column("velocity", sa.Integer(), nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint(["project_id"], ["projects.id"], ondelete="CASCADE"), + sa.UniqueConstraint("project_id", "number", name="uq_sprint_project_number"), + ) + op.create_index("ix_sprints_name", "sprints", ["name"]) + op.create_index("ix_sprints_number", "sprints", ["number"]) + op.create_index("ix_sprints_status", "sprints", ["status"]) + op.create_index("ix_sprints_project_id", "sprints", ["project_id"]) + op.create_index("ix_sprints_project_status", "sprints", ["project_id", "status"]) + + # Create issues table + op.create_table( + "issues", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("project_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("sprint_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("assigned_agent_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("title", sa.String(500), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column( + "status", + sa.Enum( + "open", + "in_progress", + "in_review", + "closed", + "blocked", + name="issue_status", + create_type=False, + ), + nullable=False, + server_default="open", + ), + sa.Column( + "priority", + sa.Enum( + "critical", "high", "medium", "low", name="issue_priority", create_type=False + ), + nullable=False, + server_default="medium", + ), + sa.Column("story_points", sa.Integer(), nullable=True), + sa.Column( + "labels", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default="[]", + ), + sa.Column( + "external_tracker", + sa.Enum( + "gitea", + "github", + "gitlab", + "jira", + name="external_tracker_type", + create_type=False, + ), + nullable=True, + ), + sa.Column("external_id", sa.String(255), nullable=True), + sa.Column("external_url", sa.String(2048), nullable=True), + sa.Column("external_number", sa.Integer(), nullable=True), + sa.Column( + "sync_status", + sa.Enum( + "synced", + "pending", + "conflict", + "error", + name="sync_status", + create_type=False, + ), + nullable=True, + ), + sa.Column("last_synced_at", sa.DateTime(timezone=True), nullable=True), + sa.Column( + "metadata", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default="{}", + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint(["project_id"], ["projects.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["sprint_id"], ["sprints.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint( + ["assigned_agent_id"], ["agent_instances.id"], ondelete="SET NULL" + ), + ) + op.create_index("ix_issues_title", "issues", ["title"]) + op.create_index("ix_issues_status", "issues", ["status"]) + op.create_index("ix_issues_priority", "issues", ["priority"]) + op.create_index("ix_issues_project_id", "issues", ["project_id"]) + op.create_index("ix_issues_sprint_id", "issues", ["sprint_id"]) + op.create_index("ix_issues_assigned_agent_id", "issues", ["assigned_agent_id"]) + op.create_index("ix_issues_external_tracker", "issues", ["external_tracker"]) + op.create_index("ix_issues_sync_status", "issues", ["sync_status"]) + op.create_index("ix_issues_project_status", "issues", ["project_id", "status"]) + op.create_index( + "ix_issues_project_status_priority", + "issues", + ["project_id", "status", "priority"], + ) + op.create_index( + "ix_issues_external", + "issues", + ["project_id", "external_tracker", "external_id"], + ) + + +def downgrade() -> None: + """Drop Syndarix domain tables.""" + # Drop tables in reverse order (respecting FK constraints) + op.drop_table("issues") + op.drop_table("sprints") + op.drop_table("agent_instances") + op.drop_table("agent_types") + op.drop_table("projects") + + # Drop ENUM types + op.execute("DROP TYPE IF EXISTS sprint_status") + op.execute("DROP TYPE IF EXISTS sync_status") + op.execute("DROP TYPE IF EXISTS external_tracker_type") + op.execute("DROP TYPE IF EXISTS issue_priority") + op.execute("DROP TYPE IF EXISTS issue_status") + op.execute("DROP TYPE IF EXISTS agent_status") + op.execute("DROP TYPE IF EXISTS client_mode") + op.execute("DROP TYPE IF EXISTS project_complexity") + op.execute("DROP TYPE IF EXISTS project_status") + op.execute("DROP TYPE IF EXISTS autonomy_level") diff --git a/backend/app/api/routes/agents.py b/backend/app/api/routes/agents.py index 7793658..78a2991 100644 --- a/backend/app/api/routes/agents.py +++ b/backend/app/api/routes/agents.py @@ -7,6 +7,7 @@ within their projects, including spawning, pausing, resuming, and terminating ag """ import logging +import os from typing import Any from uuid import UUID @@ -23,6 +24,7 @@ from app.core.exceptions import ( ValidationException, ) from app.crud.syndarix.agent_instance import agent_instance as agent_instance_crud +from app.crud.syndarix.agent_type import agent_type as agent_type_crud from app.crud.syndarix.project import project as project_crud from app.models.syndarix import AgentInstance, Project from app.models.syndarix.enums import AgentStatus @@ -47,6 +49,10 @@ logger = logging.getLogger(__name__) # Initialize limiter for this router limiter = Limiter(key_func=get_remote_address) +# Use higher rate limits in test environment +IS_TEST = os.getenv("IS_TEST", "False") == "True" +RATE_MULTIPLIER = 100 if IS_TEST else 1 + # Valid status transitions for agent lifecycle management VALID_STATUS_TRANSITIONS: dict[AgentStatus, set[AgentStatus]] = { @@ -173,7 +179,7 @@ def build_agent_response( description="Spawn a new agent instance in a project. Requires project ownership or superuser.", operation_id="spawn_agent", ) -@limiter.limit("20/minute") +@limiter.limit(f"{20 * RATE_MULTIPLIER}/minute") async def spawn_agent( request: Request, project_id: UUID, @@ -214,6 +220,20 @@ async def spawn_agent( field="project_id", ) + # Validate that the agent type exists and is active + agent_type = await agent_type_crud.get(db, id=agent_in.agent_type_id) + if not agent_type: + raise NotFoundError( + message=f"Agent type {agent_in.agent_type_id} not found", + error_code=ErrorCode.NOT_FOUND, + ) + if not agent_type.is_active: + raise ValidationException( + message=f"Agent type '{agent_type.name}' is inactive and cannot be used", + error_code=ErrorCode.VALIDATION_ERROR, + field="agent_type_id", + ) + # Create the agent instance agent = await agent_instance_crud.create(db, obj_in=agent_in) @@ -256,7 +276,7 @@ async def spawn_agent( description="List all agent instances in a project with optional filtering.", operation_id="list_project_agents", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def list_project_agents( request: Request, project_id: UUID, @@ -350,7 +370,7 @@ async def list_project_agents( description="Get detailed information about a specific agent instance.", operation_id="get_agent", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def get_agent( request: Request, project_id: UUID, @@ -427,7 +447,7 @@ async def get_agent( description="Update an agent instance's configuration and state.", operation_id="update_agent", ) -@limiter.limit("30/minute") +@limiter.limit(f"{30 * RATE_MULTIPLIER}/minute") async def update_agent( request: Request, project_id: UUID, @@ -522,7 +542,7 @@ async def update_agent( description="Pause an agent instance, temporarily stopping its work.", operation_id="pause_agent", ) -@limiter.limit("20/minute") +@limiter.limit(f"{20 * RATE_MULTIPLIER}/minute") async def pause_agent( request: Request, project_id: UUID, @@ -621,7 +641,7 @@ async def pause_agent( description="Resume a paused agent instance.", operation_id="resume_agent", ) -@limiter.limit("20/minute") +@limiter.limit(f"{20 * RATE_MULTIPLIER}/minute") async def resume_agent( request: Request, project_id: UUID, @@ -720,7 +740,7 @@ async def resume_agent( description="Terminate an agent instance, permanently stopping it.", operation_id="terminate_agent", ) -@limiter.limit("10/minute") +@limiter.limit(f"{10 * RATE_MULTIPLIER}/minute") async def terminate_agent( request: Request, project_id: UUID, @@ -817,7 +837,7 @@ async def terminate_agent( description="Get usage metrics for a specific agent instance.", operation_id="get_agent_metrics", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def get_agent_metrics( request: Request, project_id: UUID, @@ -897,7 +917,7 @@ async def get_agent_metrics( description="Get aggregated usage metrics for all agents in a project.", operation_id="get_project_agent_metrics", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def get_project_agent_metrics( request: Request, project_id: UUID, diff --git a/backend/app/api/routes/events.py b/backend/app/api/routes/events.py index 3ab150c..5d987a8 100644 --- a/backend/app/api/routes/events.py +++ b/backend/app/api/routes/events.py @@ -17,6 +17,7 @@ Features: import asyncio import json import logging +from typing import TYPE_CHECKING from uuid import UUID from fastapi import APIRouter, Depends, Header, Request @@ -26,12 +27,16 @@ from sse_starlette.sse import EventSourceResponse from app.api.dependencies.auth import get_current_user from app.api.dependencies.event_bus import get_event_bus +from app.core.database import get_db from app.core.exceptions import AuthorizationError from app.models.user import User from app.schemas.errors import ErrorCode from app.schemas.events import EventType from app.services.event_bus import EventBus +if TYPE_CHECKING: + from sqlalchemy.ext.asyncio import AsyncSession + logger = logging.getLogger(__name__) router = APIRouter() @@ -44,33 +49,44 @@ KEEPALIVE_INTERVAL = 30 async def check_project_access( project_id: UUID, user: User, + db: "AsyncSession", ) -> bool: """ Check if a user has access to a project's events. - This is a placeholder implementation that will be replaced - with actual project authorization logic once the Project model - is implemented. Currently allows access for all authenticated users. + Authorization rules: + - Superusers can access all projects + - Project owners can access their own projects Args: project_id: The project to check access for user: The authenticated user + db: Database session for project lookup Returns: bool: True if user has access, False otherwise - - TODO: Implement actual project authorization - - Check if user owns the project - - Check if user is a member of the project - - Check project visibility settings """ - # Placeholder: Allow all authenticated users for now - # This will be replaced with actual project ownership/membership check + # Superusers can access all projects + if user.is_superuser: + logger.debug( + f"Project access granted for superuser {user.id} on project {project_id}" + ) + return True + + # Check if user owns the project + from app.crud.syndarix import project as project_crud + + project = await project_crud.get(db, id=project_id) + if not project: + logger.debug(f"Project {project_id} not found for access check") + return False + + has_access = bool(project.owner_id == user.id) logger.debug( - f"Project access check for user {user.id} on project {project_id} " - "(placeholder: allowing all authenticated users)" + f"Project access {'granted' if has_access else 'denied'} " + f"for user {user.id} on project {project_id} (owner: {project.owner_id})" ) - return True + return has_access async def event_generator( @@ -176,6 +192,7 @@ async def stream_project_events( project_id: UUID, current_user: User = Depends(get_current_user), event_bus: EventBus = Depends(get_event_bus), + db: "AsyncSession" = Depends(get_db), last_event_id: str | None = Header(None, alias="Last-Event-ID"), ): """ @@ -197,7 +214,7 @@ async def stream_project_events( ) # Check project access - has_access = await check_project_access(project_id, current_user) + has_access = await check_project_access(project_id, current_user, db) if not has_access: raise AuthorizationError( message=f"You don't have access to project {project_id}", @@ -244,6 +261,7 @@ async def send_test_event( project_id: UUID, current_user: User = Depends(get_current_user), event_bus: EventBus = Depends(get_event_bus), + db: "AsyncSession" = Depends(get_db), ): """ Send a test event to the project's event stream. @@ -251,7 +269,7 @@ async def send_test_event( This is useful for testing SSE connections during development. """ # Check project access - has_access = await check_project_access(project_id, current_user) + has_access = await check_project_access(project_id, current_user, db) if not has_access: raise AuthorizationError( message=f"You don't have access to project {project_id}", diff --git a/backend/app/api/routes/issues.py b/backend/app/api/routes/issues.py index 9d049c1..4273943 100644 --- a/backend/app/api/routes/issues.py +++ b/backend/app/api/routes/issues.py @@ -30,6 +30,7 @@ from app.core.exceptions import ( from app.crud.syndarix.agent_instance import agent_instance as agent_instance_crud from app.crud.syndarix.issue import issue as issue_crud from app.crud.syndarix.project import project as project_crud +from app.crud.syndarix.sprint import sprint as sprint_crud from app.models.syndarix.enums import IssuePriority, IssueStatus, SyncStatus from app.models.user import User from app.schemas.common import ( @@ -200,6 +201,21 @@ async def create_issue( field="assigned_agent_id", ) + # Validate sprint if provided (IDOR prevention) + if issue_in.sprint_id: + sprint = await sprint_crud.get(db, id=issue_in.sprint_id) + if not sprint: + raise NotFoundError( + message=f"Sprint {issue_in.sprint_id} not found", + error_code=ErrorCode.NOT_FOUND, + ) + if sprint.project_id != project_id: + raise ValidationException( + message="Sprint does not belong to this project", + error_code=ErrorCode.VALIDATION_ERROR, + field="sprint_id", + ) + try: issue = await issue_crud.create(db, obj_in=issue_in) logger.info( @@ -470,6 +486,21 @@ async def update_issue( field="assigned_agent_id", ) + # Validate sprint if being updated (IDOR prevention) + if issue_in.sprint_id is not None: + sprint = await sprint_crud.get(db, id=issue_in.sprint_id) + if not sprint: + raise NotFoundError( + message=f"Sprint {issue_in.sprint_id} not found", + error_code=ErrorCode.NOT_FOUND, + ) + if sprint.project_id != project_id: + raise ValidationException( + message="Sprint does not belong to this project", + error_code=ErrorCode.VALIDATION_ERROR, + field="sprint_id", + ) + try: updated_issue = await issue_crud.update(db, db_obj=issue, obj_in=issue_in) logger.info( @@ -693,6 +724,78 @@ async def assign_issue( ) +@router.delete( + "/projects/{project_id}/issues/{issue_id}/assignment", + response_model=IssueResponse, + summary="Unassign Issue", + description=""" + Remove agent/human assignment from an issue. + + **Authentication**: Required (Bearer token) + **Authorization**: Project owner or superuser + + This clears both agent and human assignee fields. + + **Rate Limit**: 60 requests/minute + """, + operation_id="unassign_issue", +) +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") +async def unassign_issue( + request: Request, + project_id: UUID, + issue_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> Any: + """ + Remove assignment from an issue. + + Clears both assigned_agent_id and human_assignee fields. + """ + # Verify project access + await verify_project_ownership(db, project_id, current_user) + + # Get existing issue + issue = await issue_crud.get(db, id=issue_id) + if not issue: + raise NotFoundError( + message=f"Issue {issue_id} not found", + error_code=ErrorCode.NOT_FOUND, + ) + + # Verify issue belongs to project (IDOR prevention) + if issue.project_id != project_id: + raise NotFoundError( + message=f"Issue {issue_id} not found in project {project_id}", + error_code=ErrorCode.NOT_FOUND, + ) + + # Unassign the issue + updated_issue = await issue_crud.unassign(db, issue_id=issue_id) + + if not updated_issue: + raise NotFoundError( + message=f"Issue {issue_id} not found", + error_code=ErrorCode.NOT_FOUND, + ) + + logger.info(f"User {current_user.email} unassigned issue {issue_id}") + + # Get full details for response + issue_data = await issue_crud.get_with_details(db, issue_id=issue_id) + + return _build_issue_response( + updated_issue, + project_name=issue_data.get("project_name") if issue_data else None, + project_slug=issue_data.get("project_slug") if issue_data else None, + sprint_name=issue_data.get("sprint_name") if issue_data else None, + assigned_agent_type_name=issue_data.get("assigned_agent_type_name") + if issue_data + else None, + ) + + # ===== Issue Sync Endpoint ===== diff --git a/backend/app/api/routes/projects.py b/backend/app/api/routes/projects.py index 7c68432..edd051e 100644 --- a/backend/app/api/routes/projects.py +++ b/backend/app/api/routes/projects.py @@ -7,6 +7,7 @@ Users can create, read, update, and manage the lifecycle of their projects. """ import logging +import os from typing import Any from uuid import UUID @@ -22,6 +23,7 @@ from app.core.exceptions import ( DuplicateError, ErrorCode, NotFoundError, + ValidationException, ) from app.crud.syndarix.project import project as project_crud from app.models.syndarix.enums import ProjectStatus @@ -44,6 +46,10 @@ logger = logging.getLogger(__name__) # Initialize rate limiter limiter = Limiter(key_func=get_remote_address) +# Use higher rate limits in test environment +IS_TEST = os.getenv("IS_TEST", "False") == "True" +RATE_MULTIPLIER = 100 if IS_TEST else 1 + def _build_project_response(project_data: dict[str, Any]) -> ProjectResponse: """ @@ -111,7 +117,7 @@ def _check_project_ownership(project: Any, current_user: User) -> None: """, operation_id="create_project", ) -@limiter.limit("10/minute") +@limiter.limit(f"{10 * RATE_MULTIPLIER}/minute") async def create_project( request: Request, project_in: ProjectCreate, @@ -184,7 +190,7 @@ async def create_project( """, operation_id="list_projects", ) -@limiter.limit("30/minute") +@limiter.limit(f"{30 * RATE_MULTIPLIER}/minute") async def list_projects( request: Request, pagination: PaginationParams = Depends(), @@ -247,7 +253,7 @@ async def list_projects( """, operation_id="get_project", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def get_project( request: Request, project_id: UUID, @@ -293,7 +299,7 @@ async def get_project( """, operation_id="get_project_by_slug", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def get_project_by_slug( request: Request, slug: str, @@ -348,7 +354,7 @@ async def get_project_by_slug( """, operation_id="update_project", ) -@limiter.limit("20/minute") +@limiter.limit(f"{20 * RATE_MULTIPLIER}/minute") async def update_project( request: Request, project_id: UUID, @@ -422,7 +428,7 @@ async def update_project( """, operation_id="archive_project", ) -@limiter.limit("10/minute") +@limiter.limit(f"{10 * RATE_MULTIPLIER}/minute") async def archive_project( request: Request, project_id: UUID, @@ -493,7 +499,7 @@ async def archive_project( """, operation_id="pause_project", ) -@limiter.limit("10/minute") +@limiter.limit(f"{10 * RATE_MULTIPLIER}/minute") async def pause_project( request: Request, project_id: UUID, @@ -516,23 +522,26 @@ async def pause_project( _check_project_ownership(project, current_user) - # Validate current status + # Validate current status (business logic validation, not authorization) if project.status == ProjectStatus.PAUSED: - raise AuthorizationError( + raise ValidationException( message="Project is already paused", - error_code=ErrorCode.OPERATION_FORBIDDEN, + error_code=ErrorCode.VALIDATION_ERROR, + field="status", ) if project.status == ProjectStatus.ARCHIVED: - raise AuthorizationError( + raise ValidationException( message="Cannot pause an archived project", - error_code=ErrorCode.OPERATION_FORBIDDEN, + error_code=ErrorCode.VALIDATION_ERROR, + field="status", ) if project.status == ProjectStatus.COMPLETED: - raise AuthorizationError( + raise ValidationException( message="Cannot pause a completed project", - error_code=ErrorCode.OPERATION_FORBIDDEN, + error_code=ErrorCode.VALIDATION_ERROR, + field="status", ) # Update status to PAUSED @@ -552,7 +561,7 @@ async def pause_project( return _build_project_response(project_data) - except (NotFoundError, AuthorizationError): + except (NotFoundError, AuthorizationError, ValidationException): raise except Exception as e: logger.error(f"Error pausing project {project_id}: {e!s}", exc_info=True) @@ -573,7 +582,7 @@ async def pause_project( """, operation_id="resume_project", ) -@limiter.limit("10/minute") +@limiter.limit(f"{10 * RATE_MULTIPLIER}/minute") async def resume_project( request: Request, project_id: UUID, @@ -596,23 +605,26 @@ async def resume_project( _check_project_ownership(project, current_user) - # Validate current status + # Validate current status (business logic validation, not authorization) if project.status == ProjectStatus.ACTIVE: - raise AuthorizationError( + raise ValidationException( message="Project is already active", - error_code=ErrorCode.OPERATION_FORBIDDEN, + error_code=ErrorCode.VALIDATION_ERROR, + field="status", ) if project.status == ProjectStatus.ARCHIVED: - raise AuthorizationError( + raise ValidationException( message="Cannot resume an archived project", - error_code=ErrorCode.OPERATION_FORBIDDEN, + error_code=ErrorCode.VALIDATION_ERROR, + field="status", ) if project.status == ProjectStatus.COMPLETED: - raise AuthorizationError( + raise ValidationException( message="Cannot resume a completed project", - error_code=ErrorCode.OPERATION_FORBIDDEN, + error_code=ErrorCode.VALIDATION_ERROR, + field="status", ) # Update status to ACTIVE @@ -632,7 +644,7 @@ async def resume_project( return _build_project_response(project_data) - except (NotFoundError, AuthorizationError): + except (NotFoundError, AuthorizationError, ValidationException): raise except Exception as e: logger.error(f"Error resuming project {project_id}: {e!s}", exc_info=True) diff --git a/backend/app/api/routes/sprints.py b/backend/app/api/routes/sprints.py index 2033b83..6e7b773 100644 --- a/backend/app/api/routes/sprints.py +++ b/backend/app/api/routes/sprints.py @@ -7,6 +7,7 @@ All endpoints are scoped to a specific project for proper access control. """ import logging +import os from typing import Any from uuid import UUID @@ -37,6 +38,7 @@ from app.schemas.common import ( from app.schemas.errors import ErrorCode from app.schemas.syndarix import ( IssueResponse, + IssueStatus, SprintComplete, SprintCreate, SprintResponse, @@ -51,6 +53,10 @@ logger = logging.getLogger(__name__) router = APIRouter() limiter = Limiter(key_func=get_remote_address) +# Use higher rate limits in test environment +IS_TEST = os.getenv("IS_TEST", "False") == "True" +RATE_MULTIPLIER = 100 if IS_TEST else 1 + # ============================================================================ # Helper Functions @@ -195,7 +201,7 @@ def build_sprint_response( """, operation_id="create_sprint", ) -@limiter.limit("30/minute") +@limiter.limit(f"{30 * RATE_MULTIPLIER}/minute") async def create_sprint( request: Request, project_id: UUID, @@ -258,7 +264,7 @@ async def create_sprint( """, operation_id="list_sprints", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def list_sprints( request: Request, project_id: UUID, @@ -334,7 +340,7 @@ async def list_sprints( """, operation_id="get_active_sprint", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def get_active_sprint( request: Request, project_id: UUID, @@ -394,7 +400,7 @@ async def get_active_sprint( """, operation_id="get_sprint", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def get_sprint( request: Request, project_id: UUID, @@ -455,7 +461,7 @@ async def get_sprint( """, operation_id="update_sprint", ) -@limiter.limit("30/minute") +@limiter.limit(f"{30 * RATE_MULTIPLIER}/minute") async def update_sprint( request: Request, project_id: UUID, @@ -541,7 +547,7 @@ async def update_sprint( """, operation_id="start_sprint", ) -@limiter.limit("10/minute") +@limiter.limit(f"{10 * RATE_MULTIPLIER}/minute") async def start_sprint( request: Request, project_id: UUID, @@ -623,7 +629,7 @@ async def start_sprint( """, operation_id="complete_sprint", ) -@limiter.limit("10/minute") +@limiter.limit(f"{10 * RATE_MULTIPLIER}/minute") async def complete_sprint( request: Request, project_id: UUID, @@ -683,6 +689,162 @@ async def complete_sprint( raise +@router.post( + "/{sprint_id}/cancel", + response_model=SprintResponse, + summary="Cancel Sprint", + description=""" + Cancel a planned or active sprint. + + **Authentication**: Required (Bearer token) + **Authorization**: Project owner or superuser + + **Business Rules**: + - Only PLANNED or ACTIVE sprints can be cancelled + - Issues in the sprint are NOT automatically removed + - Cancelled sprints cannot be reactivated + + **Rate Limit**: 10 requests/minute + """, + operation_id="cancel_sprint", +) +@limiter.limit(f"{10 * RATE_MULTIPLIER}/minute") +async def cancel_sprint( + request: Request, + project_id: UUID, + sprint_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> Any: + """ + Cancel a sprint. + + Cancellation is useful when a sprint needs to be abandoned. + Issues remain in the sprint but can be moved to other sprints. + """ + # Verify project access + await verify_project_ownership(db, project_id, current_user) + + # Verify sprint exists and belongs to project + await get_sprint_or_404(db, sprint_id, project_id) + + try: + cancelled_sprint = await sprint_crud.cancel_sprint(db, sprint_id=sprint_id) + + if not cancelled_sprint: + raise NotFoundError( + message=f"Sprint {sprint_id} not found", + error_code=ErrorCode.NOT_FOUND, + ) + + logger.info( + f"User {current_user.id} cancelled sprint '{cancelled_sprint.name}' " + f"(ID: {sprint_id}) in project {project_id}" + ) + + # Get updated details + details = await sprint_crud.get_with_details(db, sprint_id=sprint_id) + if details: + return build_sprint_response( + sprint=details["sprint"], + issue_count=details["issue_count"], + open_issues=details["open_issues"], + completed_issues=details["completed_issues"], + project_name=details["project_name"], + project_slug=details["project_slug"], + ) + + return build_sprint_response(cancelled_sprint) + + except ValueError as e: + logger.warning(f"Failed to cancel sprint {sprint_id}: {e}") + raise ValidationException( + message=str(e), + error_code=ErrorCode.OPERATION_FORBIDDEN, + ) + except Exception as e: + logger.error(f"Error cancelling sprint {sprint_id}: {e!s}", exc_info=True) + raise + + +@router.delete( + "/{sprint_id}", + response_model=MessageResponse, + summary="Delete Sprint", + description=""" + Delete a sprint permanently. + + **Authentication**: Required (Bearer token) + **Authorization**: Project owner or superuser + + **Business Rules**: + - Only PLANNED or CANCELLED sprints can be deleted + - ACTIVE or COMPLETED sprints must be cancelled first + - Issues in the sprint will have their sprint_id set to NULL + + **Rate Limit**: 10 requests/minute + """, + operation_id="delete_sprint", +) +@limiter.limit(f"{10 * RATE_MULTIPLIER}/minute") +async def delete_sprint( + request: Request, + project_id: UUID, + sprint_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> Any: + """ + Delete a sprint permanently. + + Only PLANNED or CANCELLED sprints can be deleted to preserve + historical data for completed sprints. + """ + # Verify project access + await verify_project_ownership(db, project_id, current_user) + + # Verify sprint exists and belongs to project + sprint = await get_sprint_or_404(db, sprint_id, project_id) + + # Business rule: Only PLANNED or CANCELLED sprints can be deleted + if sprint.status not in [SprintStatus.PLANNED, SprintStatus.CANCELLED]: + raise ValidationException( + message=f"Cannot delete sprint with status '{sprint.status.value}'. " + f"Only PLANNED or CANCELLED sprints can be deleted.", + error_code=ErrorCode.OPERATION_FORBIDDEN, + field="status", + ) + + try: + # Remove sprint assignment from all issues first + await issue_crud.remove_sprint_from_issues(db, sprint_id=sprint_id) + + # Delete the sprint + deleted = await sprint_crud.remove(db, id=sprint_id) + + if not deleted: + raise NotFoundError( + message=f"Sprint {sprint_id} not found", + error_code=ErrorCode.NOT_FOUND, + ) + + logger.info( + f"User {current_user.id} deleted sprint '{sprint.name}' " + f"(ID: {sprint_id}) from project {project_id}" + ) + + return MessageResponse( + success=True, + message=f"Sprint '{sprint.name}' has been deleted.", + ) + + except (NotFoundError, ValidationException): + raise + except Exception as e: + logger.error(f"Error deleting sprint {sprint_id}: {e!s}", exc_info=True) + raise + + # ============================================================================ # Sprint Issues Endpoints # ============================================================================ @@ -704,7 +866,7 @@ async def complete_sprint( """, operation_id="get_sprint_issues", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def get_sprint_issues( request: Request, project_id: UUID, @@ -798,7 +960,7 @@ async def get_sprint_issues( """, operation_id="add_issue_to_sprint", ) -@limiter.limit("30/minute") +@limiter.limit(f"{30 * RATE_MULTIPLIER}/minute") async def add_issue_to_sprint( request: Request, project_id: UUID, @@ -839,6 +1001,18 @@ async def add_issue_to_sprint( error_code=ErrorCode.VALIDATION_ERROR, ) + # Business rule: Cannot add closed issues to active/planned sprints + if issue.status == IssueStatus.CLOSED and sprint.status in [ + SprintStatus.PLANNED, + SprintStatus.ACTIVE, + ]: + raise ValidationException( + message="Cannot add closed issues to planned or active sprints. " + "Reopen the issue first or use a different sprint.", + error_code=ErrorCode.VALIDATION_ERROR, + field="issue_id", + ) + try: # Update the issue's sprint_id from app.schemas.syndarix import IssueUpdate @@ -864,6 +1038,86 @@ async def add_issue_to_sprint( raise +@router.delete( + "/{sprint_id}/issues", + response_model=MessageResponse, + summary="Remove Issue from Sprint", + description=""" + Remove an issue from a sprint. + + **Authentication**: Required (Bearer token) + **Authorization**: Project owner or superuser + + **Business Rules**: + - Issue must currently be in this sprint + - Cannot modify COMPLETED sprints (use cancel first) + + **Rate Limit**: 30 requests/minute + """, + operation_id="remove_issue_from_sprint", +) +@limiter.limit(f"{30 * RATE_MULTIPLIER}/minute") +async def remove_issue_from_sprint( + request: Request, + project_id: UUID, + sprint_id: UUID, + issue_id: UUID = Query(..., description="ID of the issue to remove from the sprint"), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> Any: + """ + Remove an issue from the sprint. + + The issue's sprint_id will be set to NULL. + """ + # Verify project access + await verify_project_ownership(db, project_id, current_user) + + # Verify sprint exists and belongs to project + sprint = await get_sprint_or_404(db, sprint_id, project_id) + + # Business rule: Cannot modify completed sprints + if sprint.status == SprintStatus.COMPLETED: + raise ValidationException( + message="Cannot remove issues from a completed sprint. Cancel the sprint first.", + error_code=ErrorCode.OPERATION_FORBIDDEN, + ) + + # Verify issue exists and is in this sprint + issue = await issue_crud.get(db, id=issue_id) + if not issue: + raise NotFoundError( + message=f"Issue {issue_id} not found", + error_code=ErrorCode.NOT_FOUND, + ) + + if issue.sprint_id != sprint_id: + raise ValidationException( + message=f"Issue is not in sprint '{sprint.name}'", + error_code=ErrorCode.VALIDATION_ERROR, + ) + + try: + # Remove the issue from sprint + await issue_crud.remove_from_sprint(db, issue_id=issue_id) + + logger.info( + f"User {current_user.id} removed issue {issue_id} from sprint {sprint_id}" + ) + + return MessageResponse( + success=True, + message=f"Issue '{issue.title}' removed from sprint '{sprint.name}'", + ) + + except Exception as e: + logger.error( + f"Error removing issue {issue_id} from sprint {sprint_id}: {e!s}", + exc_info=True, + ) + raise + + # ============================================================================ # Sprint Metrics Endpoints # ============================================================================ @@ -886,7 +1140,7 @@ async def add_issue_to_sprint( """, operation_id="get_project_velocity", ) -@limiter.limit("60/minute") +@limiter.limit(f"{60 * RATE_MULTIPLIER}/minute") async def get_project_velocity( request: Request, project_id: UUID, diff --git a/backend/app/crud/syndarix/issue.py b/backend/app/crud/syndarix/issue.py index 248a25e..d3bbc5e 100644 --- a/backend/app/crud/syndarix/issue.py +++ b/backend/app/crud/syndarix/issue.py @@ -432,6 +432,94 @@ class CRUDIssue(CRUDBase[Issue, IssueCreate, IssueUpdate]): logger.error(f"Error getting pending sync issues: {e!s}", exc_info=True) raise + async def remove_sprint_from_issues( + self, + db: AsyncSession, + *, + sprint_id: UUID, + ) -> int: + """Remove sprint assignment from all issues in a sprint. + + Used when deleting a sprint to clean up references. + + Returns: + Number of issues updated + """ + try: + from sqlalchemy import update + + result = await db.execute( + update(Issue) + .where(Issue.sprint_id == sprint_id) + .values(sprint_id=None) + ) + await db.commit() + return result.rowcount + except Exception as e: + await db.rollback() + logger.error( + f"Error removing sprint {sprint_id} from issues: {e!s}", + exc_info=True, + ) + raise + + async def unassign( + self, + db: AsyncSession, + *, + issue_id: UUID, + ) -> Issue | None: + """Remove agent assignment from an issue. + + Returns: + Updated issue or None if not found + """ + try: + result = await db.execute(select(Issue).where(Issue.id == issue_id)) + issue = result.scalar_one_or_none() + + if not issue: + return None + + issue.assigned_agent_id = None + await db.commit() + await db.refresh(issue) + return issue + except Exception as e: + await db.rollback() + logger.error(f"Error unassigning issue {issue_id}: {e!s}", exc_info=True) + raise + + async def remove_from_sprint( + self, + db: AsyncSession, + *, + issue_id: UUID, + ) -> Issue | None: + """Remove an issue from its current sprint. + + Returns: + Updated issue or None if not found + """ + try: + result = await db.execute(select(Issue).where(Issue.id == issue_id)) + issue = result.scalar_one_or_none() + + if not issue: + return None + + issue.sprint_id = None + await db.commit() + await db.refresh(issue) + return issue + except Exception as e: + await db.rollback() + logger.error( + f"Error removing issue {issue_id} from sprint: {e!s}", + exc_info=True, + ) + raise + # Create a singleton instance for use across the application issue = CRUDIssue(Issue) diff --git a/backend/app/crud/syndarix/sprint.py b/backend/app/crud/syndarix/sprint.py index a43c373..16f0acd 100644 --- a/backend/app/crud/syndarix/sprint.py +++ b/backend/app/crud/syndarix/sprint.py @@ -185,9 +185,18 @@ class CRUDSprint(CRUDBase[Sprint, SprintCreate, SprintUpdate]): sprint_id: UUID, start_date: date | None = None, ) -> Sprint | None: - """Start a planned sprint.""" + """Start a planned sprint. + + Uses row-level locking (SELECT FOR UPDATE) to prevent race conditions + when multiple requests try to start sprints concurrently. + """ try: - result = await db.execute(select(Sprint).where(Sprint.id == sprint_id)) + # Lock the sprint row to prevent concurrent modifications + result = await db.execute( + select(Sprint) + .where(Sprint.id == sprint_id) + .with_for_update() + ) sprint = result.scalar_one_or_none() if not sprint: @@ -198,8 +207,17 @@ class CRUDSprint(CRUDBase[Sprint, SprintCreate, SprintUpdate]): f"Cannot start sprint with status {sprint.status.value}" ) - # Check for existing active sprint in project - active_sprint = await self.get_active_sprint(db, project_id=sprint.project_id) + # Check for existing active sprint with lock to prevent race condition + # Lock all sprints for this project to ensure atomic check-and-update + active_check = await db.execute( + select(Sprint) + .where( + Sprint.project_id == sprint.project_id, + Sprint.status == SprintStatus.ACTIVE, + ) + .with_for_update() + ) + active_sprint = active_check.scalar_one_or_none() if active_sprint: raise ValueError( f"Project already has an active sprint: {active_sprint.name}" diff --git a/backend/app/schemas/syndarix/project.py b/backend/app/schemas/syndarix/project.py index c776cfe..36916b3 100644 --- a/backend/app/schemas/syndarix/project.py +++ b/backend/app/schemas/syndarix/project.py @@ -57,7 +57,12 @@ class ProjectCreate(ProjectBase): class ProjectUpdate(BaseModel): - """Schema for updating a project.""" + """Schema for updating a project. + + Note: owner_id is intentionally excluded to prevent IDOR vulnerabilities. + Project ownership transfer should be done via a dedicated endpoint with + proper authorization checks. + """ name: str | None = Field(None, min_length=1, max_length=255) slug: str | None = Field(None, min_length=1, max_length=255) @@ -65,7 +70,6 @@ class ProjectUpdate(BaseModel): autonomy_level: AutonomyLevel | None = None status: ProjectStatus | None = None settings: dict[str, Any] | None = None - owner_id: UUID | None = None @field_validator("slug") @classmethod diff --git a/backend/tests/api/routes/test_events.py b/backend/tests/api/routes/test_events.py index d2f8e37..b6635b8 100644 --- a/backend/tests/api/routes/test_events.py +++ b/backend/tests/api/routes/test_events.py @@ -23,8 +23,10 @@ from httpx import ASGITransport, AsyncClient from app.api.dependencies.event_bus import get_event_bus from app.core.database import get_db +from app.crud.syndarix.project import project as project_crud from app.main import app from app.schemas.events import Event, EventType +from app.schemas.syndarix.project import ProjectCreate from app.services.event_bus import EventBus @@ -147,6 +149,21 @@ async def user_token_with_mock_bus(client_with_mock_bus, async_test_user): return tokens["access_token"] +@pytest_asyncio.fixture +async def test_project_for_events(async_test_db, async_test_user): + """Create a test project owned by the test user for events testing.""" + _test_engine, AsyncTestingSessionLocal = async_test_db + + async with AsyncTestingSessionLocal() as session: + project_in = ProjectCreate( + name="Test Events Project", + slug="test-events-project", + owner_id=async_test_user.id, + ) + project = await project_crud.create(session, obj_in=project_in) + return project + + class TestSSEEndpointAuthentication: """Tests for SSE endpoint authentication.""" @@ -174,15 +191,75 @@ class TestSSEEndpointAuthentication: assert response.status_code == status.HTTP_401_UNAUTHORIZED +class TestSSEEndpointAuthorization: + """Tests for SSE endpoint authorization.""" + + @pytest.mark.asyncio + async def test_stream_events_nonexistent_project_returns_403( + self, client_with_mock_bus, user_token_with_mock_bus + ): + """Test that accessing a non-existent project returns 403.""" + nonexistent_project_id = uuid.uuid4() + + response = await client_with_mock_bus.get( + f"/api/v1/projects/{nonexistent_project_id}/events/stream", + headers={"Authorization": f"Bearer {user_token_with_mock_bus}"}, + timeout=5.0, + ) + + # Should return 403 because project doesn't exist (auth check fails) + assert response.status_code == status.HTTP_403_FORBIDDEN + + @pytest.mark.asyncio + async def test_stream_events_other_users_project_returns_403( + self, client_with_mock_bus, user_token_with_mock_bus, async_test_db + ): + """Test that accessing another user's project returns 403.""" + _test_engine, AsyncTestingSessionLocal = async_test_db + + # Create a project owned by a different user + async with AsyncTestingSessionLocal() as session: + other_user_id = uuid.uuid4() # Simulated other user + project_in = ProjectCreate( + name="Other User's Project", + slug="other-users-project", + owner_id=other_user_id, + ) + other_project = await project_crud.create(session, obj_in=project_in) + + response = await client_with_mock_bus.get( + f"/api/v1/projects/{other_project.id}/events/stream", + headers={"Authorization": f"Bearer {user_token_with_mock_bus}"}, + timeout=5.0, + ) + + # Should return 403 because user doesn't own the project + assert response.status_code == status.HTTP_403_FORBIDDEN + + @pytest.mark.asyncio + async def test_send_test_event_nonexistent_project_returns_403( + self, client_with_mock_bus, user_token_with_mock_bus + ): + """Test that sending event to non-existent project returns 403.""" + nonexistent_project_id = uuid.uuid4() + + response = await client_with_mock_bus.post( + f"/api/v1/projects/{nonexistent_project_id}/events/test", + headers={"Authorization": f"Bearer {user_token_with_mock_bus}"}, + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + class TestSSEEndpointStream: """Tests for SSE stream functionality.""" @pytest.mark.asyncio async def test_stream_events_returns_sse_response( - self, client_with_mock_bus, user_token_with_mock_bus + self, client_with_mock_bus, user_token_with_mock_bus, test_project_for_events ): """Test that SSE endpoint returns proper SSE response.""" - project_id = uuid.uuid4() + project_id = test_project_for_events.id # Make request with a timeout to avoid hanging response = await client_with_mock_bus.get( @@ -197,10 +274,10 @@ class TestSSEEndpointStream: @pytest.mark.asyncio async def test_stream_events_with_events( - self, client_with_mock_bus, user_token_with_mock_bus, mock_event_bus + self, client_with_mock_bus, user_token_with_mock_bus, mock_event_bus, test_project_for_events ): """Test that SSE endpoint yields events.""" - project_id = uuid.uuid4() + project_id = test_project_for_events.id # Create a test event and add it to the mock bus test_event = Event( @@ -228,10 +305,10 @@ class TestSSEEndpointStream: @pytest.mark.asyncio async def test_stream_events_with_last_event_id( - self, client_with_mock_bus, user_token_with_mock_bus + self, client_with_mock_bus, user_token_with_mock_bus, test_project_for_events ): """Test that Last-Event-ID header is accepted.""" - project_id = uuid.uuid4() + project_id = test_project_for_events.id last_event_id = str(uuid.uuid4()) response = await client_with_mock_bus.get( @@ -252,10 +329,10 @@ class TestSSEEndpointHeaders: @pytest.mark.asyncio async def test_stream_events_cache_control_header( - self, client_with_mock_bus, user_token_with_mock_bus + self, client_with_mock_bus, user_token_with_mock_bus, test_project_for_events ): """Test that SSE response has no-cache header.""" - project_id = uuid.uuid4() + project_id = test_project_for_events.id response = await client_with_mock_bus.get( f"/api/v1/projects/{project_id}/events/stream", @@ -284,10 +361,10 @@ class TestTestEventEndpoint: @pytest.mark.asyncio async def test_send_test_event_success( - self, client_with_mock_bus, user_token_with_mock_bus, mock_event_bus + self, client_with_mock_bus, user_token_with_mock_bus, mock_event_bus, test_project_for_events ): """Test sending a test event.""" - project_id = uuid.uuid4() + project_id = test_project_for_events.id response = await client_with_mock_bus.post( f"/api/v1/projects/{project_id}/events/test",