From 11da0d57a87c38fdae5ce144efc1a7c9911b671f Mon Sep 17 00:00:00 2001 From: Felipe Cardoso Date: Tue, 30 Dec 2025 02:08:14 +0100 Subject: [PATCH] feat(backend): Add Celery worker infrastructure with task stubs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Celery app configuration with Redis broker/backend - Add task modules: agent, workflow, cost, git, sync - Add task stubs for: - Agent execution (spawn, heartbeat, terminate) - Workflow orchestration (start sprint, checkpoint, code review) - Cost tracking (record usage, calculate, generate report) - Git operations (clone, commit, push, sync) - External sync (import issues, export updates) - Add task tests directory structure - Configure for production-ready Celery setup Implements #18 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/app/celery_app.py | 110 ++++++ backend/app/tasks/__init__.py | 23 ++ backend/app/tasks/agent.py | 150 ++++++++ backend/app/tasks/cost.py | 201 +++++++++++ backend/app/tasks/git.py | 225 ++++++++++++ backend/app/tasks/sync.py | 198 +++++++++++ backend/app/tasks/workflow.py | 213 ++++++++++++ backend/tests/tasks/__init__.py | 11 + backend/tests/tasks/test_agent_tasks.py | 358 +++++++++++++++++++ backend/tests/tasks/test_celery_config.py | 321 +++++++++++++++++ backend/tests/tasks/test_cost_tasks.py | 379 +++++++++++++++++++++ backend/tests/tasks/test_git_tasks.py | 301 ++++++++++++++++ backend/tests/tasks/test_sync_tasks.py | 309 +++++++++++++++++ backend/tests/tasks/test_workflow_tasks.py | 350 +++++++++++++++++++ 14 files changed, 3149 insertions(+) create mode 100644 backend/app/celery_app.py create mode 100644 backend/app/tasks/__init__.py create mode 100644 backend/app/tasks/agent.py create mode 100644 backend/app/tasks/cost.py create mode 100644 backend/app/tasks/git.py create mode 100644 backend/app/tasks/sync.py create mode 100644 backend/app/tasks/workflow.py create mode 100644 backend/tests/tasks/__init__.py create mode 100644 backend/tests/tasks/test_agent_tasks.py create mode 100644 backend/tests/tasks/test_celery_config.py create mode 100644 backend/tests/tasks/test_cost_tasks.py create mode 100644 backend/tests/tasks/test_git_tasks.py create mode 100644 backend/tests/tasks/test_sync_tasks.py create mode 100644 backend/tests/tasks/test_workflow_tasks.py diff --git a/backend/app/celery_app.py b/backend/app/celery_app.py new file mode 100644 index 0000000..fe1f87a --- /dev/null +++ b/backend/app/celery_app.py @@ -0,0 +1,110 @@ +# app/celery_app.py +""" +Celery application configuration for Syndarix. + +This module configures the Celery app for background task processing: +- Agent execution tasks (LLM calls, tool execution) +- Git operations (clone, commit, push, PR creation) +- Issue synchronization with external trackers +- Workflow state management +- Cost tracking and budget monitoring + +Architecture: +- Redis as message broker and result backend +- Queue routing for task isolation +- JSON serialization for cross-language compatibility +- Beat scheduler for periodic tasks +""" + +from celery import Celery + +from app.core.config import settings + +# Create Celery application instance +celery_app = Celery( + "syndarix", + broker=settings.celery_broker_url, + backend=settings.celery_result_backend, +) + +# Define task queues with their own exchanges and routing keys +TASK_QUEUES = { + "agent": {"exchange": "agent", "routing_key": "agent"}, + "git": {"exchange": "git", "routing_key": "git"}, + "sync": {"exchange": "sync", "routing_key": "sync"}, + "default": {"exchange": "default", "routing_key": "default"}, +} + +# Configure Celery +celery_app.conf.update( + # Serialization + task_serializer="json", + accept_content=["json"], + result_serializer="json", + # Timezone + timezone="UTC", + enable_utc=True, + # Task imports for auto-discovery + imports=("app.tasks",), + # Default queue + task_default_queue="default", + # Task queues configuration + task_queues=TASK_QUEUES, + # Task routing - route tasks to appropriate queues + task_routes={ + "app.tasks.agent.*": {"queue": "agent"}, + "app.tasks.git.*": {"queue": "git"}, + "app.tasks.sync.*": {"queue": "sync"}, + "app.tasks.*": {"queue": "default"}, + }, + # Time limits per ADR-003 + task_soft_time_limit=300, # 5 minutes soft limit + task_time_limit=600, # 10 minutes hard limit + # Result expiration - 24 hours + result_expires=86400, + # Broker connection retry + broker_connection_retry_on_startup=True, + # Beat schedule for periodic tasks + beat_schedule={ + # Cost aggregation every hour per ADR-012 + "aggregate-daily-costs": { + "task": "app.tasks.cost.aggregate_daily_costs", + "schedule": 3600.0, # 1 hour in seconds + }, + # Reset daily budget counters at midnight UTC + "reset-daily-budget-counters": { + "task": "app.tasks.cost.reset_daily_budget_counters", + "schedule": 86400.0, # 24 hours in seconds + }, + # Check for stale workflows every 5 minutes + "recover-stale-workflows": { + "task": "app.tasks.workflow.recover_stale_workflows", + "schedule": 300.0, # 5 minutes in seconds + }, + # Incremental issue sync every minute per ADR-011 + "sync-issues-incremental": { + "task": "app.tasks.sync.sync_issues_incremental", + "schedule": 60.0, # 1 minute in seconds + }, + # Full issue reconciliation every 15 minutes per ADR-011 + "sync-issues-full": { + "task": "app.tasks.sync.sync_issues_full", + "schedule": 900.0, # 15 minutes in seconds + }, + }, + # Task execution settings + task_acks_late=True, # Acknowledge tasks after execution + task_reject_on_worker_lost=True, # Reject tasks if worker dies + worker_prefetch_multiplier=1, # Fair task distribution +) + +# Auto-discover tasks from task modules +celery_app.autodiscover_tasks( + [ + "app.tasks.agent", + "app.tasks.git", + "app.tasks.sync", + "app.tasks.workflow", + "app.tasks.cost", + ] +) diff --git a/backend/app/tasks/__init__.py b/backend/app/tasks/__init__.py new file mode 100644 index 0000000..8cf9c2f --- /dev/null +++ b/backend/app/tasks/__init__.py @@ -0,0 +1,23 @@ +# app/tasks/__init__.py +""" +Celery background tasks for Syndarix. + +This package contains all Celery tasks organized by domain: + +Modules: + agent: Agent execution tasks (run_agent_step, spawn_agent, terminate_agent) + git: Git operation tasks (clone, commit, branch, push, PR) + sync: Issue synchronization tasks (incremental/full sync, webhooks) + workflow: Workflow state management tasks + cost: Cost tracking and budget monitoring tasks +""" + +from app.tasks import agent, cost, git, sync, workflow + +__all__ = [ + "agent", + "cost", + "git", + "sync", + "workflow", +] diff --git a/backend/app/tasks/agent.py b/backend/app/tasks/agent.py new file mode 100644 index 0000000..c072b33 --- /dev/null +++ b/backend/app/tasks/agent.py @@ -0,0 +1,150 @@ +# app/tasks/agent.py +""" +Agent execution tasks for Syndarix. + +These tasks handle the lifecycle of AI agent instances: +- Spawning new agent instances from agent types +- Executing agent steps (LLM calls, tool execution) +- Terminating agent instances + +Tasks are routed to the 'agent' queue for dedicated processing. +""" + +import logging +from typing import Any + +from app.celery_app import celery_app + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, name="app.tasks.agent.run_agent_step") +def run_agent_step( + self, + agent_instance_id: str, + context: dict[str, Any], +) -> dict[str, Any]: + """ + Execute a single step of an agent's workflow. + + This task performs one iteration of the agent loop: + 1. Load agent instance state + 2. Call LLM with context and available tools + 3. Execute tool calls if any + 4. Update agent state + 5. Return result for next step or completion + + Args: + agent_instance_id: UUID of the agent instance + context: Current execution context including: + - messages: Conversation history + - tools: Available tool definitions + - state: Agent state data + - metadata: Project/task metadata + + Returns: + dict with status and agent_instance_id + """ + logger.info( + f"Running agent step for instance {agent_instance_id} with context keys: {list(context.keys())}" + ) + + # TODO: Implement actual agent step execution + # This will involve: + # 1. Loading agent instance from database + # 2. Calling LLM provider (via litellm or anthropic SDK) + # 3. Processing tool calls through MCP servers + # 4. Updating agent state in database + # 5. Scheduling next step if needed + + return { + "status": "pending", + "agent_instance_id": agent_instance_id, + } + + +@celery_app.task(bind=True, name="app.tasks.agent.spawn_agent") +def spawn_agent( + self, + agent_type_id: str, + project_id: str, + initial_context: dict[str, Any], +) -> dict[str, Any]: + """ + Spawn a new agent instance from an agent type. + + This task creates a new agent instance: + 1. Load agent type configuration (model, expertise, personality) + 2. Create agent instance record in database + 3. Initialize agent state with project context + 4. Start first agent step + + Args: + agent_type_id: UUID of the agent type template + project_id: UUID of the project this agent will work on + initial_context: Starting context including: + - goal: High-level objective + - constraints: Any limitations or requirements + - assigned_issues: Issues to work on + - autonomy_level: FULL_CONTROL, MILESTONE, or AUTONOMOUS + + Returns: + dict with status, agent_type_id, and project_id + """ + logger.info( + f"Spawning agent of type {agent_type_id} for project {project_id}" + ) + + # TODO: Implement agent spawning + # This will involve: + # 1. Loading agent type from database + # 2. Creating agent instance record + # 3. Setting up MCP tool access + # 4. Initializing agent state + # 5. Kicking off first step + + return { + "status": "spawned", + "agent_type_id": agent_type_id, + "project_id": project_id, + } + + +@celery_app.task(bind=True, name="app.tasks.agent.terminate_agent") +def terminate_agent( + self, + agent_instance_id: str, + reason: str, +) -> dict[str, Any]: + """ + Terminate an agent instance. + + This task gracefully shuts down an agent: + 1. Mark agent instance as terminated + 2. Save final state for audit + 3. Release any held resources + 4. Notify relevant subscribers + + Args: + agent_instance_id: UUID of the agent instance + reason: Reason for termination (completion, error, manual, budget) + + Returns: + dict with status and agent_instance_id + """ + logger.info( + f"Terminating agent instance {agent_instance_id} with reason: {reason}" + ) + + # TODO: Implement agent termination + # This will involve: + # 1. Loading agent instance + # 2. Updating status to terminated + # 3. Saving termination reason + # 4. Cleaning up any pending tasks + # 5. Sending termination event + + return { + "status": "terminated", + "agent_instance_id": agent_instance_id, + } diff --git a/backend/app/tasks/cost.py b/backend/app/tasks/cost.py new file mode 100644 index 0000000..6c158d4 --- /dev/null +++ b/backend/app/tasks/cost.py @@ -0,0 +1,201 @@ +# app/tasks/cost.py +""" +Cost tracking and budget management tasks for Syndarix. + +These tasks implement multi-layered cost tracking per ADR-012: +- Per-agent token usage tracking +- Project budget monitoring +- Daily cost aggregation +- Budget threshold alerts +- Cost reporting + +Costs are tracked in real-time in Redis for speed, +then aggregated to PostgreSQL for durability. +""" + +import logging +from typing import Any + +from app.celery_app import celery_app + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, name="app.tasks.cost.aggregate_daily_costs") +def aggregate_daily_costs(self) -> dict[str, Any]: + """ + Aggregate daily costs from Redis to PostgreSQL. + + This periodic task (runs daily): + 1. Read accumulated costs from Redis + 2. Aggregate by project, agent, and model + 3. Store in PostgreSQL cost_records table + 4. Clear Redis counters for new day + + Returns: + dict with status + """ + logger.info("Starting daily cost aggregation") + + # TODO: Implement cost aggregation + # This will involve: + # 1. Fetching cost data from Redis + # 2. Grouping by project_id, agent_id, model + # 3. Inserting into PostgreSQL cost tables + # 4. Resetting Redis counters + + return { + "status": "pending", + } + + +@celery_app.task(bind=True, name="app.tasks.cost.check_budget_thresholds") +def check_budget_thresholds( + self, + project_id: str, +) -> dict[str, Any]: + """ + Check if a project has exceeded budget thresholds. + + This task checks budget limits: + 1. Get current spend from Redis counters + 2. Compare against project budget limits + 3. Send alerts if thresholds exceeded + 4. Pause agents if hard limit reached + + Args: + project_id: UUID of the project + + Returns: + dict with status and project_id + """ + logger.info(f"Checking budget thresholds for project {project_id}") + + # TODO: Implement budget checking + # This will involve: + # 1. Loading project budget configuration + # 2. Getting current spend from Redis + # 3. Comparing against soft/hard limits + # 4. Sending alerts or pausing agents + + return { + "status": "pending", + "project_id": project_id, + } + + +@celery_app.task(bind=True, name="app.tasks.cost.record_llm_usage") +def record_llm_usage( + self, + agent_id: str, + project_id: str, + model: str, + prompt_tokens: int, + completion_tokens: int, + cost_usd: float, +) -> dict[str, Any]: + """ + Record LLM usage from an agent call. + + This task tracks each LLM API call: + 1. Increment Redis counters for real-time tracking + 2. Store raw usage event for audit + 3. Trigger budget check if threshold approaching + + Args: + agent_id: UUID of the agent instance + project_id: UUID of the project + model: Model identifier (e.g., claude-opus-4-5-20251101) + prompt_tokens: Number of input tokens + completion_tokens: Number of output tokens + cost_usd: Calculated cost in USD + + Returns: + dict with status, agent_id, project_id, and cost_usd + """ + logger.debug( + f"Recording LLM usage for model {model}: " + f"{prompt_tokens} prompt + {completion_tokens} completion tokens = ${cost_usd}" + ) + + # TODO: Implement usage recording + # This will involve: + # 1. Incrementing Redis counters + # 2. Storing usage event + # 3. Checking if near budget threshold + + return { + "status": "pending", + "agent_id": agent_id, + "project_id": project_id, + "cost_usd": cost_usd, + } + + +@celery_app.task(bind=True, name="app.tasks.cost.generate_cost_report") +def generate_cost_report( + self, + project_id: str, + start_date: str, + end_date: str, +) -> dict[str, Any]: + """ + Generate a cost report for a project. + + This task creates a detailed cost breakdown: + 1. Query cost records for date range + 2. Group by agent, model, and day + 3. Calculate totals and trends + 4. Format report for display + + Args: + project_id: UUID of the project + start_date: Report start date (YYYY-MM-DD) + end_date: Report end date (YYYY-MM-DD) + + Returns: + dict with status, project_id, and date range + """ + logger.info( + f"Generating cost report for project {project_id} from {start_date} to {end_date}" + ) + + # TODO: Implement report generation + # This will involve: + # 1. Querying PostgreSQL for cost records + # 2. Aggregating by various dimensions + # 3. Calculating totals and averages + # 4. Formatting report data + + return { + "status": "pending", + "project_id": project_id, + "start_date": start_date, + "end_date": end_date, + } + + +@celery_app.task(bind=True, name="app.tasks.cost.reset_daily_budget_counters") +def reset_daily_budget_counters(self) -> dict[str, Any]: + """ + Reset daily budget counters in Redis. + + This periodic task (runs daily at midnight UTC): + 1. Archive current day's counters + 2. Reset all daily budget counters + 3. Prepare for new day's tracking + + Returns: + dict with status + """ + logger.info("Resetting daily budget counters") + + # TODO: Implement counter reset + # This will involve: + # 1. Getting all daily counter keys from Redis + # 2. Archiving current values + # 3. Resetting counters to zero + + return { + "status": "pending", + } diff --git a/backend/app/tasks/git.py b/backend/app/tasks/git.py new file mode 100644 index 0000000..fbabf99 --- /dev/null +++ b/backend/app/tasks/git.py @@ -0,0 +1,225 @@ +# app/tasks/git.py +""" +Git operation tasks for Syndarix. + +These tasks handle Git operations for projects: +- Cloning repositories +- Creating branches +- Committing changes +- Pushing to remotes +- Creating pull requests + +Tasks are routed to the 'git' queue for dedicated processing. +All operations are scoped by project_id for multi-tenancy. +""" + +import logging +from typing import Any + +from app.celery_app import celery_app + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, name="app.tasks.git.clone_repository") +def clone_repository( + self, + project_id: str, + repo_url: str, + branch: str = "main", +) -> dict[str, Any]: + """ + Clone a repository for a project. + + This task clones a Git repository to the project workspace: + 1. Prepare workspace directory + 2. Clone repository with credentials + 3. Checkout specified branch + 4. Update project metadata + + Args: + project_id: UUID of the project + repo_url: Git repository URL (HTTPS or SSH) + branch: Branch to checkout (default: main) + + Returns: + dict with status and project_id + """ + logger.info( + f"Cloning repository {repo_url} for project {project_id} on branch {branch}" + ) + + # TODO: Implement repository cloning + # This will involve: + # 1. Getting project credentials from secrets store + # 2. Creating workspace directory + # 3. Running git clone with proper auth + # 4. Checking out the target branch + # 5. Updating project record with clone status + + return { + "status": "pending", + "project_id": project_id, + } + + +@celery_app.task(bind=True, name="app.tasks.git.commit_changes") +def commit_changes( + self, + project_id: str, + message: str, + files: list[str] | None = None, +) -> dict[str, Any]: + """ + Commit changes in a project repository. + + This task creates a Git commit: + 1. Stage specified files (or all if None) + 2. Create commit with message + 3. Update commit history record + + Args: + project_id: UUID of the project + message: Commit message (follows conventional commits) + files: List of files to stage, or None for all staged + + Returns: + dict with status and project_id + """ + logger.info( + f"Committing changes for project {project_id}: {message}" + ) + + # TODO: Implement commit operation + # This will involve: + # 1. Loading project workspace path + # 2. Running git add for specified files + # 3. Running git commit with message + # 4. Recording commit hash in database + + return { + "status": "pending", + "project_id": project_id, + } + + +@celery_app.task(bind=True, name="app.tasks.git.create_branch") +def create_branch( + self, + project_id: str, + branch_name: str, + from_ref: str = "HEAD", +) -> dict[str, Any]: + """ + Create a new branch in a project repository. + + This task creates a Git branch: + 1. Checkout from reference + 2. Create new branch + 3. Update branch tracking + + Args: + project_id: UUID of the project + branch_name: Name of the new branch (e.g., feature/123-description) + from_ref: Reference to branch from (default: HEAD) + + Returns: + dict with status and project_id + """ + logger.info( + f"Creating branch {branch_name} from {from_ref} for project {project_id}" + ) + + # TODO: Implement branch creation + # This will involve: + # 1. Loading project workspace + # 2. Running git checkout -b from_ref + # 3. Recording branch in database + + return { + "status": "pending", + "project_id": project_id, + } + + +@celery_app.task(bind=True, name="app.tasks.git.create_pull_request") +def create_pull_request( + self, + project_id: str, + title: str, + body: str, + head_branch: str, + base_branch: str = "main", +) -> dict[str, Any]: + """ + Create a pull request for a project. + + This task creates a PR on the external Git provider: + 1. Push branch if needed + 2. Create PR via API (Gitea, GitHub, GitLab) + 3. Store PR reference + + Args: + project_id: UUID of the project + title: PR title + body: PR description (markdown) + head_branch: Branch with changes + base_branch: Target branch (default: main) + + Returns: + dict with status and project_id + """ + logger.info( + f"Creating PR '{title}' from {head_branch} to {base_branch} for project {project_id}" + ) + + # TODO: Implement PR creation + # This will involve: + # 1. Loading project and Git provider config + # 2. Ensuring head_branch is pushed + # 3. Calling provider API to create PR + # 4. Storing PR URL and number + + return { + "status": "pending", + "project_id": project_id, + } + + +@celery_app.task(bind=True, name="app.tasks.git.push_changes") +def push_changes( + self, + project_id: str, + branch: str, + force: bool = False, +) -> dict[str, Any]: + """ + Push changes to remote repository. + + This task pushes commits to the remote: + 1. Verify authentication + 2. Push branch to remote + 3. Handle push failures + + Args: + project_id: UUID of the project + branch: Branch to push + force: Whether to force push (use with caution) + + Returns: + dict with status and project_id + """ + logger.info( + f"Pushing branch {branch} for project {project_id} (force={force})" + ) + + # TODO: Implement push operation + # This will involve: + # 1. Loading project credentials + # 2. Running git push (with --force if specified) + # 3. Handling authentication and conflicts + + return { + "status": "pending", + "project_id": project_id, + } diff --git a/backend/app/tasks/sync.py b/backend/app/tasks/sync.py new file mode 100644 index 0000000..43a0aab --- /dev/null +++ b/backend/app/tasks/sync.py @@ -0,0 +1,198 @@ +# app/tasks/sync.py +""" +Issue synchronization tasks for Syndarix. + +These tasks handle bidirectional issue synchronization: +- Incremental sync (polling for recent changes) +- Full reconciliation (daily comprehensive sync) +- Webhook event processing +- Pushing local changes to external trackers + +Tasks are routed to the 'sync' queue for dedicated processing. +Per ADR-011, sync follows a master/replica model with configurable direction. +""" + +import logging +from typing import Any + +from app.celery_app import celery_app + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, name="app.tasks.sync.sync_issues_incremental") +def sync_issues_incremental(self) -> dict[str, Any]: + """ + Perform incremental issue synchronization across all projects. + + This periodic task (runs every 5 minutes): + 1. Query each project's external tracker for recent changes + 2. Compare with local issue cache + 3. Apply updates to local database + 4. Handle conflicts based on sync direction config + + Returns: + dict with status and type + """ + logger.info("Starting incremental issue sync across all projects") + + # TODO: Implement incremental sync + # This will involve: + # 1. Loading all active projects with sync enabled + # 2. For each project, querying external tracker since last_sync_at + # 3. Upserting issues into local database + # 4. Updating last_sync_at timestamp + + return { + "status": "pending", + "type": "incremental", + } + + +@celery_app.task(bind=True, name="app.tasks.sync.sync_issues_full") +def sync_issues_full(self) -> dict[str, Any]: + """ + Perform full issue reconciliation across all projects. + + This periodic task (runs daily): + 1. Fetch all issues from external trackers + 2. Compare with local database + 3. Handle orphaned issues + 4. Resolve any drift between systems + + Returns: + dict with status and type + """ + logger.info("Starting full issue reconciliation across all projects") + + # TODO: Implement full sync + # This will involve: + # 1. Loading all active projects + # 2. Fetching complete issue lists from external trackers + # 3. Comparing with local database + # 4. Handling deletes and orphans + # 5. Resolving conflicts based on sync config + + return { + "status": "pending", + "type": "full", + } + + +@celery_app.task(bind=True, name="app.tasks.sync.process_webhook_event") +def process_webhook_event( + self, + provider: str, + event_type: str, + payload: dict[str, Any], +) -> dict[str, Any]: + """ + Process a webhook event from an external Git provider. + + This task handles real-time updates from: + - Gitea: issue.created, issue.updated, pull_request.*, etc. + - GitHub: issues, pull_request, push, etc. + - GitLab: issue events, merge request events, etc. + + Args: + provider: Git provider name (gitea, github, gitlab) + event_type: Event type from provider + payload: Raw webhook payload + + Returns: + dict with status, provider, and event_type + """ + logger.info(f"Processing webhook event from {provider}: {event_type}") + + # TODO: Implement webhook processing + # This will involve: + # 1. Validating webhook signature + # 2. Parsing provider-specific payload + # 3. Mapping to internal event format + # 4. Updating local database + # 5. Triggering any dependent workflows + + return { + "status": "pending", + "provider": provider, + "event_type": event_type, + } + + +@celery_app.task(bind=True, name="app.tasks.sync.sync_project_issues") +def sync_project_issues( + self, + project_id: str, + full: bool = False, +) -> dict[str, Any]: + """ + Synchronize issues for a specific project. + + This task can be triggered manually or by webhooks: + 1. Connect to project's external tracker + 2. Fetch issues (incremental or full) + 3. Update local database + + Args: + project_id: UUID of the project + full: Whether to do full sync or incremental + + Returns: + dict with status and project_id + """ + logger.info( + f"Syncing issues for project {project_id} (full={full})" + ) + + # TODO: Implement project-specific sync + # This will involve: + # 1. Loading project configuration + # 2. Connecting to external tracker + # 3. Fetching issues based on full flag + # 4. Upserting to database + + return { + "status": "pending", + "project_id": project_id, + } + + +@celery_app.task(bind=True, name="app.tasks.sync.push_issue_to_external") +def push_issue_to_external( + self, + project_id: str, + issue_id: str, + operation: str, +) -> dict[str, Any]: + """ + Push a local issue change to the external tracker. + + This task handles outbound sync when Syndarix is the master: + - create: Create new issue in external tracker + - update: Update existing issue + - close: Close issue in external tracker + + Args: + project_id: UUID of the project + issue_id: UUID of the local issue + operation: Operation type (create, update, close) + + Returns: + dict with status, issue_id, and operation + """ + logger.info( + f"Pushing {operation} for issue {issue_id} in project {project_id}" + ) + + # TODO: Implement outbound sync + # This will involve: + # 1. Loading issue and project config + # 2. Mapping to external tracker format + # 3. Calling provider API + # 4. Updating external_id mapping + + return { + "status": "pending", + "issue_id": issue_id, + "operation": operation, + } diff --git a/backend/app/tasks/workflow.py b/backend/app/tasks/workflow.py new file mode 100644 index 0000000..4cf3b58 --- /dev/null +++ b/backend/app/tasks/workflow.py @@ -0,0 +1,213 @@ +# app/tasks/workflow.py +""" +Workflow state management tasks for Syndarix. + +These tasks manage workflow execution and state transitions: +- Sprint workflows (planning -> implementation -> review -> done) +- Story workflows (todo -> in_progress -> review -> done) +- Approval checkpoints for autonomy levels +- Stale workflow recovery + +Per ADR-007 and ADR-010, workflow state is durable in PostgreSQL +with defined state transitions. +""" + +import logging +from typing import Any + +from app.celery_app import celery_app + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, name="app.tasks.workflow.recover_stale_workflows") +def recover_stale_workflows(self) -> dict[str, Any]: + """ + Recover workflows that have become stale. + + This periodic task (runs every 5 minutes): + 1. Find workflows stuck in intermediate states + 2. Check for timed-out agent operations + 3. Retry or escalate based on configuration + 4. Notify relevant users if needed + + Returns: + dict with status and recovered count + """ + logger.info("Checking for stale workflows to recover") + + # TODO: Implement stale workflow recovery + # This will involve: + # 1. Querying for workflows with last_updated > threshold + # 2. Checking if associated agents are still running + # 3. Retrying or resetting stuck workflows + # 4. Sending notifications for manual intervention + + return { + "status": "pending", + "recovered": 0, + } + + +@celery_app.task(bind=True, name="app.tasks.workflow.execute_workflow_step") +def execute_workflow_step( + self, + workflow_id: str, + transition: str, +) -> dict[str, Any]: + """ + Execute a state transition for a workflow. + + This task applies a transition to a workflow: + 1. Validate transition is allowed from current state + 2. Execute any pre-transition hooks + 3. Update workflow state + 4. Execute any post-transition hooks + 5. Trigger follow-up tasks + + Args: + workflow_id: UUID of the workflow + transition: Transition to execute (start, approve, reject, etc.) + + Returns: + dict with status, workflow_id, and transition + """ + logger.info( + f"Executing transition '{transition}' for workflow {workflow_id}" + ) + + # TODO: Implement workflow transition + # This will involve: + # 1. Loading workflow from database + # 2. Validating transition from current state + # 3. Running pre-transition hooks + # 4. Updating state in database + # 5. Running post-transition hooks + # 6. Scheduling follow-up tasks + + return { + "status": "pending", + "workflow_id": workflow_id, + "transition": transition, + } + + +@celery_app.task(bind=True, name="app.tasks.workflow.handle_approval_response") +def handle_approval_response( + self, + workflow_id: str, + approved: bool, + comment: str | None = None, +) -> dict[str, Any]: + """ + Handle a user approval response for a workflow checkpoint. + + This task processes approval decisions: + 1. Record approval decision with timestamp + 2. Update workflow state accordingly + 3. Resume or halt workflow execution + 4. Notify relevant parties + + Args: + workflow_id: UUID of the workflow + approved: Whether the checkpoint was approved + comment: Optional comment from approver + + Returns: + dict with status, workflow_id, and approved flag + """ + logger.info( + f"Handling approval response for workflow {workflow_id}: approved={approved}" + ) + + # TODO: Implement approval handling + # This will involve: + # 1. Loading workflow and approval checkpoint + # 2. Recording decision with user and timestamp + # 3. Transitioning workflow state + # 4. Resuming or stopping execution + # 5. Sending notifications + + return { + "status": "pending", + "workflow_id": workflow_id, + "approved": approved, + } + + +@celery_app.task(bind=True, name="app.tasks.workflow.start_sprint_workflow") +def start_sprint_workflow( + self, + project_id: str, + sprint_id: str, +) -> dict[str, Any]: + """ + Start a new sprint workflow. + + This task initializes sprint execution: + 1. Create sprint workflow record + 2. Set up sprint planning phase + 3. Spawn Product Owner agent for planning + 4. Begin story assignment + + Args: + project_id: UUID of the project + sprint_id: UUID of the sprint + + Returns: + dict with status and sprint_id + """ + logger.info( + f"Starting sprint workflow for sprint {sprint_id} in project {project_id}" + ) + + # TODO: Implement sprint workflow initialization + # This will involve: + # 1. Creating workflow record for sprint + # 2. Setting initial state to PLANNING + # 3. Spawning PO agent for sprint planning + # 4. Setting up monitoring and checkpoints + + return { + "status": "pending", + "sprint_id": sprint_id, + } + + +@celery_app.task(bind=True, name="app.tasks.workflow.start_story_workflow") +def start_story_workflow( + self, + project_id: str, + story_id: str, +) -> dict[str, Any]: + """ + Start a new story workflow. + + This task initializes story execution: + 1. Create story workflow record + 2. Spawn appropriate developer agent + 3. Set up implementation tracking + 4. Configure approval checkpoints based on autonomy level + + Args: + project_id: UUID of the project + story_id: UUID of the story/issue + + Returns: + dict with status and story_id + """ + logger.info( + f"Starting story workflow for story {story_id} in project {project_id}" + ) + + # TODO: Implement story workflow initialization + # This will involve: + # 1. Creating workflow record for story + # 2. Determining appropriate agent type + # 3. Spawning developer agent + # 4. Setting up checkpoints based on autonomy level + + return { + "status": "pending", + "story_id": story_id, + } diff --git a/backend/tests/tasks/__init__.py b/backend/tests/tasks/__init__.py new file mode 100644 index 0000000..c68d5fd --- /dev/null +++ b/backend/tests/tasks/__init__.py @@ -0,0 +1,11 @@ +# tests/tasks/__init__.py +""" +Tests for Celery background tasks. + +This module tests the Celery configuration and all task modules: +- agent: Agent execution tasks +- git: Git operation tasks +- sync: Issue synchronization tasks +- workflow: Workflow state management tasks +- cost: Cost tracking and aggregation tasks +""" diff --git a/backend/tests/tasks/test_agent_tasks.py b/backend/tests/tasks/test_agent_tasks.py new file mode 100644 index 0000000..d1525b8 --- /dev/null +++ b/backend/tests/tasks/test_agent_tasks.py @@ -0,0 +1,358 @@ +# tests/tasks/test_agent_tasks.py +""" +Tests for agent execution tasks. + +These tests verify: +- Task signatures are correctly defined +- Tasks are bound (have access to self) +- Tasks return expected structure +- Tasks handle various input scenarios + +Note: These tests mock actual execution since they would require +LLM calls and database access in production. +""" + +import pytest +from unittest.mock import patch, MagicMock +import uuid + + +class TestRunAgentStepTask: + """Tests for the run_agent_step task.""" + + def test_run_agent_step_task_exists(self): + """Test that run_agent_step task is registered.""" + from app.celery_app import celery_app + import app.tasks.agent # noqa: F401 + + assert "app.tasks.agent.run_agent_step" in celery_app.tasks + + def test_run_agent_step_is_bound_task(self): + """Test that run_agent_step is a bound task (has access to self).""" + from app.tasks.agent import run_agent_step + + # Bound tasks have __bound__=True, which means they receive 'self' as first arg + assert run_agent_step.__bound__ is True + + def test_run_agent_step_has_correct_name(self): + """Test that run_agent_step has the correct task name.""" + from app.tasks.agent import run_agent_step + + assert run_agent_step.name == "app.tasks.agent.run_agent_step" + + def test_run_agent_step_returns_expected_structure(self): + """Test that run_agent_step returns the expected result structure.""" + from app.tasks.agent import run_agent_step + + agent_instance_id = str(uuid.uuid4()) + context = {"messages": [], "tools": []} + + # Call the task directly (synchronously for testing) + result = run_agent_step(agent_instance_id, context) + + assert isinstance(result, dict) + assert "status" in result + assert "agent_instance_id" in result + assert result["agent_instance_id"] == agent_instance_id + + def test_run_agent_step_with_empty_context(self): + """Test that run_agent_step handles empty context.""" + from app.tasks.agent import run_agent_step + + agent_instance_id = str(uuid.uuid4()) + context = {} + + result = run_agent_step(agent_instance_id, context) + + assert result["status"] == "pending" + assert result["agent_instance_id"] == agent_instance_id + + def test_run_agent_step_with_complex_context(self): + """Test that run_agent_step handles complex context data.""" + from app.tasks.agent import run_agent_step + + agent_instance_id = str(uuid.uuid4()) + context = { + "messages": [ + {"role": "user", "content": "Create a new feature"}, + {"role": "assistant", "content": "I will create the feature."}, + ], + "tools": ["create_file", "edit_file", "run_tests"], + "state": {"current_step": 3, "max_steps": 10}, + "metadata": {"project_id": str(uuid.uuid4())}, + } + + result = run_agent_step(agent_instance_id, context) + + assert result["status"] == "pending" + assert result["agent_instance_id"] == agent_instance_id + + +class TestSpawnAgentTask: + """Tests for the spawn_agent task.""" + + def test_spawn_agent_task_exists(self): + """Test that spawn_agent task is registered.""" + from app.celery_app import celery_app + import app.tasks.agent # noqa: F401 + + assert "app.tasks.agent.spawn_agent" in celery_app.tasks + + def test_spawn_agent_is_bound_task(self): + """Test that spawn_agent is a bound task.""" + from app.tasks.agent import spawn_agent + + assert spawn_agent.__bound__ is True + + def test_spawn_agent_has_correct_name(self): + """Test that spawn_agent has the correct task name.""" + from app.tasks.agent import spawn_agent + + assert spawn_agent.name == "app.tasks.agent.spawn_agent" + + def test_spawn_agent_returns_expected_structure(self): + """Test that spawn_agent returns the expected result structure.""" + from app.tasks.agent import spawn_agent + + agent_type_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + initial_context = {"goal": "Implement user story"} + + result = spawn_agent(agent_type_id, project_id, initial_context) + + assert isinstance(result, dict) + assert "status" in result + assert "agent_type_id" in result + assert "project_id" in result + assert result["status"] == "spawned" + assert result["agent_type_id"] == agent_type_id + assert result["project_id"] == project_id + + def test_spawn_agent_with_empty_initial_context(self): + """Test that spawn_agent handles empty initial context.""" + from app.tasks.agent import spawn_agent + + agent_type_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + initial_context = {} + + result = spawn_agent(agent_type_id, project_id, initial_context) + + assert result["status"] == "spawned" + + def test_spawn_agent_with_detailed_initial_context(self): + """Test that spawn_agent handles detailed initial context.""" + from app.tasks.agent import spawn_agent + + agent_type_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + initial_context = { + "goal": "Implement authentication", + "constraints": ["Must use JWT", "Must support MFA"], + "assigned_issues": [str(uuid.uuid4()), str(uuid.uuid4())], + "autonomy_level": "MILESTONE", + } + + result = spawn_agent(agent_type_id, project_id, initial_context) + + assert result["status"] == "spawned" + assert result["agent_type_id"] == agent_type_id + assert result["project_id"] == project_id + + +class TestTerminateAgentTask: + """Tests for the terminate_agent task.""" + + def test_terminate_agent_task_exists(self): + """Test that terminate_agent task is registered.""" + from app.celery_app import celery_app + import app.tasks.agent # noqa: F401 + + assert "app.tasks.agent.terminate_agent" in celery_app.tasks + + def test_terminate_agent_is_bound_task(self): + """Test that terminate_agent is a bound task.""" + from app.tasks.agent import terminate_agent + + assert terminate_agent.__bound__ is True + + def test_terminate_agent_has_correct_name(self): + """Test that terminate_agent has the correct task name.""" + from app.tasks.agent import terminate_agent + + assert terminate_agent.name == "app.tasks.agent.terminate_agent" + + def test_terminate_agent_returns_expected_structure(self): + """Test that terminate_agent returns the expected result structure.""" + from app.tasks.agent import terminate_agent + + agent_instance_id = str(uuid.uuid4()) + reason = "Task completed successfully" + + result = terminate_agent(agent_instance_id, reason) + + assert isinstance(result, dict) + assert "status" in result + assert "agent_instance_id" in result + assert result["status"] == "terminated" + assert result["agent_instance_id"] == agent_instance_id + + def test_terminate_agent_with_error_reason(self): + """Test that terminate_agent handles error termination reasons.""" + from app.tasks.agent import terminate_agent + + agent_instance_id = str(uuid.uuid4()) + reason = "Error: Budget limit exceeded" + + result = terminate_agent(agent_instance_id, reason) + + assert result["status"] == "terminated" + assert result["agent_instance_id"] == agent_instance_id + + def test_terminate_agent_with_empty_reason(self): + """Test that terminate_agent handles empty reason string.""" + from app.tasks.agent import terminate_agent + + agent_instance_id = str(uuid.uuid4()) + reason = "" + + result = terminate_agent(agent_instance_id, reason) + + assert result["status"] == "terminated" + + +class TestAgentTaskRouting: + """Tests for agent task queue routing.""" + + def test_agent_tasks_should_route_to_agent_queue(self): + """Test that agent tasks are configured to route to 'agent' queue.""" + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + agent_route = routes.get("app.tasks.agent.*") + + assert agent_route is not None + assert agent_route["queue"] == "agent" + + def test_run_agent_step_routing(self): + """Test that run_agent_step task routes to agent queue.""" + from app.tasks.agent import run_agent_step + from app.celery_app import celery_app + + # Get the routing configuration for this specific task + task_name = run_agent_step.name + routes = celery_app.conf.task_routes + + # The task name matches the pattern "app.tasks.agent.*" + assert task_name.startswith("app.tasks.agent.") + assert "app.tasks.agent.*" in routes + assert routes["app.tasks.agent.*"]["queue"] == "agent" + + +class TestAgentTaskSignatures: + """Tests for agent task signature creation (for async invocation).""" + + def test_run_agent_step_signature_creation(self): + """Test that run_agent_step signature can be created.""" + from app.tasks.agent import run_agent_step + + agent_instance_id = str(uuid.uuid4()) + context = {"messages": []} + + # Create a signature (delayed task) + sig = run_agent_step.s(agent_instance_id, context) + + assert sig is not None + assert sig.args == (agent_instance_id, context) + + def test_spawn_agent_signature_creation(self): + """Test that spawn_agent signature can be created.""" + from app.tasks.agent import spawn_agent + + agent_type_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + initial_context = {} + + sig = spawn_agent.s(agent_type_id, project_id, initial_context) + + assert sig is not None + assert sig.args == (agent_type_id, project_id, initial_context) + + def test_terminate_agent_signature_creation(self): + """Test that terminate_agent signature can be created.""" + from app.tasks.agent import terminate_agent + + agent_instance_id = str(uuid.uuid4()) + reason = "User requested termination" + + sig = terminate_agent.s(agent_instance_id, reason) + + assert sig is not None + assert sig.args == (agent_instance_id, reason) + + def test_agent_task_chain_creation(self): + """Test that agent tasks can be chained together.""" + from celery import chain + from app.tasks.agent import spawn_agent, run_agent_step, terminate_agent + + # Create a chain of tasks (this doesn't execute, just builds the chain) + agent_type_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + agent_instance_id = str(uuid.uuid4()) + + # Note: In real usage, the chain would pass results between tasks + workflow = chain( + spawn_agent.s(agent_type_id, project_id, {}), + # Further tasks would use the result from spawn_agent + ) + + assert workflow is not None + + +class TestAgentTaskLogging: + """Tests for agent task logging behavior.""" + + def test_run_agent_step_logs_execution(self): + """Test that run_agent_step logs when executed.""" + from app.tasks.agent import run_agent_step + import logging + + agent_instance_id = str(uuid.uuid4()) + context = {} + + with patch("app.tasks.agent.logger") as mock_logger: + run_agent_step(agent_instance_id, context) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert agent_instance_id in call_args + + def test_spawn_agent_logs_execution(self): + """Test that spawn_agent logs when executed.""" + from app.tasks.agent import spawn_agent + + agent_type_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + + with patch("app.tasks.agent.logger") as mock_logger: + spawn_agent(agent_type_id, project_id, {}) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert agent_type_id in call_args + assert project_id in call_args + + def test_terminate_agent_logs_execution(self): + """Test that terminate_agent logs when executed.""" + from app.tasks.agent import terminate_agent + + agent_instance_id = str(uuid.uuid4()) + reason = "Test termination" + + with patch("app.tasks.agent.logger") as mock_logger: + terminate_agent(agent_instance_id, reason) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert agent_instance_id in call_args + assert reason in call_args diff --git a/backend/tests/tasks/test_celery_config.py b/backend/tests/tasks/test_celery_config.py new file mode 100644 index 0000000..41be887 --- /dev/null +++ b/backend/tests/tasks/test_celery_config.py @@ -0,0 +1,321 @@ +# tests/tasks/test_celery_config.py +""" +Tests for Celery application configuration. + +These tests verify: +- Celery app is properly configured +- Queue routing is correctly set up per ADR-003 +- Task discovery works for all task modules +- Beat schedule is configured for periodic tasks +""" + +import pytest +from unittest.mock import patch, MagicMock + + +class TestCeleryAppConfiguration: + """Tests for the Celery application instance configuration.""" + + def test_celery_app_is_created_with_correct_name(self): + """Test that the Celery app is created with 'syndarix' as the name.""" + from app.celery_app import celery_app + + assert celery_app.main == "syndarix" + + def test_celery_app_uses_redis_broker(self): + """Test that Celery is configured to use Redis as the broker.""" + from app.celery_app import celery_app + from app.core.config import settings + + # The broker URL should match the settings + assert celery_app.conf.broker_url == settings.celery_broker_url + + def test_celery_app_uses_redis_backend(self): + """Test that Celery is configured to use Redis as the result backend.""" + from app.celery_app import celery_app + from app.core.config import settings + + assert celery_app.conf.result_backend == settings.celery_result_backend + + def test_celery_uses_json_serialization(self): + """Test that Celery is configured to use JSON for serialization.""" + from app.celery_app import celery_app + + assert celery_app.conf.task_serializer == "json" + assert celery_app.conf.result_serializer == "json" + assert "json" in celery_app.conf.accept_content + + def test_celery_uses_utc_timezone(self): + """Test that Celery is configured to use UTC timezone.""" + from app.celery_app import celery_app + + assert celery_app.conf.timezone == "UTC" + assert celery_app.conf.enable_utc is True + + def test_celery_has_late_ack_enabled(self): + """Test that late acknowledgment is enabled for task reliability.""" + from app.celery_app import celery_app + + # Per ADR-003: Late ack for reliability + assert celery_app.conf.task_acks_late is True + assert celery_app.conf.task_reject_on_worker_lost is True + + def test_celery_prefetch_multiplier_is_one(self): + """Test that worker prefetch is set to 1 for fair task distribution.""" + from app.celery_app import celery_app + + assert celery_app.conf.worker_prefetch_multiplier == 1 + + def test_celery_result_expiration(self): + """Test that results expire after 24 hours.""" + from app.celery_app import celery_app + + # 86400 seconds = 24 hours + assert celery_app.conf.result_expires == 86400 + + def test_celery_has_time_limits_configured(self): + """Test that task time limits are configured per ADR-003.""" + from app.celery_app import celery_app + + # 5 minutes soft limit, 10 minutes hard limit + assert celery_app.conf.task_soft_time_limit == 300 + assert celery_app.conf.task_time_limit == 600 + + def test_celery_broker_connection_retry_enabled(self): + """Test that broker connection retry is enabled on startup.""" + from app.celery_app import celery_app + + assert celery_app.conf.broker_connection_retry_on_startup is True + + +class TestQueueRoutingConfiguration: + """Tests for Celery queue routing configuration per ADR-003.""" + + def test_default_queue_is_configured(self): + """Test that 'default' is set as the default queue.""" + from app.celery_app import celery_app + + assert celery_app.conf.task_default_queue == "default" + + def test_task_routes_are_configured(self): + """Test that task routes are properly configured.""" + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + assert routes is not None + assert isinstance(routes, dict) + + def test_agent_tasks_routed_to_agent_queue(self): + """Test that agent tasks are routed to the 'agent' queue.""" + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + assert "app.tasks.agent.*" in routes + assert routes["app.tasks.agent.*"]["queue"] == "agent" + + def test_git_tasks_routed_to_git_queue(self): + """Test that git tasks are routed to the 'git' queue.""" + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + assert "app.tasks.git.*" in routes + assert routes["app.tasks.git.*"]["queue"] == "git" + + def test_sync_tasks_routed_to_sync_queue(self): + """Test that sync tasks are routed to the 'sync' queue.""" + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + assert "app.tasks.sync.*" in routes + assert routes["app.tasks.sync.*"]["queue"] == "sync" + + def test_default_tasks_routed_to_default_queue(self): + """Test that unmatched tasks are routed to the 'default' queue.""" + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + assert "app.tasks.*" in routes + assert routes["app.tasks.*"]["queue"] == "default" + + def test_all_queues_are_defined(self): + """Test that all expected queues are defined in task_queues.""" + from app.celery_app import celery_app + + queues = celery_app.conf.task_queues + expected_queues = {"agent", "git", "sync", "default"} + + assert queues is not None + assert set(queues.keys()) == expected_queues + + def test_queue_exchanges_are_configured(self): + """Test that each queue has its own exchange configured.""" + from app.celery_app import celery_app + + queues = celery_app.conf.task_queues + + for queue_name in ["agent", "git", "sync", "default"]: + assert queue_name in queues + assert queues[queue_name]["exchange"] == queue_name + assert queues[queue_name]["routing_key"] == queue_name + + +class TestTaskDiscovery: + """Tests for Celery task auto-discovery.""" + + def test_task_imports_are_configured(self): + """Test that task imports are configured for auto-discovery.""" + from app.celery_app import celery_app + + imports = celery_app.conf.imports + assert imports is not None + assert "app.tasks" in imports + + def test_agent_tasks_are_discoverable(self): + """Test that agent tasks can be discovered and accessed.""" + from app.celery_app import celery_app + + # Force task registration by importing + import app.tasks.agent # noqa: F401 + + # Check that agent tasks are registered + registered_tasks = celery_app.tasks + + assert "app.tasks.agent.run_agent_step" in registered_tasks + assert "app.tasks.agent.spawn_agent" in registered_tasks + assert "app.tasks.agent.terminate_agent" in registered_tasks + + def test_git_tasks_are_discoverable(self): + """Test that git tasks can be discovered and accessed.""" + from app.celery_app import celery_app + + # Force task registration by importing + import app.tasks.git # noqa: F401 + + registered_tasks = celery_app.tasks + + assert "app.tasks.git.clone_repository" in registered_tasks + assert "app.tasks.git.commit_changes" in registered_tasks + assert "app.tasks.git.create_branch" in registered_tasks + assert "app.tasks.git.create_pull_request" in registered_tasks + assert "app.tasks.git.push_changes" in registered_tasks + + def test_sync_tasks_are_discoverable(self): + """Test that sync tasks can be discovered and accessed.""" + from app.celery_app import celery_app + + # Force task registration by importing + import app.tasks.sync # noqa: F401 + + registered_tasks = celery_app.tasks + + assert "app.tasks.sync.sync_issues_incremental" in registered_tasks + assert "app.tasks.sync.sync_issues_full" in registered_tasks + assert "app.tasks.sync.process_webhook_event" in registered_tasks + assert "app.tasks.sync.sync_project_issues" in registered_tasks + assert "app.tasks.sync.push_issue_to_external" in registered_tasks + + def test_workflow_tasks_are_discoverable(self): + """Test that workflow tasks can be discovered and accessed.""" + from app.celery_app import celery_app + + # Force task registration by importing + import app.tasks.workflow # noqa: F401 + + registered_tasks = celery_app.tasks + + assert "app.tasks.workflow.recover_stale_workflows" in registered_tasks + assert "app.tasks.workflow.execute_workflow_step" in registered_tasks + assert "app.tasks.workflow.handle_approval_response" in registered_tasks + assert "app.tasks.workflow.start_sprint_workflow" in registered_tasks + assert "app.tasks.workflow.start_story_workflow" in registered_tasks + + def test_cost_tasks_are_discoverable(self): + """Test that cost tasks can be discovered and accessed.""" + from app.celery_app import celery_app + + # Force task registration by importing + import app.tasks.cost # noqa: F401 + + registered_tasks = celery_app.tasks + + assert "app.tasks.cost.aggregate_daily_costs" in registered_tasks + assert "app.tasks.cost.check_budget_thresholds" in registered_tasks + assert "app.tasks.cost.record_llm_usage" in registered_tasks + assert "app.tasks.cost.generate_cost_report" in registered_tasks + assert "app.tasks.cost.reset_daily_budget_counters" in registered_tasks + + +class TestBeatSchedule: + """Tests for Celery Beat scheduled tasks configuration.""" + + def test_beat_schedule_is_configured(self): + """Test that beat_schedule is configured.""" + from app.celery_app import celery_app + + assert celery_app.conf.beat_schedule is not None + assert isinstance(celery_app.conf.beat_schedule, dict) + + def test_incremental_sync_is_scheduled(self): + """Test that incremental issue sync is scheduled per ADR-011.""" + from app.celery_app import celery_app + + schedule = celery_app.conf.beat_schedule + assert "sync-issues-incremental" in schedule + + task_config = schedule["sync-issues-incremental"] + assert task_config["task"] == "app.tasks.sync.sync_issues_incremental" + assert task_config["schedule"] == 60.0 # Every 60 seconds + + def test_full_sync_is_scheduled(self): + """Test that full issue sync is scheduled per ADR-011.""" + from app.celery_app import celery_app + + schedule = celery_app.conf.beat_schedule + assert "sync-issues-full" in schedule + + task_config = schedule["sync-issues-full"] + assert task_config["task"] == "app.tasks.sync.sync_issues_full" + assert task_config["schedule"] == 900.0 # Every 15 minutes + + def test_stale_workflow_recovery_is_scheduled(self): + """Test that stale workflow recovery is scheduled per ADR-007.""" + from app.celery_app import celery_app + + schedule = celery_app.conf.beat_schedule + assert "recover-stale-workflows" in schedule + + task_config = schedule["recover-stale-workflows"] + assert task_config["task"] == "app.tasks.workflow.recover_stale_workflows" + assert task_config["schedule"] == 300.0 # Every 5 minutes + + def test_daily_cost_aggregation_is_scheduled(self): + """Test that daily cost aggregation is scheduled per ADR-012.""" + from app.celery_app import celery_app + + schedule = celery_app.conf.beat_schedule + assert "aggregate-daily-costs" in schedule + + task_config = schedule["aggregate-daily-costs"] + assert task_config["task"] == "app.tasks.cost.aggregate_daily_costs" + assert task_config["schedule"] == 3600.0 # Every hour + + +class TestTaskModuleExports: + """Tests for the task module __init__.py exports.""" + + def test_tasks_package_exports_all_modules(self): + """Test that the tasks package exports all task modules.""" + from app import tasks + + assert hasattr(tasks, "agent") + assert hasattr(tasks, "git") + assert hasattr(tasks, "sync") + assert hasattr(tasks, "workflow") + assert hasattr(tasks, "cost") + + def test_tasks_all_attribute_is_correct(self): + """Test that __all__ contains all expected module names.""" + from app import tasks + + expected_modules = ["agent", "git", "sync", "workflow", "cost"] + assert set(tasks.__all__) == set(expected_modules) diff --git a/backend/tests/tasks/test_cost_tasks.py b/backend/tests/tasks/test_cost_tasks.py new file mode 100644 index 0000000..55b75cd --- /dev/null +++ b/backend/tests/tasks/test_cost_tasks.py @@ -0,0 +1,379 @@ +# tests/tasks/test_cost_tasks.py +""" +Tests for cost tracking and budget management tasks. + +These tests verify: +- Task signatures are correctly defined +- Tasks are bound (have access to self) +- Tasks return expected structure +- Tasks follow ADR-012 (multi-layered cost tracking) + +Note: These tests mock actual execution since they would require +database access and Redis operations in production. +""" + +import pytest +from unittest.mock import patch +import uuid + + +class TestAggregateDailyCostsTask: + """Tests for the aggregate_daily_costs task.""" + + def test_aggregate_daily_costs_task_exists(self): + """Test that aggregate_daily_costs task is registered.""" + from app.celery_app import celery_app + import app.tasks.cost # noqa: F401 + + assert "app.tasks.cost.aggregate_daily_costs" in celery_app.tasks + + def test_aggregate_daily_costs_is_bound_task(self): + """Test that aggregate_daily_costs is a bound task.""" + from app.tasks.cost import aggregate_daily_costs + + assert aggregate_daily_costs.__bound__ is True + + def test_aggregate_daily_costs_has_correct_name(self): + """Test that aggregate_daily_costs has the correct task name.""" + from app.tasks.cost import aggregate_daily_costs + + assert aggregate_daily_costs.name == "app.tasks.cost.aggregate_daily_costs" + + def test_aggregate_daily_costs_returns_expected_structure(self): + """Test that aggregate_daily_costs returns expected result.""" + from app.tasks.cost import aggregate_daily_costs + + result = aggregate_daily_costs() + + assert isinstance(result, dict) + assert "status" in result + assert result["status"] == "pending" + + +class TestCheckBudgetThresholdsTask: + """Tests for the check_budget_thresholds task.""" + + def test_check_budget_thresholds_task_exists(self): + """Test that check_budget_thresholds task is registered.""" + from app.celery_app import celery_app + import app.tasks.cost # noqa: F401 + + assert "app.tasks.cost.check_budget_thresholds" in celery_app.tasks + + def test_check_budget_thresholds_is_bound_task(self): + """Test that check_budget_thresholds is a bound task.""" + from app.tasks.cost import check_budget_thresholds + + assert check_budget_thresholds.__bound__ is True + + def test_check_budget_thresholds_returns_expected_structure(self): + """Test that check_budget_thresholds returns expected result.""" + from app.tasks.cost import check_budget_thresholds + + project_id = str(uuid.uuid4()) + + result = check_budget_thresholds(project_id) + + assert isinstance(result, dict) + assert "status" in result + assert "project_id" in result + assert result["project_id"] == project_id + + +class TestRecordLlmUsageTask: + """Tests for the record_llm_usage task.""" + + def test_record_llm_usage_task_exists(self): + """Test that record_llm_usage task is registered.""" + from app.celery_app import celery_app + import app.tasks.cost # noqa: F401 + + assert "app.tasks.cost.record_llm_usage" in celery_app.tasks + + def test_record_llm_usage_is_bound_task(self): + """Test that record_llm_usage is a bound task.""" + from app.tasks.cost import record_llm_usage + + assert record_llm_usage.__bound__ is True + + def test_record_llm_usage_returns_expected_structure(self): + """Test that record_llm_usage returns expected result.""" + from app.tasks.cost import record_llm_usage + + agent_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + model = "claude-opus-4-5-20251101" + prompt_tokens = 1500 + completion_tokens = 500 + cost_usd = 0.0825 + + result = record_llm_usage( + agent_id, project_id, model, prompt_tokens, completion_tokens, cost_usd + ) + + assert isinstance(result, dict) + assert "status" in result + assert "agent_id" in result + assert "project_id" in result + assert "cost_usd" in result + assert result["agent_id"] == agent_id + assert result["project_id"] == project_id + assert result["cost_usd"] == cost_usd + + def test_record_llm_usage_with_different_models(self): + """Test that record_llm_usage handles different model types.""" + from app.tasks.cost import record_llm_usage + + agent_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + + models = [ + ("claude-opus-4-5-20251101", 0.015), + ("claude-sonnet-4-20250514", 0.003), + ("gpt-4-turbo", 0.01), + ("gemini-1.5-pro", 0.007), + ] + + for model, cost in models: + result = record_llm_usage( + agent_id, project_id, model, 1000, 500, cost + ) + assert result["status"] == "pending" + + def test_record_llm_usage_with_zero_tokens(self): + """Test that record_llm_usage handles zero token counts.""" + from app.tasks.cost import record_llm_usage + + agent_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + + result = record_llm_usage( + agent_id, project_id, "claude-opus-4-5-20251101", 0, 0, 0.0 + ) + + assert result["status"] == "pending" + + +class TestGenerateCostReportTask: + """Tests for the generate_cost_report task.""" + + def test_generate_cost_report_task_exists(self): + """Test that generate_cost_report task is registered.""" + from app.celery_app import celery_app + import app.tasks.cost # noqa: F401 + + assert "app.tasks.cost.generate_cost_report" in celery_app.tasks + + def test_generate_cost_report_is_bound_task(self): + """Test that generate_cost_report is a bound task.""" + from app.tasks.cost import generate_cost_report + + assert generate_cost_report.__bound__ is True + + def test_generate_cost_report_returns_expected_structure(self): + """Test that generate_cost_report returns expected result.""" + from app.tasks.cost import generate_cost_report + + project_id = str(uuid.uuid4()) + start_date = "2025-01-01" + end_date = "2025-01-31" + + result = generate_cost_report(project_id, start_date, end_date) + + assert isinstance(result, dict) + assert "status" in result + assert "project_id" in result + assert "start_date" in result + assert "end_date" in result + assert result["project_id"] == project_id + assert result["start_date"] == start_date + assert result["end_date"] == end_date + + def test_generate_cost_report_with_various_date_ranges(self): + """Test that generate_cost_report handles various date ranges.""" + from app.tasks.cost import generate_cost_report + + project_id = str(uuid.uuid4()) + + date_ranges = [ + ("2025-01-01", "2025-01-01"), # Single day + ("2025-01-01", "2025-01-07"), # Week + ("2025-01-01", "2025-12-31"), # Full year + ] + + for start, end in date_ranges: + result = generate_cost_report(project_id, start, end) + assert result["status"] == "pending" + + +class TestResetDailyBudgetCountersTask: + """Tests for the reset_daily_budget_counters task.""" + + def test_reset_daily_budget_counters_task_exists(self): + """Test that reset_daily_budget_counters task is registered.""" + from app.celery_app import celery_app + import app.tasks.cost # noqa: F401 + + assert "app.tasks.cost.reset_daily_budget_counters" in celery_app.tasks + + def test_reset_daily_budget_counters_is_bound_task(self): + """Test that reset_daily_budget_counters is a bound task.""" + from app.tasks.cost import reset_daily_budget_counters + + assert reset_daily_budget_counters.__bound__ is True + + def test_reset_daily_budget_counters_returns_expected_structure(self): + """Test that reset_daily_budget_counters returns expected result.""" + from app.tasks.cost import reset_daily_budget_counters + + result = reset_daily_budget_counters() + + assert isinstance(result, dict) + assert "status" in result + assert result["status"] == "pending" + + +class TestCostTaskRouting: + """Tests for cost task queue routing.""" + + def test_cost_tasks_route_to_default_queue(self): + """Test that cost tasks route to 'default' queue. + + Per the routing configuration, cost tasks match 'app.tasks.*' + which routes to the default queue. + """ + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + + # Cost tasks match the generic 'app.tasks.*' pattern + assert "app.tasks.*" in routes + assert routes["app.tasks.*"]["queue"] == "default" + + def test_all_cost_tasks_match_routing_pattern(self): + """Test that all cost task names match the routing pattern.""" + task_names = [ + "app.tasks.cost.aggregate_daily_costs", + "app.tasks.cost.check_budget_thresholds", + "app.tasks.cost.record_llm_usage", + "app.tasks.cost.generate_cost_report", + "app.tasks.cost.reset_daily_budget_counters", + ] + + for name in task_names: + assert name.startswith("app.tasks.") + + +class TestCostTaskLogging: + """Tests for cost task logging behavior.""" + + def test_aggregate_daily_costs_logs_execution(self): + """Test that aggregate_daily_costs logs when executed.""" + from app.tasks.cost import aggregate_daily_costs + + with patch("app.tasks.cost.logger") as mock_logger: + aggregate_daily_costs() + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert "cost" in call_args.lower() or "aggregat" in call_args.lower() + + def test_check_budget_thresholds_logs_execution(self): + """Test that check_budget_thresholds logs when executed.""" + from app.tasks.cost import check_budget_thresholds + + project_id = str(uuid.uuid4()) + + with patch("app.tasks.cost.logger") as mock_logger: + check_budget_thresholds(project_id) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert project_id in call_args + + def test_record_llm_usage_logs_execution(self): + """Test that record_llm_usage logs when executed.""" + from app.tasks.cost import record_llm_usage + + agent_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + model = "claude-opus-4-5-20251101" + + with patch("app.tasks.cost.logger") as mock_logger: + record_llm_usage(agent_id, project_id, model, 100, 50, 0.01) + + # Uses debug level, not info + mock_logger.debug.assert_called_once() + call_args = mock_logger.debug.call_args[0][0] + assert model in call_args + + def test_generate_cost_report_logs_execution(self): + """Test that generate_cost_report logs when executed.""" + from app.tasks.cost import generate_cost_report + + project_id = str(uuid.uuid4()) + + with patch("app.tasks.cost.logger") as mock_logger: + generate_cost_report(project_id, "2025-01-01", "2025-01-31") + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert project_id in call_args + + def test_reset_daily_budget_counters_logs_execution(self): + """Test that reset_daily_budget_counters logs when executed.""" + from app.tasks.cost import reset_daily_budget_counters + + with patch("app.tasks.cost.logger") as mock_logger: + reset_daily_budget_counters() + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert "reset" in call_args.lower() or "counter" in call_args.lower() + + +class TestCostTaskSignatures: + """Tests for cost task signature creation.""" + + def test_record_llm_usage_signature_creation(self): + """Test that record_llm_usage signature can be created.""" + from app.tasks.cost import record_llm_usage + + agent_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + + sig = record_llm_usage.s( + agent_id, project_id, "claude-opus-4-5-20251101", 100, 50, 0.01 + ) + + assert sig is not None + assert len(sig.args) == 6 + + def test_check_budget_thresholds_signature_creation(self): + """Test that check_budget_thresholds signature can be created.""" + from app.tasks.cost import check_budget_thresholds + + project_id = str(uuid.uuid4()) + + sig = check_budget_thresholds.s(project_id) + + assert sig is not None + assert sig.args == (project_id,) + + def test_cost_task_chain_creation(self): + """Test that cost tasks can be chained together.""" + from celery import chain + from app.tasks.cost import record_llm_usage, check_budget_thresholds + + agent_id = str(uuid.uuid4()) + project_id = str(uuid.uuid4()) + + # Build a chain: record usage, then check thresholds + workflow = chain( + record_llm_usage.s( + agent_id, project_id, "claude-opus-4-5-20251101", 1000, 500, 0.05 + ), + check_budget_thresholds.s(project_id), + ) + + assert workflow is not None diff --git a/backend/tests/tasks/test_git_tasks.py b/backend/tests/tasks/test_git_tasks.py new file mode 100644 index 0000000..3d52e09 --- /dev/null +++ b/backend/tests/tasks/test_git_tasks.py @@ -0,0 +1,301 @@ +# tests/tasks/test_git_tasks.py +""" +Tests for git operation tasks. + +These tests verify: +- Task signatures are correctly defined +- Tasks are bound (have access to self) +- Tasks return expected structure +- Tasks are routed to the 'git' queue + +Note: These tests mock actual execution since they would require +Git operations and external APIs in production. +""" + +import pytest +from unittest.mock import patch +import uuid + + +class TestCloneRepositoryTask: + """Tests for the clone_repository task.""" + + def test_clone_repository_task_exists(self): + """Test that clone_repository task is registered.""" + from app.celery_app import celery_app + import app.tasks.git # noqa: F401 + + assert "app.tasks.git.clone_repository" in celery_app.tasks + + def test_clone_repository_is_bound_task(self): + """Test that clone_repository is a bound task.""" + from app.tasks.git import clone_repository + + assert clone_repository.__bound__ is True + + def test_clone_repository_has_correct_name(self): + """Test that clone_repository has the correct task name.""" + from app.tasks.git import clone_repository + + assert clone_repository.name == "app.tasks.git.clone_repository" + + def test_clone_repository_returns_expected_structure(self): + """Test that clone_repository returns the expected result structure.""" + from app.tasks.git import clone_repository + + project_id = str(uuid.uuid4()) + repo_url = "https://gitea.example.com/org/repo.git" + branch = "main" + + result = clone_repository(project_id, repo_url, branch) + + assert isinstance(result, dict) + assert "status" in result + assert "project_id" in result + assert result["project_id"] == project_id + + def test_clone_repository_with_default_branch(self): + """Test that clone_repository uses default branch when not specified.""" + from app.tasks.git import clone_repository + + project_id = str(uuid.uuid4()) + repo_url = "https://github.com/org/repo.git" + + # Call without specifying branch (should default to 'main') + result = clone_repository(project_id, repo_url) + + assert result["status"] == "pending" + + +class TestCommitChangesTask: + """Tests for the commit_changes task.""" + + def test_commit_changes_task_exists(self): + """Test that commit_changes task is registered.""" + from app.celery_app import celery_app + import app.tasks.git # noqa: F401 + + assert "app.tasks.git.commit_changes" in celery_app.tasks + + def test_commit_changes_is_bound_task(self): + """Test that commit_changes is a bound task.""" + from app.tasks.git import commit_changes + + assert commit_changes.__bound__ is True + + def test_commit_changes_returns_expected_structure(self): + """Test that commit_changes returns the expected result structure.""" + from app.tasks.git import commit_changes + + project_id = str(uuid.uuid4()) + message = "feat: Add new feature" + files = ["src/feature.py", "tests/test_feature.py"] + + result = commit_changes(project_id, message, files) + + assert isinstance(result, dict) + assert "status" in result + assert "project_id" in result + + def test_commit_changes_without_files(self): + """Test that commit_changes handles None files (commit all staged).""" + from app.tasks.git import commit_changes + + project_id = str(uuid.uuid4()) + message = "chore: Update dependencies" + + result = commit_changes(project_id, message, None) + + assert result["status"] == "pending" + + +class TestCreateBranchTask: + """Tests for the create_branch task.""" + + def test_create_branch_task_exists(self): + """Test that create_branch task is registered.""" + from app.celery_app import celery_app + import app.tasks.git # noqa: F401 + + assert "app.tasks.git.create_branch" in celery_app.tasks + + def test_create_branch_is_bound_task(self): + """Test that create_branch is a bound task.""" + from app.tasks.git import create_branch + + assert create_branch.__bound__ is True + + def test_create_branch_returns_expected_structure(self): + """Test that create_branch returns the expected result structure.""" + from app.tasks.git import create_branch + + project_id = str(uuid.uuid4()) + branch_name = "feature/new-feature" + from_ref = "develop" + + result = create_branch(project_id, branch_name, from_ref) + + assert isinstance(result, dict) + assert "status" in result + assert "project_id" in result + + def test_create_branch_with_default_from_ref(self): + """Test that create_branch uses default from_ref when not specified.""" + from app.tasks.git import create_branch + + project_id = str(uuid.uuid4()) + branch_name = "feature/123-add-login" + + result = create_branch(project_id, branch_name) + + assert result["status"] == "pending" + + +class TestCreatePullRequestTask: + """Tests for the create_pull_request task.""" + + def test_create_pull_request_task_exists(self): + """Test that create_pull_request task is registered.""" + from app.celery_app import celery_app + import app.tasks.git # noqa: F401 + + assert "app.tasks.git.create_pull_request" in celery_app.tasks + + def test_create_pull_request_is_bound_task(self): + """Test that create_pull_request is a bound task.""" + from app.tasks.git import create_pull_request + + assert create_pull_request.__bound__ is True + + def test_create_pull_request_returns_expected_structure(self): + """Test that create_pull_request returns expected result structure.""" + from app.tasks.git import create_pull_request + + project_id = str(uuid.uuid4()) + title = "feat: Add authentication" + body = "## Summary\n- Added JWT auth\n- Added login endpoint" + head_branch = "feature/auth" + base_branch = "main" + + result = create_pull_request(project_id, title, body, head_branch, base_branch) + + assert isinstance(result, dict) + assert "status" in result + assert "project_id" in result + + def test_create_pull_request_with_default_base(self): + """Test that create_pull_request uses default base branch.""" + from app.tasks.git import create_pull_request + + project_id = str(uuid.uuid4()) + + result = create_pull_request( + project_id, "Fix bug", "Bug fix description", "fix/bug-123" + ) + + assert result["status"] == "pending" + + +class TestPushChangesTask: + """Tests for the push_changes task.""" + + def test_push_changes_task_exists(self): + """Test that push_changes task is registered.""" + from app.celery_app import celery_app + import app.tasks.git # noqa: F401 + + assert "app.tasks.git.push_changes" in celery_app.tasks + + def test_push_changes_is_bound_task(self): + """Test that push_changes is a bound task.""" + from app.tasks.git import push_changes + + assert push_changes.__bound__ is True + + def test_push_changes_returns_expected_structure(self): + """Test that push_changes returns the expected result structure.""" + from app.tasks.git import push_changes + + project_id = str(uuid.uuid4()) + branch = "feature/new-feature" + force = False + + result = push_changes(project_id, branch, force) + + assert isinstance(result, dict) + assert "status" in result + assert "project_id" in result + + def test_push_changes_with_force_option(self): + """Test that push_changes handles force push option.""" + from app.tasks.git import push_changes + + project_id = str(uuid.uuid4()) + branch = "feature/rebased-branch" + force = True + + result = push_changes(project_id, branch, force) + + assert result["status"] == "pending" + + +class TestGitTaskRouting: + """Tests for git task queue routing.""" + + def test_git_tasks_should_route_to_git_queue(self): + """Test that git tasks are configured to route to 'git' queue.""" + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + git_route = routes.get("app.tasks.git.*") + + assert git_route is not None + assert git_route["queue"] == "git" + + def test_all_git_tasks_match_routing_pattern(self): + """Test that all git task names match the routing pattern.""" + from app.tasks import git + + task_names = [ + "app.tasks.git.clone_repository", + "app.tasks.git.commit_changes", + "app.tasks.git.create_branch", + "app.tasks.git.create_pull_request", + "app.tasks.git.push_changes", + ] + + for name in task_names: + assert name.startswith("app.tasks.git.") + + +class TestGitTaskLogging: + """Tests for git task logging behavior.""" + + def test_clone_repository_logs_execution(self): + """Test that clone_repository logs when executed.""" + from app.tasks.git import clone_repository + + project_id = str(uuid.uuid4()) + repo_url = "https://github.com/org/repo.git" + + with patch("app.tasks.git.logger") as mock_logger: + clone_repository(project_id, repo_url) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert repo_url in call_args + assert project_id in call_args + + def test_commit_changes_logs_execution(self): + """Test that commit_changes logs when executed.""" + from app.tasks.git import commit_changes + + project_id = str(uuid.uuid4()) + message = "test commit" + + with patch("app.tasks.git.logger") as mock_logger: + commit_changes(project_id, message) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert message in call_args diff --git a/backend/tests/tasks/test_sync_tasks.py b/backend/tests/tasks/test_sync_tasks.py new file mode 100644 index 0000000..00f0a11 --- /dev/null +++ b/backend/tests/tasks/test_sync_tasks.py @@ -0,0 +1,309 @@ +# tests/tasks/test_sync_tasks.py +""" +Tests for issue synchronization tasks. + +These tests verify: +- Task signatures are correctly defined +- Tasks are bound (have access to self) +- Tasks return expected structure +- Tasks are routed to the 'sync' queue per ADR-011 + +Note: These tests mock actual execution since they would require +external API calls in production. +""" + +import pytest +from unittest.mock import patch +import uuid + + +class TestSyncIssuesIncrementalTask: + """Tests for the sync_issues_incremental task.""" + + def test_sync_issues_incremental_task_exists(self): + """Test that sync_issues_incremental task is registered.""" + from app.celery_app import celery_app + import app.tasks.sync # noqa: F401 + + assert "app.tasks.sync.sync_issues_incremental" in celery_app.tasks + + def test_sync_issues_incremental_is_bound_task(self): + """Test that sync_issues_incremental is a bound task.""" + from app.tasks.sync import sync_issues_incremental + + assert sync_issues_incremental.__bound__ is True + + def test_sync_issues_incremental_has_correct_name(self): + """Test that sync_issues_incremental has the correct task name.""" + from app.tasks.sync import sync_issues_incremental + + assert sync_issues_incremental.name == "app.tasks.sync.sync_issues_incremental" + + def test_sync_issues_incremental_returns_expected_structure(self): + """Test that sync_issues_incremental returns expected result.""" + from app.tasks.sync import sync_issues_incremental + + result = sync_issues_incremental() + + assert isinstance(result, dict) + assert "status" in result + assert "type" in result + assert result["type"] == "incremental" + + +class TestSyncIssuesFullTask: + """Tests for the sync_issues_full task.""" + + def test_sync_issues_full_task_exists(self): + """Test that sync_issues_full task is registered.""" + from app.celery_app import celery_app + import app.tasks.sync # noqa: F401 + + assert "app.tasks.sync.sync_issues_full" in celery_app.tasks + + def test_sync_issues_full_is_bound_task(self): + """Test that sync_issues_full is a bound task.""" + from app.tasks.sync import sync_issues_full + + assert sync_issues_full.__bound__ is True + + def test_sync_issues_full_has_correct_name(self): + """Test that sync_issues_full has the correct task name.""" + from app.tasks.sync import sync_issues_full + + assert sync_issues_full.name == "app.tasks.sync.sync_issues_full" + + def test_sync_issues_full_returns_expected_structure(self): + """Test that sync_issues_full returns expected result.""" + from app.tasks.sync import sync_issues_full + + result = sync_issues_full() + + assert isinstance(result, dict) + assert "status" in result + assert "type" in result + assert result["type"] == "full" + + +class TestProcessWebhookEventTask: + """Tests for the process_webhook_event task.""" + + def test_process_webhook_event_task_exists(self): + """Test that process_webhook_event task is registered.""" + from app.celery_app import celery_app + import app.tasks.sync # noqa: F401 + + assert "app.tasks.sync.process_webhook_event" in celery_app.tasks + + def test_process_webhook_event_is_bound_task(self): + """Test that process_webhook_event is a bound task.""" + from app.tasks.sync import process_webhook_event + + assert process_webhook_event.__bound__ is True + + def test_process_webhook_event_returns_expected_structure(self): + """Test that process_webhook_event returns expected result.""" + from app.tasks.sync import process_webhook_event + + provider = "gitea" + event_type = "issue.created" + payload = { + "action": "opened", + "issue": {"number": 123, "title": "New issue"}, + } + + result = process_webhook_event(provider, event_type, payload) + + assert isinstance(result, dict) + assert "status" in result + assert "provider" in result + assert "event_type" in result + assert result["provider"] == provider + assert result["event_type"] == event_type + + def test_process_webhook_event_handles_github_provider(self): + """Test that process_webhook_event handles GitHub webhooks.""" + from app.tasks.sync import process_webhook_event + + result = process_webhook_event( + "github", "issues", {"action": "opened", "issue": {"number": 1}} + ) + + assert result["provider"] == "github" + + def test_process_webhook_event_handles_gitlab_provider(self): + """Test that process_webhook_event handles GitLab webhooks.""" + from app.tasks.sync import process_webhook_event + + result = process_webhook_event( + "gitlab", + "issue.created", + {"object_kind": "issue", "object_attributes": {"iid": 1}}, + ) + + assert result["provider"] == "gitlab" + + +class TestSyncProjectIssuesTask: + """Tests for the sync_project_issues task.""" + + def test_sync_project_issues_task_exists(self): + """Test that sync_project_issues task is registered.""" + from app.celery_app import celery_app + import app.tasks.sync # noqa: F401 + + assert "app.tasks.sync.sync_project_issues" in celery_app.tasks + + def test_sync_project_issues_is_bound_task(self): + """Test that sync_project_issues is a bound task.""" + from app.tasks.sync import sync_project_issues + + assert sync_project_issues.__bound__ is True + + def test_sync_project_issues_returns_expected_structure(self): + """Test that sync_project_issues returns expected result.""" + from app.tasks.sync import sync_project_issues + + project_id = str(uuid.uuid4()) + full = False + + result = sync_project_issues(project_id, full) + + assert isinstance(result, dict) + assert "status" in result + assert "project_id" in result + assert result["project_id"] == project_id + + def test_sync_project_issues_with_full_sync(self): + """Test that sync_project_issues handles full sync flag.""" + from app.tasks.sync import sync_project_issues + + project_id = str(uuid.uuid4()) + + result = sync_project_issues(project_id, full=True) + + assert result["status"] == "pending" + + +class TestPushIssueToExternalTask: + """Tests for the push_issue_to_external task.""" + + def test_push_issue_to_external_task_exists(self): + """Test that push_issue_to_external task is registered.""" + from app.celery_app import celery_app + import app.tasks.sync # noqa: F401 + + assert "app.tasks.sync.push_issue_to_external" in celery_app.tasks + + def test_push_issue_to_external_is_bound_task(self): + """Test that push_issue_to_external is a bound task.""" + from app.tasks.sync import push_issue_to_external + + assert push_issue_to_external.__bound__ is True + + def test_push_issue_to_external_returns_expected_structure(self): + """Test that push_issue_to_external returns expected result.""" + from app.tasks.sync import push_issue_to_external + + project_id = str(uuid.uuid4()) + issue_id = str(uuid.uuid4()) + operation = "create" + + result = push_issue_to_external(project_id, issue_id, operation) + + assert isinstance(result, dict) + assert "status" in result + assert "issue_id" in result + assert "operation" in result + assert result["issue_id"] == issue_id + assert result["operation"] == operation + + def test_push_issue_to_external_update_operation(self): + """Test that push_issue_to_external handles update operation.""" + from app.tasks.sync import push_issue_to_external + + project_id = str(uuid.uuid4()) + issue_id = str(uuid.uuid4()) + + result = push_issue_to_external(project_id, issue_id, "update") + + assert result["operation"] == "update" + + def test_push_issue_to_external_close_operation(self): + """Test that push_issue_to_external handles close operation.""" + from app.tasks.sync import push_issue_to_external + + project_id = str(uuid.uuid4()) + issue_id = str(uuid.uuid4()) + + result = push_issue_to_external(project_id, issue_id, "close") + + assert result["operation"] == "close" + + +class TestSyncTaskRouting: + """Tests for sync task queue routing.""" + + def test_sync_tasks_should_route_to_sync_queue(self): + """Test that sync tasks are configured to route to 'sync' queue.""" + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + sync_route = routes.get("app.tasks.sync.*") + + assert sync_route is not None + assert sync_route["queue"] == "sync" + + def test_all_sync_tasks_match_routing_pattern(self): + """Test that all sync task names match the routing pattern.""" + task_names = [ + "app.tasks.sync.sync_issues_incremental", + "app.tasks.sync.sync_issues_full", + "app.tasks.sync.process_webhook_event", + "app.tasks.sync.sync_project_issues", + "app.tasks.sync.push_issue_to_external", + ] + + for name in task_names: + assert name.startswith("app.tasks.sync.") + + +class TestSyncTaskLogging: + """Tests for sync task logging behavior.""" + + def test_sync_issues_incremental_logs_execution(self): + """Test that sync_issues_incremental logs when executed.""" + from app.tasks.sync import sync_issues_incremental + + with patch("app.tasks.sync.logger") as mock_logger: + sync_issues_incremental() + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert "incremental" in call_args.lower() + + def test_sync_issues_full_logs_execution(self): + """Test that sync_issues_full logs when executed.""" + from app.tasks.sync import sync_issues_full + + with patch("app.tasks.sync.logger") as mock_logger: + sync_issues_full() + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert "full" in call_args.lower() or "reconciliation" in call_args.lower() + + def test_process_webhook_event_logs_execution(self): + """Test that process_webhook_event logs when executed.""" + from app.tasks.sync import process_webhook_event + + provider = "gitea" + event_type = "issue.updated" + + with patch("app.tasks.sync.logger") as mock_logger: + process_webhook_event(provider, event_type, {}) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert provider in call_args + assert event_type in call_args diff --git a/backend/tests/tasks/test_workflow_tasks.py b/backend/tests/tasks/test_workflow_tasks.py new file mode 100644 index 0000000..58c5748 --- /dev/null +++ b/backend/tests/tasks/test_workflow_tasks.py @@ -0,0 +1,350 @@ +# tests/tasks/test_workflow_tasks.py +""" +Tests for workflow state management tasks. + +These tests verify: +- Task signatures are correctly defined +- Tasks are bound (have access to self) +- Tasks return expected structure +- Tasks follow ADR-007 (transitions) and ADR-010 (PostgreSQL durability) + +Note: These tests mock actual execution since they would require +database access and state machine operations in production. +""" + +import pytest +from unittest.mock import patch +import uuid + + +class TestRecoverStaleWorkflowsTask: + """Tests for the recover_stale_workflows task.""" + + def test_recover_stale_workflows_task_exists(self): + """Test that recover_stale_workflows task is registered.""" + from app.celery_app import celery_app + import app.tasks.workflow # noqa: F401 + + assert "app.tasks.workflow.recover_stale_workflows" in celery_app.tasks + + def test_recover_stale_workflows_is_bound_task(self): + """Test that recover_stale_workflows is a bound task.""" + from app.tasks.workflow import recover_stale_workflows + + assert recover_stale_workflows.__bound__ is True + + def test_recover_stale_workflows_has_correct_name(self): + """Test that recover_stale_workflows has the correct task name.""" + from app.tasks.workflow import recover_stale_workflows + + assert ( + recover_stale_workflows.name == "app.tasks.workflow.recover_stale_workflows" + ) + + def test_recover_stale_workflows_returns_expected_structure(self): + """Test that recover_stale_workflows returns expected result.""" + from app.tasks.workflow import recover_stale_workflows + + result = recover_stale_workflows() + + assert isinstance(result, dict) + assert "status" in result + assert "recovered" in result + assert result["status"] == "pending" + assert result["recovered"] == 0 + + +class TestExecuteWorkflowStepTask: + """Tests for the execute_workflow_step task.""" + + def test_execute_workflow_step_task_exists(self): + """Test that execute_workflow_step task is registered.""" + from app.celery_app import celery_app + import app.tasks.workflow # noqa: F401 + + assert "app.tasks.workflow.execute_workflow_step" in celery_app.tasks + + def test_execute_workflow_step_is_bound_task(self): + """Test that execute_workflow_step is a bound task.""" + from app.tasks.workflow import execute_workflow_step + + assert execute_workflow_step.__bound__ is True + + def test_execute_workflow_step_returns_expected_structure(self): + """Test that execute_workflow_step returns expected result.""" + from app.tasks.workflow import execute_workflow_step + + workflow_id = str(uuid.uuid4()) + transition = "start_planning" + + result = execute_workflow_step(workflow_id, transition) + + assert isinstance(result, dict) + assert "status" in result + assert "workflow_id" in result + assert "transition" in result + assert result["workflow_id"] == workflow_id + assert result["transition"] == transition + + def test_execute_workflow_step_with_various_transitions(self): + """Test that execute_workflow_step handles various transition types.""" + from app.tasks.workflow import execute_workflow_step + + workflow_id = str(uuid.uuid4()) + transitions = [ + "start", + "complete_planning", + "begin_implementation", + "request_approval", + "approve", + "reject", + "complete", + ] + + for transition in transitions: + result = execute_workflow_step(workflow_id, transition) + assert result["transition"] == transition + + +class TestHandleApprovalResponseTask: + """Tests for the handle_approval_response task.""" + + def test_handle_approval_response_task_exists(self): + """Test that handle_approval_response task is registered.""" + from app.celery_app import celery_app + import app.tasks.workflow # noqa: F401 + + assert "app.tasks.workflow.handle_approval_response" in celery_app.tasks + + def test_handle_approval_response_is_bound_task(self): + """Test that handle_approval_response is a bound task.""" + from app.tasks.workflow import handle_approval_response + + assert handle_approval_response.__bound__ is True + + def test_handle_approval_response_returns_expected_structure(self): + """Test that handle_approval_response returns expected result.""" + from app.tasks.workflow import handle_approval_response + + workflow_id = str(uuid.uuid4()) + approved = True + comment = "LGTM! Proceeding with deployment." + + result = handle_approval_response(workflow_id, approved, comment) + + assert isinstance(result, dict) + assert "status" in result + assert "workflow_id" in result + assert "approved" in result + assert result["workflow_id"] == workflow_id + assert result["approved"] == approved + + def test_handle_approval_response_with_rejection(self): + """Test that handle_approval_response handles rejection.""" + from app.tasks.workflow import handle_approval_response + + workflow_id = str(uuid.uuid4()) + + result = handle_approval_response( + workflow_id, approved=False, comment="Needs more test coverage" + ) + + assert result["approved"] is False + + def test_handle_approval_response_without_comment(self): + """Test that handle_approval_response handles missing comment.""" + from app.tasks.workflow import handle_approval_response + + workflow_id = str(uuid.uuid4()) + + result = handle_approval_response(workflow_id, approved=True) + + assert result["status"] == "pending" + + +class TestStartSprintWorkflowTask: + """Tests for the start_sprint_workflow task.""" + + def test_start_sprint_workflow_task_exists(self): + """Test that start_sprint_workflow task is registered.""" + from app.celery_app import celery_app + import app.tasks.workflow # noqa: F401 + + assert "app.tasks.workflow.start_sprint_workflow" in celery_app.tasks + + def test_start_sprint_workflow_is_bound_task(self): + """Test that start_sprint_workflow is a bound task.""" + from app.tasks.workflow import start_sprint_workflow + + assert start_sprint_workflow.__bound__ is True + + def test_start_sprint_workflow_returns_expected_structure(self): + """Test that start_sprint_workflow returns expected result.""" + from app.tasks.workflow import start_sprint_workflow + + project_id = str(uuid.uuid4()) + sprint_id = str(uuid.uuid4()) + + result = start_sprint_workflow(project_id, sprint_id) + + assert isinstance(result, dict) + assert "status" in result + assert "sprint_id" in result + assert result["sprint_id"] == sprint_id + + +class TestStartStoryWorkflowTask: + """Tests for the start_story_workflow task.""" + + def test_start_story_workflow_task_exists(self): + """Test that start_story_workflow task is registered.""" + from app.celery_app import celery_app + import app.tasks.workflow # noqa: F401 + + assert "app.tasks.workflow.start_story_workflow" in celery_app.tasks + + def test_start_story_workflow_is_bound_task(self): + """Test that start_story_workflow is a bound task.""" + from app.tasks.workflow import start_story_workflow + + assert start_story_workflow.__bound__ is True + + def test_start_story_workflow_returns_expected_structure(self): + """Test that start_story_workflow returns expected result.""" + from app.tasks.workflow import start_story_workflow + + project_id = str(uuid.uuid4()) + story_id = str(uuid.uuid4()) + + result = start_story_workflow(project_id, story_id) + + assert isinstance(result, dict) + assert "status" in result + assert "story_id" in result + assert result["story_id"] == story_id + + +class TestWorkflowTaskRouting: + """Tests for workflow task queue routing.""" + + def test_workflow_tasks_route_to_default_queue(self): + """Test that workflow tasks route to 'default' queue. + + Per the routing configuration, workflow tasks match 'app.tasks.*' + which routes to the default queue. + """ + from app.celery_app import celery_app + + routes = celery_app.conf.task_routes + + # Workflow tasks match the generic 'app.tasks.*' pattern + # since there's no specific 'app.tasks.workflow.*' route + assert "app.tasks.*" in routes + assert routes["app.tasks.*"]["queue"] == "default" + + def test_all_workflow_tasks_match_routing_pattern(self): + """Test that all workflow task names match the routing pattern.""" + task_names = [ + "app.tasks.workflow.recover_stale_workflows", + "app.tasks.workflow.execute_workflow_step", + "app.tasks.workflow.handle_approval_response", + "app.tasks.workflow.start_sprint_workflow", + "app.tasks.workflow.start_story_workflow", + ] + + for name in task_names: + assert name.startswith("app.tasks.") + + +class TestWorkflowTaskLogging: + """Tests for workflow task logging behavior.""" + + def test_recover_stale_workflows_logs_execution(self): + """Test that recover_stale_workflows logs when executed.""" + from app.tasks.workflow import recover_stale_workflows + + with patch("app.tasks.workflow.logger") as mock_logger: + recover_stale_workflows() + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert "stale" in call_args.lower() or "recover" in call_args.lower() + + def test_execute_workflow_step_logs_execution(self): + """Test that execute_workflow_step logs when executed.""" + from app.tasks.workflow import execute_workflow_step + + workflow_id = str(uuid.uuid4()) + transition = "start_planning" + + with patch("app.tasks.workflow.logger") as mock_logger: + execute_workflow_step(workflow_id, transition) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert transition in call_args + assert workflow_id in call_args + + def test_handle_approval_response_logs_execution(self): + """Test that handle_approval_response logs when executed.""" + from app.tasks.workflow import handle_approval_response + + workflow_id = str(uuid.uuid4()) + + with patch("app.tasks.workflow.logger") as mock_logger: + handle_approval_response(workflow_id, approved=True) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert workflow_id in call_args + + def test_start_sprint_workflow_logs_execution(self): + """Test that start_sprint_workflow logs when executed.""" + from app.tasks.workflow import start_sprint_workflow + + project_id = str(uuid.uuid4()) + sprint_id = str(uuid.uuid4()) + + with patch("app.tasks.workflow.logger") as mock_logger: + start_sprint_workflow(project_id, sprint_id) + + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + assert sprint_id in call_args + + +class TestWorkflowTaskSignatures: + """Tests for workflow task signature creation.""" + + def test_execute_workflow_step_signature_creation(self): + """Test that execute_workflow_step signature can be created.""" + from app.tasks.workflow import execute_workflow_step + + workflow_id = str(uuid.uuid4()) + transition = "approve" + + sig = execute_workflow_step.s(workflow_id, transition) + + assert sig is not None + assert sig.args == (workflow_id, transition) + + def test_workflow_chain_creation(self): + """Test that workflow tasks can be chained together.""" + from celery import chain + from app.tasks.workflow import ( + start_sprint_workflow, + execute_workflow_step, + handle_approval_response, + ) + + project_id = str(uuid.uuid4()) + sprint_id = str(uuid.uuid4()) + workflow_id = str(uuid.uuid4()) + + # Build a chain (doesn't execute, just creates the workflow) + workflow = chain( + start_sprint_workflow.s(project_id, sprint_id), + # In reality, these would use results from previous tasks + ) + + assert workflow is not None