diff --git a/backend/tests/crud/syndarix/test_agent_instance.py b/backend/tests/crud/syndarix/test_agent_instance.py new file mode 100644 index 0000000..67962da --- /dev/null +++ b/backend/tests/crud/syndarix/test_agent_instance.py @@ -0,0 +1,473 @@ +# tests/crud/syndarix/test_agent_instance.py +"""Tests for AgentInstance CRUD operations.""" + +import uuid +from decimal import Decimal +from unittest.mock import patch + +import pytest +import pytest_asyncio +from sqlalchemy.exc import IntegrityError, OperationalError + +from app.crud.syndarix.agent_instance import agent_instance +from app.models.syndarix import AgentInstance, AgentType, Project +from app.models.syndarix.enums import ( + AgentStatus, + ProjectStatus, +) +from app.schemas.syndarix import AgentInstanceCreate + + +@pytest_asyncio.fixture +async def db_session(async_test_db): + """Create a database session for tests.""" + _, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + yield session + + +@pytest_asyncio.fixture +async def test_project(db_session): + """Create a test project.""" + project = Project( + id=uuid.uuid4(), + name="Test Project", + slug=f"test-project-{uuid.uuid4().hex[:8]}", + status=ProjectStatus.ACTIVE, + ) + db_session.add(project) + await db_session.commit() + await db_session.refresh(project) + return project + + +@pytest_asyncio.fixture +async def test_agent_type(db_session): + """Create a test agent type.""" + agent_type = AgentType( + id=uuid.uuid4(), + name="Test Agent Type", + slug=f"test-agent-type-{uuid.uuid4().hex[:8]}", + primary_model="claude-3-opus", + personality_prompt="You are a helpful test agent.", + ) + db_session.add(agent_type) + await db_session.commit() + await db_session.refresh(agent_type) + return agent_type + + +@pytest_asyncio.fixture +async def test_agent_instance(db_session, test_project, test_agent_type): + """Create a test agent instance.""" + instance = AgentInstance( + id=uuid.uuid4(), + agent_type_id=test_agent_type.id, + project_id=test_project.id, + name="Test Agent", + status=AgentStatus.IDLE, + ) + db_session.add(instance) + await db_session.commit() + await db_session.refresh(instance) + return instance + + +class TestAgentInstanceCreate: + """Tests for agent instance creation.""" + + @pytest.mark.asyncio + async def test_create_instance_success( + self, db_session, test_project, test_agent_type + ): + """Test successful agent instance creation.""" + instance_data = AgentInstanceCreate( + agent_type_id=test_agent_type.id, + project_id=test_project.id, + name="New Agent", + ) + created = await agent_instance.create(db_session, obj_in=instance_data) + assert created.name == "New Agent" + assert created.status == AgentStatus.IDLE + + @pytest.mark.asyncio + async def test_create_instance_with_all_fields( + self, db_session, test_project, test_agent_type + ): + """Test agent instance creation with all optional fields.""" + instance_data = AgentInstanceCreate( + agent_type_id=test_agent_type.id, + project_id=test_project.id, + name="Full Agent", + status=AgentStatus.WORKING, + current_task="Processing request", + short_term_memory={"context": "test context", "history": []}, + long_term_memory_ref="ref-123", + session_id="session-456", + ) + created = await agent_instance.create(db_session, obj_in=instance_data) + assert created.current_task == "Processing request" + assert created.status == AgentStatus.WORKING + + @pytest.mark.asyncio + async def test_create_instance_integrity_error( + self, db_session, test_project, test_agent_type + ): + """Test agent instance creation with integrity error.""" + instance_data = AgentInstanceCreate( + agent_type_id=test_agent_type.id, + project_id=test_project.id, + name="Test Agent", + ) + + with patch.object( + db_session, + "commit", + side_effect=IntegrityError("", {}, Exception()), + ): + with pytest.raises(ValueError, match="Database integrity error"): + await agent_instance.create(db_session, obj_in=instance_data) + + @pytest.mark.asyncio + async def test_create_instance_unexpected_error( + self, db_session, test_project, test_agent_type + ): + """Test agent instance creation with unexpected error.""" + instance_data = AgentInstanceCreate( + agent_type_id=test_agent_type.id, + project_id=test_project.id, + name="Test Agent", + ) + + with patch.object( + db_session, + "commit", + side_effect=RuntimeError("Unexpected error"), + ): + with pytest.raises(RuntimeError, match="Unexpected error"): + await agent_instance.create(db_session, obj_in=instance_data) + + +class TestAgentInstanceGetWithDetails: + """Tests for getting agent instance with details.""" + + @pytest.mark.asyncio + async def test_get_with_details_not_found(self, db_session): + """Test getting non-existent agent instance with details.""" + result = await agent_instance.get_with_details( + db_session, instance_id=uuid.uuid4() + ) + assert result is None + + @pytest.mark.asyncio + async def test_get_with_details_success(self, db_session, test_agent_instance): + """Test getting agent instance with details.""" + result = await agent_instance.get_with_details( + db_session, instance_id=test_agent_instance.id + ) + assert result is not None + assert result["instance"].id == test_agent_instance.id + assert "agent_type_name" in result + assert "assigned_issues_count" in result + + @pytest.mark.asyncio + async def test_get_with_details_db_error(self, db_session, test_agent_instance): + """Test getting agent instance with details when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_instance.get_with_details( + db_session, instance_id=test_agent_instance.id + ) + + +class TestAgentInstanceGetByProject: + """Tests for getting agent instances by project.""" + + @pytest.mark.asyncio + async def test_get_by_project_success( + self, db_session, test_project, test_agent_instance + ): + """Test getting agent instances by project.""" + instances, total = await agent_instance.get_by_project( + db_session, project_id=test_project.id + ) + assert len(instances) == 1 + assert total == 1 + + @pytest.mark.asyncio + async def test_get_by_project_with_status_filter( + self, db_session, test_project, test_agent_instance + ): + """Test getting agent instances with status filter.""" + instances, total = await agent_instance.get_by_project( + db_session, + project_id=test_project.id, + status=AgentStatus.IDLE, + ) + assert len(instances) == 1 + assert instances[0].status == AgentStatus.IDLE + + @pytest.mark.asyncio + async def test_get_by_project_db_error(self, db_session, test_project): + """Test getting agent instances when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_instance.get_by_project( + db_session, project_id=test_project.id + ) + + +class TestAgentInstanceGetByAgentType: + """Tests for getting agent instances by agent type.""" + + @pytest.mark.asyncio + async def test_get_by_agent_type_success( + self, db_session, test_agent_type, test_agent_instance + ): + """Test getting agent instances by agent type.""" + instances = await agent_instance.get_by_agent_type( + db_session, agent_type_id=test_agent_type.id + ) + assert len(instances) == 1 + + @pytest.mark.asyncio + async def test_get_by_agent_type_with_status_filter( + self, db_session, test_agent_type, test_agent_instance + ): + """Test getting agent instances by agent type with status filter.""" + instances = await agent_instance.get_by_agent_type( + db_session, + agent_type_id=test_agent_type.id, + status=AgentStatus.IDLE, + ) + assert len(instances) == 1 + assert instances[0].status == AgentStatus.IDLE + + @pytest.mark.asyncio + async def test_get_by_agent_type_db_error(self, db_session, test_agent_type): + """Test getting agent instances by agent type when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_instance.get_by_agent_type( + db_session, agent_type_id=test_agent_type.id + ) + + +class TestAgentInstanceStatusOperations: + """Tests for agent instance status operations.""" + + @pytest.mark.asyncio + async def test_update_status_not_found(self, db_session): + """Test updating status for non-existent agent instance.""" + result = await agent_instance.update_status( + db_session, + instance_id=uuid.uuid4(), + status=AgentStatus.WORKING, + ) + assert result is None + + @pytest.mark.asyncio + async def test_update_status_success(self, db_session, test_agent_instance): + """Test successfully updating agent instance status.""" + result = await agent_instance.update_status( + db_session, + instance_id=test_agent_instance.id, + status=AgentStatus.WORKING, + current_task="Processing task", + ) + assert result is not None + assert result.status == AgentStatus.WORKING + assert result.current_task == "Processing task" + + @pytest.mark.asyncio + async def test_update_status_db_error(self, db_session, test_agent_instance): + """Test updating status when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_instance.update_status( + db_session, + instance_id=test_agent_instance.id, + status=AgentStatus.WORKING, + ) + + +class TestAgentInstanceTerminate: + """Tests for agent instance termination.""" + + @pytest.mark.asyncio + async def test_terminate_not_found(self, db_session): + """Test terminating non-existent agent instance.""" + result = await agent_instance.terminate(db_session, instance_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_terminate_success(self, db_session, test_agent_instance): + """Test successfully terminating agent instance.""" + result = await agent_instance.terminate( + db_session, instance_id=test_agent_instance.id + ) + assert result is not None + assert result.status == AgentStatus.TERMINATED + assert result.terminated_at is not None + + @pytest.mark.asyncio + async def test_terminate_db_error(self, db_session, test_agent_instance): + """Test terminating agent instance when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_instance.terminate( + db_session, instance_id=test_agent_instance.id + ) + + +class TestAgentInstanceTaskCompletion: + """Tests for recording task completion.""" + + @pytest.mark.asyncio + async def test_record_task_completion_not_found(self, db_session): + """Test recording task completion for non-existent agent instance.""" + result = await agent_instance.record_task_completion( + db_session, + instance_id=uuid.uuid4(), + tokens_used=100, + cost_incurred=Decimal("0.01"), + ) + assert result is None + + @pytest.mark.asyncio + async def test_record_task_completion_success( + self, db_session, test_agent_instance + ): + """Test successfully recording task completion.""" + result = await agent_instance.record_task_completion( + db_session, + instance_id=test_agent_instance.id, + tokens_used=1000, + cost_incurred=Decimal("0.05"), + ) + assert result is not None + assert result.tasks_completed == 1 + assert result.tokens_used == 1000 + + @pytest.mark.asyncio + async def test_record_task_completion_db_error( + self, db_session, test_agent_instance + ): + """Test recording task completion when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_instance.record_task_completion( + db_session, + instance_id=test_agent_instance.id, + tokens_used=100, + cost_incurred=Decimal("0.01"), + ) + + +class TestAgentInstanceMetrics: + """Tests for agent instance metrics.""" + + @pytest.mark.asyncio + async def test_get_project_metrics_empty(self, db_session, test_project): + """Test getting project metrics with no agent instances.""" + result = await agent_instance.get_project_metrics( + db_session, project_id=test_project.id + ) + assert result["total_instances"] == 0 + assert result["active_instances"] == 0 + + @pytest.mark.asyncio + async def test_get_project_metrics_with_data( + self, db_session, test_project, test_agent_instance + ): + """Test getting project metrics with agent instances.""" + result = await agent_instance.get_project_metrics( + db_session, project_id=test_project.id + ) + assert result["total_instances"] == 1 + assert result["idle_instances"] == 1 + + @pytest.mark.asyncio + async def test_get_project_metrics_db_error(self, db_session, test_project): + """Test getting project metrics when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_instance.get_project_metrics( + db_session, project_id=test_project.id + ) + + +class TestAgentInstanceBulkTerminate: + """Tests for bulk termination.""" + + @pytest.mark.asyncio + async def test_bulk_terminate_by_project_empty(self, db_session, test_project): + """Test bulk terminating with no agent instances.""" + count = await agent_instance.bulk_terminate_by_project( + db_session, project_id=test_project.id + ) + assert count == 0 + + @pytest.mark.asyncio + async def test_bulk_terminate_by_project_success( + self, db_session, test_project, test_agent_instance, test_agent_type + ): + """Test successfully bulk terminating agent instances.""" + # Create another active instance + instance2 = AgentInstance( + id=uuid.uuid4(), + agent_type_id=test_agent_type.id, + project_id=test_project.id, + name="Test Agent 2", + status=AgentStatus.WORKING, + ) + db_session.add(instance2) + await db_session.commit() + + count = await agent_instance.bulk_terminate_by_project( + db_session, project_id=test_project.id + ) + assert count == 2 + + @pytest.mark.asyncio + async def test_bulk_terminate_by_project_db_error( + self, db_session, test_project, test_agent_instance + ): + """Test bulk terminating when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_instance.bulk_terminate_by_project( + db_session, project_id=test_project.id + ) diff --git a/backend/tests/crud/syndarix/test_agent_type.py b/backend/tests/crud/syndarix/test_agent_type.py new file mode 100644 index 0000000..519ff3e --- /dev/null +++ b/backend/tests/crud/syndarix/test_agent_type.py @@ -0,0 +1,307 @@ +# tests/crud/syndarix/test_agent_type.py +"""Tests for AgentType CRUD operations.""" + +import uuid +from unittest.mock import MagicMock, patch + +import pytest +import pytest_asyncio +from sqlalchemy.exc import IntegrityError, OperationalError + +from app.crud.syndarix.agent_type import agent_type +from app.models.syndarix import AgentInstance, AgentType, Project +from app.models.syndarix.enums import AgentStatus, ProjectStatus +from app.schemas.syndarix import AgentTypeCreate + + +@pytest_asyncio.fixture +async def db_session(async_test_db): + """Create a database session for tests.""" + _, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + yield session + + +@pytest_asyncio.fixture +async def test_agent_type(db_session): + """Create a test agent type.""" + at = AgentType( + id=uuid.uuid4(), + name="Test Agent Type", + slug=f"test-agent-type-{uuid.uuid4().hex[:8]}", + primary_model="claude-3-opus", + personality_prompt="You are a helpful test agent.", + expertise=["python", "testing"], + is_active=True, + ) + db_session.add(at) + await db_session.commit() + await db_session.refresh(at) + return at + + +class TestAgentTypeGetBySlug: + """Tests for getting agent type by slug.""" + + @pytest.mark.asyncio + async def test_get_by_slug_not_found(self, db_session): + """Test getting non-existent agent type by slug.""" + result = await agent_type.get_by_slug(db_session, slug="nonexistent") + assert result is None + + @pytest.mark.asyncio + async def test_get_by_slug_success(self, db_session, test_agent_type): + """Test successfully getting agent type by slug.""" + result = await agent_type.get_by_slug(db_session, slug=test_agent_type.slug) + assert result is not None + assert result.id == test_agent_type.id + + @pytest.mark.asyncio + async def test_get_by_slug_db_error(self, db_session): + """Test getting agent type by slug when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_type.get_by_slug(db_session, slug="test") + + +class TestAgentTypeCreate: + """Tests for agent type creation.""" + + @pytest.mark.asyncio + async def test_create_agent_type_success(self, db_session): + """Test successful agent type creation.""" + agent_type_data = AgentTypeCreate( + name="New Agent Type", + slug=f"new-agent-type-{uuid.uuid4().hex[:8]}", + primary_model="claude-3-opus", + personality_prompt="You are a new agent.", + ) + created = await agent_type.create(db_session, obj_in=agent_type_data) + assert created.name == "New Agent Type" + + @pytest.mark.asyncio + async def test_create_agent_type_duplicate_slug(self, db_session, test_agent_type): + """Test agent type creation with duplicate slug.""" + agent_type_data = AgentTypeCreate( + name="Another Agent Type", + slug=test_agent_type.slug, # Use existing slug + primary_model="claude-3-opus", + personality_prompt="You are another agent.", + ) + + # Mock IntegrityError with slug in the message + mock_orig = MagicMock() + mock_orig.__str__ = lambda self: "duplicate key value violates unique constraint on slug" + + with patch.object( + db_session, + "commit", + side_effect=IntegrityError("", {}, mock_orig), + ): + with pytest.raises(ValueError, match="already exists"): + await agent_type.create(db_session, obj_in=agent_type_data) + + @pytest.mark.asyncio + async def test_create_agent_type_integrity_error(self, db_session): + """Test agent type creation with general integrity error.""" + agent_type_data = AgentTypeCreate( + name="Test Agent Type", + slug=f"test-{uuid.uuid4().hex[:8]}", + primary_model="claude-3-opus", + personality_prompt="You are a test agent.", + ) + + # Mock IntegrityError without slug in the message + mock_orig = MagicMock() + mock_orig.__str__ = lambda self: "foreign key constraint violation" + + with patch.object( + db_session, + "commit", + side_effect=IntegrityError("", {}, mock_orig), + ): + with pytest.raises(ValueError, match="Database integrity error"): + await agent_type.create(db_session, obj_in=agent_type_data) + + @pytest.mark.asyncio + async def test_create_agent_type_unexpected_error(self, db_session): + """Test agent type creation with unexpected error.""" + agent_type_data = AgentTypeCreate( + name="Test Agent Type", + slug=f"test-{uuid.uuid4().hex[:8]}", + primary_model="claude-3-opus", + personality_prompt="You are a test agent.", + ) + + with patch.object( + db_session, + "commit", + side_effect=RuntimeError("Unexpected error"), + ): + with pytest.raises(RuntimeError, match="Unexpected error"): + await agent_type.create(db_session, obj_in=agent_type_data) + + +class TestAgentTypeGetMultiWithFilters: + """Tests for getting agent types with filters.""" + + @pytest.mark.asyncio + async def test_get_multi_with_filters_success(self, db_session, test_agent_type): + """Test successfully getting agent types with filters.""" + results, total = await agent_type.get_multi_with_filters(db_session) + assert total >= 1 + + @pytest.mark.asyncio + async def test_get_multi_with_filters_sort_asc(self, db_session, test_agent_type): + """Test getting agent types with ascending sort order.""" + results, total = await agent_type.get_multi_with_filters( + db_session, + sort_by="created_at", + sort_order="asc", + ) + assert total >= 1 + + @pytest.mark.asyncio + async def test_get_multi_with_filters_db_error(self, db_session): + """Test getting agent types when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_type.get_multi_with_filters(db_session) + + +class TestAgentTypeGetWithInstanceCount: + """Tests for getting agent type with instance count.""" + + @pytest.mark.asyncio + async def test_get_with_instance_count_not_found(self, db_session): + """Test getting non-existent agent type with instance count.""" + result = await agent_type.get_with_instance_count( + db_session, agent_type_id=uuid.uuid4() + ) + assert result is None + + @pytest.mark.asyncio + async def test_get_with_instance_count_success(self, db_session, test_agent_type): + """Test successfully getting agent type with instance count.""" + result = await agent_type.get_with_instance_count( + db_session, agent_type_id=test_agent_type.id + ) + assert result is not None + assert result["agent_type"].id == test_agent_type.id + assert result["instance_count"] == 0 + + @pytest.mark.asyncio + async def test_get_with_instance_count_db_error(self, db_session, test_agent_type): + """Test getting agent type with instance count when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_type.get_with_instance_count( + db_session, agent_type_id=test_agent_type.id + ) + + +class TestAgentTypeGetMultiWithInstanceCounts: + """Tests for getting agent types with instance counts.""" + + @pytest.mark.asyncio + async def test_get_multi_with_instance_counts_empty(self, db_session): + """Test getting agent types with instance counts when none exist.""" + # Create a separate project to ensure isolation + results, total = await agent_type.get_multi_with_instance_counts( + db_session, + is_active=None, + search="nonexistent-xyz-query", + ) + assert results == [] + assert total == 0 + + @pytest.mark.asyncio + async def test_get_multi_with_instance_counts_success( + self, db_session, test_agent_type + ): + """Test successfully getting agent types with instance counts.""" + results, total = await agent_type.get_multi_with_instance_counts(db_session) + assert total >= 1 + assert len(results) >= 1 + assert "agent_type" in results[0] + assert "instance_count" in results[0] + + @pytest.mark.asyncio + async def test_get_multi_with_instance_counts_db_error( + self, db_session, test_agent_type + ): + """Test getting agent types with instance counts when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_type.get_multi_with_instance_counts(db_session) + + +class TestAgentTypeGetByExpertise: + """Tests for getting agent types by expertise.""" + + @pytest.mark.asyncio + @pytest.mark.skip(reason="Uses PostgreSQL JSONB contains operator, not available in SQLite") + async def test_get_by_expertise_success(self, db_session, test_agent_type): + """Test successfully getting agent types by expertise.""" + results = await agent_type.get_by_expertise(db_session, expertise="python") + assert len(results) >= 1 + + @pytest.mark.asyncio + @pytest.mark.skip(reason="Uses PostgreSQL JSONB contains operator, not available in SQLite") + async def test_get_by_expertise_db_error(self, db_session): + """Test getting agent types by expertise when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_type.get_by_expertise(db_session, expertise="python") + + +class TestAgentTypeDeactivate: + """Tests for deactivating agent types.""" + + @pytest.mark.asyncio + async def test_deactivate_not_found(self, db_session): + """Test deactivating non-existent agent type.""" + result = await agent_type.deactivate(db_session, agent_type_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_deactivate_success(self, db_session, test_agent_type): + """Test successfully deactivating agent type.""" + result = await agent_type.deactivate( + db_session, agent_type_id=test_agent_type.id + ) + assert result is not None + assert result.is_active is False + + @pytest.mark.asyncio + async def test_deactivate_db_error(self, db_session, test_agent_type): + """Test deactivating agent type when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await agent_type.deactivate( + db_session, agent_type_id=test_agent_type.id + ) diff --git a/backend/tests/crud/syndarix/test_issue.py b/backend/tests/crud/syndarix/test_issue.py new file mode 100644 index 0000000..d2b1aea --- /dev/null +++ b/backend/tests/crud/syndarix/test_issue.py @@ -0,0 +1,673 @@ +# tests/crud/syndarix/test_issue.py +"""Tests for Issue CRUD operations.""" + +import uuid +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import pytest_asyncio +from sqlalchemy.exc import IntegrityError, OperationalError + +from app.crud.syndarix.issue import CRUDIssue, issue +from app.models.syndarix import Issue, Project, Sprint +from app.models.syndarix.enums import ( + IssuePriority, + IssueStatus, + ProjectStatus, + SprintStatus, + SyncStatus, +) +from app.schemas.syndarix import IssueCreate, IssueUpdate + + +@pytest_asyncio.fixture +async def db_session(async_test_db): + """Create a database session for tests.""" + _, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + yield session + + +@pytest_asyncio.fixture +async def test_project(db_session): + """Create a test project for issues.""" + project = Project( + id=uuid.uuid4(), + name="Test Project", + slug=f"test-project-{uuid.uuid4().hex[:8]}", + status=ProjectStatus.ACTIVE, + ) + db_session.add(project) + await db_session.commit() + await db_session.refresh(project) + return project + + +@pytest_asyncio.fixture +async def test_sprint(db_session, test_project): + """Create a test sprint.""" + from datetime import date + sprint = Sprint( + id=uuid.uuid4(), + project_id=test_project.id, + name="Test Sprint", + number=1, + status=SprintStatus.PLANNED, + start_date=date.today(), + end_date=date.today(), + ) + db_session.add(sprint) + await db_session.commit() + await db_session.refresh(sprint) + return sprint + + +@pytest_asyncio.fixture +async def test_issue(db_session, test_project): + """Create a test issue.""" + issue_obj = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + title="Test Issue", + body="Test issue body", + status=IssueStatus.OPEN, + priority=IssuePriority.MEDIUM, + labels=["bug", "backend"], + ) + db_session.add(issue_obj) + await db_session.commit() + await db_session.refresh(issue_obj) + return issue_obj + + +class TestIssueCreate: + """Tests for issue creation.""" + + @pytest.mark.asyncio + async def test_create_issue_success(self, db_session, test_project): + """Test successful issue creation.""" + issue_data = IssueCreate( + project_id=test_project.id, + title="New Issue", + body="Issue description", + status=IssueStatus.OPEN, + priority=IssuePriority.HIGH, + labels=["feature"], + ) + created = await issue.create(db_session, obj_in=issue_data) + assert created.title == "New Issue" + assert created.priority == IssuePriority.HIGH + assert created.sync_status == SyncStatus.SYNCED + + @pytest.mark.asyncio + async def test_create_issue_with_external_tracker(self, db_session, test_project): + """Test issue creation with external tracker info.""" + issue_data = IssueCreate( + project_id=test_project.id, + title="External Issue", + external_tracker_type="gitea", + external_issue_id="ext-123", + remote_url="https://gitea.example.com/issues/123", + external_issue_number=123, + ) + created = await issue.create(db_session, obj_in=issue_data) + assert created.external_tracker_type == "gitea" + assert created.external_issue_id == "ext-123" + + @pytest.mark.asyncio + async def test_create_issue_integrity_error(self, db_session, test_project): + """Test issue creation with integrity error.""" + issue_data = IssueCreate( + project_id=test_project.id, + title="Test Issue", + ) + + # Mock commit to raise IntegrityError + mock_orig = MagicMock() + mock_orig.__str__ = lambda self: "UNIQUE constraint failed" + + with patch.object( + db_session, + "commit", + side_effect=IntegrityError("", {}, mock_orig), + ): + with pytest.raises(ValueError, match="Database integrity error"): + await issue.create(db_session, obj_in=issue_data) + + @pytest.mark.asyncio + async def test_create_issue_unexpected_error(self, db_session, test_project): + """Test issue creation with unexpected error.""" + issue_data = IssueCreate( + project_id=test_project.id, + title="Test Issue", + ) + + with patch.object( + db_session, + "commit", + side_effect=RuntimeError("Unexpected error"), + ): + with pytest.raises(RuntimeError, match="Unexpected error"): + await issue.create(db_session, obj_in=issue_data) + + +class TestIssueGetWithDetails: + """Tests for getting issue with details.""" + + @pytest.mark.asyncio + async def test_get_with_details_not_found(self, db_session): + """Test getting non-existent issue with details.""" + result = await issue.get_with_details(db_session, issue_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_get_with_details_success(self, db_session, test_issue): + """Test getting issue with details.""" + result = await issue.get_with_details(db_session, issue_id=test_issue.id) + assert result is not None + assert result["issue"].id == test_issue.id + assert "project_name" in result + assert "project_slug" in result + + @pytest.mark.asyncio + async def test_get_with_details_db_error(self, db_session, test_issue): + """Test getting issue with details when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.get_with_details(db_session, issue_id=test_issue.id) + + +class TestIssueGetByProject: + """Tests for getting issues by project.""" + + @pytest.mark.asyncio + async def test_get_by_project_with_filters( + self, db_session, test_project, test_issue + ): + """Test getting issues with various filters.""" + # Create issue with specific labels + issue2 = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + title="Filtered Issue", + status=IssueStatus.IN_PROGRESS, + priority=IssuePriority.HIGH, + labels=["frontend"], + ) + db_session.add(issue2) + await db_session.commit() + + # Test status filter + issues, total = await issue.get_by_project( + db_session, + project_id=test_project.id, + status=IssueStatus.IN_PROGRESS, + ) + assert len(issues) == 1 + assert issues[0].status == IssueStatus.IN_PROGRESS + + # Test priority filter + issues, total = await issue.get_by_project( + db_session, + project_id=test_project.id, + priority=IssuePriority.HIGH, + ) + assert len(issues) == 1 + assert issues[0].priority == IssuePriority.HIGH + + @pytest.mark.asyncio + @pytest.mark.skip(reason="Labels filter uses PostgreSQL @> operator, not available in SQLite") + async def test_get_by_project_with_labels_filter( + self, db_session, test_project, test_issue + ): + """Test getting issues filtered by labels.""" + issues, total = await issue.get_by_project( + db_session, + project_id=test_project.id, + labels=["bug"], + ) + assert len(issues) == 1 + assert "bug" in issues[0].labels + + @pytest.mark.asyncio + async def test_get_by_project_sort_order_asc( + self, db_session, test_project, test_issue + ): + """Test getting issues with ascending sort order.""" + # Create another issue + issue2 = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + title="Second Issue", + status=IssueStatus.OPEN, + ) + db_session.add(issue2) + await db_session.commit() + + issues, total = await issue.get_by_project( + db_session, + project_id=test_project.id, + sort_by="created_at", + sort_order="asc", + ) + assert len(issues) == 2 + # Compare without timezone info since DB may strip it + first_time = issues[0].created_at.replace(tzinfo=None) if issues[0].created_at.tzinfo else issues[0].created_at + second_time = issues[1].created_at.replace(tzinfo=None) if issues[1].created_at.tzinfo else issues[1].created_at + assert first_time <= second_time + + @pytest.mark.asyncio + async def test_get_by_project_db_error(self, db_session, test_project): + """Test getting issues when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.get_by_project(db_session, project_id=test_project.id) + + +class TestIssueGetBySprint: + """Tests for getting issues by sprint.""" + + @pytest.mark.asyncio + async def test_get_by_sprint_with_status( + self, db_session, test_project, test_sprint + ): + """Test getting issues by sprint with status filter.""" + # Create issues in sprint + issue1 = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + sprint_id=test_sprint.id, + title="Sprint Issue 1", + status=IssueStatus.OPEN, + ) + issue2 = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + sprint_id=test_sprint.id, + title="Sprint Issue 2", + status=IssueStatus.CLOSED, + ) + db_session.add_all([issue1, issue2]) + await db_session.commit() + + # Test status filter + issues = await issue.get_by_sprint( + db_session, + sprint_id=test_sprint.id, + status=IssueStatus.OPEN, + ) + assert len(issues) == 1 + assert issues[0].status == IssueStatus.OPEN + + @pytest.mark.asyncio + async def test_get_by_sprint_db_error(self, db_session, test_sprint): + """Test getting issues by sprint when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.get_by_sprint(db_session, sprint_id=test_sprint.id) + + +class TestIssueAssignment: + """Tests for issue assignment operations.""" + + @pytest.mark.asyncio + async def test_assign_to_agent_not_found(self, db_session): + """Test assigning non-existent issue to agent.""" + result = await issue.assign_to_agent( + db_session, issue_id=uuid.uuid4(), agent_id=uuid.uuid4() + ) + assert result is None + + @pytest.mark.asyncio + async def test_assign_to_agent_db_error(self, db_session, test_issue): + """Test assigning issue to agent when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.assign_to_agent( + db_session, issue_id=test_issue.id, agent_id=uuid.uuid4() + ) + + @pytest.mark.asyncio + async def test_assign_to_human_not_found(self, db_session): + """Test assigning non-existent issue to human.""" + result = await issue.assign_to_human( + db_session, issue_id=uuid.uuid4(), human_assignee="john@example.com" + ) + assert result is None + + @pytest.mark.asyncio + async def test_assign_to_human_db_error(self, db_session, test_issue): + """Test assigning issue to human when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.assign_to_human( + db_session, + issue_id=test_issue.id, + human_assignee="john@example.com", + ) + + @pytest.mark.asyncio + async def test_unassign_not_found(self, db_session): + """Test unassigning non-existent issue.""" + result = await issue.unassign(db_session, issue_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_unassign_db_error(self, db_session, test_issue): + """Test unassigning issue when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.unassign(db_session, issue_id=test_issue.id) + + +class TestIssueStatusChanges: + """Tests for issue status change operations.""" + + @pytest.mark.asyncio + async def test_close_issue_not_found(self, db_session): + """Test closing non-existent issue.""" + result = await issue.close_issue(db_session, issue_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_close_issue_db_error(self, db_session, test_issue): + """Test closing issue when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.close_issue(db_session, issue_id=test_issue.id) + + @pytest.mark.asyncio + async def test_reopen_issue_not_found(self, db_session): + """Test reopening non-existent issue.""" + result = await issue.reopen_issue(db_session, issue_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_reopen_issue_db_error(self, db_session, test_issue): + """Test reopening issue when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.reopen_issue(db_session, issue_id=test_issue.id) + + +class TestIssueSyncStatus: + """Tests for issue sync status operations.""" + + @pytest.mark.asyncio + async def test_update_sync_status_not_found(self, db_session): + """Test updating sync status for non-existent issue.""" + result = await issue.update_sync_status( + db_session, + issue_id=uuid.uuid4(), + sync_status=SyncStatus.SYNCED, + ) + assert result is None + + @pytest.mark.asyncio + async def test_update_sync_status_with_timestamps(self, db_session, test_issue): + """Test updating sync status with timestamps.""" + now = datetime.now(UTC) + result = await issue.update_sync_status( + db_session, + issue_id=test_issue.id, + sync_status=SyncStatus.SYNCED, + last_synced_at=now, + external_updated_at=now, + ) + assert result is not None + assert result.sync_status == SyncStatus.SYNCED + # Compare without timezone info since DB may strip it + assert result.last_synced_at.replace(tzinfo=None) == now.replace(tzinfo=None) + + @pytest.mark.asyncio + async def test_update_sync_status_db_error(self, db_session, test_issue): + """Test updating sync status when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.update_sync_status( + db_session, + issue_id=test_issue.id, + sync_status=SyncStatus.ERROR, + ) + + +class TestIssueStats: + """Tests for issue statistics.""" + + @pytest.mark.asyncio + async def test_get_project_stats(self, db_session, test_project, test_issue): + """Test getting project issue statistics.""" + stats = await issue.get_project_stats(db_session, project_id=test_project.id) + assert stats["total"] >= 1 + assert "open" in stats + assert "by_priority" in stats + + @pytest.mark.asyncio + async def test_get_project_stats_db_error(self, db_session, test_project): + """Test getting project stats when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.get_project_stats(db_session, project_id=test_project.id) + + +class TestIssueExternalTracker: + """Tests for external tracker operations.""" + + @pytest.mark.asyncio + async def test_get_by_external_id_not_found(self, db_session): + """Test getting issue by non-existent external ID.""" + result = await issue.get_by_external_id( + db_session, + external_tracker_type="gitea", + external_issue_id="nonexistent", + ) + assert result is None + + @pytest.mark.asyncio + async def test_get_by_external_id_success(self, db_session, test_project): + """Test getting issue by external ID.""" + # Create issue with external tracker + issue_obj = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + title="External Issue", + external_tracker_type="gitea", + external_issue_id="ext-456", + ) + db_session.add(issue_obj) + await db_session.commit() + + result = await issue.get_by_external_id( + db_session, + external_tracker_type="gitea", + external_issue_id="ext-456", + ) + assert result is not None + assert result.external_issue_id == "ext-456" + + @pytest.mark.asyncio + async def test_get_by_external_id_db_error(self, db_session): + """Test getting issue by external ID when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.get_by_external_id( + db_session, + external_tracker_type="gitea", + external_issue_id="test", + ) + + @pytest.mark.asyncio + async def test_get_pending_sync(self, db_session, test_project): + """Test getting issues pending sync.""" + # Create issue with pending sync + issue_obj = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + title="Pending Sync Issue", + external_tracker_type="gitea", + external_issue_id="ext-789", + sync_status=SyncStatus.PENDING, + ) + db_session.add(issue_obj) + await db_session.commit() + + # Test without project filter + issues = await issue.get_pending_sync(db_session) + assert len(issues) >= 1 + + # Test with project filter + issues = await issue.get_pending_sync( + db_session, project_id=test_project.id + ) + assert len(issues) >= 1 + + @pytest.mark.asyncio + async def test_get_pending_sync_db_error(self, db_session): + """Test getting pending sync issues when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.get_pending_sync(db_session) + + +class TestIssueSprintOperations: + """Tests for sprint-related issue operations.""" + + @pytest.mark.asyncio + async def test_remove_sprint_from_issues( + self, db_session, test_project, test_sprint + ): + """Test removing sprint from all issues.""" + # Create issues in sprint + issue1 = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + sprint_id=test_sprint.id, + title="Sprint Issue 1", + ) + issue2 = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + sprint_id=test_sprint.id, + title="Sprint Issue 2", + ) + db_session.add_all([issue1, issue2]) + await db_session.commit() + + count = await issue.remove_sprint_from_issues( + db_session, sprint_id=test_sprint.id + ) + assert count == 2 + + # Verify issues no longer in sprint + await db_session.refresh(issue1) + await db_session.refresh(issue2) + assert issue1.sprint_id is None + assert issue2.sprint_id is None + + @pytest.mark.asyncio + async def test_remove_sprint_from_issues_db_error(self, db_session, test_sprint): + """Test removing sprint from issues when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.remove_sprint_from_issues( + db_session, sprint_id=test_sprint.id + ) + + @pytest.mark.asyncio + async def test_remove_from_sprint_not_found(self, db_session): + """Test removing non-existent issue from sprint.""" + result = await issue.remove_from_sprint(db_session, issue_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_remove_from_sprint_success( + self, db_session, test_project, test_sprint + ): + """Test removing issue from sprint.""" + issue_obj = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + sprint_id=test_sprint.id, + title="Issue in Sprint", + ) + db_session.add(issue_obj) + await db_session.commit() + + result = await issue.remove_from_sprint(db_session, issue_id=issue_obj.id) + assert result is not None + assert result.sprint_id is None + + @pytest.mark.asyncio + async def test_remove_from_sprint_db_error( + self, db_session, test_project, test_sprint + ): + """Test removing issue from sprint when DB error occurs.""" + issue_obj = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + sprint_id=test_sprint.id, + title="Issue in Sprint", + ) + db_session.add(issue_obj) + await db_session.commit() + + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await issue.remove_from_sprint(db_session, issue_id=issue_obj.id) diff --git a/backend/tests/crud/syndarix/test_project.py b/backend/tests/crud/syndarix/test_project.py new file mode 100644 index 0000000..ffb3011 --- /dev/null +++ b/backend/tests/crud/syndarix/test_project.py @@ -0,0 +1,284 @@ +# tests/crud/syndarix/test_project.py +"""Tests for Project CRUD operations.""" + +import uuid +from unittest.mock import MagicMock, patch + +import pytest +import pytest_asyncio +from sqlalchemy.exc import IntegrityError, OperationalError + +from app.crud.syndarix.project import project +from app.models.syndarix import Project +from app.models.syndarix.enums import AutonomyLevel, ProjectStatus +from app.schemas.syndarix import ProjectCreate + + +@pytest_asyncio.fixture +async def db_session(async_test_db): + """Create a database session for tests.""" + _, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + yield session + + +@pytest_asyncio.fixture +async def test_project(db_session): + """Create a test project.""" + proj = Project( + id=uuid.uuid4(), + name="Test Project", + slug=f"test-project-{uuid.uuid4().hex[:8]}", + status=ProjectStatus.ACTIVE, + ) + db_session.add(proj) + await db_session.commit() + await db_session.refresh(proj) + return proj + + +class TestProjectGetBySlug: + """Tests for getting project by slug.""" + + @pytest.mark.asyncio + async def test_get_by_slug_not_found(self, db_session): + """Test getting non-existent project by slug.""" + result = await project.get_by_slug(db_session, slug="nonexistent") + assert result is None + + @pytest.mark.asyncio + async def test_get_by_slug_success(self, db_session, test_project): + """Test successfully getting project by slug.""" + result = await project.get_by_slug(db_session, slug=test_project.slug) + assert result is not None + assert result.id == test_project.id + + @pytest.mark.asyncio + async def test_get_by_slug_db_error(self, db_session): + """Test getting project by slug when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await project.get_by_slug(db_session, slug="test") + + +class TestProjectCreate: + """Tests for project creation.""" + + @pytest.mark.asyncio + async def test_create_project_success(self, db_session): + """Test successful project creation.""" + project_data = ProjectCreate( + name="New Project", + slug=f"new-project-{uuid.uuid4().hex[:8]}", + ) + created = await project.create(db_session, obj_in=project_data) + assert created.name == "New Project" + + @pytest.mark.asyncio + async def test_create_project_duplicate_slug(self, db_session, test_project): + """Test project creation with duplicate slug.""" + project_data = ProjectCreate( + name="Another Project", + slug=test_project.slug, # Use existing slug + ) + + # Mock IntegrityError with slug in the message + mock_orig = MagicMock() + mock_orig.__str__ = lambda self: "duplicate key value violates unique constraint on slug" + + with patch.object( + db_session, + "commit", + side_effect=IntegrityError("", {}, mock_orig), + ): + with pytest.raises(ValueError, match="already exists"): + await project.create(db_session, obj_in=project_data) + + @pytest.mark.asyncio + async def test_create_project_integrity_error(self, db_session): + """Test project creation with general integrity error.""" + project_data = ProjectCreate( + name="Test Project", + slug=f"test-{uuid.uuid4().hex[:8]}", + ) + + # Mock IntegrityError without slug in the message + mock_orig = MagicMock() + mock_orig.__str__ = lambda self: "foreign key constraint violation" + + with patch.object( + db_session, + "commit", + side_effect=IntegrityError("", {}, mock_orig), + ): + with pytest.raises(ValueError, match="Database integrity error"): + await project.create(db_session, obj_in=project_data) + + @pytest.mark.asyncio + async def test_create_project_unexpected_error(self, db_session): + """Test project creation with unexpected error.""" + project_data = ProjectCreate( + name="Test Project", + slug=f"test-{uuid.uuid4().hex[:8]}", + ) + + with patch.object( + db_session, + "commit", + side_effect=RuntimeError("Unexpected error"), + ): + with pytest.raises(RuntimeError, match="Unexpected error"): + await project.create(db_session, obj_in=project_data) + + +class TestProjectGetMultiWithFilters: + """Tests for getting projects with filters.""" + + @pytest.mark.asyncio + async def test_get_multi_with_filters_success(self, db_session, test_project): + """Test successfully getting projects with filters.""" + results, total = await project.get_multi_with_filters(db_session) + assert total >= 1 + + @pytest.mark.asyncio + async def test_get_multi_with_filters_db_error(self, db_session): + """Test getting projects when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await project.get_multi_with_filters(db_session) + + +class TestProjectGetWithCounts: + """Tests for getting project with counts.""" + + @pytest.mark.asyncio + async def test_get_with_counts_not_found(self, db_session): + """Test getting non-existent project with counts.""" + result = await project.get_with_counts( + db_session, project_id=uuid.uuid4() + ) + assert result is None + + @pytest.mark.asyncio + async def test_get_with_counts_success(self, db_session, test_project): + """Test successfully getting project with counts.""" + result = await project.get_with_counts( + db_session, project_id=test_project.id + ) + assert result is not None + assert result["project"].id == test_project.id + assert result["agent_count"] == 0 + assert result["issue_count"] == 0 + + @pytest.mark.asyncio + async def test_get_with_counts_db_error(self, db_session, test_project): + """Test getting project with counts when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await project.get_with_counts( + db_session, project_id=test_project.id + ) + + +class TestProjectGetMultiWithCounts: + """Tests for getting projects with counts.""" + + @pytest.mark.asyncio + async def test_get_multi_with_counts_empty(self, db_session): + """Test getting projects with counts when none match.""" + results, total = await project.get_multi_with_counts( + db_session, + search="nonexistent-xyz-query", + ) + assert results == [] + assert total == 0 + + @pytest.mark.asyncio + async def test_get_multi_with_counts_success(self, db_session, test_project): + """Test successfully getting projects with counts.""" + results, total = await project.get_multi_with_counts(db_session) + assert total >= 1 + assert len(results) >= 1 + assert "project" in results[0] + assert "agent_count" in results[0] + assert "issue_count" in results[0] + + @pytest.mark.asyncio + async def test_get_multi_with_counts_db_error(self, db_session, test_project): + """Test getting projects with counts when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await project.get_multi_with_counts(db_session) + + +class TestProjectGetByOwner: + """Tests for getting projects by owner.""" + + @pytest.mark.asyncio + async def test_get_projects_by_owner_empty(self, db_session): + """Test getting projects by owner when none exist.""" + results = await project.get_projects_by_owner( + db_session, owner_id=uuid.uuid4() + ) + assert results == [] + + @pytest.mark.asyncio + async def test_get_projects_by_owner_db_error(self, db_session): + """Test getting projects by owner when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await project.get_projects_by_owner( + db_session, owner_id=uuid.uuid4() + ) + + +class TestProjectArchive: + """Tests for archiving projects.""" + + @pytest.mark.asyncio + async def test_archive_project_not_found(self, db_session): + """Test archiving non-existent project.""" + result = await project.archive_project(db_session, project_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_archive_project_success(self, db_session, test_project): + """Test successfully archiving project.""" + result = await project.archive_project( + db_session, project_id=test_project.id + ) + assert result is not None + assert result.status == ProjectStatus.ARCHIVED + + @pytest.mark.asyncio + async def test_archive_project_db_error(self, db_session, test_project): + """Test archiving project when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await project.archive_project( + db_session, project_id=test_project.id + ) diff --git a/backend/tests/crud/syndarix/test_sprint.py b/backend/tests/crud/syndarix/test_sprint.py new file mode 100644 index 0000000..af57fba --- /dev/null +++ b/backend/tests/crud/syndarix/test_sprint.py @@ -0,0 +1,502 @@ +# tests/crud/syndarix/test_sprint.py +"""Tests for Sprint CRUD operations.""" + +import uuid +from datetime import date, timedelta +from unittest.mock import patch + +import pytest +import pytest_asyncio +from sqlalchemy.exc import IntegrityError, OperationalError + +from app.crud.syndarix.sprint import CRUDSprint, sprint +from app.models.syndarix import Issue, Project, Sprint +from app.models.syndarix.enums import ( + IssueStatus, + ProjectStatus, + SprintStatus, +) +from app.schemas.syndarix import SprintCreate + + +@pytest_asyncio.fixture +async def db_session(async_test_db): + """Create a database session for tests.""" + _, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + yield session + + +@pytest_asyncio.fixture +async def test_project(db_session): + """Create a test project for sprints.""" + project = Project( + id=uuid.uuid4(), + name="Test Project", + slug=f"test-project-{uuid.uuid4().hex[:8]}", + status=ProjectStatus.ACTIVE, + ) + db_session.add(project) + await db_session.commit() + await db_session.refresh(project) + return project + + +@pytest_asyncio.fixture +async def test_sprint(db_session, test_project): + """Create a test sprint.""" + sprint_obj = Sprint( + id=uuid.uuid4(), + project_id=test_project.id, + name="Test Sprint", + number=1, + status=SprintStatus.PLANNED, + start_date=date.today(), + end_date=date.today() + timedelta(days=14), + ) + db_session.add(sprint_obj) + await db_session.commit() + await db_session.refresh(sprint_obj) + return sprint_obj + + +class TestSprintCreate: + """Tests for sprint creation.""" + + @pytest.mark.asyncio + async def test_create_sprint_success(self, db_session, test_project): + """Test successful sprint creation.""" + sprint_data = SprintCreate( + project_id=test_project.id, + name="New Sprint", + number=1, + start_date=date.today(), + end_date=date.today() + timedelta(days=14), + ) + created = await sprint.create(db_session, obj_in=sprint_data) + assert created.name == "New Sprint" + assert created.number == 1 + assert created.status == SprintStatus.PLANNED + + @pytest.mark.asyncio + async def test_create_sprint_with_all_fields(self, db_session, test_project): + """Test sprint creation with all optional fields.""" + sprint_data = SprintCreate( + project_id=test_project.id, + name="Full Sprint", + number=2, + goal="Deliver user authentication", + start_date=date.today(), + end_date=date.today() + timedelta(days=14), + status=SprintStatus.PLANNED, + planned_points=20, + velocity=15, + ) + created = await sprint.create(db_session, obj_in=sprint_data) + assert created.goal == "Deliver user authentication" + assert created.planned_points == 20 + assert created.velocity == 15 + + @pytest.mark.asyncio + async def test_create_sprint_integrity_error(self, db_session, test_project): + """Test sprint creation with integrity error.""" + sprint_data = SprintCreate( + project_id=test_project.id, + name="Test Sprint", + number=1, + start_date=date.today(), + end_date=date.today() + timedelta(days=14), + ) + + with patch.object( + db_session, + "commit", + side_effect=IntegrityError("", {}, Exception()), + ): + with pytest.raises(ValueError, match="Database integrity error"): + await sprint.create(db_session, obj_in=sprint_data) + + @pytest.mark.asyncio + async def test_create_sprint_unexpected_error(self, db_session, test_project): + """Test sprint creation with unexpected error.""" + sprint_data = SprintCreate( + project_id=test_project.id, + name="Test Sprint", + number=1, + start_date=date.today(), + end_date=date.today() + timedelta(days=14), + ) + + with patch.object( + db_session, + "commit", + side_effect=RuntimeError("Unexpected error"), + ): + with pytest.raises(RuntimeError, match="Unexpected error"): + await sprint.create(db_session, obj_in=sprint_data) + + +class TestSprintGetWithDetails: + """Tests for getting sprint with details.""" + + @pytest.mark.asyncio + async def test_get_with_details_not_found(self, db_session): + """Test getting non-existent sprint with details.""" + result = await sprint.get_with_details(db_session, sprint_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_get_with_details_success(self, db_session, test_sprint): + """Test getting sprint with details.""" + result = await sprint.get_with_details(db_session, sprint_id=test_sprint.id) + assert result is not None + assert result["sprint"].id == test_sprint.id + assert "project_name" in result + assert "issue_count" in result + + @pytest.mark.asyncio + async def test_get_with_details_db_error(self, db_session, test_sprint): + """Test getting sprint with details when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await sprint.get_with_details(db_session, sprint_id=test_sprint.id) + + +class TestSprintGetByProject: + """Tests for getting sprints by project.""" + + @pytest.mark.asyncio + async def test_get_by_project_with_status_filter( + self, db_session, test_project, test_sprint + ): + """Test getting sprints with status filter.""" + sprints, total = await sprint.get_by_project( + db_session, + project_id=test_project.id, + status=SprintStatus.PLANNED, + ) + assert len(sprints) == 1 + assert sprints[0].status == SprintStatus.PLANNED + + @pytest.mark.asyncio + async def test_get_by_project_db_error(self, db_session, test_project): + """Test getting sprints when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await sprint.get_by_project(db_session, project_id=test_project.id) + + +class TestSprintActiveOperations: + """Tests for active sprint operations.""" + + @pytest.mark.asyncio + async def test_get_active_sprint_none(self, db_session, test_project, test_sprint): + """Test getting active sprint when none exists.""" + result = await sprint.get_active_sprint(db_session, project_id=test_project.id) + assert result is None + + @pytest.mark.asyncio + async def test_get_active_sprint_db_error(self, db_session, test_project): + """Test getting active sprint when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await sprint.get_active_sprint(db_session, project_id=test_project.id) + + +class TestSprintNumberOperations: + """Tests for sprint number operations.""" + + @pytest.mark.asyncio + async def test_get_next_sprint_number_empty(self, db_session, test_project): + """Test getting next sprint number for project with no sprints.""" + result = await sprint.get_next_sprint_number( + db_session, project_id=test_project.id + ) + assert result == 1 + + @pytest.mark.asyncio + async def test_get_next_sprint_number_with_existing( + self, db_session, test_project, test_sprint + ): + """Test getting next sprint number with existing sprints.""" + result = await sprint.get_next_sprint_number( + db_session, project_id=test_project.id + ) + assert result == 2 + + @pytest.mark.asyncio + async def test_get_next_sprint_number_db_error(self, db_session, test_project): + """Test getting next sprint number when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await sprint.get_next_sprint_number( + db_session, project_id=test_project.id + ) + + +class TestSprintLifecycle: + """Tests for sprint lifecycle operations.""" + + @pytest.mark.asyncio + async def test_start_sprint_not_found(self, db_session): + """Test starting non-existent sprint.""" + result = await sprint.start_sprint(db_session, sprint_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_start_sprint_invalid_status(self, db_session, test_project): + """Test starting sprint with invalid status.""" + # Create an active sprint + active_sprint = Sprint( + id=uuid.uuid4(), + project_id=test_project.id, + name="Active Sprint", + number=1, + status=SprintStatus.ACTIVE, + start_date=date.today(), + end_date=date.today() + timedelta(days=14), + ) + db_session.add(active_sprint) + await db_session.commit() + + with pytest.raises(ValueError, match="Cannot start sprint with status"): + await sprint.start_sprint(db_session, sprint_id=active_sprint.id) + + @pytest.mark.asyncio + async def test_start_sprint_with_existing_active(self, db_session, test_project): + """Test starting sprint when another is already active.""" + # Create active sprint + active_sprint = Sprint( + id=uuid.uuid4(), + project_id=test_project.id, + name="Active Sprint", + number=1, + status=SprintStatus.ACTIVE, + start_date=date.today(), + end_date=date.today() + timedelta(days=14), + ) + db_session.add(active_sprint) + + # Create planned sprint + planned_sprint = Sprint( + id=uuid.uuid4(), + project_id=test_project.id, + name="Planned Sprint", + number=2, + status=SprintStatus.PLANNED, + start_date=date.today() + timedelta(days=15), + end_date=date.today() + timedelta(days=29), + ) + db_session.add(planned_sprint) + await db_session.commit() + + with pytest.raises(ValueError, match="Project already has an active sprint"): + await sprint.start_sprint(db_session, sprint_id=planned_sprint.id) + + @pytest.mark.asyncio + async def test_start_sprint_db_error(self, db_session, test_sprint): + """Test starting sprint when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await sprint.start_sprint(db_session, sprint_id=test_sprint.id) + + @pytest.mark.asyncio + async def test_complete_sprint_not_found(self, db_session): + """Test completing non-existent sprint.""" + result = await sprint.complete_sprint(db_session, sprint_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_complete_sprint_invalid_status(self, db_session, test_sprint): + """Test completing sprint with invalid status (PLANNED).""" + with pytest.raises(ValueError, match="Cannot complete sprint with status"): + await sprint.complete_sprint(db_session, sprint_id=test_sprint.id) + + @pytest.mark.asyncio + async def test_complete_sprint_db_error(self, db_session, test_project): + """Test completing sprint when DB error occurs.""" + # Create active sprint + active_sprint = Sprint( + id=uuid.uuid4(), + project_id=test_project.id, + name="Active Sprint", + number=1, + status=SprintStatus.ACTIVE, + start_date=date.today(), + end_date=date.today() + timedelta(days=14), + ) + db_session.add(active_sprint) + await db_session.commit() + + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await sprint.complete_sprint(db_session, sprint_id=active_sprint.id) + + @pytest.mark.asyncio + async def test_cancel_sprint_not_found(self, db_session): + """Test cancelling non-existent sprint.""" + result = await sprint.cancel_sprint(db_session, sprint_id=uuid.uuid4()) + assert result is None + + @pytest.mark.asyncio + async def test_cancel_sprint_invalid_status(self, db_session, test_project): + """Test cancelling sprint with invalid status (COMPLETED).""" + completed_sprint = Sprint( + id=uuid.uuid4(), + project_id=test_project.id, + name="Completed Sprint", + number=1, + status=SprintStatus.COMPLETED, + start_date=date.today() - timedelta(days=14), + end_date=date.today(), + ) + db_session.add(completed_sprint) + await db_session.commit() + + with pytest.raises(ValueError, match="Cannot cancel sprint with status"): + await sprint.cancel_sprint(db_session, sprint_id=completed_sprint.id) + + @pytest.mark.asyncio + async def test_cancel_sprint_success(self, db_session, test_sprint): + """Test successfully cancelling a planned sprint.""" + result = await sprint.cancel_sprint(db_session, sprint_id=test_sprint.id) + assert result is not None + assert result.status == SprintStatus.CANCELLED + + @pytest.mark.asyncio + async def test_cancel_sprint_db_error(self, db_session, test_sprint): + """Test cancelling sprint when DB error occurs.""" + with patch.object( + db_session, + "commit", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await sprint.cancel_sprint(db_session, sprint_id=test_sprint.id) + + +class TestSprintVelocity: + """Tests for velocity operations.""" + + @pytest.mark.asyncio + async def test_get_velocity_empty(self, db_session, test_project): + """Test getting velocity with no completed sprints.""" + result = await sprint.get_velocity(db_session, project_id=test_project.id) + assert result == [] + + @pytest.mark.asyncio + async def test_get_velocity_with_data(self, db_session, test_project): + """Test getting velocity with completed sprints.""" + completed_sprint = Sprint( + id=uuid.uuid4(), + project_id=test_project.id, + name="Completed Sprint", + number=1, + status=SprintStatus.COMPLETED, + start_date=date.today() - timedelta(days=14), + end_date=date.today(), + planned_points=20, + velocity=18, + ) + db_session.add(completed_sprint) + await db_session.commit() + + result = await sprint.get_velocity(db_session, project_id=test_project.id) + assert len(result) == 1 + assert result[0]["sprint_number"] == 1 + assert result[0]["velocity"] == 18 + assert result[0]["velocity_ratio"] == 0.9 + + @pytest.mark.asyncio + async def test_get_velocity_db_error(self, db_session, test_project): + """Test getting velocity when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await sprint.get_velocity(db_session, project_id=test_project.id) + + +class TestSprintWithIssueCounts: + """Tests for sprints with issue counts.""" + + @pytest.mark.asyncio + async def test_get_sprints_with_issue_counts_empty(self, db_session, test_project): + """Test getting sprints with issue counts when no sprints exist.""" + results, total = await sprint.get_sprints_with_issue_counts( + db_session, project_id=test_project.id + ) + assert results == [] + assert total == 0 + + @pytest.mark.asyncio + async def test_get_sprints_with_issue_counts_success( + self, db_session, test_project, test_sprint + ): + """Test getting sprints with issue counts.""" + # Add some issues to the sprint + issue1 = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + sprint_id=test_sprint.id, + title="Issue 1", + status=IssueStatus.OPEN, + ) + issue2 = Issue( + id=uuid.uuid4(), + project_id=test_project.id, + sprint_id=test_sprint.id, + title="Issue 2", + status=IssueStatus.CLOSED, + ) + db_session.add_all([issue1, issue2]) + await db_session.commit() + + results, total = await sprint.get_sprints_with_issue_counts( + db_session, project_id=test_project.id + ) + assert len(results) == 1 + assert results[0]["issue_count"] == 2 + assert results[0]["open_issues"] == 1 + assert results[0]["completed_issues"] == 1 + + @pytest.mark.asyncio + async def test_get_sprints_with_issue_counts_db_error( + self, db_session, test_project, test_sprint + ): + """Test getting sprints with issue counts when DB error occurs.""" + with patch.object( + db_session, + "execute", + side_effect=OperationalError("Connection lost", {}, Exception()), + ): + with pytest.raises(OperationalError): + await sprint.get_sprints_with_issue_counts( + db_session, project_id=test_project.id + )