Files
fast-next-template/backend/tests/crud/syndarix/test_sprint.py
Felipe Cardoso 63066c50ba test(crud): add comprehensive Syndarix CRUD tests for 95% coverage
Added CRUD layer tests for all Syndarix domain modules:
- test_issue.py: 37 tests covering issue CRUD operations
- test_sprint.py: 31 tests covering sprint CRUD operations
- test_agent_instance.py: 28 tests covering agent instance CRUD
- test_agent_type.py: 19 tests covering agent type CRUD
- test_project.py: 20 tests covering project CRUD operations

Each test file covers:
- Successful CRUD operations
- Not found cases
- Exception handling paths (IntegrityError, OperationalError)
- Filter and pagination operations
- PostgreSQL-specific tests marked as skip for SQLite

Coverage improvements:
- issue.py: 65% → 99%
- sprint.py: 74% → 100%
- agent_instance.py: 73% → 100%
- agent_type.py: 71% → 93%
- project.py: 79% → 100%

Total backend coverage: 89% → 92%

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-31 14:30:05 +01:00

503 lines
18 KiB
Python

# tests/crud/syndarix/test_sprint.py
"""Tests for Sprint CRUD operations."""
import uuid
from datetime import date, timedelta
from unittest.mock import patch
import pytest
import pytest_asyncio
from sqlalchemy.exc import IntegrityError, OperationalError
from app.crud.syndarix.sprint import CRUDSprint, sprint
from app.models.syndarix import Issue, Project, Sprint
from app.models.syndarix.enums import (
IssueStatus,
ProjectStatus,
SprintStatus,
)
from app.schemas.syndarix import SprintCreate
@pytest_asyncio.fixture
async def db_session(async_test_db):
"""Create a database session for tests."""
_, AsyncTestingSessionLocal = async_test_db
async with AsyncTestingSessionLocal() as session:
yield session
@pytest_asyncio.fixture
async def test_project(db_session):
"""Create a test project for sprints."""
project = Project(
id=uuid.uuid4(),
name="Test Project",
slug=f"test-project-{uuid.uuid4().hex[:8]}",
status=ProjectStatus.ACTIVE,
)
db_session.add(project)
await db_session.commit()
await db_session.refresh(project)
return project
@pytest_asyncio.fixture
async def test_sprint(db_session, test_project):
"""Create a test sprint."""
sprint_obj = Sprint(
id=uuid.uuid4(),
project_id=test_project.id,
name="Test Sprint",
number=1,
status=SprintStatus.PLANNED,
start_date=date.today(),
end_date=date.today() + timedelta(days=14),
)
db_session.add(sprint_obj)
await db_session.commit()
await db_session.refresh(sprint_obj)
return sprint_obj
class TestSprintCreate:
"""Tests for sprint creation."""
@pytest.mark.asyncio
async def test_create_sprint_success(self, db_session, test_project):
"""Test successful sprint creation."""
sprint_data = SprintCreate(
project_id=test_project.id,
name="New Sprint",
number=1,
start_date=date.today(),
end_date=date.today() + timedelta(days=14),
)
created = await sprint.create(db_session, obj_in=sprint_data)
assert created.name == "New Sprint"
assert created.number == 1
assert created.status == SprintStatus.PLANNED
@pytest.mark.asyncio
async def test_create_sprint_with_all_fields(self, db_session, test_project):
"""Test sprint creation with all optional fields."""
sprint_data = SprintCreate(
project_id=test_project.id,
name="Full Sprint",
number=2,
goal="Deliver user authentication",
start_date=date.today(),
end_date=date.today() + timedelta(days=14),
status=SprintStatus.PLANNED,
planned_points=20,
velocity=15,
)
created = await sprint.create(db_session, obj_in=sprint_data)
assert created.goal == "Deliver user authentication"
assert created.planned_points == 20
assert created.velocity == 15
@pytest.mark.asyncio
async def test_create_sprint_integrity_error(self, db_session, test_project):
"""Test sprint creation with integrity error."""
sprint_data = SprintCreate(
project_id=test_project.id,
name="Test Sprint",
number=1,
start_date=date.today(),
end_date=date.today() + timedelta(days=14),
)
with patch.object(
db_session,
"commit",
side_effect=IntegrityError("", {}, Exception()),
):
with pytest.raises(ValueError, match="Database integrity error"):
await sprint.create(db_session, obj_in=sprint_data)
@pytest.mark.asyncio
async def test_create_sprint_unexpected_error(self, db_session, test_project):
"""Test sprint creation with unexpected error."""
sprint_data = SprintCreate(
project_id=test_project.id,
name="Test Sprint",
number=1,
start_date=date.today(),
end_date=date.today() + timedelta(days=14),
)
with patch.object(
db_session,
"commit",
side_effect=RuntimeError("Unexpected error"),
):
with pytest.raises(RuntimeError, match="Unexpected error"):
await sprint.create(db_session, obj_in=sprint_data)
class TestSprintGetWithDetails:
"""Tests for getting sprint with details."""
@pytest.mark.asyncio
async def test_get_with_details_not_found(self, db_session):
"""Test getting non-existent sprint with details."""
result = await sprint.get_with_details(db_session, sprint_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_get_with_details_success(self, db_session, test_sprint):
"""Test getting sprint with details."""
result = await sprint.get_with_details(db_session, sprint_id=test_sprint.id)
assert result is not None
assert result["sprint"].id == test_sprint.id
assert "project_name" in result
assert "issue_count" in result
@pytest.mark.asyncio
async def test_get_with_details_db_error(self, db_session, test_sprint):
"""Test getting sprint with details when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await sprint.get_with_details(db_session, sprint_id=test_sprint.id)
class TestSprintGetByProject:
"""Tests for getting sprints by project."""
@pytest.mark.asyncio
async def test_get_by_project_with_status_filter(
self, db_session, test_project, test_sprint
):
"""Test getting sprints with status filter."""
sprints, total = await sprint.get_by_project(
db_session,
project_id=test_project.id,
status=SprintStatus.PLANNED,
)
assert len(sprints) == 1
assert sprints[0].status == SprintStatus.PLANNED
@pytest.mark.asyncio
async def test_get_by_project_db_error(self, db_session, test_project):
"""Test getting sprints when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await sprint.get_by_project(db_session, project_id=test_project.id)
class TestSprintActiveOperations:
"""Tests for active sprint operations."""
@pytest.mark.asyncio
async def test_get_active_sprint_none(self, db_session, test_project, test_sprint):
"""Test getting active sprint when none exists."""
result = await sprint.get_active_sprint(db_session, project_id=test_project.id)
assert result is None
@pytest.mark.asyncio
async def test_get_active_sprint_db_error(self, db_session, test_project):
"""Test getting active sprint when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await sprint.get_active_sprint(db_session, project_id=test_project.id)
class TestSprintNumberOperations:
"""Tests for sprint number operations."""
@pytest.mark.asyncio
async def test_get_next_sprint_number_empty(self, db_session, test_project):
"""Test getting next sprint number for project with no sprints."""
result = await sprint.get_next_sprint_number(
db_session, project_id=test_project.id
)
assert result == 1
@pytest.mark.asyncio
async def test_get_next_sprint_number_with_existing(
self, db_session, test_project, test_sprint
):
"""Test getting next sprint number with existing sprints."""
result = await sprint.get_next_sprint_number(
db_session, project_id=test_project.id
)
assert result == 2
@pytest.mark.asyncio
async def test_get_next_sprint_number_db_error(self, db_session, test_project):
"""Test getting next sprint number when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await sprint.get_next_sprint_number(
db_session, project_id=test_project.id
)
class TestSprintLifecycle:
"""Tests for sprint lifecycle operations."""
@pytest.mark.asyncio
async def test_start_sprint_not_found(self, db_session):
"""Test starting non-existent sprint."""
result = await sprint.start_sprint(db_session, sprint_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_start_sprint_invalid_status(self, db_session, test_project):
"""Test starting sprint with invalid status."""
# Create an active sprint
active_sprint = Sprint(
id=uuid.uuid4(),
project_id=test_project.id,
name="Active Sprint",
number=1,
status=SprintStatus.ACTIVE,
start_date=date.today(),
end_date=date.today() + timedelta(days=14),
)
db_session.add(active_sprint)
await db_session.commit()
with pytest.raises(ValueError, match="Cannot start sprint with status"):
await sprint.start_sprint(db_session, sprint_id=active_sprint.id)
@pytest.mark.asyncio
async def test_start_sprint_with_existing_active(self, db_session, test_project):
"""Test starting sprint when another is already active."""
# Create active sprint
active_sprint = Sprint(
id=uuid.uuid4(),
project_id=test_project.id,
name="Active Sprint",
number=1,
status=SprintStatus.ACTIVE,
start_date=date.today(),
end_date=date.today() + timedelta(days=14),
)
db_session.add(active_sprint)
# Create planned sprint
planned_sprint = Sprint(
id=uuid.uuid4(),
project_id=test_project.id,
name="Planned Sprint",
number=2,
status=SprintStatus.PLANNED,
start_date=date.today() + timedelta(days=15),
end_date=date.today() + timedelta(days=29),
)
db_session.add(planned_sprint)
await db_session.commit()
with pytest.raises(ValueError, match="Project already has an active sprint"):
await sprint.start_sprint(db_session, sprint_id=planned_sprint.id)
@pytest.mark.asyncio
async def test_start_sprint_db_error(self, db_session, test_sprint):
"""Test starting sprint when DB error occurs."""
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await sprint.start_sprint(db_session, sprint_id=test_sprint.id)
@pytest.mark.asyncio
async def test_complete_sprint_not_found(self, db_session):
"""Test completing non-existent sprint."""
result = await sprint.complete_sprint(db_session, sprint_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_complete_sprint_invalid_status(self, db_session, test_sprint):
"""Test completing sprint with invalid status (PLANNED)."""
with pytest.raises(ValueError, match="Cannot complete sprint with status"):
await sprint.complete_sprint(db_session, sprint_id=test_sprint.id)
@pytest.mark.asyncio
async def test_complete_sprint_db_error(self, db_session, test_project):
"""Test completing sprint when DB error occurs."""
# Create active sprint
active_sprint = Sprint(
id=uuid.uuid4(),
project_id=test_project.id,
name="Active Sprint",
number=1,
status=SprintStatus.ACTIVE,
start_date=date.today(),
end_date=date.today() + timedelta(days=14),
)
db_session.add(active_sprint)
await db_session.commit()
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await sprint.complete_sprint(db_session, sprint_id=active_sprint.id)
@pytest.mark.asyncio
async def test_cancel_sprint_not_found(self, db_session):
"""Test cancelling non-existent sprint."""
result = await sprint.cancel_sprint(db_session, sprint_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_cancel_sprint_invalid_status(self, db_session, test_project):
"""Test cancelling sprint with invalid status (COMPLETED)."""
completed_sprint = Sprint(
id=uuid.uuid4(),
project_id=test_project.id,
name="Completed Sprint",
number=1,
status=SprintStatus.COMPLETED,
start_date=date.today() - timedelta(days=14),
end_date=date.today(),
)
db_session.add(completed_sprint)
await db_session.commit()
with pytest.raises(ValueError, match="Cannot cancel sprint with status"):
await sprint.cancel_sprint(db_session, sprint_id=completed_sprint.id)
@pytest.mark.asyncio
async def test_cancel_sprint_success(self, db_session, test_sprint):
"""Test successfully cancelling a planned sprint."""
result = await sprint.cancel_sprint(db_session, sprint_id=test_sprint.id)
assert result is not None
assert result.status == SprintStatus.CANCELLED
@pytest.mark.asyncio
async def test_cancel_sprint_db_error(self, db_session, test_sprint):
"""Test cancelling sprint when DB error occurs."""
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await sprint.cancel_sprint(db_session, sprint_id=test_sprint.id)
class TestSprintVelocity:
"""Tests for velocity operations."""
@pytest.mark.asyncio
async def test_get_velocity_empty(self, db_session, test_project):
"""Test getting velocity with no completed sprints."""
result = await sprint.get_velocity(db_session, project_id=test_project.id)
assert result == []
@pytest.mark.asyncio
async def test_get_velocity_with_data(self, db_session, test_project):
"""Test getting velocity with completed sprints."""
completed_sprint = Sprint(
id=uuid.uuid4(),
project_id=test_project.id,
name="Completed Sprint",
number=1,
status=SprintStatus.COMPLETED,
start_date=date.today() - timedelta(days=14),
end_date=date.today(),
planned_points=20,
velocity=18,
)
db_session.add(completed_sprint)
await db_session.commit()
result = await sprint.get_velocity(db_session, project_id=test_project.id)
assert len(result) == 1
assert result[0]["sprint_number"] == 1
assert result[0]["velocity"] == 18
assert result[0]["velocity_ratio"] == 0.9
@pytest.mark.asyncio
async def test_get_velocity_db_error(self, db_session, test_project):
"""Test getting velocity when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await sprint.get_velocity(db_session, project_id=test_project.id)
class TestSprintWithIssueCounts:
"""Tests for sprints with issue counts."""
@pytest.mark.asyncio
async def test_get_sprints_with_issue_counts_empty(self, db_session, test_project):
"""Test getting sprints with issue counts when no sprints exist."""
results, total = await sprint.get_sprints_with_issue_counts(
db_session, project_id=test_project.id
)
assert results == []
assert total == 0
@pytest.mark.asyncio
async def test_get_sprints_with_issue_counts_success(
self, db_session, test_project, test_sprint
):
"""Test getting sprints with issue counts."""
# Add some issues to the sprint
issue1 = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
sprint_id=test_sprint.id,
title="Issue 1",
status=IssueStatus.OPEN,
)
issue2 = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
sprint_id=test_sprint.id,
title="Issue 2",
status=IssueStatus.CLOSED,
)
db_session.add_all([issue1, issue2])
await db_session.commit()
results, total = await sprint.get_sprints_with_issue_counts(
db_session, project_id=test_project.id
)
assert len(results) == 1
assert results[0]["issue_count"] == 2
assert results[0]["open_issues"] == 1
assert results[0]["completed_issues"] == 1
@pytest.mark.asyncio
async def test_get_sprints_with_issue_counts_db_error(
self, db_session, test_project, test_sprint
):
"""Test getting sprints with issue counts when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await sprint.get_sprints_with_issue_counts(
db_session, project_id=test_project.id
)