# tests/crud/syndarix/test_sprint.py """Tests for Sprint CRUD operations.""" import uuid from datetime import date, timedelta from unittest.mock import patch import pytest import pytest_asyncio from sqlalchemy.exc import IntegrityError, OperationalError from app.crud.syndarix.sprint import CRUDSprint, sprint from app.models.syndarix import Issue, Project, Sprint from app.models.syndarix.enums import ( IssueStatus, ProjectStatus, SprintStatus, ) from app.schemas.syndarix import SprintCreate @pytest_asyncio.fixture async def db_session(async_test_db): """Create a database session for tests.""" _, AsyncTestingSessionLocal = async_test_db async with AsyncTestingSessionLocal() as session: yield session @pytest_asyncio.fixture async def test_project(db_session): """Create a test project for sprints.""" project = Project( id=uuid.uuid4(), name="Test Project", slug=f"test-project-{uuid.uuid4().hex[:8]}", status=ProjectStatus.ACTIVE, ) db_session.add(project) await db_session.commit() await db_session.refresh(project) return project @pytest_asyncio.fixture async def test_sprint(db_session, test_project): """Create a test sprint.""" sprint_obj = Sprint( id=uuid.uuid4(), project_id=test_project.id, name="Test Sprint", number=1, status=SprintStatus.PLANNED, start_date=date.today(), end_date=date.today() + timedelta(days=14), ) db_session.add(sprint_obj) await db_session.commit() await db_session.refresh(sprint_obj) return sprint_obj class TestSprintCreate: """Tests for sprint creation.""" @pytest.mark.asyncio async def test_create_sprint_success(self, db_session, test_project): """Test successful sprint creation.""" sprint_data = SprintCreate( project_id=test_project.id, name="New Sprint", number=1, start_date=date.today(), end_date=date.today() + timedelta(days=14), ) created = await sprint.create(db_session, obj_in=sprint_data) assert created.name == "New Sprint" assert created.number == 1 assert created.status == SprintStatus.PLANNED @pytest.mark.asyncio async def test_create_sprint_with_all_fields(self, db_session, test_project): """Test sprint creation with all optional fields.""" sprint_data = SprintCreate( project_id=test_project.id, name="Full Sprint", number=2, goal="Deliver user authentication", start_date=date.today(), end_date=date.today() + timedelta(days=14), status=SprintStatus.PLANNED, planned_points=20, velocity=15, ) created = await sprint.create(db_session, obj_in=sprint_data) assert created.goal == "Deliver user authentication" assert created.planned_points == 20 assert created.velocity == 15 @pytest.mark.asyncio async def test_create_sprint_integrity_error(self, db_session, test_project): """Test sprint creation with integrity error.""" sprint_data = SprintCreate( project_id=test_project.id, name="Test Sprint", number=1, start_date=date.today(), end_date=date.today() + timedelta(days=14), ) with patch.object( db_session, "commit", side_effect=IntegrityError("", {}, Exception()), ): with pytest.raises(ValueError, match="Database integrity error"): await sprint.create(db_session, obj_in=sprint_data) @pytest.mark.asyncio async def test_create_sprint_unexpected_error(self, db_session, test_project): """Test sprint creation with unexpected error.""" sprint_data = SprintCreate( project_id=test_project.id, name="Test Sprint", number=1, start_date=date.today(), end_date=date.today() + timedelta(days=14), ) with patch.object( db_session, "commit", side_effect=RuntimeError("Unexpected error"), ): with pytest.raises(RuntimeError, match="Unexpected error"): await sprint.create(db_session, obj_in=sprint_data) class TestSprintGetWithDetails: """Tests for getting sprint with details.""" @pytest.mark.asyncio async def test_get_with_details_not_found(self, db_session): """Test getting non-existent sprint with details.""" result = await sprint.get_with_details(db_session, sprint_id=uuid.uuid4()) assert result is None @pytest.mark.asyncio async def test_get_with_details_success(self, db_session, test_sprint): """Test getting sprint with details.""" result = await sprint.get_with_details(db_session, sprint_id=test_sprint.id) assert result is not None assert result["sprint"].id == test_sprint.id assert "project_name" in result assert "issue_count" in result @pytest.mark.asyncio async def test_get_with_details_db_error(self, db_session, test_sprint): """Test getting sprint with details when DB error occurs.""" with patch.object( db_session, "execute", side_effect=OperationalError("Connection lost", {}, Exception()), ): with pytest.raises(OperationalError): await sprint.get_with_details(db_session, sprint_id=test_sprint.id) class TestSprintGetByProject: """Tests for getting sprints by project.""" @pytest.mark.asyncio async def test_get_by_project_with_status_filter( self, db_session, test_project, test_sprint ): """Test getting sprints with status filter.""" sprints, total = await sprint.get_by_project( db_session, project_id=test_project.id, status=SprintStatus.PLANNED, ) assert len(sprints) == 1 assert sprints[0].status == SprintStatus.PLANNED @pytest.mark.asyncio async def test_get_by_project_db_error(self, db_session, test_project): """Test getting sprints when DB error occurs.""" with patch.object( db_session, "execute", side_effect=OperationalError("Connection lost", {}, Exception()), ): with pytest.raises(OperationalError): await sprint.get_by_project(db_session, project_id=test_project.id) class TestSprintActiveOperations: """Tests for active sprint operations.""" @pytest.mark.asyncio async def test_get_active_sprint_none(self, db_session, test_project, test_sprint): """Test getting active sprint when none exists.""" result = await sprint.get_active_sprint(db_session, project_id=test_project.id) assert result is None @pytest.mark.asyncio async def test_get_active_sprint_db_error(self, db_session, test_project): """Test getting active sprint when DB error occurs.""" with patch.object( db_session, "execute", side_effect=OperationalError("Connection lost", {}, Exception()), ): with pytest.raises(OperationalError): await sprint.get_active_sprint(db_session, project_id=test_project.id) class TestSprintNumberOperations: """Tests for sprint number operations.""" @pytest.mark.asyncio async def test_get_next_sprint_number_empty(self, db_session, test_project): """Test getting next sprint number for project with no sprints.""" result = await sprint.get_next_sprint_number( db_session, project_id=test_project.id ) assert result == 1 @pytest.mark.asyncio async def test_get_next_sprint_number_with_existing( self, db_session, test_project, test_sprint ): """Test getting next sprint number with existing sprints.""" result = await sprint.get_next_sprint_number( db_session, project_id=test_project.id ) assert result == 2 @pytest.mark.asyncio async def test_get_next_sprint_number_db_error(self, db_session, test_project): """Test getting next sprint number when DB error occurs.""" with patch.object( db_session, "execute", side_effect=OperationalError("Connection lost", {}, Exception()), ): with pytest.raises(OperationalError): await sprint.get_next_sprint_number( db_session, project_id=test_project.id ) class TestSprintLifecycle: """Tests for sprint lifecycle operations.""" @pytest.mark.asyncio async def test_start_sprint_not_found(self, db_session): """Test starting non-existent sprint.""" result = await sprint.start_sprint(db_session, sprint_id=uuid.uuid4()) assert result is None @pytest.mark.asyncio async def test_start_sprint_invalid_status(self, db_session, test_project): """Test starting sprint with invalid status.""" # Create an active sprint active_sprint = Sprint( id=uuid.uuid4(), project_id=test_project.id, name="Active Sprint", number=1, status=SprintStatus.ACTIVE, start_date=date.today(), end_date=date.today() + timedelta(days=14), ) db_session.add(active_sprint) await db_session.commit() with pytest.raises(ValueError, match="Cannot start sprint with status"): await sprint.start_sprint(db_session, sprint_id=active_sprint.id) @pytest.mark.asyncio async def test_start_sprint_with_existing_active(self, db_session, test_project): """Test starting sprint when another is already active.""" # Create active sprint active_sprint = Sprint( id=uuid.uuid4(), project_id=test_project.id, name="Active Sprint", number=1, status=SprintStatus.ACTIVE, start_date=date.today(), end_date=date.today() + timedelta(days=14), ) db_session.add(active_sprint) # Create planned sprint planned_sprint = Sprint( id=uuid.uuid4(), project_id=test_project.id, name="Planned Sprint", number=2, status=SprintStatus.PLANNED, start_date=date.today() + timedelta(days=15), end_date=date.today() + timedelta(days=29), ) db_session.add(planned_sprint) await db_session.commit() with pytest.raises(ValueError, match="Project already has an active sprint"): await sprint.start_sprint(db_session, sprint_id=planned_sprint.id) @pytest.mark.asyncio async def test_start_sprint_db_error(self, db_session, test_sprint): """Test starting sprint when DB error occurs.""" with patch.object( db_session, "commit", side_effect=OperationalError("Connection lost", {}, Exception()), ): with pytest.raises(OperationalError): await sprint.start_sprint(db_session, sprint_id=test_sprint.id) @pytest.mark.asyncio async def test_complete_sprint_not_found(self, db_session): """Test completing non-existent sprint.""" result = await sprint.complete_sprint(db_session, sprint_id=uuid.uuid4()) assert result is None @pytest.mark.asyncio async def test_complete_sprint_invalid_status(self, db_session, test_sprint): """Test completing sprint with invalid status (PLANNED).""" with pytest.raises(ValueError, match="Cannot complete sprint with status"): await sprint.complete_sprint(db_session, sprint_id=test_sprint.id) @pytest.mark.asyncio async def test_complete_sprint_db_error(self, db_session, test_project): """Test completing sprint when DB error occurs.""" # Create active sprint active_sprint = Sprint( id=uuid.uuid4(), project_id=test_project.id, name="Active Sprint", number=1, status=SprintStatus.ACTIVE, start_date=date.today(), end_date=date.today() + timedelta(days=14), ) db_session.add(active_sprint) await db_session.commit() with patch.object( db_session, "commit", side_effect=OperationalError("Connection lost", {}, Exception()), ): with pytest.raises(OperationalError): await sprint.complete_sprint(db_session, sprint_id=active_sprint.id) @pytest.mark.asyncio async def test_cancel_sprint_not_found(self, db_session): """Test cancelling non-existent sprint.""" result = await sprint.cancel_sprint(db_session, sprint_id=uuid.uuid4()) assert result is None @pytest.mark.asyncio async def test_cancel_sprint_invalid_status(self, db_session, test_project): """Test cancelling sprint with invalid status (COMPLETED).""" completed_sprint = Sprint( id=uuid.uuid4(), project_id=test_project.id, name="Completed Sprint", number=1, status=SprintStatus.COMPLETED, start_date=date.today() - timedelta(days=14), end_date=date.today(), ) db_session.add(completed_sprint) await db_session.commit() with pytest.raises(ValueError, match="Cannot cancel sprint with status"): await sprint.cancel_sprint(db_session, sprint_id=completed_sprint.id) @pytest.mark.asyncio async def test_cancel_sprint_success(self, db_session, test_sprint): """Test successfully cancelling a planned sprint.""" result = await sprint.cancel_sprint(db_session, sprint_id=test_sprint.id) assert result is not None assert result.status == SprintStatus.CANCELLED @pytest.mark.asyncio async def test_cancel_sprint_db_error(self, db_session, test_sprint): """Test cancelling sprint when DB error occurs.""" with patch.object( db_session, "commit", side_effect=OperationalError("Connection lost", {}, Exception()), ): with pytest.raises(OperationalError): await sprint.cancel_sprint(db_session, sprint_id=test_sprint.id) class TestSprintVelocity: """Tests for velocity operations.""" @pytest.mark.asyncio async def test_get_velocity_empty(self, db_session, test_project): """Test getting velocity with no completed sprints.""" result = await sprint.get_velocity(db_session, project_id=test_project.id) assert result == [] @pytest.mark.asyncio async def test_get_velocity_with_data(self, db_session, test_project): """Test getting velocity with completed sprints.""" completed_sprint = Sprint( id=uuid.uuid4(), project_id=test_project.id, name="Completed Sprint", number=1, status=SprintStatus.COMPLETED, start_date=date.today() - timedelta(days=14), end_date=date.today(), planned_points=20, velocity=18, ) db_session.add(completed_sprint) await db_session.commit() result = await sprint.get_velocity(db_session, project_id=test_project.id) assert len(result) == 1 assert result[0]["sprint_number"] == 1 assert result[0]["velocity"] == 18 assert result[0]["velocity_ratio"] == 0.9 @pytest.mark.asyncio async def test_get_velocity_db_error(self, db_session, test_project): """Test getting velocity when DB error occurs.""" with patch.object( db_session, "execute", side_effect=OperationalError("Connection lost", {}, Exception()), ): with pytest.raises(OperationalError): await sprint.get_velocity(db_session, project_id=test_project.id) class TestSprintWithIssueCounts: """Tests for sprints with issue counts.""" @pytest.mark.asyncio async def test_get_sprints_with_issue_counts_empty(self, db_session, test_project): """Test getting sprints with issue counts when no sprints exist.""" results, total = await sprint.get_sprints_with_issue_counts( db_session, project_id=test_project.id ) assert results == [] assert total == 0 @pytest.mark.asyncio async def test_get_sprints_with_issue_counts_success( self, db_session, test_project, test_sprint ): """Test getting sprints with issue counts.""" # Add some issues to the sprint issue1 = Issue( id=uuid.uuid4(), project_id=test_project.id, sprint_id=test_sprint.id, title="Issue 1", status=IssueStatus.OPEN, ) issue2 = Issue( id=uuid.uuid4(), project_id=test_project.id, sprint_id=test_sprint.id, title="Issue 2", status=IssueStatus.CLOSED, ) db_session.add_all([issue1, issue2]) await db_session.commit() results, total = await sprint.get_sprints_with_issue_counts( db_session, project_id=test_project.id ) assert len(results) == 1 assert results[0]["issue_count"] == 2 assert results[0]["open_issues"] == 1 assert results[0]["completed_issues"] == 1 @pytest.mark.asyncio async def test_get_sprints_with_issue_counts_db_error( self, db_session, test_project, test_sprint ): """Test getting sprints with issue counts when DB error occurs.""" with patch.object( db_session, "execute", side_effect=OperationalError("Connection lost", {}, Exception()), ): with pytest.raises(OperationalError): await sprint.get_sprints_with_issue_counts( db_session, project_id=test_project.id )