test(backend): add comprehensive tests for OAuth and agent endpoints

- Added tests for OAuth provider admin and consent endpoints covering edge cases.
- Extended agent-related tests to handle incorrect project associations and lifecycle state transitions.
- Introduced tests for sprint status transitions and validation checks.
- Improved multiline formatting consistency across all test functions.
This commit is contained in:
2026-01-03 01:44:11 +01:00
parent acd18ff694
commit 664415111a
28 changed files with 1530 additions and 216 deletions

View File

@@ -122,9 +122,7 @@ class TestSpawnAgent:
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_spawn_agent_nonexistent_type(
self, client, user_token, test_project
):
async def test_spawn_agent_nonexistent_type(self, client, user_token, test_project):
"""Test spawning agent with nonexistent agent type."""
project_id = test_project["id"]
fake_type_id = str(uuid.uuid4())
@@ -376,9 +374,7 @@ class TestUpdateAgent:
class TestAgentLifecycle:
"""Tests for agent lifecycle management endpoints."""
async def test_pause_agent(
self, client, user_token, test_project, test_agent_type
):
async def test_pause_agent(self, client, user_token, test_project, test_agent_type):
"""Test pausing an agent."""
project_id = test_project["id"]
agent_type_id = test_agent_type["id"]
@@ -617,3 +613,364 @@ class TestAgentAuthorization:
)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
class TestSpawnAgentEdgeCases:
"""Tests for agent spawn edge cases."""
async def test_spawn_agent_with_inactive_agent_type(
self, client, user_token, superuser_token, test_project
):
"""Test spawning agent with an inactive agent type fails."""
project_id = test_project["id"]
# Create an inactive agent type
unique_slug = f"inactive-agent-type-{uuid.uuid4().hex[:8]}"
create_response = await client.post(
"/api/v1/agent-types",
json={
"name": "Inactive Agent Type",
"slug": unique_slug,
"expertise": ["testing"],
"primary_model": "claude-3-opus",
"personality_prompt": "Test inactive agent.",
"description": "An inactive agent type for testing",
"is_active": False,
},
headers={"Authorization": f"Bearer {superuser_token}"},
)
assert create_response.status_code == status.HTTP_201_CREATED
inactive_type_id = create_response.json()["id"]
# Try to spawn agent with inactive type
response = await client.post(
f"/api/v1/projects/{project_id}/agents",
json={
"project_id": project_id,
"agent_type_id": inactive_type_id,
"name": "Agent With Inactive Type",
},
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
# Error response uses standardized format with "errors" list
data = response.json()
assert "errors" in data
assert any("inactive" in err["message"].lower() for err in data["errors"])
@pytest.mark.asyncio
class TestAgentWrongProject:
"""Tests for agent operations when agent belongs to different project."""
@pytest_asyncio.fixture
async def two_projects_with_agent(
self, client, user_token, superuser_token, test_agent_type
):
"""Create two projects and an agent in project1."""
# Create project1
resp1 = await client.post(
"/api/v1/projects",
json={
"name": "Project One",
"slug": f"project-one-{uuid.uuid4().hex[:8]}",
},
headers={"Authorization": f"Bearer {user_token}"},
)
project1 = resp1.json()
# Create project2
resp2 = await client.post(
"/api/v1/projects",
json={
"name": "Project Two",
"slug": f"project-two-{uuid.uuid4().hex[:8]}",
},
headers={"Authorization": f"Bearer {user_token}"},
)
project2 = resp2.json()
# Create agent in project1
agent_resp = await client.post(
f"/api/v1/projects/{project1['id']}/agents",
json={
"project_id": project1["id"],
"agent_type_id": test_agent_type["id"],
"name": "Project1 Agent",
},
headers={"Authorization": f"Bearer {user_token}"},
)
agent = agent_resp.json()
return {"project1": project1, "project2": project2, "agent": agent}
async def test_get_agent_wrong_project(
self, client, user_token, two_projects_with_agent
):
"""Test getting an agent via wrong project returns 404."""
data = two_projects_with_agent
agent_id = data["agent"]["id"]
wrong_project_id = data["project2"]["id"]
response = await client.get(
f"/api/v1/projects/{wrong_project_id}/agents/{agent_id}",
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_update_agent_wrong_project(
self, client, user_token, two_projects_with_agent
):
"""Test updating an agent via wrong project returns 404."""
data = two_projects_with_agent
agent_id = data["agent"]["id"]
wrong_project_id = data["project2"]["id"]
response = await client.patch(
f"/api/v1/projects/{wrong_project_id}/agents/{agent_id}",
json={"current_task": "Test task"},
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_pause_agent_wrong_project(
self, client, user_token, two_projects_with_agent
):
"""Test pausing an agent via wrong project returns 404."""
data = two_projects_with_agent
agent_id = data["agent"]["id"]
wrong_project_id = data["project2"]["id"]
response = await client.post(
f"/api/v1/projects/{wrong_project_id}/agents/{agent_id}/pause",
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_resume_agent_wrong_project(
self, client, user_token, two_projects_with_agent
):
"""Test resuming an agent via wrong project returns 404."""
data = two_projects_with_agent
project1_id = data["project1"]["id"]
agent_id = data["agent"]["id"]
wrong_project_id = data["project2"]["id"]
# First pause the agent using correct project
await client.post(
f"/api/v1/projects/{project1_id}/agents/{agent_id}/pause",
headers={"Authorization": f"Bearer {user_token}"},
)
# Try to resume via wrong project
response = await client.post(
f"/api/v1/projects/{wrong_project_id}/agents/{agent_id}/resume",
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_terminate_agent_wrong_project(
self, client, user_token, two_projects_with_agent
):
"""Test terminating an agent via wrong project returns 404."""
data = two_projects_with_agent
agent_id = data["agent"]["id"]
wrong_project_id = data["project2"]["id"]
response = await client.delete(
f"/api/v1/projects/{wrong_project_id}/agents/{agent_id}",
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
async def test_get_agent_metrics_wrong_project(
self, client, user_token, two_projects_with_agent
):
"""Test getting agent metrics via wrong project returns 404."""
data = two_projects_with_agent
agent_id = data["agent"]["id"]
wrong_project_id = data["project2"]["id"]
response = await client.get(
f"/api/v1/projects/{wrong_project_id}/agents/{agent_id}/metrics",
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
@pytest.mark.asyncio
class TestAgentStatusTransitions:
"""Tests for invalid agent status transitions."""
async def test_terminate_already_terminated_agent(
self, client, user_token, test_project, test_agent_type
):
"""Test terminating an already terminated agent fails."""
project_id = test_project["id"]
agent_type_id = test_agent_type["id"]
# Create agent
create_response = await client.post(
f"/api/v1/projects/{project_id}/agents",
json={
"project_id": project_id,
"agent_type_id": agent_type_id,
"name": "Double Terminate Agent",
},
headers={"Authorization": f"Bearer {user_token}"},
)
agent_id = create_response.json()["id"]
# Terminate once
first_terminate = await client.delete(
f"/api/v1/projects/{project_id}/agents/{agent_id}",
headers={"Authorization": f"Bearer {user_token}"},
)
assert first_terminate.status_code == status.HTTP_200_OK
# Try to terminate again
response = await client.delete(
f"/api/v1/projects/{project_id}/agents/{agent_id}",
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
data = response.json()
assert "errors" in data
assert any("terminated" in err["message"].lower() for err in data["errors"])
async def test_resume_idle_agent(
self, client, user_token, test_project, test_agent_type
):
"""Test resuming an agent that is not paused fails."""
project_id = test_project["id"]
agent_type_id = test_agent_type["id"]
# Create agent (starts in idle state)
create_response = await client.post(
f"/api/v1/projects/{project_id}/agents",
json={
"project_id": project_id,
"agent_type_id": agent_type_id,
"name": "Resume Idle Agent",
},
headers={"Authorization": f"Bearer {user_token}"},
)
agent_id = create_response.json()["id"]
# Try to resume without pausing first
response = await client.post(
f"/api/v1/projects/{project_id}/agents/{agent_id}/resume",
headers={"Authorization": f"Bearer {user_token}"},
)
# Should fail since agent is not paused
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
async def test_pause_already_paused_agent(
self, client, user_token, test_project, test_agent_type
):
"""Test pausing an already paused agent fails."""
project_id = test_project["id"]
agent_type_id = test_agent_type["id"]
# Create agent
create_response = await client.post(
f"/api/v1/projects/{project_id}/agents",
json={
"project_id": project_id,
"agent_type_id": agent_type_id,
"name": "Double Pause Agent",
},
headers={"Authorization": f"Bearer {user_token}"},
)
agent_id = create_response.json()["id"]
# Pause once
first_pause = await client.post(
f"/api/v1/projects/{project_id}/agents/{agent_id}/pause",
headers={"Authorization": f"Bearer {user_token}"},
)
assert first_pause.status_code == status.HTTP_200_OK
# Try to pause again
response = await client.post(
f"/api/v1/projects/{project_id}/agents/{agent_id}/pause",
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
async def test_pause_terminated_agent(
self, client, user_token, test_project, test_agent_type
):
"""Test pausing a terminated agent fails."""
project_id = test_project["id"]
agent_type_id = test_agent_type["id"]
# Create agent
create_response = await client.post(
f"/api/v1/projects/{project_id}/agents",
json={
"project_id": project_id,
"agent_type_id": agent_type_id,
"name": "Pause Terminated Agent",
},
headers={"Authorization": f"Bearer {user_token}"},
)
agent_id = create_response.json()["id"]
# Terminate agent
await client.delete(
f"/api/v1/projects/{project_id}/agents/{agent_id}",
headers={"Authorization": f"Bearer {user_token}"},
)
# Try to pause terminated agent
response = await client.post(
f"/api/v1/projects/{project_id}/agents/{agent_id}/pause",
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
async def test_resume_terminated_agent(
self, client, user_token, test_project, test_agent_type
):
"""Test resuming a terminated agent fails."""
project_id = test_project["id"]
agent_type_id = test_agent_type["id"]
# Create agent
create_response = await client.post(
f"/api/v1/projects/{project_id}/agents",
json={
"project_id": project_id,
"agent_type_id": agent_type_id,
"name": "Resume Terminated Agent",
},
headers={"Authorization": f"Bearer {user_token}"},
)
agent_id = create_response.json()["id"]
# Terminate agent
await client.delete(
f"/api/v1/projects/{project_id}/agents/{agent_id}",
headers={"Authorization": f"Bearer {user_token}"},
)
# Try to resume terminated agent
response = await client.post(
f"/api/v1/projects/{project_id}/agents/{agent_id}/resume",
headers={"Authorization": f"Bearer {user_token}"},
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY