- Added tests for OAuth provider admin and consent endpoints covering edge cases. - Extended agent-related tests to handle incorrect project associations and lifecycle state transitions. - Introduced tests for sprint status transitions and validation checks. - Improved multiline formatting consistency across all test functions.
273 lines
9.7 KiB
Python
273 lines
9.7 KiB
Python
# tests/crud/syndarix/test_project.py
|
|
"""Tests for Project CRUD operations."""
|
|
|
|
import uuid
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlalchemy.exc import IntegrityError, OperationalError
|
|
|
|
from app.crud.syndarix.project import project
|
|
from app.models.syndarix import Project
|
|
from app.models.syndarix.enums import ProjectStatus
|
|
from app.schemas.syndarix import ProjectCreate
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def db_session(async_test_db):
|
|
"""Create a database session for tests."""
|
|
_, AsyncTestingSessionLocal = async_test_db
|
|
async with AsyncTestingSessionLocal() as session:
|
|
yield session
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def test_project(db_session):
|
|
"""Create a test project."""
|
|
proj = Project(
|
|
id=uuid.uuid4(),
|
|
name="Test Project",
|
|
slug=f"test-project-{uuid.uuid4().hex[:8]}",
|
|
status=ProjectStatus.ACTIVE,
|
|
)
|
|
db_session.add(proj)
|
|
await db_session.commit()
|
|
await db_session.refresh(proj)
|
|
return proj
|
|
|
|
|
|
class TestProjectGetBySlug:
|
|
"""Tests for getting project by slug."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_slug_not_found(self, db_session):
|
|
"""Test getting non-existent project by slug."""
|
|
result = await project.get_by_slug(db_session, slug="nonexistent")
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_slug_success(self, db_session, test_project):
|
|
"""Test successfully getting project by slug."""
|
|
result = await project.get_by_slug(db_session, slug=test_project.slug)
|
|
assert result is not None
|
|
assert result.id == test_project.id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_slug_db_error(self, db_session):
|
|
"""Test getting project by slug when DB error occurs."""
|
|
with patch.object(
|
|
db_session,
|
|
"execute",
|
|
side_effect=OperationalError("Connection lost", {}, Exception()),
|
|
):
|
|
with pytest.raises(OperationalError):
|
|
await project.get_by_slug(db_session, slug="test")
|
|
|
|
|
|
class TestProjectCreate:
|
|
"""Tests for project creation."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_project_success(self, db_session):
|
|
"""Test successful project creation."""
|
|
project_data = ProjectCreate(
|
|
name="New Project",
|
|
slug=f"new-project-{uuid.uuid4().hex[:8]}",
|
|
)
|
|
created = await project.create(db_session, obj_in=project_data)
|
|
assert created.name == "New Project"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_project_duplicate_slug(self, db_session, test_project):
|
|
"""Test project creation with duplicate slug."""
|
|
project_data = ProjectCreate(
|
|
name="Another Project",
|
|
slug=test_project.slug, # Use existing slug
|
|
)
|
|
|
|
# Mock IntegrityError with slug in the message
|
|
mock_orig = MagicMock()
|
|
mock_orig.__str__ = (
|
|
lambda self: "duplicate key value violates unique constraint on slug"
|
|
)
|
|
|
|
with patch.object(
|
|
db_session,
|
|
"commit",
|
|
side_effect=IntegrityError("", {}, mock_orig),
|
|
):
|
|
with pytest.raises(ValueError, match="already exists"):
|
|
await project.create(db_session, obj_in=project_data)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_project_integrity_error(self, db_session):
|
|
"""Test project creation with general integrity error."""
|
|
project_data = ProjectCreate(
|
|
name="Test Project",
|
|
slug=f"test-{uuid.uuid4().hex[:8]}",
|
|
)
|
|
|
|
# Mock IntegrityError without slug in the message
|
|
mock_orig = MagicMock()
|
|
mock_orig.__str__ = lambda self: "foreign key constraint violation"
|
|
|
|
with patch.object(
|
|
db_session,
|
|
"commit",
|
|
side_effect=IntegrityError("", {}, mock_orig),
|
|
):
|
|
with pytest.raises(ValueError, match="Database integrity error"):
|
|
await project.create(db_session, obj_in=project_data)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_project_unexpected_error(self, db_session):
|
|
"""Test project creation with unexpected error."""
|
|
project_data = ProjectCreate(
|
|
name="Test Project",
|
|
slug=f"test-{uuid.uuid4().hex[:8]}",
|
|
)
|
|
|
|
with patch.object(
|
|
db_session,
|
|
"commit",
|
|
side_effect=RuntimeError("Unexpected error"),
|
|
):
|
|
with pytest.raises(RuntimeError, match="Unexpected error"):
|
|
await project.create(db_session, obj_in=project_data)
|
|
|
|
|
|
class TestProjectGetMultiWithFilters:
|
|
"""Tests for getting projects with filters."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_multi_with_filters_success(self, db_session, test_project):
|
|
"""Test successfully getting projects with filters."""
|
|
_results, total = await project.get_multi_with_filters(db_session)
|
|
assert total >= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_multi_with_filters_db_error(self, db_session):
|
|
"""Test getting projects when DB error occurs."""
|
|
with patch.object(
|
|
db_session,
|
|
"execute",
|
|
side_effect=OperationalError("Connection lost", {}, Exception()),
|
|
):
|
|
with pytest.raises(OperationalError):
|
|
await project.get_multi_with_filters(db_session)
|
|
|
|
|
|
class TestProjectGetWithCounts:
|
|
"""Tests for getting project with counts."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_with_counts_not_found(self, db_session):
|
|
"""Test getting non-existent project with counts."""
|
|
result = await project.get_with_counts(db_session, project_id=uuid.uuid4())
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_with_counts_success(self, db_session, test_project):
|
|
"""Test successfully getting project with counts."""
|
|
result = await project.get_with_counts(db_session, project_id=test_project.id)
|
|
assert result is not None
|
|
assert result["project"].id == test_project.id
|
|
assert result["agent_count"] == 0
|
|
assert result["issue_count"] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_with_counts_db_error(self, db_session, test_project):
|
|
"""Test getting project with counts when DB error occurs."""
|
|
with patch.object(
|
|
db_session,
|
|
"execute",
|
|
side_effect=OperationalError("Connection lost", {}, Exception()),
|
|
):
|
|
with pytest.raises(OperationalError):
|
|
await project.get_with_counts(db_session, project_id=test_project.id)
|
|
|
|
|
|
class TestProjectGetMultiWithCounts:
|
|
"""Tests for getting projects with counts."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_multi_with_counts_empty(self, db_session):
|
|
"""Test getting projects with counts when none match."""
|
|
results, total = await project.get_multi_with_counts(
|
|
db_session,
|
|
search="nonexistent-xyz-query",
|
|
)
|
|
assert results == []
|
|
assert total == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_multi_with_counts_success(self, db_session, test_project):
|
|
"""Test successfully getting projects with counts."""
|
|
results, total = await project.get_multi_with_counts(db_session)
|
|
assert total >= 1
|
|
assert len(results) >= 1
|
|
assert "project" in results[0]
|
|
assert "agent_count" in results[0]
|
|
assert "issue_count" in results[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_multi_with_counts_db_error(self, db_session, test_project):
|
|
"""Test getting projects with counts when DB error occurs."""
|
|
with patch.object(
|
|
db_session,
|
|
"execute",
|
|
side_effect=OperationalError("Connection lost", {}, Exception()),
|
|
):
|
|
with pytest.raises(OperationalError):
|
|
await project.get_multi_with_counts(db_session)
|
|
|
|
|
|
class TestProjectGetByOwner:
|
|
"""Tests for getting projects by owner."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_projects_by_owner_empty(self, db_session):
|
|
"""Test getting projects by owner when none exist."""
|
|
results = await project.get_projects_by_owner(db_session, owner_id=uuid.uuid4())
|
|
assert results == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_projects_by_owner_db_error(self, db_session):
|
|
"""Test getting projects by owner when DB error occurs."""
|
|
with patch.object(
|
|
db_session,
|
|
"execute",
|
|
side_effect=OperationalError("Connection lost", {}, Exception()),
|
|
):
|
|
with pytest.raises(OperationalError):
|
|
await project.get_projects_by_owner(db_session, owner_id=uuid.uuid4())
|
|
|
|
|
|
class TestProjectArchive:
|
|
"""Tests for archiving projects."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_archive_project_not_found(self, db_session):
|
|
"""Test archiving non-existent project."""
|
|
result = await project.archive_project(db_session, project_id=uuid.uuid4())
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_archive_project_success(self, db_session, test_project):
|
|
"""Test successfully archiving project."""
|
|
result = await project.archive_project(db_session, project_id=test_project.id)
|
|
assert result is not None
|
|
assert result.status == ProjectStatus.ARCHIVED
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_archive_project_db_error(self, db_session, test_project):
|
|
"""Test archiving project when DB error occurs."""
|
|
with patch.object(
|
|
db_session,
|
|
"commit",
|
|
side_effect=OperationalError("Connection lost", {}, Exception()),
|
|
):
|
|
with pytest.raises(OperationalError):
|
|
await project.archive_project(db_session, project_id=test_project.id)
|