Files
syndarix/docs/spikes/SPIKE-004-celery-redis-integration.md
Felipe Cardoso a6a336b66e docs: add spike findings for LLM abstraction, MCP integration, and real-time updates
- Added research findings and recommendations as separate SPIKE documents in `docs/spikes/`:
  - `SPIKE-005-llm-provider-abstraction.md`: Research on unified abstraction for LLM providers with failover, cost tracking, and caching strategies.
  - `SPIKE-001-mcp-integration-pattern.md`: Optimal pattern for integrating MCP with project/agent scoping and authentication strategies.
  - `SPIKE-003-realtime-updates.md`: Evaluation of SSE vs WebSocket for real-time updates, aligned with use-case needs.
- Focused on aligning implementation architectures with scalability, efficiency, and user needs.
- Documentation intended to inform upcoming ADRs.
2025-12-29 13:15:50 +01:00

13 KiB

SPIKE-004: Celery + Redis Integration

Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #4


Objective

Research best practices for integrating Celery with FastAPI for background task processing, focusing on agent orchestration, long-running workflows, and task monitoring.

Research Questions

  1. How to properly integrate Celery with async FastAPI?
  2. What is the optimal task queue architecture for Syndarix?
  3. How to handle long-running agent tasks?
  4. What monitoring and visibility patterns should we use?

Findings

1. Celery + FastAPI Integration Pattern

Challenge: Celery is synchronous, FastAPI is async.

Solution: Use celery.result.AsyncResult with async polling or callbacks.

# app/core/celery.py
from celery import Celery
from app.core.config import settings

celery_app = Celery(
    "syndarix",
    broker=settings.REDIS_URL,
    backend=settings.REDIS_URL,
    include=[
        "app.tasks.agent_tasks",
        "app.tasks.git_tasks",
        "app.tasks.sync_tasks",
    ]
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=3600,  # 1 hour max
    task_soft_time_limit=3300,  # 55 min soft limit
    worker_prefetch_multiplier=1,  # One task at a time for LLM tasks
    task_acks_late=True,  # Acknowledge after completion
    task_reject_on_worker_lost=True,  # Retry if worker dies
)

2. Task Queue Architecture

┌─────────────────────────────────────────────────────────────────┐
│                       FastAPI Backend                            │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
│  │  API Layer  │  │   Services  │  │   Events    │              │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘              │
│         │                │                │                      │
│         └────────────────┼────────────────┘                      │
│                          │                                       │
│                          ▼                                       │
│         ┌────────────────────────────────┐                      │
│         │       Task Dispatcher          │                      │
│         │   (Celery send_task)           │                      │
│         └────────────────┬───────────────┘                      │
└──────────────────────────┼──────────────────────────────────────┘
                           │
                           ▼
┌──────────────────────────────────────────────────────────────────┐
│                      Redis (Broker + Backend)                     │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐            │
│  │ agent_queue  │  │  git_queue   │  │  sync_queue  │            │
│  │ (priority)   │  │              │  │              │            │
│  └──────────────┘  └──────────────┘  └──────────────┘            │
└──────────────────────────────────────────────────────────────────┘
                           │
           ┌───────────────┼───────────────┐
           │               │               │
           ▼               ▼               ▼
    ┌────────────┐  ┌────────────┐  ┌────────────┐
    │   Worker   │  │   Worker   │  │   Worker   │
    │  (agents)  │  │   (git)    │  │   (sync)   │
    │ prefetch=1 │  │ prefetch=4 │  │ prefetch=4 │
    └────────────┘  └────────────┘  └────────────┘

3. Queue Configuration

# app/core/celery.py
celery_app.conf.task_queues = [
    Queue("agent_queue", routing_key="agent.#"),
    Queue("git_queue", routing_key="git.#"),
    Queue("sync_queue", routing_key="sync.#"),
    Queue("cicd_queue", routing_key="cicd.#"),
]

celery_app.conf.task_routes = {
    "app.tasks.agent_tasks.*": {"queue": "agent_queue"},
    "app.tasks.git_tasks.*": {"queue": "git_queue"},
    "app.tasks.sync_tasks.*": {"queue": "sync_queue"},
    "app.tasks.cicd_tasks.*": {"queue": "cicd_queue"},
}

4. Agent Task Implementation

# app/tasks/agent_tasks.py
from celery import Task
from app.core.celery import celery_app
from app.services.agent_runner import AgentRunner
from app.services.events import EventBus

class AgentTask(Task):
    """Base class for agent tasks with retry and monitoring."""

    autoretry_for = (ConnectionError, TimeoutError)
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True
    max_retries = 3

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Handle task failure."""
        project_id = kwargs.get("project_id")
        agent_id = kwargs.get("agent_id")
        EventBus().publish(f"project:{project_id}", {
            "type": "agent_error",
            "agent_id": agent_id,
            "error": str(exc)
        })

@celery_app.task(bind=True, base=AgentTask)
def run_agent_action(
    self,
    agent_id: str,
    project_id: str,
    action: str,
    context: dict
) -> dict:
    """
    Execute an agent action as a background task.

    Args:
        agent_id: The agent instance ID
        project_id: The project context
        action: The action to perform
        context: Action-specific context

    Returns:
        Action result dictionary
    """
    runner = AgentRunner(agent_id, project_id)

    # Update task state for monitoring
    self.update_state(
        state="RUNNING",
        meta={"agent_id": agent_id, "action": action}
    )

    # Publish start event
    EventBus().publish(f"project:{project_id}", {
        "type": "agent_started",
        "agent_id": agent_id,
        "action": action,
        "task_id": self.request.id
    })

    try:
        result = runner.execute(action, context)

        # Publish completion event
        EventBus().publish(f"project:{project_id}", {
            "type": "agent_completed",
            "agent_id": agent_id,
            "action": action,
            "result_summary": result.get("summary")
        })

        return result
    except Exception as e:
        # Will trigger on_failure
        raise

5. Long-Running Task Patterns

Progress Reporting:

@celery_app.task(bind=True)
def implement_story(self, story_id: str, agent_id: str, project_id: str):
    """Implement a user story with progress reporting."""

    steps = [
        ("analyzing", "Analyzing requirements"),
        ("designing", "Designing solution"),
        ("implementing", "Writing code"),
        ("testing", "Running tests"),
        ("documenting", "Updating documentation"),
    ]

    for i, (state, description) in enumerate(steps):
        self.update_state(
            state="PROGRESS",
            meta={
                "current": i + 1,
                "total": len(steps),
                "status": description
            }
        )

        # Do the actual work
        execute_step(state, story_id, agent_id)

        # Publish progress event
        EventBus().publish(f"project:{project_id}", {
            "type": "agent_progress",
            "agent_id": agent_id,
            "step": i + 1,
            "total": len(steps),
            "description": description
        })

    return {"status": "completed", "story_id": story_id}

Task Chaining:

from celery import chain, group

# Sequential workflow
workflow = chain(
    analyze_requirements.s(story_id),
    design_solution.s(),
    implement_code.s(),
    run_tests.s(),
    create_pr.s()
)

# Parallel execution
parallel_tests = group(
    run_unit_tests.s(project_id),
    run_integration_tests.s(project_id),
    run_linting.s(project_id)
)

6. FastAPI Integration

# app/api/v1/agents.py
from fastapi import APIRouter, BackgroundTasks
from app.tasks.agent_tasks import run_agent_action
from celery.result import AsyncResult

router = APIRouter()

@router.post("/agents/{agent_id}/actions")
async def trigger_agent_action(
    agent_id: str,
    action: AgentActionRequest,
    background_tasks: BackgroundTasks
):
    """Trigger an agent action as a background task."""

    # Dispatch to Celery
    task = run_agent_action.delay(
        agent_id=agent_id,
        project_id=action.project_id,
        action=action.action,
        context=action.context
    )

    return {
        "task_id": task.id,
        "status": "queued"
    }

@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    """Get the status of a background task."""

    result = AsyncResult(task_id)

    if result.state == "PENDING":
        return {"status": "pending"}
    elif result.state == "RUNNING":
        return {"status": "running", **result.info}
    elif result.state == "PROGRESS":
        return {"status": "progress", **result.info}
    elif result.state == "SUCCESS":
        return {"status": "completed", "result": result.result}
    elif result.state == "FAILURE":
        return {"status": "failed", "error": str(result.result)}

    return {"status": result.state}

7. Worker Configuration

# Run different workers for different queues

# Agent worker (single task at a time for LLM rate limiting)
celery -A app.core.celery worker \
    -Q agent_queue \
    -c 4 \
    --prefetch-multiplier=1 \
    -n agent_worker@%h

# Git worker (can handle multiple concurrent tasks)
celery -A app.core.celery worker \
    -Q git_queue \
    -c 8 \
    --prefetch-multiplier=4 \
    -n git_worker@%h

# Sync worker
celery -A app.core.celery worker \
    -Q sync_queue \
    -c 4 \
    --prefetch-multiplier=4 \
    -n sync_worker@%h

8. Monitoring with Flower

# docker-compose.yml
services:
  flower:
    image: mher/flower:latest
    command: celery flower --broker=redis://redis:6379/0
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - FLOWER_BASIC_AUTH=admin:password

9. Task Scheduling (Celery Beat)

# app/core/celery.py
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    # Sync issues every minute
    "sync-external-issues": {
        "task": "app.tasks.sync_tasks.sync_all_issues",
        "schedule": 60.0,
    },
    # Health check every 5 minutes
    "agent-health-check": {
        "task": "app.tasks.agent_tasks.health_check_all_agents",
        "schedule": 300.0,
    },
    # Daily cleanup at midnight
    "cleanup-old-tasks": {
        "task": "app.tasks.maintenance.cleanup_old_tasks",
        "schedule": crontab(hour=0, minute=0),
    },
}

Best Practices

  1. One task per LLM call - Avoid rate limiting issues
  2. Progress reporting - Update state for long-running tasks
  3. Idempotent tasks - Handle retries gracefully
  4. Separate queues - Isolate slow tasks from fast ones
  5. Task result expiry - Set result_expires to avoid Redis bloat
  6. Soft time limits - Allow graceful shutdown before hard kill

Recommendations

  1. Use Celery for all long-running operations

    • Agent actions
    • Git operations
    • External sync
    • CI/CD triggers
  2. Use Redis as both broker and backend

    • Simplifies infrastructure
    • Fast enough for our scale
  3. Configure separate queues

    • agent_queue with prefetch=1
    • git_queue with prefetch=4
    • sync_queue with prefetch=4
  4. Implement proper monitoring

    • Flower for web UI
    • Prometheus metrics export
    • Dead letter queue for failed tasks

References

Decision

Adopt Celery + Redis for all background task processing with queue-based routing and progress reporting via Redis Pub/Sub events.


Spike completed. Findings will inform ADR-003: Background Task Architecture.