Files
fast-next-template/backend/tests/crud/syndarix/test_issue.py
Felipe Cardoso 664415111a test(backend): add comprehensive tests for OAuth and agent endpoints
- Added tests for OAuth provider admin and consent endpoints covering edge cases.
- Extended agent-related tests to handle incorrect project associations and lifecycle state transitions.
- Introduced tests for sprint status transitions and validation checks.
- Improved multiline formatting consistency across all test functions.
2026-01-03 01:44:11 +01:00

683 lines
23 KiB
Python

# tests/crud/syndarix/test_issue.py
"""Tests for Issue CRUD operations."""
import uuid
from datetime import UTC, datetime
from unittest.mock import MagicMock, patch
import pytest
import pytest_asyncio
from sqlalchemy.exc import IntegrityError, OperationalError
from app.crud.syndarix.issue import issue
from app.models.syndarix import Issue, Project, Sprint
from app.models.syndarix.enums import (
IssuePriority,
IssueStatus,
ProjectStatus,
SprintStatus,
SyncStatus,
)
from app.schemas.syndarix import IssueCreate
@pytest_asyncio.fixture
async def db_session(async_test_db):
"""Create a database session for tests."""
_, AsyncTestingSessionLocal = async_test_db
async with AsyncTestingSessionLocal() as session:
yield session
@pytest_asyncio.fixture
async def test_project(db_session):
"""Create a test project for issues."""
project = Project(
id=uuid.uuid4(),
name="Test Project",
slug=f"test-project-{uuid.uuid4().hex[:8]}",
status=ProjectStatus.ACTIVE,
)
db_session.add(project)
await db_session.commit()
await db_session.refresh(project)
return project
@pytest_asyncio.fixture
async def test_sprint(db_session, test_project):
"""Create a test sprint."""
from datetime import date
sprint = Sprint(
id=uuid.uuid4(),
project_id=test_project.id,
name="Test Sprint",
number=1,
status=SprintStatus.PLANNED,
start_date=date.today(),
end_date=date.today(),
)
db_session.add(sprint)
await db_session.commit()
await db_session.refresh(sprint)
return sprint
@pytest_asyncio.fixture
async def test_issue(db_session, test_project):
"""Create a test issue."""
issue_obj = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
title="Test Issue",
body="Test issue body",
status=IssueStatus.OPEN,
priority=IssuePriority.MEDIUM,
labels=["bug", "backend"],
)
db_session.add(issue_obj)
await db_session.commit()
await db_session.refresh(issue_obj)
return issue_obj
class TestIssueCreate:
"""Tests for issue creation."""
@pytest.mark.asyncio
async def test_create_issue_success(self, db_session, test_project):
"""Test successful issue creation."""
issue_data = IssueCreate(
project_id=test_project.id,
title="New Issue",
body="Issue description",
status=IssueStatus.OPEN,
priority=IssuePriority.HIGH,
labels=["feature"],
)
created = await issue.create(db_session, obj_in=issue_data)
assert created.title == "New Issue"
assert created.priority == IssuePriority.HIGH
assert created.sync_status == SyncStatus.SYNCED
@pytest.mark.asyncio
async def test_create_issue_with_external_tracker(self, db_session, test_project):
"""Test issue creation with external tracker info."""
issue_data = IssueCreate(
project_id=test_project.id,
title="External Issue",
external_tracker_type="gitea",
external_issue_id="ext-123",
remote_url="https://gitea.example.com/issues/123",
external_issue_number=123,
)
created = await issue.create(db_session, obj_in=issue_data)
assert created.external_tracker_type == "gitea"
assert created.external_issue_id == "ext-123"
@pytest.mark.asyncio
async def test_create_issue_integrity_error(self, db_session, test_project):
"""Test issue creation with integrity error."""
issue_data = IssueCreate(
project_id=test_project.id,
title="Test Issue",
)
# Mock commit to raise IntegrityError
mock_orig = MagicMock()
mock_orig.__str__ = lambda self: "UNIQUE constraint failed"
with patch.object(
db_session,
"commit",
side_effect=IntegrityError("", {}, mock_orig),
):
with pytest.raises(ValueError, match="Database integrity error"):
await issue.create(db_session, obj_in=issue_data)
@pytest.mark.asyncio
async def test_create_issue_unexpected_error(self, db_session, test_project):
"""Test issue creation with unexpected error."""
issue_data = IssueCreate(
project_id=test_project.id,
title="Test Issue",
)
with patch.object(
db_session,
"commit",
side_effect=RuntimeError("Unexpected error"),
):
with pytest.raises(RuntimeError, match="Unexpected error"):
await issue.create(db_session, obj_in=issue_data)
class TestIssueGetWithDetails:
"""Tests for getting issue with details."""
@pytest.mark.asyncio
async def test_get_with_details_not_found(self, db_session):
"""Test getting non-existent issue with details."""
result = await issue.get_with_details(db_session, issue_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_get_with_details_success(self, db_session, test_issue):
"""Test getting issue with details."""
result = await issue.get_with_details(db_session, issue_id=test_issue.id)
assert result is not None
assert result["issue"].id == test_issue.id
assert "project_name" in result
assert "project_slug" in result
@pytest.mark.asyncio
async def test_get_with_details_db_error(self, db_session, test_issue):
"""Test getting issue with details when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.get_with_details(db_session, issue_id=test_issue.id)
class TestIssueGetByProject:
"""Tests for getting issues by project."""
@pytest.mark.asyncio
async def test_get_by_project_with_filters(
self, db_session, test_project, test_issue
):
"""Test getting issues with various filters."""
# Create issue with specific labels
issue2 = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
title="Filtered Issue",
status=IssueStatus.IN_PROGRESS,
priority=IssuePriority.HIGH,
labels=["frontend"],
)
db_session.add(issue2)
await db_session.commit()
# Test status filter
issues, _total = await issue.get_by_project(
db_session,
project_id=test_project.id,
status=IssueStatus.IN_PROGRESS,
)
assert len(issues) == 1
assert issues[0].status == IssueStatus.IN_PROGRESS
# Test priority filter
issues, _total = await issue.get_by_project(
db_session,
project_id=test_project.id,
priority=IssuePriority.HIGH,
)
assert len(issues) == 1
assert issues[0].priority == IssuePriority.HIGH
@pytest.mark.asyncio
@pytest.mark.skip(
reason="Labels filter uses PostgreSQL @> operator, not available in SQLite"
)
async def test_get_by_project_with_labels_filter(
self, db_session, test_project, test_issue
):
"""Test getting issues filtered by labels."""
issues, _total = await issue.get_by_project(
db_session,
project_id=test_project.id,
labels=["bug"],
)
assert len(issues) == 1
assert "bug" in issues[0].labels
@pytest.mark.asyncio
async def test_get_by_project_sort_order_asc(
self, db_session, test_project, test_issue
):
"""Test getting issues with ascending sort order."""
# Create another issue
issue2 = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
title="Second Issue",
status=IssueStatus.OPEN,
)
db_session.add(issue2)
await db_session.commit()
issues, _total = await issue.get_by_project(
db_session,
project_id=test_project.id,
sort_by="created_at",
sort_order="asc",
)
assert len(issues) == 2
# Compare without timezone info since DB may strip it
first_time = (
issues[0].created_at.replace(tzinfo=None)
if issues[0].created_at.tzinfo
else issues[0].created_at
)
second_time = (
issues[1].created_at.replace(tzinfo=None)
if issues[1].created_at.tzinfo
else issues[1].created_at
)
assert first_time <= second_time
@pytest.mark.asyncio
async def test_get_by_project_db_error(self, db_session, test_project):
"""Test getting issues when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.get_by_project(db_session, project_id=test_project.id)
class TestIssueGetBySprint:
"""Tests for getting issues by sprint."""
@pytest.mark.asyncio
async def test_get_by_sprint_with_status(
self, db_session, test_project, test_sprint
):
"""Test getting issues by sprint with status filter."""
# Create issues in sprint
issue1 = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
sprint_id=test_sprint.id,
title="Sprint Issue 1",
status=IssueStatus.OPEN,
)
issue2 = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
sprint_id=test_sprint.id,
title="Sprint Issue 2",
status=IssueStatus.CLOSED,
)
db_session.add_all([issue1, issue2])
await db_session.commit()
# Test status filter
issues = await issue.get_by_sprint(
db_session,
sprint_id=test_sprint.id,
status=IssueStatus.OPEN,
)
assert len(issues) == 1
assert issues[0].status == IssueStatus.OPEN
@pytest.mark.asyncio
async def test_get_by_sprint_db_error(self, db_session, test_sprint):
"""Test getting issues by sprint when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.get_by_sprint(db_session, sprint_id=test_sprint.id)
class TestIssueAssignment:
"""Tests for issue assignment operations."""
@pytest.mark.asyncio
async def test_assign_to_agent_not_found(self, db_session):
"""Test assigning non-existent issue to agent."""
result = await issue.assign_to_agent(
db_session, issue_id=uuid.uuid4(), agent_id=uuid.uuid4()
)
assert result is None
@pytest.mark.asyncio
async def test_assign_to_agent_db_error(self, db_session, test_issue):
"""Test assigning issue to agent when DB error occurs."""
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.assign_to_agent(
db_session, issue_id=test_issue.id, agent_id=uuid.uuid4()
)
@pytest.mark.asyncio
async def test_assign_to_human_not_found(self, db_session):
"""Test assigning non-existent issue to human."""
result = await issue.assign_to_human(
db_session, issue_id=uuid.uuid4(), human_assignee="john@example.com"
)
assert result is None
@pytest.mark.asyncio
async def test_assign_to_human_db_error(self, db_session, test_issue):
"""Test assigning issue to human when DB error occurs."""
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.assign_to_human(
db_session,
issue_id=test_issue.id,
human_assignee="john@example.com",
)
@pytest.mark.asyncio
async def test_unassign_not_found(self, db_session):
"""Test unassigning non-existent issue."""
result = await issue.unassign(db_session, issue_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_unassign_db_error(self, db_session, test_issue):
"""Test unassigning issue when DB error occurs."""
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.unassign(db_session, issue_id=test_issue.id)
class TestIssueStatusChanges:
"""Tests for issue status change operations."""
@pytest.mark.asyncio
async def test_close_issue_not_found(self, db_session):
"""Test closing non-existent issue."""
result = await issue.close_issue(db_session, issue_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_close_issue_db_error(self, db_session, test_issue):
"""Test closing issue when DB error occurs."""
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.close_issue(db_session, issue_id=test_issue.id)
@pytest.mark.asyncio
async def test_reopen_issue_not_found(self, db_session):
"""Test reopening non-existent issue."""
result = await issue.reopen_issue(db_session, issue_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_reopen_issue_db_error(self, db_session, test_issue):
"""Test reopening issue when DB error occurs."""
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.reopen_issue(db_session, issue_id=test_issue.id)
class TestIssueSyncStatus:
"""Tests for issue sync status operations."""
@pytest.mark.asyncio
async def test_update_sync_status_not_found(self, db_session):
"""Test updating sync status for non-existent issue."""
result = await issue.update_sync_status(
db_session,
issue_id=uuid.uuid4(),
sync_status=SyncStatus.SYNCED,
)
assert result is None
@pytest.mark.asyncio
async def test_update_sync_status_with_timestamps(self, db_session, test_issue):
"""Test updating sync status with timestamps."""
now = datetime.now(UTC)
result = await issue.update_sync_status(
db_session,
issue_id=test_issue.id,
sync_status=SyncStatus.SYNCED,
last_synced_at=now,
external_updated_at=now,
)
assert result is not None
assert result.sync_status == SyncStatus.SYNCED
# Compare without timezone info since DB may strip it
assert result.last_synced_at.replace(tzinfo=None) == now.replace(tzinfo=None)
@pytest.mark.asyncio
async def test_update_sync_status_db_error(self, db_session, test_issue):
"""Test updating sync status when DB error occurs."""
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.update_sync_status(
db_session,
issue_id=test_issue.id,
sync_status=SyncStatus.ERROR,
)
class TestIssueStats:
"""Tests for issue statistics."""
@pytest.mark.asyncio
async def test_get_project_stats(self, db_session, test_project, test_issue):
"""Test getting project issue statistics."""
stats = await issue.get_project_stats(db_session, project_id=test_project.id)
assert stats["total"] >= 1
assert "open" in stats
assert "by_priority" in stats
@pytest.mark.asyncio
async def test_get_project_stats_db_error(self, db_session, test_project):
"""Test getting project stats when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.get_project_stats(db_session, project_id=test_project.id)
class TestIssueExternalTracker:
"""Tests for external tracker operations."""
@pytest.mark.asyncio
async def test_get_by_external_id_not_found(self, db_session):
"""Test getting issue by non-existent external ID."""
result = await issue.get_by_external_id(
db_session,
external_tracker_type="gitea",
external_issue_id="nonexistent",
)
assert result is None
@pytest.mark.asyncio
async def test_get_by_external_id_success(self, db_session, test_project):
"""Test getting issue by external ID."""
# Create issue with external tracker
issue_obj = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
title="External Issue",
external_tracker_type="gitea",
external_issue_id="ext-456",
)
db_session.add(issue_obj)
await db_session.commit()
result = await issue.get_by_external_id(
db_session,
external_tracker_type="gitea",
external_issue_id="ext-456",
)
assert result is not None
assert result.external_issue_id == "ext-456"
@pytest.mark.asyncio
async def test_get_by_external_id_db_error(self, db_session):
"""Test getting issue by external ID when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.get_by_external_id(
db_session,
external_tracker_type="gitea",
external_issue_id="test",
)
@pytest.mark.asyncio
async def test_get_pending_sync(self, db_session, test_project):
"""Test getting issues pending sync."""
# Create issue with pending sync
issue_obj = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
title="Pending Sync Issue",
external_tracker_type="gitea",
external_issue_id="ext-789",
sync_status=SyncStatus.PENDING,
)
db_session.add(issue_obj)
await db_session.commit()
# Test without project filter
issues = await issue.get_pending_sync(db_session)
assert len(issues) >= 1
# Test with project filter
issues = await issue.get_pending_sync(db_session, project_id=test_project.id)
assert len(issues) >= 1
@pytest.mark.asyncio
async def test_get_pending_sync_db_error(self, db_session):
"""Test getting pending sync issues when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.get_pending_sync(db_session)
class TestIssueSprintOperations:
"""Tests for sprint-related issue operations."""
@pytest.mark.asyncio
async def test_remove_sprint_from_issues(
self, db_session, test_project, test_sprint
):
"""Test removing sprint from all issues."""
# Create issues in sprint
issue1 = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
sprint_id=test_sprint.id,
title="Sprint Issue 1",
)
issue2 = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
sprint_id=test_sprint.id,
title="Sprint Issue 2",
)
db_session.add_all([issue1, issue2])
await db_session.commit()
count = await issue.remove_sprint_from_issues(
db_session, sprint_id=test_sprint.id
)
assert count == 2
# Verify issues no longer in sprint
await db_session.refresh(issue1)
await db_session.refresh(issue2)
assert issue1.sprint_id is None
assert issue2.sprint_id is None
@pytest.mark.asyncio
async def test_remove_sprint_from_issues_db_error(self, db_session, test_sprint):
"""Test removing sprint from issues when DB error occurs."""
with patch.object(
db_session,
"execute",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.remove_sprint_from_issues(
db_session, sprint_id=test_sprint.id
)
@pytest.mark.asyncio
async def test_remove_from_sprint_not_found(self, db_session):
"""Test removing non-existent issue from sprint."""
result = await issue.remove_from_sprint(db_session, issue_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_remove_from_sprint_success(
self, db_session, test_project, test_sprint
):
"""Test removing issue from sprint."""
issue_obj = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
sprint_id=test_sprint.id,
title="Issue in Sprint",
)
db_session.add(issue_obj)
await db_session.commit()
result = await issue.remove_from_sprint(db_session, issue_id=issue_obj.id)
assert result is not None
assert result.sprint_id is None
@pytest.mark.asyncio
async def test_remove_from_sprint_db_error(
self, db_session, test_project, test_sprint
):
"""Test removing issue from sprint when DB error occurs."""
issue_obj = Issue(
id=uuid.uuid4(),
project_id=test_project.id,
sprint_id=test_sprint.id,
title="Issue in Sprint",
)
db_session.add(issue_obj)
await db_session.commit()
with patch.object(
db_session,
"commit",
side_effect=OperationalError("Connection lost", {}, Exception()),
):
with pytest.raises(OperationalError):
await issue.remove_from_sprint(db_session, issue_id=issue_obj.id)