# tests/tasks/test_agent_tasks.py """ Tests for agent execution tasks. These tests verify: - Task signatures are correctly defined - Tasks are bound (have access to self) - Tasks return expected structure - Tasks handle various input scenarios Note: These tests mock actual execution since they would require LLM calls and database access in production. """ import uuid from unittest.mock import patch class TestRunAgentStepTask: """Tests for the run_agent_step task.""" def test_run_agent_step_task_exists(self): """Test that run_agent_step task is registered.""" import app.tasks.agent # noqa: F401 from app.celery_app import celery_app assert "app.tasks.agent.run_agent_step" in celery_app.tasks def test_run_agent_step_is_bound_task(self): """Test that run_agent_step is a bound task (has access to self).""" from app.tasks.agent import run_agent_step # Bound tasks have __bound__=True, which means they receive 'self' as first arg assert run_agent_step.__bound__ is True def test_run_agent_step_has_correct_name(self): """Test that run_agent_step has the correct task name.""" from app.tasks.agent import run_agent_step assert run_agent_step.name == "app.tasks.agent.run_agent_step" def test_run_agent_step_returns_expected_structure(self): """Test that run_agent_step returns the expected result structure.""" from app.tasks.agent import run_agent_step agent_instance_id = str(uuid.uuid4()) context = {"messages": [], "tools": []} # Call the task directly (synchronously for testing) result = run_agent_step(agent_instance_id, context) assert isinstance(result, dict) assert "status" in result assert "agent_instance_id" in result assert result["agent_instance_id"] == agent_instance_id def test_run_agent_step_with_empty_context(self): """Test that run_agent_step handles empty context.""" from app.tasks.agent import run_agent_step agent_instance_id = str(uuid.uuid4()) context = {} result = run_agent_step(agent_instance_id, context) assert result["status"] == "pending" assert result["agent_instance_id"] == agent_instance_id def test_run_agent_step_with_complex_context(self): """Test that run_agent_step handles complex context data.""" from app.tasks.agent import run_agent_step agent_instance_id = str(uuid.uuid4()) context = { "messages": [ {"role": "user", "content": "Create a new feature"}, {"role": "assistant", "content": "I will create the feature."}, ], "tools": ["create_file", "edit_file", "run_tests"], "state": {"current_step": 3, "max_steps": 10}, "metadata": {"project_id": str(uuid.uuid4())}, } result = run_agent_step(agent_instance_id, context) assert result["status"] == "pending" assert result["agent_instance_id"] == agent_instance_id class TestSpawnAgentTask: """Tests for the spawn_agent task.""" def test_spawn_agent_task_exists(self): """Test that spawn_agent task is registered.""" import app.tasks.agent # noqa: F401 from app.celery_app import celery_app assert "app.tasks.agent.spawn_agent" in celery_app.tasks def test_spawn_agent_is_bound_task(self): """Test that spawn_agent is a bound task.""" from app.tasks.agent import spawn_agent assert spawn_agent.__bound__ is True def test_spawn_agent_has_correct_name(self): """Test that spawn_agent has the correct task name.""" from app.tasks.agent import spawn_agent assert spawn_agent.name == "app.tasks.agent.spawn_agent" def test_spawn_agent_returns_expected_structure(self): """Test that spawn_agent returns the expected result structure.""" from app.tasks.agent import spawn_agent agent_type_id = str(uuid.uuid4()) project_id = str(uuid.uuid4()) initial_context = {"goal": "Implement user story"} result = spawn_agent(agent_type_id, project_id, initial_context) assert isinstance(result, dict) assert "status" in result assert "agent_type_id" in result assert "project_id" in result assert result["status"] == "spawned" assert result["agent_type_id"] == agent_type_id assert result["project_id"] == project_id def test_spawn_agent_with_empty_initial_context(self): """Test that spawn_agent handles empty initial context.""" from app.tasks.agent import spawn_agent agent_type_id = str(uuid.uuid4()) project_id = str(uuid.uuid4()) initial_context = {} result = spawn_agent(agent_type_id, project_id, initial_context) assert result["status"] == "spawned" def test_spawn_agent_with_detailed_initial_context(self): """Test that spawn_agent handles detailed initial context.""" from app.tasks.agent import spawn_agent agent_type_id = str(uuid.uuid4()) project_id = str(uuid.uuid4()) initial_context = { "goal": "Implement authentication", "constraints": ["Must use JWT", "Must support MFA"], "assigned_issues": [str(uuid.uuid4()), str(uuid.uuid4())], "autonomy_level": "MILESTONE", } result = spawn_agent(agent_type_id, project_id, initial_context) assert result["status"] == "spawned" assert result["agent_type_id"] == agent_type_id assert result["project_id"] == project_id class TestTerminateAgentTask: """Tests for the terminate_agent task.""" def test_terminate_agent_task_exists(self): """Test that terminate_agent task is registered.""" import app.tasks.agent # noqa: F401 from app.celery_app import celery_app assert "app.tasks.agent.terminate_agent" in celery_app.tasks def test_terminate_agent_is_bound_task(self): """Test that terminate_agent is a bound task.""" from app.tasks.agent import terminate_agent assert terminate_agent.__bound__ is True def test_terminate_agent_has_correct_name(self): """Test that terminate_agent has the correct task name.""" from app.tasks.agent import terminate_agent assert terminate_agent.name == "app.tasks.agent.terminate_agent" def test_terminate_agent_returns_expected_structure(self): """Test that terminate_agent returns the expected result structure.""" from app.tasks.agent import terminate_agent agent_instance_id = str(uuid.uuid4()) reason = "Task completed successfully" result = terminate_agent(agent_instance_id, reason) assert isinstance(result, dict) assert "status" in result assert "agent_instance_id" in result assert result["status"] == "terminated" assert result["agent_instance_id"] == agent_instance_id def test_terminate_agent_with_error_reason(self): """Test that terminate_agent handles error termination reasons.""" from app.tasks.agent import terminate_agent agent_instance_id = str(uuid.uuid4()) reason = "Error: Budget limit exceeded" result = terminate_agent(agent_instance_id, reason) assert result["status"] == "terminated" assert result["agent_instance_id"] == agent_instance_id def test_terminate_agent_with_empty_reason(self): """Test that terminate_agent handles empty reason string.""" from app.tasks.agent import terminate_agent agent_instance_id = str(uuid.uuid4()) reason = "" result = terminate_agent(agent_instance_id, reason) assert result["status"] == "terminated" class TestAgentTaskRouting: """Tests for agent task queue routing.""" def test_agent_tasks_should_route_to_agent_queue(self): """Test that agent tasks are configured to route to 'agent' queue.""" from app.celery_app import celery_app routes = celery_app.conf.task_routes agent_route = routes.get("app.tasks.agent.*") assert agent_route is not None assert agent_route["queue"] == "agent" def test_run_agent_step_routing(self): """Test that run_agent_step task routes to agent queue.""" from app.celery_app import celery_app from app.tasks.agent import run_agent_step # Get the routing configuration for this specific task task_name = run_agent_step.name routes = celery_app.conf.task_routes # The task name matches the pattern "app.tasks.agent.*" assert task_name.startswith("app.tasks.agent.") assert "app.tasks.agent.*" in routes assert routes["app.tasks.agent.*"]["queue"] == "agent" class TestAgentTaskSignatures: """Tests for agent task signature creation (for async invocation).""" def test_run_agent_step_signature_creation(self): """Test that run_agent_step signature can be created.""" from app.tasks.agent import run_agent_step agent_instance_id = str(uuid.uuid4()) context = {"messages": []} # Create a signature (delayed task) sig = run_agent_step.s(agent_instance_id, context) assert sig is not None assert sig.args == (agent_instance_id, context) def test_spawn_agent_signature_creation(self): """Test that spawn_agent signature can be created.""" from app.tasks.agent import spawn_agent agent_type_id = str(uuid.uuid4()) project_id = str(uuid.uuid4()) initial_context = {} sig = spawn_agent.s(agent_type_id, project_id, initial_context) assert sig is not None assert sig.args == (agent_type_id, project_id, initial_context) def test_terminate_agent_signature_creation(self): """Test that terminate_agent signature can be created.""" from app.tasks.agent import terminate_agent agent_instance_id = str(uuid.uuid4()) reason = "User requested termination" sig = terminate_agent.s(agent_instance_id, reason) assert sig is not None assert sig.args == (agent_instance_id, reason) def test_agent_task_chain_creation(self): """Test that agent tasks can be chained together.""" from celery import chain from app.tasks.agent import spawn_agent # Create a chain of tasks (this doesn't execute, just builds the chain) agent_type_id = str(uuid.uuid4()) project_id = str(uuid.uuid4()) str(uuid.uuid4()) # Note: In real usage, the chain would pass results between tasks workflow = chain( spawn_agent.s(agent_type_id, project_id, {}), # Further tasks would use the result from spawn_agent ) assert workflow is not None class TestAgentTaskLogging: """Tests for agent task logging behavior.""" def test_run_agent_step_logs_execution(self): """Test that run_agent_step logs when executed.""" from app.tasks.agent import run_agent_step agent_instance_id = str(uuid.uuid4()) context = {} with patch("app.tasks.agent.logger") as mock_logger: run_agent_step(agent_instance_id, context) mock_logger.info.assert_called_once() call_args = mock_logger.info.call_args[0][0] assert agent_instance_id in call_args def test_spawn_agent_logs_execution(self): """Test that spawn_agent logs when executed.""" from app.tasks.agent import spawn_agent agent_type_id = str(uuid.uuid4()) project_id = str(uuid.uuid4()) with patch("app.tasks.agent.logger") as mock_logger: spawn_agent(agent_type_id, project_id, {}) mock_logger.info.assert_called_once() call_args = mock_logger.info.call_args[0][0] assert agent_type_id in call_args assert project_id in call_args def test_terminate_agent_logs_execution(self): """Test that terminate_agent logs when executed.""" from app.tasks.agent import terminate_agent agent_instance_id = str(uuid.uuid4()) reason = "Test termination" with patch("app.tasks.agent.logger") as mock_logger: terminate_agent(agent_instance_id, reason) mock_logger.info.assert_called_once() call_args = mock_logger.info.call_args[0][0] assert agent_instance_id in call_args assert reason in call_args