# SPIKE-004: Celery + Redis Integration **Status:** Completed **Date:** 2025-12-29 **Author:** Architecture Team **Related Issue:** #4 --- ## Objective Research best practices for integrating Celery with FastAPI for background task processing, focusing on agent orchestration, long-running workflows, and task monitoring. ## Research Questions 1. How to properly integrate Celery with async FastAPI? 2. What is the optimal task queue architecture for Syndarix? 3. How to handle long-running agent tasks? 4. What monitoring and visibility patterns should we use? ## Findings ### 1. Celery + FastAPI Integration Pattern **Challenge:** Celery is synchronous, FastAPI is async. **Solution:** Use `celery.result.AsyncResult` with async polling or callbacks. ```python # app/core/celery.py from celery import Celery from app.core.config import settings celery_app = Celery( "syndarix", broker=settings.REDIS_URL, backend=settings.REDIS_URL, include=[ "app.tasks.agent_tasks", "app.tasks.git_tasks", "app.tasks.sync_tasks", ] ) celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, task_track_started=True, task_time_limit=3600, # 1 hour max task_soft_time_limit=3300, # 55 min soft limit worker_prefetch_multiplier=1, # One task at a time for LLM tasks task_acks_late=True, # Acknowledge after completion task_reject_on_worker_lost=True, # Retry if worker dies ) ``` ### 2. Task Queue Architecture ``` ┌─────────────────────────────────────────────────────────────────┐ │ FastAPI Backend │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ API Layer │ │ Services │ │ Events │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ │ │ └────────────────┼────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────┐ │ │ │ Task Dispatcher │ │ │ │ (Celery send_task) │ │ │ └────────────────┬───────────────┘ │ └──────────────────────────┼──────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────┐ │ Redis (Broker + Backend) │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ agent_queue │ │ git_queue │ │ sync_queue │ │ │ │ (priority) │ │ │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ └──────────────────────────────────────────────────────────────────┘ │ ┌───────────────┼───────────────┐ │ │ │ ▼ ▼ ▼ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ Worker │ │ Worker │ │ Worker │ │ (agents) │ │ (git) │ │ (sync) │ │ prefetch=1 │ │ prefetch=4 │ │ prefetch=4 │ └────────────┘ └────────────┘ └────────────┘ ``` ### 3. Queue Configuration ```python # app/core/celery.py celery_app.conf.task_queues = [ Queue("agent_queue", routing_key="agent.#"), Queue("git_queue", routing_key="git.#"), Queue("sync_queue", routing_key="sync.#"), Queue("cicd_queue", routing_key="cicd.#"), ] celery_app.conf.task_routes = { "app.tasks.agent_tasks.*": {"queue": "agent_queue"}, "app.tasks.git_tasks.*": {"queue": "git_queue"}, "app.tasks.sync_tasks.*": {"queue": "sync_queue"}, "app.tasks.cicd_tasks.*": {"queue": "cicd_queue"}, } ``` ### 4. Agent Task Implementation ```python # app/tasks/agent_tasks.py from celery import Task from app.core.celery import celery_app from app.services.agent_runner import AgentRunner from app.services.events import EventBus class AgentTask(Task): """Base class for agent tasks with retry and monitoring.""" autoretry_for = (ConnectionError, TimeoutError) retry_backoff = True retry_backoff_max = 600 retry_jitter = True max_retries = 3 def on_failure(self, exc, task_id, args, kwargs, einfo): """Handle task failure.""" project_id = kwargs.get("project_id") agent_id = kwargs.get("agent_id") EventBus().publish(f"project:{project_id}", { "type": "agent_error", "agent_id": agent_id, "error": str(exc) }) @celery_app.task(bind=True, base=AgentTask) def run_agent_action( self, agent_id: str, project_id: str, action: str, context: dict ) -> dict: """ Execute an agent action as a background task. Args: agent_id: The agent instance ID project_id: The project context action: The action to perform context: Action-specific context Returns: Action result dictionary """ runner = AgentRunner(agent_id, project_id) # Update task state for monitoring self.update_state( state="RUNNING", meta={"agent_id": agent_id, "action": action} ) # Publish start event EventBus().publish(f"project:{project_id}", { "type": "agent_started", "agent_id": agent_id, "action": action, "task_id": self.request.id }) try: result = runner.execute(action, context) # Publish completion event EventBus().publish(f"project:{project_id}", { "type": "agent_completed", "agent_id": agent_id, "action": action, "result_summary": result.get("summary") }) return result except Exception as e: # Will trigger on_failure raise ``` ### 5. Long-Running Task Patterns **Progress Reporting:** ```python @celery_app.task(bind=True) def implement_story(self, story_id: str, agent_id: str, project_id: str): """Implement a user story with progress reporting.""" steps = [ ("analyzing", "Analyzing requirements"), ("designing", "Designing solution"), ("implementing", "Writing code"), ("testing", "Running tests"), ("documenting", "Updating documentation"), ] for i, (state, description) in enumerate(steps): self.update_state( state="PROGRESS", meta={ "current": i + 1, "total": len(steps), "status": description } ) # Do the actual work execute_step(state, story_id, agent_id) # Publish progress event EventBus().publish(f"project:{project_id}", { "type": "agent_progress", "agent_id": agent_id, "step": i + 1, "total": len(steps), "description": description }) return {"status": "completed", "story_id": story_id} ``` **Task Chaining:** ```python from celery import chain, group # Sequential workflow workflow = chain( analyze_requirements.s(story_id), design_solution.s(), implement_code.s(), run_tests.s(), create_pr.s() ) # Parallel execution parallel_tests = group( run_unit_tests.s(project_id), run_integration_tests.s(project_id), run_linting.s(project_id) ) ``` ### 6. FastAPI Integration ```python # app/api/v1/agents.py from fastapi import APIRouter, BackgroundTasks from app.tasks.agent_tasks import run_agent_action from celery.result import AsyncResult router = APIRouter() @router.post("/agents/{agent_id}/actions") async def trigger_agent_action( agent_id: str, action: AgentActionRequest, background_tasks: BackgroundTasks ): """Trigger an agent action as a background task.""" # Dispatch to Celery task = run_agent_action.delay( agent_id=agent_id, project_id=action.project_id, action=action.action, context=action.context ) return { "task_id": task.id, "status": "queued" } @router.get("/tasks/{task_id}") async def get_task_status(task_id: str): """Get the status of a background task.""" result = AsyncResult(task_id) if result.state == "PENDING": return {"status": "pending"} elif result.state == "RUNNING": return {"status": "running", **result.info} elif result.state == "PROGRESS": return {"status": "progress", **result.info} elif result.state == "SUCCESS": return {"status": "completed", "result": result.result} elif result.state == "FAILURE": return {"status": "failed", "error": str(result.result)} return {"status": result.state} ``` ### 7. Worker Configuration ```bash # Run different workers for different queues # Agent worker (single task at a time for LLM rate limiting) celery -A app.core.celery worker \ -Q agent_queue \ -c 4 \ --prefetch-multiplier=1 \ -n agent_worker@%h # Git worker (can handle multiple concurrent tasks) celery -A app.core.celery worker \ -Q git_queue \ -c 8 \ --prefetch-multiplier=4 \ -n git_worker@%h # Sync worker celery -A app.core.celery worker \ -Q sync_queue \ -c 4 \ --prefetch-multiplier=4 \ -n sync_worker@%h ``` ### 8. Monitoring with Flower ```python # docker-compose.yml services: flower: image: mher/flower:latest command: celery flower --broker=redis://redis:6379/0 ports: - "5555:5555" environment: - CELERY_BROKER_URL=redis://redis:6379/0 - FLOWER_BASIC_AUTH=admin:password ``` ### 9. Task Scheduling (Celery Beat) ```python # app/core/celery.py from celery.schedules import crontab celery_app.conf.beat_schedule = { # Sync issues every minute "sync-external-issues": { "task": "app.tasks.sync_tasks.sync_all_issues", "schedule": 60.0, }, # Health check every 5 minutes "agent-health-check": { "task": "app.tasks.agent_tasks.health_check_all_agents", "schedule": 300.0, }, # Daily cleanup at midnight "cleanup-old-tasks": { "task": "app.tasks.maintenance.cleanup_old_tasks", "schedule": crontab(hour=0, minute=0), }, } ``` ## Best Practices 1. **One task per LLM call** - Avoid rate limiting issues 2. **Progress reporting** - Update state for long-running tasks 3. **Idempotent tasks** - Handle retries gracefully 4. **Separate queues** - Isolate slow tasks from fast ones 5. **Task result expiry** - Set `result_expires` to avoid Redis bloat 6. **Soft time limits** - Allow graceful shutdown before hard kill ## Recommendations 1. **Use Celery for all long-running operations** - Agent actions - Git operations - External sync - CI/CD triggers 2. **Use Redis as both broker and backend** - Simplifies infrastructure - Fast enough for our scale 3. **Configure separate queues** - `agent_queue` with prefetch=1 - `git_queue` with prefetch=4 - `sync_queue` with prefetch=4 4. **Implement proper monitoring** - Flower for web UI - Prometheus metrics export - Dead letter queue for failed tasks ## References - [Celery Documentation](https://docs.celeryq.dev/) - [FastAPI Background Tasks](https://fastapi.tiangolo.com/tutorial/background-tasks/) - [Celery Best Practices](https://docs.celeryq.dev/en/stable/userguide/tasks.html#tips-and-best-practices) ## Decision **Adopt Celery + Redis** for all background task processing with queue-based routing and progress reporting via Redis Pub/Sub events. --- *Spike completed. Findings will inform ADR-003: Background Task Architecture.*