# tests/tasks/test_sync_tasks.py """ Tests for issue synchronization tasks. These tests verify: - Task signatures are correctly defined - Tasks are bound (have access to self) - Tasks return expected structure - Tasks are routed to the 'sync' queue per ADR-011 Note: These tests mock actual execution since they would require external API calls in production. """ import pytest from unittest.mock import patch import uuid class TestSyncIssuesIncrementalTask: """Tests for the sync_issues_incremental task.""" def test_sync_issues_incremental_task_exists(self): """Test that sync_issues_incremental task is registered.""" from app.celery_app import celery_app import app.tasks.sync # noqa: F401 assert "app.tasks.sync.sync_issues_incremental" in celery_app.tasks def test_sync_issues_incremental_is_bound_task(self): """Test that sync_issues_incremental is a bound task.""" from app.tasks.sync import sync_issues_incremental assert sync_issues_incremental.__bound__ is True def test_sync_issues_incremental_has_correct_name(self): """Test that sync_issues_incremental has the correct task name.""" from app.tasks.sync import sync_issues_incremental assert sync_issues_incremental.name == "app.tasks.sync.sync_issues_incremental" def test_sync_issues_incremental_returns_expected_structure(self): """Test that sync_issues_incremental returns expected result.""" from app.tasks.sync import sync_issues_incremental result = sync_issues_incremental() assert isinstance(result, dict) assert "status" in result assert "type" in result assert result["type"] == "incremental" class TestSyncIssuesFullTask: """Tests for the sync_issues_full task.""" def test_sync_issues_full_task_exists(self): """Test that sync_issues_full task is registered.""" from app.celery_app import celery_app import app.tasks.sync # noqa: F401 assert "app.tasks.sync.sync_issues_full" in celery_app.tasks def test_sync_issues_full_is_bound_task(self): """Test that sync_issues_full is a bound task.""" from app.tasks.sync import sync_issues_full assert sync_issues_full.__bound__ is True def test_sync_issues_full_has_correct_name(self): """Test that sync_issues_full has the correct task name.""" from app.tasks.sync import sync_issues_full assert sync_issues_full.name == "app.tasks.sync.sync_issues_full" def test_sync_issues_full_returns_expected_structure(self): """Test that sync_issues_full returns expected result.""" from app.tasks.sync import sync_issues_full result = sync_issues_full() assert isinstance(result, dict) assert "status" in result assert "type" in result assert result["type"] == "full" class TestProcessWebhookEventTask: """Tests for the process_webhook_event task.""" def test_process_webhook_event_task_exists(self): """Test that process_webhook_event task is registered.""" from app.celery_app import celery_app import app.tasks.sync # noqa: F401 assert "app.tasks.sync.process_webhook_event" in celery_app.tasks def test_process_webhook_event_is_bound_task(self): """Test that process_webhook_event is a bound task.""" from app.tasks.sync import process_webhook_event assert process_webhook_event.__bound__ is True def test_process_webhook_event_returns_expected_structure(self): """Test that process_webhook_event returns expected result.""" from app.tasks.sync import process_webhook_event provider = "gitea" event_type = "issue.created" payload = { "action": "opened", "issue": {"number": 123, "title": "New issue"}, } result = process_webhook_event(provider, event_type, payload) assert isinstance(result, dict) assert "status" in result assert "provider" in result assert "event_type" in result assert result["provider"] == provider assert result["event_type"] == event_type def test_process_webhook_event_handles_github_provider(self): """Test that process_webhook_event handles GitHub webhooks.""" from app.tasks.sync import process_webhook_event result = process_webhook_event( "github", "issues", {"action": "opened", "issue": {"number": 1}} ) assert result["provider"] == "github" def test_process_webhook_event_handles_gitlab_provider(self): """Test that process_webhook_event handles GitLab webhooks.""" from app.tasks.sync import process_webhook_event result = process_webhook_event( "gitlab", "issue.created", {"object_kind": "issue", "object_attributes": {"iid": 1}}, ) assert result["provider"] == "gitlab" class TestSyncProjectIssuesTask: """Tests for the sync_project_issues task.""" def test_sync_project_issues_task_exists(self): """Test that sync_project_issues task is registered.""" from app.celery_app import celery_app import app.tasks.sync # noqa: F401 assert "app.tasks.sync.sync_project_issues" in celery_app.tasks def test_sync_project_issues_is_bound_task(self): """Test that sync_project_issues is a bound task.""" from app.tasks.sync import sync_project_issues assert sync_project_issues.__bound__ is True def test_sync_project_issues_returns_expected_structure(self): """Test that sync_project_issues returns expected result.""" from app.tasks.sync import sync_project_issues project_id = str(uuid.uuid4()) full = False result = sync_project_issues(project_id, full) assert isinstance(result, dict) assert "status" in result assert "project_id" in result assert result["project_id"] == project_id def test_sync_project_issues_with_full_sync(self): """Test that sync_project_issues handles full sync flag.""" from app.tasks.sync import sync_project_issues project_id = str(uuid.uuid4()) result = sync_project_issues(project_id, full=True) assert result["status"] == "pending" class TestPushIssueToExternalTask: """Tests for the push_issue_to_external task.""" def test_push_issue_to_external_task_exists(self): """Test that push_issue_to_external task is registered.""" from app.celery_app import celery_app import app.tasks.sync # noqa: F401 assert "app.tasks.sync.push_issue_to_external" in celery_app.tasks def test_push_issue_to_external_is_bound_task(self): """Test that push_issue_to_external is a bound task.""" from app.tasks.sync import push_issue_to_external assert push_issue_to_external.__bound__ is True def test_push_issue_to_external_returns_expected_structure(self): """Test that push_issue_to_external returns expected result.""" from app.tasks.sync import push_issue_to_external project_id = str(uuid.uuid4()) issue_id = str(uuid.uuid4()) operation = "create" result = push_issue_to_external(project_id, issue_id, operation) assert isinstance(result, dict) assert "status" in result assert "issue_id" in result assert "operation" in result assert result["issue_id"] == issue_id assert result["operation"] == operation def test_push_issue_to_external_update_operation(self): """Test that push_issue_to_external handles update operation.""" from app.tasks.sync import push_issue_to_external project_id = str(uuid.uuid4()) issue_id = str(uuid.uuid4()) result = push_issue_to_external(project_id, issue_id, "update") assert result["operation"] == "update" def test_push_issue_to_external_close_operation(self): """Test that push_issue_to_external handles close operation.""" from app.tasks.sync import push_issue_to_external project_id = str(uuid.uuid4()) issue_id = str(uuid.uuid4()) result = push_issue_to_external(project_id, issue_id, "close") assert result["operation"] == "close" class TestSyncTaskRouting: """Tests for sync task queue routing.""" def test_sync_tasks_should_route_to_sync_queue(self): """Test that sync tasks are configured to route to 'sync' queue.""" from app.celery_app import celery_app routes = celery_app.conf.task_routes sync_route = routes.get("app.tasks.sync.*") assert sync_route is not None assert sync_route["queue"] == "sync" def test_all_sync_tasks_match_routing_pattern(self): """Test that all sync task names match the routing pattern.""" task_names = [ "app.tasks.sync.sync_issues_incremental", "app.tasks.sync.sync_issues_full", "app.tasks.sync.process_webhook_event", "app.tasks.sync.sync_project_issues", "app.tasks.sync.push_issue_to_external", ] for name in task_names: assert name.startswith("app.tasks.sync.") class TestSyncTaskLogging: """Tests for sync task logging behavior.""" def test_sync_issues_incremental_logs_execution(self): """Test that sync_issues_incremental logs when executed.""" from app.tasks.sync import sync_issues_incremental with patch("app.tasks.sync.logger") as mock_logger: sync_issues_incremental() mock_logger.info.assert_called_once() call_args = mock_logger.info.call_args[0][0] assert "incremental" in call_args.lower() def test_sync_issues_full_logs_execution(self): """Test that sync_issues_full logs when executed.""" from app.tasks.sync import sync_issues_full with patch("app.tasks.sync.logger") as mock_logger: sync_issues_full() mock_logger.info.assert_called_once() call_args = mock_logger.info.call_args[0][0] assert "full" in call_args.lower() or "reconciliation" in call_args.lower() def test_process_webhook_event_logs_execution(self): """Test that process_webhook_event logs when executed.""" from app.tasks.sync import process_webhook_event provider = "gitea" event_type = "issue.updated" with patch("app.tasks.sync.logger") as mock_logger: process_webhook_event(provider, event_type, {}) mock_logger.info.assert_called_once() call_args = mock_logger.info.call_args[0][0] assert provider in call_args assert event_type in call_args