feat(backend): Add Syndarix domain models with CRUD operations
- Add Project model with slug, description, autonomy level, and settings - Add AgentType model for agent templates with model config and failover - Add AgentInstance model for running agents with status and memory - Add Issue model with external tracker sync (Gitea/GitHub/GitLab) - Add Sprint model with velocity tracking and lifecycle management - Add comprehensive Pydantic schemas with validation - Add full CRUD operations for all models with filtering/sorting - Add 280+ tests for models, schemas, and CRUD operations Implements #23, #24, #25, #26, #27 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2
backend/tests/models/syndarix/__init__.py
Normal file
2
backend/tests/models/syndarix/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
# tests/models/syndarix/__init__.py
|
||||
"""Syndarix model unit tests."""
|
||||
192
backend/tests/models/syndarix/conftest.py
Normal file
192
backend/tests/models/syndarix/conftest.py
Normal file
@@ -0,0 +1,192 @@
|
||||
# tests/models/syndarix/conftest.py
|
||||
"""
|
||||
Shared fixtures for Syndarix model tests.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from datetime import date, timedelta
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from app.models.syndarix import (
|
||||
AgentInstance,
|
||||
AgentStatus,
|
||||
AgentType,
|
||||
AutonomyLevel,
|
||||
Issue,
|
||||
IssuePriority,
|
||||
IssueStatus,
|
||||
Project,
|
||||
ProjectStatus,
|
||||
Sprint,
|
||||
SprintStatus,
|
||||
SyncStatus,
|
||||
)
|
||||
from app.models.user import User
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_project_data():
|
||||
"""Return sample project data for testing."""
|
||||
return {
|
||||
"name": "Test Project",
|
||||
"slug": "test-project",
|
||||
"description": "A test project for unit testing",
|
||||
"autonomy_level": AutonomyLevel.MILESTONE,
|
||||
"status": ProjectStatus.ACTIVE,
|
||||
"settings": {"mcp_servers": ["gitea", "slack"]},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_agent_type_data():
|
||||
"""Return sample agent type data for testing."""
|
||||
return {
|
||||
"name": "Backend Engineer",
|
||||
"slug": "backend-engineer",
|
||||
"description": "Specialized in backend development",
|
||||
"expertise": ["python", "fastapi", "postgresql"],
|
||||
"personality_prompt": "You are an expert backend engineer...",
|
||||
"primary_model": "claude-opus-4-5-20251101",
|
||||
"fallback_models": ["claude-sonnet-4-20250514"],
|
||||
"model_params": {"temperature": 0.7, "max_tokens": 4096},
|
||||
"mcp_servers": ["gitea", "file-system"],
|
||||
"tool_permissions": {"allowed": ["*"], "denied": []},
|
||||
"is_active": True,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_sprint_data():
|
||||
"""Return sample sprint data for testing."""
|
||||
today = date.today()
|
||||
return {
|
||||
"name": "Sprint 1",
|
||||
"number": 1,
|
||||
"goal": "Complete initial setup and core features",
|
||||
"start_date": today,
|
||||
"end_date": today + timedelta(days=14),
|
||||
"status": SprintStatus.PLANNED,
|
||||
"planned_points": 21,
|
||||
"completed_points": 0,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_issue_data():
|
||||
"""Return sample issue data for testing."""
|
||||
return {
|
||||
"title": "Implement user authentication",
|
||||
"body": "As a user, I want to log in securely...",
|
||||
"status": IssueStatus.OPEN,
|
||||
"priority": IssuePriority.HIGH,
|
||||
"labels": ["backend", "security"],
|
||||
"story_points": 5,
|
||||
}
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_owner(async_test_db):
|
||||
"""Create a test user to be used as project owner."""
|
||||
from app.core.auth import get_password_hash
|
||||
|
||||
_test_engine, AsyncTestingSessionLocal = async_test_db
|
||||
async with AsyncTestingSessionLocal() as session:
|
||||
user = User(
|
||||
id=uuid.uuid4(),
|
||||
email="owner@example.com",
|
||||
password_hash=get_password_hash("TestPassword123!"),
|
||||
first_name="Test",
|
||||
last_name="Owner",
|
||||
is_active=True,
|
||||
is_superuser=False,
|
||||
)
|
||||
session.add(user)
|
||||
await session.commit()
|
||||
await session.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_project(async_test_db, test_owner, sample_project_data):
|
||||
"""Create a test project in the database."""
|
||||
_test_engine, AsyncTestingSessionLocal = async_test_db
|
||||
async with AsyncTestingSessionLocal() as session:
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
owner_id=test_owner.id,
|
||||
**sample_project_data,
|
||||
)
|
||||
session.add(project)
|
||||
await session.commit()
|
||||
await session.refresh(project)
|
||||
return project
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_agent_type(async_test_db, sample_agent_type_data):
|
||||
"""Create a test agent type in the database."""
|
||||
_test_engine, AsyncTestingSessionLocal = async_test_db
|
||||
async with AsyncTestingSessionLocal() as session:
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
**sample_agent_type_data,
|
||||
)
|
||||
session.add(agent_type)
|
||||
await session.commit()
|
||||
await session.refresh(agent_type)
|
||||
return agent_type
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_agent_instance(async_test_db, test_project, test_agent_type):
|
||||
"""Create a test agent instance in the database."""
|
||||
_test_engine, AsyncTestingSessionLocal = async_test_db
|
||||
async with AsyncTestingSessionLocal() as session:
|
||||
agent_instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=test_agent_type.id,
|
||||
project_id=test_project.id,
|
||||
status=AgentStatus.IDLE,
|
||||
current_task=None,
|
||||
short_term_memory={},
|
||||
long_term_memory_ref=None,
|
||||
session_id=None,
|
||||
)
|
||||
session.add(agent_instance)
|
||||
await session.commit()
|
||||
await session.refresh(agent_instance)
|
||||
return agent_instance
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_sprint(async_test_db, test_project, sample_sprint_data):
|
||||
"""Create a test sprint in the database."""
|
||||
_test_engine, AsyncTestingSessionLocal = async_test_db
|
||||
async with AsyncTestingSessionLocal() as session:
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=test_project.id,
|
||||
**sample_sprint_data,
|
||||
)
|
||||
session.add(sprint)
|
||||
await session.commit()
|
||||
await session.refresh(sprint)
|
||||
return sprint
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_issue(async_test_db, test_project, sample_issue_data):
|
||||
"""Create a test issue in the database."""
|
||||
_test_engine, AsyncTestingSessionLocal = async_test_db
|
||||
async with AsyncTestingSessionLocal() as session:
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=test_project.id,
|
||||
**sample_issue_data,
|
||||
)
|
||||
session.add(issue)
|
||||
await session.commit()
|
||||
await session.refresh(issue)
|
||||
return issue
|
||||
424
backend/tests/models/syndarix/test_agent_instance.py
Normal file
424
backend/tests/models/syndarix/test_agent_instance.py
Normal file
@@ -0,0 +1,424 @@
|
||||
# tests/models/syndarix/test_agent_instance.py
|
||||
"""
|
||||
Unit tests for the AgentInstance model.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from decimal import Decimal
|
||||
|
||||
import pytest
|
||||
|
||||
from app.models.syndarix import (
|
||||
AgentInstance,
|
||||
AgentStatus,
|
||||
AgentType,
|
||||
Project,
|
||||
)
|
||||
|
||||
|
||||
class TestAgentInstanceModel:
|
||||
"""Tests for AgentInstance model creation and fields."""
|
||||
|
||||
def test_create_agent_instance_with_required_fields(self, db_session):
|
||||
"""Test creating an agent instance with only required fields."""
|
||||
# First create dependencies
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Test Project",
|
||||
slug="test-project-instance",
|
||||
)
|
||||
db_session.add(project)
|
||||
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Test Agent",
|
||||
slug="test-agent-instance",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
# Create agent instance
|
||||
instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentInstance).filter_by(project_id=project.id).first()
|
||||
|
||||
assert retrieved is not None
|
||||
assert retrieved.agent_type_id == agent_type.id
|
||||
assert retrieved.project_id == project.id
|
||||
assert retrieved.status == AgentStatus.IDLE # Default
|
||||
assert retrieved.current_task is None
|
||||
assert retrieved.short_term_memory == {}
|
||||
assert retrieved.long_term_memory_ref is None
|
||||
assert retrieved.session_id is None
|
||||
assert retrieved.tasks_completed == 0
|
||||
assert retrieved.tokens_used == 0
|
||||
assert retrieved.cost_incurred == Decimal("0")
|
||||
|
||||
def test_create_agent_instance_with_all_fields(self, db_session):
|
||||
"""Test creating an agent instance with all optional fields."""
|
||||
# First create dependencies
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Full Project",
|
||||
slug="full-project-instance",
|
||||
)
|
||||
db_session.add(project)
|
||||
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Full Agent",
|
||||
slug="full-agent-instance",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
instance_id = uuid.uuid4()
|
||||
now = datetime.now(UTC)
|
||||
|
||||
instance = AgentInstance(
|
||||
id=instance_id,
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
status=AgentStatus.WORKING,
|
||||
current_task="Implementing user authentication",
|
||||
short_term_memory={"context": "Working on auth", "recent_files": ["auth.py"]},
|
||||
long_term_memory_ref="project-123/agent-456",
|
||||
session_id="session-abc-123",
|
||||
last_activity_at=now,
|
||||
tasks_completed=5,
|
||||
tokens_used=10000,
|
||||
cost_incurred=Decimal("0.5000"),
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentInstance).filter_by(id=instance_id).first()
|
||||
|
||||
assert retrieved.status == AgentStatus.WORKING
|
||||
assert retrieved.current_task == "Implementing user authentication"
|
||||
assert retrieved.short_term_memory == {"context": "Working on auth", "recent_files": ["auth.py"]}
|
||||
assert retrieved.long_term_memory_ref == "project-123/agent-456"
|
||||
assert retrieved.session_id == "session-abc-123"
|
||||
assert retrieved.tasks_completed == 5
|
||||
assert retrieved.tokens_used == 10000
|
||||
assert retrieved.cost_incurred == Decimal("0.5000")
|
||||
|
||||
def test_agent_instance_timestamps(self, db_session):
|
||||
"""Test that timestamps are automatically set."""
|
||||
project = Project(id=uuid.uuid4(), name="Timestamp Project", slug="timestamp-project-ai")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Timestamp Agent",
|
||||
slug="timestamp-agent-ai",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
assert isinstance(instance.created_at, datetime)
|
||||
assert isinstance(instance.updated_at, datetime)
|
||||
|
||||
def test_agent_instance_string_representation(self, db_session):
|
||||
"""Test the string representation of an agent instance."""
|
||||
project = Project(id=uuid.uuid4(), name="Repr Project", slug="repr-project-ai")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Repr Agent",
|
||||
slug="repr-agent-ai",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
instance_id = uuid.uuid4()
|
||||
instance = AgentInstance(
|
||||
id=instance_id,
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
status=AgentStatus.IDLE,
|
||||
)
|
||||
|
||||
repr_str = repr(instance)
|
||||
assert str(instance_id) in repr_str
|
||||
assert str(agent_type.id) in repr_str
|
||||
assert str(project.id) in repr_str
|
||||
assert "idle" in repr_str
|
||||
|
||||
|
||||
class TestAgentInstanceStatus:
|
||||
"""Tests for AgentInstance status transitions."""
|
||||
|
||||
def test_all_agent_statuses(self, db_session):
|
||||
"""Test that all agent statuses can be stored."""
|
||||
project = Project(id=uuid.uuid4(), name="Status Project", slug="status-project-ai")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Status Agent",
|
||||
slug="status-agent-ai",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
for status in AgentStatus:
|
||||
instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
status=status,
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentInstance).filter_by(id=instance.id).first()
|
||||
assert retrieved.status == status
|
||||
|
||||
def test_status_update(self, db_session):
|
||||
"""Test updating agent instance status."""
|
||||
project = Project(id=uuid.uuid4(), name="Update Status Project", slug="update-status-project-ai")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Update Status Agent",
|
||||
slug="update-status-agent-ai",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
status=AgentStatus.IDLE,
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
# Update to WORKING
|
||||
instance.status = AgentStatus.WORKING
|
||||
instance.current_task = "Processing feature request"
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentInstance).filter_by(id=instance.id).first()
|
||||
assert retrieved.status == AgentStatus.WORKING
|
||||
assert retrieved.current_task == "Processing feature request"
|
||||
|
||||
def test_terminate_agent_instance(self, db_session):
|
||||
"""Test terminating an agent instance."""
|
||||
project = Project(id=uuid.uuid4(), name="Terminate Project", slug="terminate-project-ai")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Terminate Agent",
|
||||
slug="terminate-agent-ai",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
status=AgentStatus.WORKING,
|
||||
current_task="Working on something",
|
||||
session_id="active-session",
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
# Terminate
|
||||
now = datetime.now(UTC)
|
||||
instance.status = AgentStatus.TERMINATED
|
||||
instance.terminated_at = now
|
||||
instance.current_task = None
|
||||
instance.session_id = None
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentInstance).filter_by(id=instance.id).first()
|
||||
assert retrieved.status == AgentStatus.TERMINATED
|
||||
assert retrieved.terminated_at is not None
|
||||
assert retrieved.current_task is None
|
||||
assert retrieved.session_id is None
|
||||
|
||||
|
||||
class TestAgentInstanceMetrics:
|
||||
"""Tests for AgentInstance usage metrics."""
|
||||
|
||||
def test_increment_metrics(self, db_session):
|
||||
"""Test incrementing usage metrics."""
|
||||
project = Project(id=uuid.uuid4(), name="Metrics Project", slug="metrics-project-ai")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Metrics Agent",
|
||||
slug="metrics-agent-ai",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
# Record task completion
|
||||
instance.tasks_completed += 1
|
||||
instance.tokens_used += 1500
|
||||
instance.cost_incurred += Decimal("0.0150")
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentInstance).filter_by(id=instance.id).first()
|
||||
assert retrieved.tasks_completed == 1
|
||||
assert retrieved.tokens_used == 1500
|
||||
assert retrieved.cost_incurred == Decimal("0.0150")
|
||||
|
||||
# Record another task
|
||||
retrieved.tasks_completed += 1
|
||||
retrieved.tokens_used += 2500
|
||||
retrieved.cost_incurred += Decimal("0.0250")
|
||||
db_session.commit()
|
||||
|
||||
updated = db_session.query(AgentInstance).filter_by(id=instance.id).first()
|
||||
assert updated.tasks_completed == 2
|
||||
assert updated.tokens_used == 4000
|
||||
assert updated.cost_incurred == Decimal("0.0400")
|
||||
|
||||
def test_large_token_count(self, db_session):
|
||||
"""Test handling large token counts."""
|
||||
project = Project(id=uuid.uuid4(), name="Large Tokens Project", slug="large-tokens-project-ai")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Large Tokens Agent",
|
||||
slug="large-tokens-agent-ai",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
tokens_used=10_000_000_000, # 10 billion tokens
|
||||
cost_incurred=Decimal("100000.0000"), # $100,000
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentInstance).filter_by(id=instance.id).first()
|
||||
assert retrieved.tokens_used == 10_000_000_000
|
||||
assert retrieved.cost_incurred == Decimal("100000.0000")
|
||||
|
||||
|
||||
class TestAgentInstanceShortTermMemory:
|
||||
"""Tests for AgentInstance short-term memory JSON field."""
|
||||
|
||||
def test_store_complex_memory(self, db_session):
|
||||
"""Test storing complex short-term memory."""
|
||||
project = Project(id=uuid.uuid4(), name="Memory Project", slug="memory-project-ai")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Memory Agent",
|
||||
slug="memory-agent-ai",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
memory = {
|
||||
"conversation_history": [
|
||||
{"role": "user", "content": "Implement feature X"},
|
||||
{"role": "assistant", "content": "I'll start by..."},
|
||||
],
|
||||
"recent_files": ["auth.py", "models.py", "test_auth.py"],
|
||||
"decisions": {
|
||||
"architecture": "Use repository pattern",
|
||||
"testing": "TDD approach",
|
||||
},
|
||||
"blockers": [],
|
||||
"context_tokens": 2048,
|
||||
}
|
||||
|
||||
instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
short_term_memory=memory,
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentInstance).filter_by(id=instance.id).first()
|
||||
assert retrieved.short_term_memory == memory
|
||||
assert len(retrieved.short_term_memory["conversation_history"]) == 2
|
||||
assert "auth.py" in retrieved.short_term_memory["recent_files"]
|
||||
|
||||
def test_update_memory(self, db_session):
|
||||
"""Test updating short-term memory."""
|
||||
project = Project(id=uuid.uuid4(), name="Update Memory Project", slug="update-memory-project-ai")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Update Memory Agent",
|
||||
slug="update-memory-agent-ai",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
short_term_memory={"initial": "state"},
|
||||
)
|
||||
db_session.add(instance)
|
||||
db_session.commit()
|
||||
|
||||
# Update memory
|
||||
instance.short_term_memory = {"updated": "state", "new_key": "new_value"}
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentInstance).filter_by(id=instance.id).first()
|
||||
assert "initial" not in retrieved.short_term_memory
|
||||
assert retrieved.short_term_memory["updated"] == "state"
|
||||
assert retrieved.short_term_memory["new_key"] == "new_value"
|
||||
315
backend/tests/models/syndarix/test_agent_type.py
Normal file
315
backend/tests/models/syndarix/test_agent_type.py
Normal file
@@ -0,0 +1,315 @@
|
||||
# tests/models/syndarix/test_agent_type.py
|
||||
"""
|
||||
Unit tests for the AgentType model.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from app.models.syndarix import AgentType
|
||||
|
||||
|
||||
class TestAgentTypeModel:
|
||||
"""Tests for AgentType model creation and fields."""
|
||||
|
||||
def test_create_agent_type_with_required_fields(self, db_session):
|
||||
"""Test creating an agent type with only required fields."""
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Test Agent",
|
||||
slug="test-agent",
|
||||
personality_prompt="You are a helpful assistant.",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(slug="test-agent").first()
|
||||
|
||||
assert retrieved is not None
|
||||
assert retrieved.name == "Test Agent"
|
||||
assert retrieved.slug == "test-agent"
|
||||
assert retrieved.personality_prompt == "You are a helpful assistant."
|
||||
assert retrieved.primary_model == "claude-opus-4-5-20251101"
|
||||
assert retrieved.is_active is True # Default
|
||||
assert retrieved.expertise == [] # Default empty list
|
||||
assert retrieved.fallback_models == [] # Default empty list
|
||||
assert retrieved.model_params == {} # Default empty dict
|
||||
assert retrieved.mcp_servers == [] # Default empty list
|
||||
assert retrieved.tool_permissions == {} # Default empty dict
|
||||
|
||||
def test_create_agent_type_with_all_fields(self, db_session):
|
||||
"""Test creating an agent type with all optional fields."""
|
||||
agent_type_id = uuid.uuid4()
|
||||
|
||||
agent_type = AgentType(
|
||||
id=agent_type_id,
|
||||
name="Full Agent Type",
|
||||
slug="full-agent-type",
|
||||
description="A fully configured agent type",
|
||||
expertise=["python", "fastapi", "testing"],
|
||||
personality_prompt="You are an expert Python developer...",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
fallback_models=["claude-sonnet-4-20250514", "gpt-4o"],
|
||||
model_params={"temperature": 0.7, "max_tokens": 4096},
|
||||
mcp_servers=["gitea", "file-system", "slack"],
|
||||
tool_permissions={"allowed": ["*"], "denied": ["dangerous_tool"]},
|
||||
is_active=True,
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(id=agent_type_id).first()
|
||||
|
||||
assert retrieved.name == "Full Agent Type"
|
||||
assert retrieved.description == "A fully configured agent type"
|
||||
assert retrieved.expertise == ["python", "fastapi", "testing"]
|
||||
assert retrieved.fallback_models == ["claude-sonnet-4-20250514", "gpt-4o"]
|
||||
assert retrieved.model_params == {"temperature": 0.7, "max_tokens": 4096}
|
||||
assert retrieved.mcp_servers == ["gitea", "file-system", "slack"]
|
||||
assert retrieved.tool_permissions == {"allowed": ["*"], "denied": ["dangerous_tool"]}
|
||||
assert retrieved.is_active is True
|
||||
|
||||
def test_agent_type_unique_slug_constraint(self, db_session):
|
||||
"""Test that agent types cannot have duplicate slugs."""
|
||||
agent_type1 = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Agent One",
|
||||
slug="duplicate-agent-slug",
|
||||
personality_prompt="First agent",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(agent_type1)
|
||||
db_session.commit()
|
||||
|
||||
agent_type2 = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Agent Two",
|
||||
slug="duplicate-agent-slug", # Same slug
|
||||
personality_prompt="Second agent",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(agent_type2)
|
||||
|
||||
with pytest.raises(IntegrityError):
|
||||
db_session.commit()
|
||||
|
||||
db_session.rollback()
|
||||
|
||||
def test_agent_type_timestamps(self, db_session):
|
||||
"""Test that timestamps are automatically set."""
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Timestamp Agent",
|
||||
slug="timestamp-agent",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(slug="timestamp-agent").first()
|
||||
|
||||
assert isinstance(retrieved.created_at, datetime)
|
||||
assert isinstance(retrieved.updated_at, datetime)
|
||||
|
||||
def test_agent_type_update(self, db_session):
|
||||
"""Test updating agent type fields."""
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Original Agent",
|
||||
slug="original-agent",
|
||||
personality_prompt="Original prompt",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
is_active=True,
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
original_created_at = agent_type.created_at
|
||||
|
||||
# Update fields
|
||||
agent_type.name = "Updated Agent"
|
||||
agent_type.is_active = False
|
||||
agent_type.expertise = ["new", "skills"]
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(slug="original-agent").first()
|
||||
|
||||
assert retrieved.name == "Updated Agent"
|
||||
assert retrieved.is_active is False
|
||||
assert retrieved.expertise == ["new", "skills"]
|
||||
assert retrieved.created_at == original_created_at
|
||||
assert retrieved.updated_at > original_created_at
|
||||
|
||||
def test_agent_type_delete(self, db_session):
|
||||
"""Test deleting an agent type."""
|
||||
agent_type_id = uuid.uuid4()
|
||||
agent_type = AgentType(
|
||||
id=agent_type_id,
|
||||
name="Delete Me",
|
||||
slug="delete-me-agent",
|
||||
personality_prompt="Delete test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
db_session.delete(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
deleted = db_session.query(AgentType).filter_by(id=agent_type_id).first()
|
||||
assert deleted is None
|
||||
|
||||
def test_agent_type_string_representation(self, db_session):
|
||||
"""Test the string representation of an agent type."""
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Repr Agent",
|
||||
slug="repr-agent",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
is_active=True,
|
||||
)
|
||||
|
||||
assert str(agent_type) == "<AgentType Repr Agent (repr-agent) active=True>"
|
||||
assert repr(agent_type) == "<AgentType Repr Agent (repr-agent) active=True>"
|
||||
|
||||
|
||||
class TestAgentTypeJsonFields:
|
||||
"""Tests for AgentType JSON fields."""
|
||||
|
||||
def test_complex_expertise_list(self, db_session):
|
||||
"""Test storing a list of expertise areas."""
|
||||
expertise = ["python", "fastapi", "sqlalchemy", "postgresql", "redis", "docker"]
|
||||
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Expert Agent",
|
||||
slug="expert-agent",
|
||||
personality_prompt="Prompt",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
expertise=expertise,
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(slug="expert-agent").first()
|
||||
assert retrieved.expertise == expertise
|
||||
assert "python" in retrieved.expertise
|
||||
assert len(retrieved.expertise) == 6
|
||||
|
||||
def test_complex_model_params(self, db_session):
|
||||
"""Test storing complex model parameters."""
|
||||
model_params = {
|
||||
"temperature": 0.7,
|
||||
"max_tokens": 4096,
|
||||
"top_p": 0.9,
|
||||
"frequency_penalty": 0.1,
|
||||
"presence_penalty": 0.1,
|
||||
"stop_sequences": ["###", "END"],
|
||||
}
|
||||
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Params Agent",
|
||||
slug="params-agent",
|
||||
personality_prompt="Prompt",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
model_params=model_params,
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(slug="params-agent").first()
|
||||
assert retrieved.model_params == model_params
|
||||
assert retrieved.model_params["temperature"] == 0.7
|
||||
assert retrieved.model_params["stop_sequences"] == ["###", "END"]
|
||||
|
||||
def test_complex_tool_permissions(self, db_session):
|
||||
"""Test storing complex tool permissions."""
|
||||
tool_permissions = {
|
||||
"allowed": ["file:read", "file:write", "git:commit"],
|
||||
"denied": ["file:delete", "system:exec"],
|
||||
"require_approval": ["git:push", "gitea:create_pr"],
|
||||
"limits": {
|
||||
"file:write": {"max_size_mb": 10},
|
||||
"git:commit": {"require_message": True},
|
||||
},
|
||||
}
|
||||
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Permissions Agent",
|
||||
slug="permissions-agent",
|
||||
personality_prompt="Prompt",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
tool_permissions=tool_permissions,
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(slug="permissions-agent").first()
|
||||
assert retrieved.tool_permissions == tool_permissions
|
||||
assert "file:read" in retrieved.tool_permissions["allowed"]
|
||||
assert retrieved.tool_permissions["limits"]["file:write"]["max_size_mb"] == 10
|
||||
|
||||
def test_empty_json_fields_default(self, db_session):
|
||||
"""Test that JSON fields default to empty structures."""
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Empty JSON Agent",
|
||||
slug="empty-json-agent",
|
||||
personality_prompt="Prompt",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(slug="empty-json-agent").first()
|
||||
assert retrieved.expertise == []
|
||||
assert retrieved.fallback_models == []
|
||||
assert retrieved.model_params == {}
|
||||
assert retrieved.mcp_servers == []
|
||||
assert retrieved.tool_permissions == {}
|
||||
|
||||
|
||||
class TestAgentTypeIsActive:
|
||||
"""Tests for AgentType is_active field."""
|
||||
|
||||
def test_default_is_active(self, db_session):
|
||||
"""Test that is_active defaults to True."""
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Default Active",
|
||||
slug="default-active",
|
||||
personality_prompt="Prompt",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(slug="default-active").first()
|
||||
assert retrieved.is_active is True
|
||||
|
||||
def test_deactivate_agent_type(self, db_session):
|
||||
"""Test deactivating an agent type."""
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Deactivate Me",
|
||||
slug="deactivate-me",
|
||||
personality_prompt="Prompt",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
is_active=True,
|
||||
)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
agent_type.is_active = False
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(AgentType).filter_by(slug="deactivate-me").first()
|
||||
assert retrieved.is_active is False
|
||||
463
backend/tests/models/syndarix/test_issue.py
Normal file
463
backend/tests/models/syndarix/test_issue.py
Normal file
@@ -0,0 +1,463 @@
|
||||
# tests/models/syndarix/test_issue.py
|
||||
"""
|
||||
Unit tests for the Issue model.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import pytest
|
||||
|
||||
from app.models.syndarix import (
|
||||
AgentInstance,
|
||||
AgentType,
|
||||
Issue,
|
||||
IssuePriority,
|
||||
IssueStatus,
|
||||
Project,
|
||||
Sprint,
|
||||
SprintStatus,
|
||||
SyncStatus,
|
||||
)
|
||||
|
||||
|
||||
class TestIssueModel:
|
||||
"""Tests for Issue model creation and fields."""
|
||||
|
||||
def test_create_issue_with_required_fields(self, db_session):
|
||||
"""Test creating an issue with only required fields."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Issue Project",
|
||||
slug="issue-project",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Test Issue",
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="Test Issue").first()
|
||||
|
||||
assert retrieved is not None
|
||||
assert retrieved.title == "Test Issue"
|
||||
assert retrieved.body == "" # Default empty string
|
||||
assert retrieved.status == IssueStatus.OPEN # Default
|
||||
assert retrieved.priority == IssuePriority.MEDIUM # Default
|
||||
assert retrieved.labels == [] # Default empty list
|
||||
assert retrieved.story_points is None
|
||||
assert retrieved.assigned_agent_id is None
|
||||
assert retrieved.human_assignee is None
|
||||
assert retrieved.sprint_id is None
|
||||
assert retrieved.sync_status == SyncStatus.SYNCED # Default
|
||||
|
||||
def test_create_issue_with_all_fields(self, db_session):
|
||||
"""Test creating an issue with all optional fields."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Full Issue Project",
|
||||
slug="full-issue-project",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
issue_id = uuid.uuid4()
|
||||
now = datetime.now(UTC)
|
||||
|
||||
issue = Issue(
|
||||
id=issue_id,
|
||||
project_id=project.id,
|
||||
title="Full Issue",
|
||||
body="A complete issue with all fields set",
|
||||
status=IssueStatus.IN_PROGRESS,
|
||||
priority=IssuePriority.CRITICAL,
|
||||
labels=["bug", "security", "urgent"],
|
||||
story_points=8,
|
||||
human_assignee="john.doe@example.com",
|
||||
external_tracker="gitea",
|
||||
external_id="gitea-123",
|
||||
external_url="https://gitea.example.com/issues/123",
|
||||
external_number=123,
|
||||
sync_status=SyncStatus.SYNCED,
|
||||
last_synced_at=now,
|
||||
external_updated_at=now,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(id=issue_id).first()
|
||||
|
||||
assert retrieved.title == "Full Issue"
|
||||
assert retrieved.body == "A complete issue with all fields set"
|
||||
assert retrieved.status == IssueStatus.IN_PROGRESS
|
||||
assert retrieved.priority == IssuePriority.CRITICAL
|
||||
assert retrieved.labels == ["bug", "security", "urgent"]
|
||||
assert retrieved.story_points == 8
|
||||
assert retrieved.human_assignee == "john.doe@example.com"
|
||||
assert retrieved.external_tracker == "gitea"
|
||||
assert retrieved.external_id == "gitea-123"
|
||||
assert retrieved.external_number == 123
|
||||
assert retrieved.sync_status == SyncStatus.SYNCED
|
||||
|
||||
def test_issue_timestamps(self, db_session):
|
||||
"""Test that timestamps are automatically set."""
|
||||
project = Project(id=uuid.uuid4(), name="Timestamp Issue Project", slug="timestamp-issue-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Timestamp Issue",
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
assert isinstance(issue.created_at, datetime)
|
||||
assert isinstance(issue.updated_at, datetime)
|
||||
|
||||
def test_issue_string_representation(self, db_session):
|
||||
"""Test the string representation of an issue."""
|
||||
project = Project(id=uuid.uuid4(), name="Repr Issue Project", slug="repr-issue-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="This is a very long issue title that should be truncated in repr",
|
||||
status=IssueStatus.OPEN,
|
||||
priority=IssuePriority.HIGH,
|
||||
)
|
||||
|
||||
repr_str = repr(issue)
|
||||
assert "This is a very long issue tit" in repr_str # First 30 chars
|
||||
assert "open" in repr_str
|
||||
assert "high" in repr_str
|
||||
|
||||
|
||||
class TestIssueStatus:
|
||||
"""Tests for Issue status field."""
|
||||
|
||||
def test_all_issue_statuses(self, db_session):
|
||||
"""Test that all issue statuses can be stored."""
|
||||
project = Project(id=uuid.uuid4(), name="Status Issue Project", slug="status-issue-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
for status in IssueStatus:
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title=f"Issue {status.value}",
|
||||
status=status,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(id=issue.id).first()
|
||||
assert retrieved.status == status
|
||||
|
||||
|
||||
class TestIssuePriority:
|
||||
"""Tests for Issue priority field."""
|
||||
|
||||
def test_all_issue_priorities(self, db_session):
|
||||
"""Test that all issue priorities can be stored."""
|
||||
project = Project(id=uuid.uuid4(), name="Priority Issue Project", slug="priority-issue-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
for priority in IssuePriority:
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title=f"Issue {priority.value}",
|
||||
priority=priority,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(id=issue.id).first()
|
||||
assert retrieved.priority == priority
|
||||
|
||||
|
||||
class TestIssueSyncStatus:
|
||||
"""Tests for Issue sync status field."""
|
||||
|
||||
def test_all_sync_statuses(self, db_session):
|
||||
"""Test that all sync statuses can be stored."""
|
||||
project = Project(id=uuid.uuid4(), name="Sync Issue Project", slug="sync-issue-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
for sync_status in SyncStatus:
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title=f"Issue {sync_status.value}",
|
||||
external_tracker="gitea",
|
||||
external_id=f"ext-{sync_status.value}",
|
||||
sync_status=sync_status,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(id=issue.id).first()
|
||||
assert retrieved.sync_status == sync_status
|
||||
|
||||
|
||||
class TestIssueLabels:
|
||||
"""Tests for Issue labels JSON field."""
|
||||
|
||||
def test_store_labels(self, db_session):
|
||||
"""Test storing labels list."""
|
||||
project = Project(id=uuid.uuid4(), name="Labels Issue Project", slug="labels-issue-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
labels = ["bug", "security", "high-priority", "needs-review"]
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Issue with Labels",
|
||||
labels=labels,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="Issue with Labels").first()
|
||||
assert retrieved.labels == labels
|
||||
assert "security" in retrieved.labels
|
||||
|
||||
def test_update_labels(self, db_session):
|
||||
"""Test updating labels."""
|
||||
project = Project(id=uuid.uuid4(), name="Update Labels Project", slug="update-labels-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Update Labels Issue",
|
||||
labels=["initial"],
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
issue.labels = ["updated", "new-label"]
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="Update Labels Issue").first()
|
||||
assert "initial" not in retrieved.labels
|
||||
assert "updated" in retrieved.labels
|
||||
|
||||
|
||||
class TestIssueAssignment:
|
||||
"""Tests for Issue assignment fields."""
|
||||
|
||||
def test_assign_to_agent(self, db_session):
|
||||
"""Test assigning an issue to an agent."""
|
||||
project = Project(id=uuid.uuid4(), name="Agent Assign Project", slug="agent-assign-project")
|
||||
agent_type = AgentType(
|
||||
id=uuid.uuid4(),
|
||||
name="Test Agent Type",
|
||||
slug="test-agent-type-assign",
|
||||
personality_prompt="Test",
|
||||
primary_model="claude-opus-4-5-20251101",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.add(agent_type)
|
||||
db_session.commit()
|
||||
|
||||
agent_instance = AgentInstance(
|
||||
id=uuid.uuid4(),
|
||||
agent_type_id=agent_type.id,
|
||||
project_id=project.id,
|
||||
)
|
||||
db_session.add(agent_instance)
|
||||
db_session.commit()
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Agent Assignment Issue",
|
||||
assigned_agent_id=agent_instance.id,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="Agent Assignment Issue").first()
|
||||
assert retrieved.assigned_agent_id == agent_instance.id
|
||||
assert retrieved.human_assignee is None
|
||||
|
||||
def test_assign_to_human(self, db_session):
|
||||
"""Test assigning an issue to a human."""
|
||||
project = Project(id=uuid.uuid4(), name="Human Assign Project", slug="human-assign-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Human Assignment Issue",
|
||||
human_assignee="developer@example.com",
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="Human Assignment Issue").first()
|
||||
assert retrieved.human_assignee == "developer@example.com"
|
||||
assert retrieved.assigned_agent_id is None
|
||||
|
||||
|
||||
class TestIssueSprintAssociation:
|
||||
"""Tests for Issue sprint association."""
|
||||
|
||||
def test_assign_issue_to_sprint(self, db_session):
|
||||
"""Test assigning an issue to a sprint."""
|
||||
project = Project(id=uuid.uuid4(), name="Sprint Assign Project", slug="sprint-assign-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
from datetime import date
|
||||
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Sprint 1",
|
||||
number=1,
|
||||
start_date=date.today(),
|
||||
end_date=date.today() + timedelta(days=14),
|
||||
status=SprintStatus.ACTIVE,
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Sprint Issue",
|
||||
sprint_id=sprint.id,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="Sprint Issue").first()
|
||||
assert retrieved.sprint_id == sprint.id
|
||||
|
||||
|
||||
class TestIssueExternalTracker:
|
||||
"""Tests for Issue external tracker integration."""
|
||||
|
||||
def test_gitea_integration(self, db_session):
|
||||
"""Test Gitea external tracker fields."""
|
||||
project = Project(id=uuid.uuid4(), name="Gitea Project", slug="gitea-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
now = datetime.now(UTC)
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Gitea Synced Issue",
|
||||
external_tracker="gitea",
|
||||
external_id="abc123xyz",
|
||||
external_url="https://gitea.example.com/org/repo/issues/42",
|
||||
external_number=42,
|
||||
sync_status=SyncStatus.SYNCED,
|
||||
last_synced_at=now,
|
||||
external_updated_at=now,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="Gitea Synced Issue").first()
|
||||
assert retrieved.external_tracker == "gitea"
|
||||
assert retrieved.external_id == "abc123xyz"
|
||||
assert retrieved.external_number == 42
|
||||
assert "/issues/42" in retrieved.external_url
|
||||
|
||||
def test_github_integration(self, db_session):
|
||||
"""Test GitHub external tracker fields."""
|
||||
project = Project(id=uuid.uuid4(), name="GitHub Project", slug="github-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="GitHub Synced Issue",
|
||||
external_tracker="github",
|
||||
external_id="gh-12345",
|
||||
external_url="https://github.com/org/repo/issues/100",
|
||||
external_number=100,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="GitHub Synced Issue").first()
|
||||
assert retrieved.external_tracker == "github"
|
||||
assert retrieved.external_number == 100
|
||||
|
||||
|
||||
class TestIssueLifecycle:
|
||||
"""Tests for Issue lifecycle operations."""
|
||||
|
||||
def test_close_issue(self, db_session):
|
||||
"""Test closing an issue."""
|
||||
project = Project(id=uuid.uuid4(), name="Close Issue Project", slug="close-issue-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Issue to Close",
|
||||
status=IssueStatus.OPEN,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
# Close the issue
|
||||
now = datetime.now(UTC)
|
||||
issue.status = IssueStatus.CLOSED
|
||||
issue.closed_at = now
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="Issue to Close").first()
|
||||
assert retrieved.status == IssueStatus.CLOSED
|
||||
assert retrieved.closed_at is not None
|
||||
|
||||
def test_reopen_issue(self, db_session):
|
||||
"""Test reopening a closed issue."""
|
||||
project = Project(id=uuid.uuid4(), name="Reopen Issue Project", slug="reopen-issue-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
now = datetime.now(UTC)
|
||||
issue = Issue(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
title="Issue to Reopen",
|
||||
status=IssueStatus.CLOSED,
|
||||
closed_at=now,
|
||||
)
|
||||
db_session.add(issue)
|
||||
db_session.commit()
|
||||
|
||||
# Reopen the issue
|
||||
issue.status = IssueStatus.OPEN
|
||||
issue.closed_at = None
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Issue).filter_by(title="Issue to Reopen").first()
|
||||
assert retrieved.status == IssueStatus.OPEN
|
||||
assert retrieved.closed_at is None
|
||||
262
backend/tests/models/syndarix/test_project.py
Normal file
262
backend/tests/models/syndarix/test_project.py
Normal file
@@ -0,0 +1,262 @@
|
||||
# tests/models/syndarix/test_project.py
|
||||
"""
|
||||
Unit tests for the Project model.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from app.models.syndarix import (
|
||||
AutonomyLevel,
|
||||
Project,
|
||||
ProjectStatus,
|
||||
)
|
||||
|
||||
|
||||
class TestProjectModel:
|
||||
"""Tests for Project model creation and fields."""
|
||||
|
||||
def test_create_project_with_required_fields(self, db_session):
|
||||
"""Test creating a project with only required fields."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Test Project",
|
||||
slug="test-project",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Project).filter_by(slug="test-project").first()
|
||||
|
||||
assert retrieved is not None
|
||||
assert retrieved.name == "Test Project"
|
||||
assert retrieved.slug == "test-project"
|
||||
assert retrieved.autonomy_level == AutonomyLevel.MILESTONE # Default
|
||||
assert retrieved.status == ProjectStatus.ACTIVE # Default
|
||||
assert retrieved.settings == {} # Default empty dict
|
||||
assert retrieved.description is None
|
||||
assert retrieved.owner_id is None
|
||||
|
||||
def test_create_project_with_all_fields(self, db_session):
|
||||
"""Test creating a project with all optional fields."""
|
||||
project_id = uuid.uuid4()
|
||||
owner_id = uuid.uuid4()
|
||||
|
||||
project = Project(
|
||||
id=project_id,
|
||||
name="Full Project",
|
||||
slug="full-project",
|
||||
description="A complete project with all fields",
|
||||
autonomy_level=AutonomyLevel.AUTONOMOUS,
|
||||
status=ProjectStatus.PAUSED,
|
||||
settings={"webhook_url": "https://example.com/webhook"},
|
||||
owner_id=owner_id,
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Project).filter_by(id=project_id).first()
|
||||
|
||||
assert retrieved.name == "Full Project"
|
||||
assert retrieved.slug == "full-project"
|
||||
assert retrieved.description == "A complete project with all fields"
|
||||
assert retrieved.autonomy_level == AutonomyLevel.AUTONOMOUS
|
||||
assert retrieved.status == ProjectStatus.PAUSED
|
||||
assert retrieved.settings == {"webhook_url": "https://example.com/webhook"}
|
||||
assert retrieved.owner_id == owner_id
|
||||
|
||||
def test_project_unique_slug_constraint(self, db_session):
|
||||
"""Test that projects cannot have duplicate slugs."""
|
||||
project1 = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Project One",
|
||||
slug="duplicate-slug",
|
||||
)
|
||||
db_session.add(project1)
|
||||
db_session.commit()
|
||||
|
||||
project2 = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Project Two",
|
||||
slug="duplicate-slug", # Same slug
|
||||
)
|
||||
db_session.add(project2)
|
||||
|
||||
with pytest.raises(IntegrityError):
|
||||
db_session.commit()
|
||||
|
||||
db_session.rollback()
|
||||
|
||||
def test_project_timestamps(self, db_session):
|
||||
"""Test that timestamps are automatically set."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Timestamp Project",
|
||||
slug="timestamp-project",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Project).filter_by(slug="timestamp-project").first()
|
||||
|
||||
assert isinstance(retrieved.created_at, datetime)
|
||||
assert isinstance(retrieved.updated_at, datetime)
|
||||
|
||||
def test_project_update(self, db_session):
|
||||
"""Test updating project fields."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Original Name",
|
||||
slug="original-slug",
|
||||
status=ProjectStatus.ACTIVE,
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
original_created_at = project.created_at
|
||||
|
||||
# Update fields
|
||||
project.name = "Updated Name"
|
||||
project.status = ProjectStatus.COMPLETED
|
||||
project.settings = {"new_setting": "value"}
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Project).filter_by(slug="original-slug").first()
|
||||
|
||||
assert retrieved.name == "Updated Name"
|
||||
assert retrieved.status == ProjectStatus.COMPLETED
|
||||
assert retrieved.settings == {"new_setting": "value"}
|
||||
assert retrieved.created_at == original_created_at
|
||||
assert retrieved.updated_at > original_created_at
|
||||
|
||||
def test_project_delete(self, db_session):
|
||||
"""Test deleting a project."""
|
||||
project_id = uuid.uuid4()
|
||||
project = Project(
|
||||
id=project_id,
|
||||
name="Delete Me",
|
||||
slug="delete-me",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
db_session.delete(project)
|
||||
db_session.commit()
|
||||
|
||||
deleted = db_session.query(Project).filter_by(id=project_id).first()
|
||||
assert deleted is None
|
||||
|
||||
def test_project_string_representation(self, db_session):
|
||||
"""Test the string representation of a project."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Repr Project",
|
||||
slug="repr-project",
|
||||
status=ProjectStatus.ACTIVE,
|
||||
)
|
||||
|
||||
assert str(project) == "<Project Repr Project (repr-project) status=active>"
|
||||
assert repr(project) == "<Project Repr Project (repr-project) status=active>"
|
||||
|
||||
|
||||
class TestProjectEnums:
|
||||
"""Tests for Project enum fields."""
|
||||
|
||||
def test_all_autonomy_levels(self, db_session):
|
||||
"""Test that all autonomy levels can be stored."""
|
||||
for level in AutonomyLevel:
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name=f"Project {level.value}",
|
||||
slug=f"project-{level.value}",
|
||||
autonomy_level=level,
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Project).filter_by(slug=f"project-{level.value}").first()
|
||||
assert retrieved.autonomy_level == level
|
||||
|
||||
def test_all_project_statuses(self, db_session):
|
||||
"""Test that all project statuses can be stored."""
|
||||
for status in ProjectStatus:
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name=f"Project {status.value}",
|
||||
slug=f"project-status-{status.value}",
|
||||
status=status,
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Project).filter_by(slug=f"project-status-{status.value}").first()
|
||||
assert retrieved.status == status
|
||||
|
||||
|
||||
class TestProjectSettings:
|
||||
"""Tests for Project JSON settings field."""
|
||||
|
||||
def test_complex_json_settings(self, db_session):
|
||||
"""Test storing complex JSON in settings."""
|
||||
complex_settings = {
|
||||
"mcp_servers": ["gitea", "slack", "file-system"],
|
||||
"webhook_urls": {
|
||||
"on_issue_created": "https://example.com/issue",
|
||||
"on_sprint_completed": "https://example.com/sprint",
|
||||
},
|
||||
"notification_settings": {
|
||||
"email": True,
|
||||
"slack_channel": "#syndarix-updates",
|
||||
},
|
||||
"tags": ["important", "client-a"],
|
||||
}
|
||||
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Complex Settings Project",
|
||||
slug="complex-settings",
|
||||
settings=complex_settings,
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Project).filter_by(slug="complex-settings").first()
|
||||
|
||||
assert retrieved.settings == complex_settings
|
||||
assert retrieved.settings["mcp_servers"] == ["gitea", "slack", "file-system"]
|
||||
assert retrieved.settings["webhook_urls"]["on_issue_created"] == "https://example.com/issue"
|
||||
assert "important" in retrieved.settings["tags"]
|
||||
|
||||
def test_empty_settings(self, db_session):
|
||||
"""Test that empty settings defaults correctly."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Empty Settings",
|
||||
slug="empty-settings",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Project).filter_by(slug="empty-settings").first()
|
||||
assert retrieved.settings == {}
|
||||
|
||||
def test_update_settings(self, db_session):
|
||||
"""Test updating settings field."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Update Settings",
|
||||
slug="update-settings",
|
||||
settings={"initial": "value"},
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
# Update settings
|
||||
project.settings = {"updated": "new_value", "additional": "data"}
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Project).filter_by(slug="update-settings").first()
|
||||
assert retrieved.settings == {"updated": "new_value", "additional": "data"}
|
||||
507
backend/tests/models/syndarix/test_sprint.py
Normal file
507
backend/tests/models/syndarix/test_sprint.py
Normal file
@@ -0,0 +1,507 @@
|
||||
# tests/models/syndarix/test_sprint.py
|
||||
"""
|
||||
Unit tests for the Sprint model.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from datetime import date, datetime, timedelta
|
||||
|
||||
import pytest
|
||||
|
||||
from app.models.syndarix import (
|
||||
Project,
|
||||
Sprint,
|
||||
SprintStatus,
|
||||
)
|
||||
|
||||
|
||||
class TestSprintModel:
|
||||
"""Tests for Sprint model creation and fields."""
|
||||
|
||||
def test_create_sprint_with_required_fields(self, db_session):
|
||||
"""Test creating a sprint with only required fields."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Sprint Project",
|
||||
slug="sprint-project",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Sprint 1",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Sprint 1").first()
|
||||
|
||||
assert retrieved is not None
|
||||
assert retrieved.name == "Sprint 1"
|
||||
assert retrieved.number == 1
|
||||
assert retrieved.start_date == today
|
||||
assert retrieved.end_date == today + timedelta(days=14)
|
||||
assert retrieved.status == SprintStatus.PLANNED # Default
|
||||
assert retrieved.goal is None
|
||||
assert retrieved.planned_points is None
|
||||
assert retrieved.completed_points is None
|
||||
|
||||
def test_create_sprint_with_all_fields(self, db_session):
|
||||
"""Test creating a sprint with all optional fields."""
|
||||
project = Project(
|
||||
id=uuid.uuid4(),
|
||||
name="Full Sprint Project",
|
||||
slug="full-sprint-project",
|
||||
)
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint_id = uuid.uuid4()
|
||||
|
||||
sprint = Sprint(
|
||||
id=sprint_id,
|
||||
project_id=project.id,
|
||||
name="Full Sprint",
|
||||
number=5,
|
||||
goal="Complete all authentication features",
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
status=SprintStatus.ACTIVE,
|
||||
planned_points=34,
|
||||
completed_points=21,
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(id=sprint_id).first()
|
||||
|
||||
assert retrieved.name == "Full Sprint"
|
||||
assert retrieved.number == 5
|
||||
assert retrieved.goal == "Complete all authentication features"
|
||||
assert retrieved.status == SprintStatus.ACTIVE
|
||||
assert retrieved.planned_points == 34
|
||||
assert retrieved.completed_points == 21
|
||||
|
||||
def test_sprint_timestamps(self, db_session):
|
||||
"""Test that timestamps are automatically set."""
|
||||
project = Project(id=uuid.uuid4(), name="Timestamp Sprint Project", slug="timestamp-sprint-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Timestamp Sprint",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
assert isinstance(sprint.created_at, datetime)
|
||||
assert isinstance(sprint.updated_at, datetime)
|
||||
|
||||
def test_sprint_string_representation(self, db_session):
|
||||
"""Test the string representation of a sprint."""
|
||||
project = Project(id=uuid.uuid4(), name="Repr Sprint Project", slug="repr-sprint-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Sprint Alpha",
|
||||
number=3,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
status=SprintStatus.ACTIVE,
|
||||
)
|
||||
|
||||
repr_str = repr(sprint)
|
||||
assert "Sprint Alpha" in repr_str
|
||||
assert "#3" in repr_str
|
||||
assert str(project.id) in repr_str
|
||||
assert "active" in repr_str
|
||||
|
||||
|
||||
class TestSprintStatus:
|
||||
"""Tests for Sprint status field."""
|
||||
|
||||
def test_all_sprint_statuses(self, db_session):
|
||||
"""Test that all sprint statuses can be stored."""
|
||||
project = Project(id=uuid.uuid4(), name="Status Sprint Project", slug="status-sprint-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
for idx, status in enumerate(SprintStatus):
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name=f"Sprint {status.value}",
|
||||
number=idx + 1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
status=status,
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(id=sprint.id).first()
|
||||
assert retrieved.status == status
|
||||
|
||||
|
||||
class TestSprintLifecycle:
|
||||
"""Tests for Sprint lifecycle operations."""
|
||||
|
||||
def test_start_sprint(self, db_session):
|
||||
"""Test starting a planned sprint."""
|
||||
project = Project(id=uuid.uuid4(), name="Start Sprint Project", slug="start-sprint-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Sprint to Start",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
status=SprintStatus.PLANNED,
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
# Start the sprint
|
||||
sprint.status = SprintStatus.ACTIVE
|
||||
sprint.planned_points = 21
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Sprint to Start").first()
|
||||
assert retrieved.status == SprintStatus.ACTIVE
|
||||
assert retrieved.planned_points == 21
|
||||
|
||||
def test_complete_sprint(self, db_session):
|
||||
"""Test completing an active sprint."""
|
||||
project = Project(id=uuid.uuid4(), name="Complete Sprint Project", slug="complete-sprint-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Sprint to Complete",
|
||||
number=1,
|
||||
start_date=today - timedelta(days=14),
|
||||
end_date=today,
|
||||
status=SprintStatus.ACTIVE,
|
||||
planned_points=21,
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
# Complete the sprint
|
||||
sprint.status = SprintStatus.COMPLETED
|
||||
sprint.completed_points = 18
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Sprint to Complete").first()
|
||||
assert retrieved.status == SprintStatus.COMPLETED
|
||||
assert retrieved.completed_points == 18
|
||||
|
||||
def test_cancel_sprint(self, db_session):
|
||||
"""Test cancelling a sprint."""
|
||||
project = Project(id=uuid.uuid4(), name="Cancel Sprint Project", slug="cancel-sprint-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Sprint to Cancel",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
status=SprintStatus.ACTIVE,
|
||||
planned_points=21,
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
# Cancel the sprint
|
||||
sprint.status = SprintStatus.CANCELLED
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Sprint to Cancel").first()
|
||||
assert retrieved.status == SprintStatus.CANCELLED
|
||||
|
||||
|
||||
class TestSprintDates:
|
||||
"""Tests for Sprint date fields."""
|
||||
|
||||
def test_sprint_date_range(self, db_session):
|
||||
"""Test storing sprint date range."""
|
||||
project = Project(id=uuid.uuid4(), name="Date Range Project", slug="date-range-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
start = date(2024, 1, 1)
|
||||
end = date(2024, 1, 14)
|
||||
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Date Range Sprint",
|
||||
number=1,
|
||||
start_date=start,
|
||||
end_date=end,
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Date Range Sprint").first()
|
||||
assert retrieved.start_date == start
|
||||
assert retrieved.end_date == end
|
||||
|
||||
def test_one_day_sprint(self, db_session):
|
||||
"""Test creating a one-day sprint."""
|
||||
project = Project(id=uuid.uuid4(), name="One Day Project", slug="one-day-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="One Day Sprint",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today, # Same day
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="One Day Sprint").first()
|
||||
assert retrieved.start_date == retrieved.end_date
|
||||
|
||||
def test_long_sprint(self, db_session):
|
||||
"""Test creating a long sprint (e.g., 4 weeks)."""
|
||||
project = Project(id=uuid.uuid4(), name="Long Sprint Project", slug="long-sprint-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Long Sprint",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=28), # 4 weeks
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Long Sprint").first()
|
||||
delta = retrieved.end_date - retrieved.start_date
|
||||
assert delta.days == 28
|
||||
|
||||
|
||||
class TestSprintPoints:
|
||||
"""Tests for Sprint story points fields."""
|
||||
|
||||
def test_sprint_with_zero_points(self, db_session):
|
||||
"""Test sprint with zero planned points."""
|
||||
project = Project(id=uuid.uuid4(), name="Zero Points Project", slug="zero-points-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Zero Points Sprint",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
planned_points=0,
|
||||
completed_points=0,
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Zero Points Sprint").first()
|
||||
assert retrieved.planned_points == 0
|
||||
assert retrieved.completed_points == 0
|
||||
|
||||
def test_sprint_velocity_calculation(self, db_session):
|
||||
"""Test that we can calculate velocity from points."""
|
||||
project = Project(id=uuid.uuid4(), name="Velocity Project", slug="velocity-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Velocity Sprint",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
status=SprintStatus.COMPLETED,
|
||||
planned_points=21,
|
||||
completed_points=18,
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Velocity Sprint").first()
|
||||
|
||||
# Calculate velocity
|
||||
velocity = retrieved.completed_points / retrieved.planned_points
|
||||
assert velocity == pytest.approx(18 / 21, rel=0.01)
|
||||
|
||||
def test_sprint_overdelivery(self, db_session):
|
||||
"""Test sprint where completed > planned (stretch goals)."""
|
||||
project = Project(id=uuid.uuid4(), name="Overdelivery Project", slug="overdelivery-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Overdelivery Sprint",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
status=SprintStatus.COMPLETED,
|
||||
planned_points=20,
|
||||
completed_points=25, # Completed more than planned
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Overdelivery Sprint").first()
|
||||
assert retrieved.completed_points > retrieved.planned_points
|
||||
|
||||
|
||||
class TestSprintNumber:
|
||||
"""Tests for Sprint number field."""
|
||||
|
||||
def test_sequential_sprint_numbers(self, db_session):
|
||||
"""Test creating sprints with sequential numbers."""
|
||||
project = Project(id=uuid.uuid4(), name="Sequential Project", slug="sequential-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
for i in range(1, 6):
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name=f"Sprint {i}",
|
||||
number=i,
|
||||
start_date=today + timedelta(days=(i - 1) * 14),
|
||||
end_date=today + timedelta(days=i * 14 - 1),
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
sprints = db_session.query(Sprint).filter_by(project_id=project.id).order_by(Sprint.number).all()
|
||||
assert len(sprints) == 5
|
||||
for i, sprint in enumerate(sprints, 1):
|
||||
assert sprint.number == i
|
||||
|
||||
def test_large_sprint_number(self, db_session):
|
||||
"""Test sprint with large number (e.g., long-running project)."""
|
||||
project = Project(id=uuid.uuid4(), name="Large Number Project", slug="large-number-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Sprint 100",
|
||||
number=100,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Sprint 100").first()
|
||||
assert retrieved.number == 100
|
||||
|
||||
|
||||
class TestSprintUpdate:
|
||||
"""Tests for Sprint update operations."""
|
||||
|
||||
def test_update_sprint_goal(self, db_session):
|
||||
"""Test updating sprint goal."""
|
||||
project = Project(id=uuid.uuid4(), name="Update Goal Project", slug="update-goal-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Update Goal Sprint",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
goal="Original goal",
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
original_created_at = sprint.created_at
|
||||
|
||||
sprint.goal = "Updated goal with more detail"
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Update Goal Sprint").first()
|
||||
assert retrieved.goal == "Updated goal with more detail"
|
||||
assert retrieved.created_at == original_created_at
|
||||
assert retrieved.updated_at > original_created_at
|
||||
|
||||
def test_update_sprint_dates(self, db_session):
|
||||
"""Test updating sprint dates."""
|
||||
project = Project(id=uuid.uuid4(), name="Update Dates Project", slug="update-dates-project")
|
||||
db_session.add(project)
|
||||
db_session.commit()
|
||||
|
||||
today = date.today()
|
||||
sprint = Sprint(
|
||||
id=uuid.uuid4(),
|
||||
project_id=project.id,
|
||||
name="Update Dates Sprint",
|
||||
number=1,
|
||||
start_date=today,
|
||||
end_date=today + timedelta(days=14),
|
||||
)
|
||||
db_session.add(sprint)
|
||||
db_session.commit()
|
||||
|
||||
# Extend sprint by a week
|
||||
sprint.end_date = today + timedelta(days=21)
|
||||
db_session.commit()
|
||||
|
||||
retrieved = db_session.query(Sprint).filter_by(name="Update Dates Sprint").first()
|
||||
delta = retrieved.end_date - retrieved.start_date
|
||||
assert delta.days == 21
|
||||
Reference in New Issue
Block a user