- Added tests for OAuth provider admin and consent endpoints covering edge cases. - Extended agent-related tests to handle incorrect project associations and lifecycle state transitions. - Introduced tests for sprint status transitions and validation checks. - Improved multiline formatting consistency across all test functions.
541 lines
19 KiB
Python
541 lines
19 KiB
Python
# tests/crud/syndarix/test_sprint_crud.py
|
|
"""
|
|
Tests for Sprint CRUD operations.
|
|
"""
|
|
|
|
import uuid
|
|
from datetime import date, timedelta
|
|
|
|
import pytest
|
|
|
|
from app.crud.syndarix import sprint as sprint_crud
|
|
from app.models.syndarix import SprintStatus
|
|
from app.schemas.syndarix import SprintCreate, SprintUpdate
|
|
|
|
|
|
class TestSprintCreate:
|
|
"""Tests for sprint creation."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_sprint_success(self, async_test_db, test_project_crud):
|
|
"""Test successfully creating a sprint."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Sprint 1",
|
|
number=1,
|
|
goal="Complete initial setup",
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
status=SprintStatus.PLANNED,
|
|
planned_points=21,
|
|
)
|
|
result = await sprint_crud.create(session, obj_in=sprint_data)
|
|
|
|
assert result.id is not None
|
|
assert result.name == "Sprint 1"
|
|
assert result.number == 1
|
|
assert result.goal == "Complete initial setup"
|
|
assert result.status == SprintStatus.PLANNED
|
|
assert result.planned_points == 21
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_sprint_minimal(self, async_test_db, test_project_crud):
|
|
"""Test creating sprint with minimal fields."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Minimal Sprint",
|
|
number=1,
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
)
|
|
result = await sprint_crud.create(session, obj_in=sprint_data)
|
|
|
|
assert result.name == "Minimal Sprint"
|
|
assert result.status == SprintStatus.PLANNED # Default
|
|
assert result.goal is None
|
|
assert result.planned_points is None
|
|
|
|
|
|
class TestSprintRead:
|
|
"""Tests for sprint read operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_sprint_by_id(self, async_test_db, test_sprint_crud):
|
|
"""Test getting sprint by ID."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
result = await sprint_crud.get(session, id=str(test_sprint_crud.id))
|
|
|
|
assert result is not None
|
|
assert result.id == test_sprint_crud.id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_sprint_by_id_not_found(self, async_test_db):
|
|
"""Test getting non-existent sprint returns None."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
result = await sprint_crud.get(session, id=str(uuid.uuid4()))
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_with_details(self, async_test_db, test_sprint_crud):
|
|
"""Test getting sprint with related details."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
result = await sprint_crud.get_with_details(
|
|
session,
|
|
sprint_id=test_sprint_crud.id,
|
|
)
|
|
|
|
assert result is not None
|
|
assert result["sprint"].id == test_sprint_crud.id
|
|
assert result["project_name"] is not None
|
|
assert "issue_count" in result
|
|
assert "open_issues" in result
|
|
assert "completed_issues" in result
|
|
|
|
|
|
class TestSprintUpdate:
|
|
"""Tests for sprint update operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_sprint_basic_fields(self, async_test_db, test_sprint_crud):
|
|
"""Test updating basic sprint fields."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint = await sprint_crud.get(session, id=str(test_sprint_crud.id))
|
|
|
|
update_data = SprintUpdate(
|
|
name="Updated Sprint Name",
|
|
goal="Updated goal",
|
|
)
|
|
result = await sprint_crud.update(
|
|
session, db_obj=sprint, obj_in=update_data
|
|
)
|
|
|
|
assert result.name == "Updated Sprint Name"
|
|
assert result.goal == "Updated goal"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_sprint_dates(self, async_test_db, test_sprint_crud):
|
|
"""Test updating sprint dates."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint = await sprint_crud.get(session, id=str(test_sprint_crud.id))
|
|
|
|
update_data = SprintUpdate(
|
|
start_date=today + timedelta(days=1),
|
|
end_date=today + timedelta(days=21),
|
|
)
|
|
result = await sprint_crud.update(
|
|
session, db_obj=sprint, obj_in=update_data
|
|
)
|
|
|
|
assert result.start_date == today + timedelta(days=1)
|
|
assert result.end_date == today + timedelta(days=21)
|
|
|
|
|
|
class TestSprintLifecycle:
|
|
"""Tests for sprint lifecycle operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_sprint(self, async_test_db, test_sprint_crud):
|
|
"""Test starting a planned sprint."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
result = await sprint_crud.start_sprint(
|
|
session,
|
|
sprint_id=test_sprint_crud.id,
|
|
)
|
|
|
|
assert result is not None
|
|
assert result.status == SprintStatus.ACTIVE
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_sprint_with_custom_date(
|
|
self, async_test_db, test_project_crud
|
|
):
|
|
"""Test starting sprint with custom start date."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
# Create a planned sprint
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Start Date Sprint",
|
|
number=10,
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
status=SprintStatus.PLANNED,
|
|
)
|
|
created = await sprint_crud.create(session, obj_in=sprint_data)
|
|
sprint_id = created.id
|
|
|
|
# Start with custom date
|
|
new_start = today + timedelta(days=2)
|
|
async with AsyncTestingSessionLocal() as session:
|
|
result = await sprint_crud.start_sprint(
|
|
session,
|
|
sprint_id=sprint_id,
|
|
start_date=new_start,
|
|
)
|
|
|
|
assert result.status == SprintStatus.ACTIVE
|
|
assert result.start_date == new_start
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_sprint_already_active_fails(
|
|
self, async_test_db, test_project_crud
|
|
):
|
|
"""Test starting an already active sprint raises ValueError."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
# Create and start a sprint
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Already Active Sprint",
|
|
number=20,
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
status=SprintStatus.ACTIVE,
|
|
)
|
|
created = await sprint_crud.create(session, obj_in=sprint_data)
|
|
sprint_id = created.id
|
|
|
|
# Try to start again
|
|
async with AsyncTestingSessionLocal() as session:
|
|
with pytest.raises(ValueError) as exc_info:
|
|
await sprint_crud.start_sprint(session, sprint_id=sprint_id)
|
|
|
|
assert "cannot start sprint" in str(exc_info.value).lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_sprint(self, async_test_db, test_project_crud):
|
|
"""Test completing an active sprint."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
# Create an active sprint
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Complete Me Sprint",
|
|
number=30,
|
|
start_date=today - timedelta(days=14),
|
|
end_date=today,
|
|
status=SprintStatus.ACTIVE,
|
|
planned_points=21,
|
|
)
|
|
created = await sprint_crud.create(session, obj_in=sprint_data)
|
|
sprint_id = created.id
|
|
|
|
# Complete
|
|
async with AsyncTestingSessionLocal() as session:
|
|
result = await sprint_crud.complete_sprint(session, sprint_id=sprint_id)
|
|
|
|
assert result is not None
|
|
assert result.status == SprintStatus.COMPLETED
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_planned_sprint_fails(
|
|
self, async_test_db, test_project_crud
|
|
):
|
|
"""Test completing a planned sprint raises ValueError."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Planned Sprint",
|
|
number=40,
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
status=SprintStatus.PLANNED,
|
|
)
|
|
created = await sprint_crud.create(session, obj_in=sprint_data)
|
|
sprint_id = created.id
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
with pytest.raises(ValueError) as exc_info:
|
|
await sprint_crud.complete_sprint(session, sprint_id=sprint_id)
|
|
|
|
assert "cannot complete sprint" in str(exc_info.value).lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_sprint(self, async_test_db, test_project_crud):
|
|
"""Test cancelling a sprint."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Cancel Me Sprint",
|
|
number=50,
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
status=SprintStatus.ACTIVE,
|
|
)
|
|
created = await sprint_crud.create(session, obj_in=sprint_data)
|
|
sprint_id = created.id
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
result = await sprint_crud.cancel_sprint(session, sprint_id=sprint_id)
|
|
|
|
assert result is not None
|
|
assert result.status == SprintStatus.CANCELLED
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_completed_sprint_fails(
|
|
self, async_test_db, test_project_crud
|
|
):
|
|
"""Test cancelling a completed sprint raises ValueError."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Completed Sprint",
|
|
number=60,
|
|
start_date=today - timedelta(days=14),
|
|
end_date=today,
|
|
status=SprintStatus.COMPLETED,
|
|
)
|
|
created = await sprint_crud.create(session, obj_in=sprint_data)
|
|
sprint_id = created.id
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
with pytest.raises(ValueError) as exc_info:
|
|
await sprint_crud.cancel_sprint(session, sprint_id=sprint_id)
|
|
|
|
assert "cannot cancel sprint" in str(exc_info.value).lower()
|
|
|
|
|
|
class TestSprintByProject:
|
|
"""Tests for getting sprints by project."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_project(
|
|
self, async_test_db, test_project_crud, test_sprint_crud
|
|
):
|
|
"""Test getting sprints by project."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprints, total = await sprint_crud.get_by_project(
|
|
session,
|
|
project_id=test_project_crud.id,
|
|
)
|
|
|
|
assert total >= 1
|
|
assert all(s.project_id == test_project_crud.id for s in sprints)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_project_with_status(self, async_test_db, test_project_crud):
|
|
"""Test filtering sprints by status."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
# Create sprints with different statuses
|
|
async with AsyncTestingSessionLocal() as session:
|
|
planned_sprint = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Planned Filter Sprint",
|
|
number=70,
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
status=SprintStatus.PLANNED,
|
|
)
|
|
await sprint_crud.create(session, obj_in=planned_sprint)
|
|
|
|
active_sprint = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Active Filter Sprint",
|
|
number=71,
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
status=SprintStatus.ACTIVE,
|
|
)
|
|
await sprint_crud.create(session, obj_in=active_sprint)
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprints, _ = await sprint_crud.get_by_project(
|
|
session,
|
|
project_id=test_project_crud.id,
|
|
status=SprintStatus.ACTIVE,
|
|
)
|
|
|
|
assert all(s.status == SprintStatus.ACTIVE for s in sprints)
|
|
|
|
|
|
class TestSprintActiveSprint:
|
|
"""Tests for active sprint operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_active_sprint(self, async_test_db, test_project_crud):
|
|
"""Test getting active sprint for a project."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
# Create an active sprint
|
|
async with AsyncTestingSessionLocal() as session:
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name="Active Sprint",
|
|
number=80,
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
status=SprintStatus.ACTIVE,
|
|
)
|
|
await sprint_crud.create(session, obj_in=sprint_data)
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
result = await sprint_crud.get_active_sprint(
|
|
session,
|
|
project_id=test_project_crud.id,
|
|
)
|
|
|
|
assert result is not None
|
|
assert result.status == SprintStatus.ACTIVE
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_active_sprint_none(self, async_test_db, test_project_crud):
|
|
"""Test getting active sprint when none exists."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
# Note: test_sprint_crud has PLANNED status by default
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
result = await sprint_crud.get_active_sprint(
|
|
session,
|
|
project_id=test_project_crud.id,
|
|
)
|
|
|
|
# May or may not be None depending on other tests
|
|
if result is not None:
|
|
assert result.status == SprintStatus.ACTIVE
|
|
|
|
|
|
class TestSprintNextNumber:
|
|
"""Tests for getting next sprint number."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_next_sprint_number(self, async_test_db, test_project_crud):
|
|
"""Test getting next sprint number."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
# Create sprints with numbers
|
|
async with AsyncTestingSessionLocal() as session:
|
|
for i in range(1, 4):
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name=f"Number Sprint {i}",
|
|
number=i,
|
|
start_date=today,
|
|
end_date=today + timedelta(days=14),
|
|
)
|
|
await sprint_crud.create(session, obj_in=sprint_data)
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
next_number = await sprint_crud.get_next_sprint_number(
|
|
session,
|
|
project_id=test_project_crud.id,
|
|
)
|
|
|
|
assert next_number >= 4
|
|
|
|
|
|
class TestSprintVelocity:
|
|
"""Tests for sprint velocity operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_velocity(self, async_test_db, test_project_crud):
|
|
"""Test getting velocity data for completed sprints."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
today = date.today()
|
|
|
|
# Create completed sprints with points
|
|
async with AsyncTestingSessionLocal() as session:
|
|
for i in range(1, 4):
|
|
sprint_data = SprintCreate(
|
|
project_id=test_project_crud.id,
|
|
name=f"Velocity Sprint {i}",
|
|
number=100 + i,
|
|
start_date=today - timedelta(days=14 * i),
|
|
end_date=today - timedelta(days=14 * (i - 1)),
|
|
status=SprintStatus.COMPLETED,
|
|
planned_points=20,
|
|
velocity=15 + i,
|
|
)
|
|
await sprint_crud.create(session, obj_in=sprint_data)
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
velocity_data = await sprint_crud.get_velocity(
|
|
session,
|
|
project_id=test_project_crud.id,
|
|
limit=5,
|
|
)
|
|
|
|
assert len(velocity_data) >= 1
|
|
for data in velocity_data:
|
|
assert "sprint_number" in data
|
|
assert "sprint_name" in data
|
|
assert "planned_points" in data
|
|
assert "velocity" in data
|
|
assert "velocity_ratio" in data
|
|
|
|
|
|
class TestSprintWithIssueCounts:
|
|
"""Tests for getting sprints with issue counts."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_sprints_with_issue_counts(
|
|
self, async_test_db, test_project_crud, test_sprint_crud
|
|
):
|
|
"""Test getting sprints with issue counts."""
|
|
_test_engine, AsyncTestingSessionLocal = async_test_db
|
|
|
|
async with AsyncTestingSessionLocal() as session:
|
|
results, total = await sprint_crud.get_sprints_with_issue_counts(
|
|
session,
|
|
project_id=test_project_crud.id,
|
|
)
|
|
|
|
assert total >= 1
|
|
for result in results:
|
|
assert "sprint" in result
|
|
assert "issue_count" in result
|
|
assert "open_issues" in result
|
|
assert "completed_issues" in result
|