# tests/api/routes/syndarix/test_edge_cases.py """ Edge case and bug-hunting tests for Syndarix API. These tests focus on potential production issues: - Assigning issues to terminated agents - Sprint lifecycle edge cases - Project archiving with active resources - State consistency across operations """ import uuid import pytest import pytest_asyncio from starlette import status @pytest_asyncio.fixture async def test_project(client, user_token): """Create a test project.""" response = await client.post( "/api/v1/projects", json={ "name": f"Edge Case Test Project {uuid.uuid4().hex[:6]}", "slug": f"edge-case-project-{uuid.uuid4().hex[:8]}", "autonomy_level": "milestone", }, headers={"Authorization": f"Bearer {user_token}"}, ) assert response.status_code == status.HTTP_201_CREATED return response.json() @pytest_asyncio.fixture async def test_agent_type(client, superuser_token): """Create a test agent type.""" response = await client.post( "/api/v1/agent-types", json={ "name": f"Edge Case Agent Type {uuid.uuid4().hex[:6]}", "slug": f"edge-case-type-{uuid.uuid4().hex[:8]}", "expertise": ["testing"], "primary_model": "claude-3-opus", "personality_prompt": "A test agent for edge cases.", }, headers={"Authorization": f"Bearer {superuser_token}"}, ) assert response.status_code == status.HTTP_201_CREATED return response.json() @pytest_asyncio.fixture async def test_agent(client, user_token, test_project, test_agent_type): """Create a test agent in the project.""" response = await client.post( f"/api/v1/projects/{test_project['id']}/agents", json={ "project_id": test_project["id"], "agent_type_id": test_agent_type["id"], "name": f"Test Agent {uuid.uuid4().hex[:6]}", }, headers={"Authorization": f"Bearer {user_token}"}, ) assert response.status_code == status.HTTP_201_CREATED return response.json() @pytest_asyncio.fixture async def terminated_agent(client, user_token, test_project, test_agent): """Create and terminate an agent.""" # Terminate the agent using DELETE endpoint response = await client.delete( f"/api/v1/projects/{test_project['id']}/agents/{test_agent['id']}", headers={"Authorization": f"Bearer {user_token}"}, ) assert response.status_code == status.HTTP_200_OK, f"Failed to terminate: {response.json()}" # Return agent info with terminated status return {**test_agent, "status": "terminated"} @pytest.mark.asyncio class TestAssignIssueToTerminatedAgent: """ Tests for assigning issues to terminated agents. POTENTIAL BUG: The system might allow assigning issues to terminated agents, which doesn't make sense since terminated agents cannot work on issues. """ async def test_assign_issue_to_terminated_agent_should_fail( self, client, user_token, test_project, terminated_agent ): """ Assigning an issue to a terminated agent should be rejected. Expected behavior: Return 422 validation error """ project_id = test_project["id"] agent_id = terminated_agent["id"] # Create an issue first issue_response = await client.post( f"/api/v1/projects/{project_id}/issues", json={ "project_id": project_id, "title": "Test Issue for Terminated Agent", "body": "This issue should not be assignable to a terminated agent", }, headers={"Authorization": f"Bearer {user_token}"}, ) assert issue_response.status_code == status.HTTP_201_CREATED issue = issue_response.json() # Try to assign the issue to the terminated agent - should fail assign_response = await client.post( f"/api/v1/projects/{project_id}/issues/{issue['id']}/assign", json={"assigned_agent_id": agent_id}, headers={"Authorization": f"Bearer {user_token}"}, ) # Should fail with validation error assert assign_response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY errors = assign_response.json()["errors"] assert any("terminated" in err["message"].lower() for err in errors) async def test_create_issue_with_terminated_agent_assignment( self, client, user_token, test_project, terminated_agent ): """Creating an issue pre-assigned to a terminated agent should fail.""" project_id = test_project["id"] agent_id = terminated_agent["id"] # Try to create an issue with terminated agent assignment response = await client.post( f"/api/v1/projects/{project_id}/issues", json={ "project_id": project_id, "title": "Issue Pre-Assigned to Terminated Agent", "body": "This should fail", "assigned_agent_id": agent_id, }, headers={"Authorization": f"Bearer {user_token}"}, ) # Should fail with validation error assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY errors = response.json()["errors"] assert any("terminated" in err["message"].lower() for err in errors) async def test_update_issue_to_terminated_agent( self, client, user_token, test_project, terminated_agent ): """Updating an issue to assign it to a terminated agent should fail.""" project_id = test_project["id"] agent_id = terminated_agent["id"] # Create an issue issue_response = await client.post( f"/api/v1/projects/{project_id}/issues", json={ "project_id": project_id, "title": "Issue to be Updated", }, headers={"Authorization": f"Bearer {user_token}"}, ) assert issue_response.status_code == status.HTTP_201_CREATED issue = issue_response.json() # Try to update with terminated agent assignment - should fail update_response = await client.patch( f"/api/v1/projects/{project_id}/issues/{issue['id']}", json={"assigned_agent_id": agent_id}, headers={"Authorization": f"Bearer {user_token}"}, ) # Should fail with validation error assert update_response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY errors = update_response.json()["errors"] assert any("terminated" in err["message"].lower() for err in errors) @pytest.mark.asyncio class TestTerminateAgentWithAssignedIssues: """Tests for terminating agents that have assigned issues.""" async def test_terminate_agent_with_assigned_issues( self, client, user_token, test_project, test_agent ): """ When an agent is terminated, its assigned issues should be auto-unassigned. This prevents orphaned issue assignments to non-functional agents. """ project_id = test_project["id"] agent_id = test_agent["id"] # Create and assign an issue to the agent issue_response = await client.post( f"/api/v1/projects/{project_id}/issues", json={ "project_id": project_id, "title": "Issue Assigned to Agent", "assigned_agent_id": agent_id, }, headers={"Authorization": f"Bearer {user_token}"}, ) assert issue_response.status_code == status.HTTP_201_CREATED issue = issue_response.json() assert issue["assigned_agent_id"] == agent_id # Terminate the agent terminate_response = await client.delete( f"/api/v1/projects/{test_project['id']}/agents/{agent_id}", headers={"Authorization": f"Bearer {user_token}"}, ) assert terminate_response.status_code == status.HTTP_200_OK # Verify the issue was auto-unassigned issue_check = await client.get( f"/api/v1/projects/{project_id}/issues/{issue['id']}", headers={"Authorization": f"Bearer {user_token}"}, ) assert issue_check.status_code == status.HTTP_200_OK updated_issue = issue_check.json() # Issue should be unassigned now assert updated_issue.get("assigned_agent_id") is None @pytest.mark.asyncio class TestSprintLifecycleEdgeCases: """ Tests for sprint lifecycle edge cases. """ async def test_start_sprint_twice(self, client, user_token, test_project): """ Test that starting an already active sprint fails gracefully. """ from datetime import date, timedelta project_id = test_project["id"] # Create a sprint sprint_response = await client.post( f"/api/v1/projects/{project_id}/sprints", json={ "project_id": project_id, "name": "Sprint to Start Twice", "number": 1, "start_date": str(date.today()), "end_date": str(date.today() + timedelta(days=14)), }, headers={"Authorization": f"Bearer {user_token}"}, ) assert sprint_response.status_code == status.HTTP_201_CREATED sprint = sprint_response.json() # Start the sprint start_response = await client.post( f"/api/v1/projects/{project_id}/sprints/{sprint['id']}/start", headers={"Authorization": f"Bearer {user_token}"}, ) assert start_response.status_code == status.HTTP_200_OK # Try to start it again second_start = await client.post( f"/api/v1/projects/{project_id}/sprints/{sprint['id']}/start", headers={"Authorization": f"Bearer {user_token}"}, ) # Should fail with appropriate error assert second_start.status_code in [ status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT, status.HTTP_422_UNPROCESSABLE_ENTITY, ], f"Expected error for double-start, got: {second_start.status_code}" async def test_complete_planned_sprint(self, client, user_token, test_project): """ Test that completing a PLANNED (not started) sprint fails. """ from datetime import date, timedelta project_id = test_project["id"] # Create a sprint (status: PLANNED) sprint_response = await client.post( f"/api/v1/projects/{project_id}/sprints", json={ "project_id": project_id, "name": "Sprint Never Started", "number": 99, "start_date": str(date.today()), "end_date": str(date.today() + timedelta(days=14)), }, headers={"Authorization": f"Bearer {user_token}"}, ) assert sprint_response.status_code == status.HTTP_201_CREATED sprint = sprint_response.json() # Try to complete without starting complete_response = await client.post( f"/api/v1/projects/{project_id}/sprints/{sprint['id']}/complete", headers={"Authorization": f"Bearer {user_token}"}, ) # Should fail - can't complete a sprint that was never started assert complete_response.status_code in [ status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY, ], f"Should not complete PLANNED sprint, got: {complete_response.status_code}" async def test_reopen_cancelled_sprint(self, client, user_token, test_project): """ Test that cancelled sprints cannot be restarted. """ from datetime import date, timedelta project_id = test_project["id"] # Create and cancel a sprint sprint_response = await client.post( f"/api/v1/projects/{project_id}/sprints", json={ "project_id": project_id, "name": "Sprint to Cancel", "number": 100, "start_date": str(date.today()), "end_date": str(date.today() + timedelta(days=14)), }, headers={"Authorization": f"Bearer {user_token}"}, ) assert sprint_response.status_code == status.HTTP_201_CREATED sprint = sprint_response.json() # Cancel the sprint cancel_response = await client.post( f"/api/v1/projects/{project_id}/sprints/{sprint['id']}/cancel", headers={"Authorization": f"Bearer {user_token}"}, ) assert cancel_response.status_code == status.HTTP_200_OK # Try to start the cancelled sprint start_response = await client.post( f"/api/v1/projects/{project_id}/sprints/{sprint['id']}/start", headers={"Authorization": f"Bearer {user_token}"}, ) # Should fail - cancelled sprints are terminal assert start_response.status_code in [ status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY, ], f"Should not start cancelled sprint, got: {start_response.status_code}" @pytest.mark.asyncio class TestProjectArchivingEdgeCases: """ Tests for project archiving with active resources. QUESTION: What should happen to active agents/sprints when archiving? """ async def test_archive_project_with_active_sprint( self, client, user_token, test_project ): """ Test archiving a project that has an active sprint. Should this: A) Fail (require cancelling sprint first)? B) Auto-cancel the sprint? C) Archive with active sprint (inconsistent state)? """ from datetime import date, timedelta project_id = test_project["id"] # Create and start a sprint sprint_response = await client.post( f"/api/v1/projects/{project_id}/sprints", json={ "project_id": project_id, "name": "Active Sprint", "number": 1, "start_date": str(date.today()), "end_date": str(date.today() + timedelta(days=14)), }, headers={"Authorization": f"Bearer {user_token}"}, ) assert sprint_response.status_code == status.HTTP_201_CREATED sprint = sprint_response.json() # Start the sprint await client.post( f"/api/v1/projects/{project_id}/sprints/{sprint['id']}/start", headers={"Authorization": f"Bearer {user_token}"}, ) # Try to archive the project archive_response = await client.post( f"/api/v1/projects/{project_id}/archive", headers={"Authorization": f"Bearer {user_token}"}, ) # Document the behavior if archive_response.status_code == status.HTTP_200_OK: # Project was archived - check sprint status sprint_check = await client.get( f"/api/v1/projects/{project_id}/sprints/{sprint['id']}", headers={"Authorization": f"Bearer {user_token}"}, ) if sprint_check.status_code == status.HTTP_200_OK: sprint_data = sprint_check.json() if sprint_data.get("status") == "active": pytest.fail( "DESIGN ISSUE: Project archived with active sprint. " "Sprint should be cancelled or archiving should fail." ) async def test_archive_project_with_working_agent( self, client, user_token, test_project, test_agent ): """ Test archiving a project that has working agents. """ project_id = test_project["id"] agent_id = test_agent["id"] # Set agent to working status status_response = await client.patch( f"/api/v1/projects/{project_id}/agents/{agent_id}/status", json={"status": "working", "current_task": "Processing something"}, headers={"Authorization": f"Bearer {user_token}"}, ) # The status update might fail if not allowed, which is fine # Try to archive archive_response = await client.post( f"/api/v1/projects/{project_id}/archive", headers={"Authorization": f"Bearer {user_token}"}, ) if archive_response.status_code == status.HTTP_200_OK: # Check if agents were terminated agent_check = await client.get( f"/api/v1/projects/{project_id}/agents/{agent_id}", headers={"Authorization": f"Bearer {user_token}"}, ) if agent_check.status_code == status.HTTP_200_OK: agent_data = agent_check.json() if agent_data.get("status") != "terminated": pytest.fail( f"DESIGN ISSUE: Project archived but agent status is " f"'{agent_data.get('status')}', not 'terminated'" ) @pytest.mark.asyncio class TestConcurrencyEdgeCases: """ Tests for potential race conditions and concurrency issues. """ async def test_start_two_sprints_simultaneously( self, client, user_token, test_project ): """ Test that only one sprint can be active at a time. If two requests try to start sprints simultaneously, only one should succeed. """ import asyncio from datetime import date, timedelta project_id = test_project["id"] # Create two sprints sprints = [] for i in range(2): response = await client.post( f"/api/v1/projects/{project_id}/sprints", json={ "project_id": project_id, "name": f"Concurrent Sprint {i}", "number": 200 + i, "start_date": str(date.today()), "end_date": str(date.today() + timedelta(days=14)), }, headers={"Authorization": f"Bearer {user_token}"}, ) assert response.status_code == status.HTTP_201_CREATED sprints.append(response.json()) # Try to start both sprints (sequentially in this test, but simulating the check) start1 = await client.post( f"/api/v1/projects/{project_id}/sprints/{sprints[0]['id']}/start", headers={"Authorization": f"Bearer {user_token}"}, ) start2 = await client.post( f"/api/v1/projects/{project_id}/sprints/{sprints[1]['id']}/start", headers={"Authorization": f"Bearer {user_token}"}, ) # Exactly one should succeed successes = sum(1 for r in [start1, start2] if r.status_code == status.HTTP_200_OK) failures = sum(1 for r in [start1, start2] if r.status_code in [400, 409, 422]) assert successes == 1, f"Expected exactly 1 success, got {successes}" assert failures == 1, f"Expected exactly 1 failure, got {failures}" @pytest.mark.asyncio class TestDataIntegrityEdgeCases: """ Tests for data integrity and referential consistency. """ async def test_delete_agent_type_with_active_instances( self, client, superuser_token, user_token, test_project, test_agent_type ): """ Test deleting an agent type that has active instances. Should fail or cascade appropriately. """ project_id = test_project["id"] agent_type_id = test_agent_type["id"] # Create an agent from this type agent_response = await client.post( f"/api/v1/projects/{project_id}/agents", json={ "project_id": project_id, "agent_type_id": agent_type_id, "name": "Agent from Type", }, headers={"Authorization": f"Bearer {user_token}"}, ) assert agent_response.status_code == status.HTTP_201_CREATED # Try to delete the agent type delete_response = await client.delete( f"/api/v1/agent-types/{agent_type_id}", headers={"Authorization": f"Bearer {superuser_token}"}, ) # Should fail or handle gracefully if delete_response.status_code == status.HTTP_200_OK: # If deletion succeeded, verify instances are handled # This could indicate a cascade delete or orphaned instances pass else: # Expected - should reject deletion with active instances assert delete_response.status_code in [ status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT, ] async def test_move_issue_to_nonexistent_sprint( self, client, user_token, test_project ): """ Test assigning an issue to a sprint ID that doesn't exist. """ project_id = test_project["id"] fake_sprint_id = str(uuid.uuid4()) # Create an issue issue_response = await client.post( f"/api/v1/projects/{project_id}/issues", json={ "project_id": project_id, "title": "Issue for Fake Sprint", }, headers={"Authorization": f"Bearer {user_token}"}, ) assert issue_response.status_code == status.HTTP_201_CREATED issue = issue_response.json() # Try to assign to nonexistent sprint update_response = await client.patch( f"/api/v1/projects/{project_id}/issues/{issue['id']}", json={"sprint_id": fake_sprint_id}, headers={"Authorization": f"Bearer {user_token}"}, ) assert update_response.status_code == status.HTTP_404_NOT_FOUND async def test_assign_issue_to_other_projects_sprint( self, client, user_token ): """ IDOR Test: Try to assign an issue to a sprint from a different project. """ # Create two projects project1 = await client.post( "/api/v1/projects", json={ "name": f"Project 1 {uuid.uuid4().hex[:6]}", "slug": f"project-1-{uuid.uuid4().hex[:8]}", }, headers={"Authorization": f"Bearer {user_token}"}, ) assert project1.status_code == status.HTTP_201_CREATED p1 = project1.json() project2 = await client.post( "/api/v1/projects", json={ "name": f"Project 2 {uuid.uuid4().hex[:6]}", "slug": f"project-2-{uuid.uuid4().hex[:8]}", }, headers={"Authorization": f"Bearer {user_token}"}, ) assert project2.status_code == status.HTTP_201_CREATED p2 = project2.json() # Create a sprint in project 2 from datetime import date, timedelta sprint_response = await client.post( f"/api/v1/projects/{p2['id']}/sprints", json={ "project_id": p2["id"], "name": "Sprint in Project 2", "number": 1, "start_date": str(date.today()), "end_date": str(date.today() + timedelta(days=14)), }, headers={"Authorization": f"Bearer {user_token}"}, ) assert sprint_response.status_code == status.HTTP_201_CREATED sprint = sprint_response.json() # Create an issue in project 1 issue_response = await client.post( f"/api/v1/projects/{p1['id']}/issues", json={ "project_id": p1["id"], "title": "Issue in Project 1", }, headers={"Authorization": f"Bearer {user_token}"}, ) assert issue_response.status_code == status.HTTP_201_CREATED issue = issue_response.json() # IDOR: Try to assign project 1's issue to project 2's sprint update_response = await client.patch( f"/api/v1/projects/{p1['id']}/issues/{issue['id']}", json={"sprint_id": sprint["id"]}, headers={"Authorization": f"Bearer {user_token}"}, ) # This MUST fail - sprint doesn't belong to this project assert update_response.status_code in [ status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND, status.HTTP_422_UNPROCESSABLE_ENTITY, ], f"IDOR BUG: Assigned issue to another project's sprint! Status: {update_response.status_code}"