forked from cardosofelipe/fast-next-template
- Added research findings and recommendations as separate SPIKE documents in `docs/spikes/`: - `SPIKE-005-llm-provider-abstraction.md`: Research on unified abstraction for LLM providers with failover, cost tracking, and caching strategies. - `SPIKE-001-mcp-integration-pattern.md`: Optimal pattern for integrating MCP with project/agent scoping and authentication strategies. - `SPIKE-003-realtime-updates.md`: Evaluation of SSE vs WebSocket for real-time updates, aligned with use-case needs. - Focused on aligning implementation architectures with scalability, efficiency, and user needs. - Documentation intended to inform upcoming ADRs.
13 KiB
13 KiB
SPIKE-004: Celery + Redis Integration
Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #4
Objective
Research best practices for integrating Celery with FastAPI for background task processing, focusing on agent orchestration, long-running workflows, and task monitoring.
Research Questions
- How to properly integrate Celery with async FastAPI?
- What is the optimal task queue architecture for Syndarix?
- How to handle long-running agent tasks?
- What monitoring and visibility patterns should we use?
Findings
1. Celery + FastAPI Integration Pattern
Challenge: Celery is synchronous, FastAPI is async.
Solution: Use celery.result.AsyncResult with async polling or callbacks.
# app/core/celery.py
from celery import Celery
from app.core.config import settings
celery_app = Celery(
"syndarix",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
include=[
"app.tasks.agent_tasks",
"app.tasks.git_tasks",
"app.tasks.sync_tasks",
]
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=3600, # 1 hour max
task_soft_time_limit=3300, # 55 min soft limit
worker_prefetch_multiplier=1, # One task at a time for LLM tasks
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True, # Retry if worker dies
)
2. Task Queue Architecture
┌─────────────────────────────────────────────────────────────────┐
│ FastAPI Backend │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ API Layer │ │ Services │ │ Events │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────┐ │
│ │ Task Dispatcher │ │
│ │ (Celery send_task) │ │
│ └────────────────┬───────────────┘ │
└──────────────────────────┼──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Redis (Broker + Backend) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ agent_queue │ │ git_queue │ │ sync_queue │ │
│ │ (priority) │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└──────────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Worker │ │ Worker │ │ Worker │
│ (agents) │ │ (git) │ │ (sync) │
│ prefetch=1 │ │ prefetch=4 │ │ prefetch=4 │
└────────────┘ └────────────┘ └────────────┘
3. Queue Configuration
# app/core/celery.py
celery_app.conf.task_queues = [
Queue("agent_queue", routing_key="agent.#"),
Queue("git_queue", routing_key="git.#"),
Queue("sync_queue", routing_key="sync.#"),
Queue("cicd_queue", routing_key="cicd.#"),
]
celery_app.conf.task_routes = {
"app.tasks.agent_tasks.*": {"queue": "agent_queue"},
"app.tasks.git_tasks.*": {"queue": "git_queue"},
"app.tasks.sync_tasks.*": {"queue": "sync_queue"},
"app.tasks.cicd_tasks.*": {"queue": "cicd_queue"},
}
4. Agent Task Implementation
# app/tasks/agent_tasks.py
from celery import Task
from app.core.celery import celery_app
from app.services.agent_runner import AgentRunner
from app.services.events import EventBus
class AgentTask(Task):
"""Base class for agent tasks with retry and monitoring."""
autoretry_for = (ConnectionError, TimeoutError)
retry_backoff = True
retry_backoff_max = 600
retry_jitter = True
max_retries = 3
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Handle task failure."""
project_id = kwargs.get("project_id")
agent_id = kwargs.get("agent_id")
EventBus().publish(f"project:{project_id}", {
"type": "agent_error",
"agent_id": agent_id,
"error": str(exc)
})
@celery_app.task(bind=True, base=AgentTask)
def run_agent_action(
self,
agent_id: str,
project_id: str,
action: str,
context: dict
) -> dict:
"""
Execute an agent action as a background task.
Args:
agent_id: The agent instance ID
project_id: The project context
action: The action to perform
context: Action-specific context
Returns:
Action result dictionary
"""
runner = AgentRunner(agent_id, project_id)
# Update task state for monitoring
self.update_state(
state="RUNNING",
meta={"agent_id": agent_id, "action": action}
)
# Publish start event
EventBus().publish(f"project:{project_id}", {
"type": "agent_started",
"agent_id": agent_id,
"action": action,
"task_id": self.request.id
})
try:
result = runner.execute(action, context)
# Publish completion event
EventBus().publish(f"project:{project_id}", {
"type": "agent_completed",
"agent_id": agent_id,
"action": action,
"result_summary": result.get("summary")
})
return result
except Exception as e:
# Will trigger on_failure
raise
5. Long-Running Task Patterns
Progress Reporting:
@celery_app.task(bind=True)
def implement_story(self, story_id: str, agent_id: str, project_id: str):
"""Implement a user story with progress reporting."""
steps = [
("analyzing", "Analyzing requirements"),
("designing", "Designing solution"),
("implementing", "Writing code"),
("testing", "Running tests"),
("documenting", "Updating documentation"),
]
for i, (state, description) in enumerate(steps):
self.update_state(
state="PROGRESS",
meta={
"current": i + 1,
"total": len(steps),
"status": description
}
)
# Do the actual work
execute_step(state, story_id, agent_id)
# Publish progress event
EventBus().publish(f"project:{project_id}", {
"type": "agent_progress",
"agent_id": agent_id,
"step": i + 1,
"total": len(steps),
"description": description
})
return {"status": "completed", "story_id": story_id}
Task Chaining:
from celery import chain, group
# Sequential workflow
workflow = chain(
analyze_requirements.s(story_id),
design_solution.s(),
implement_code.s(),
run_tests.s(),
create_pr.s()
)
# Parallel execution
parallel_tests = group(
run_unit_tests.s(project_id),
run_integration_tests.s(project_id),
run_linting.s(project_id)
)
6. FastAPI Integration
# app/api/v1/agents.py
from fastapi import APIRouter, BackgroundTasks
from app.tasks.agent_tasks import run_agent_action
from celery.result import AsyncResult
router = APIRouter()
@router.post("/agents/{agent_id}/actions")
async def trigger_agent_action(
agent_id: str,
action: AgentActionRequest,
background_tasks: BackgroundTasks
):
"""Trigger an agent action as a background task."""
# Dispatch to Celery
task = run_agent_action.delay(
agent_id=agent_id,
project_id=action.project_id,
action=action.action,
context=action.context
)
return {
"task_id": task.id,
"status": "queued"
}
@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""Get the status of a background task."""
result = AsyncResult(task_id)
if result.state == "PENDING":
return {"status": "pending"}
elif result.state == "RUNNING":
return {"status": "running", **result.info}
elif result.state == "PROGRESS":
return {"status": "progress", **result.info}
elif result.state == "SUCCESS":
return {"status": "completed", "result": result.result}
elif result.state == "FAILURE":
return {"status": "failed", "error": str(result.result)}
return {"status": result.state}
7. Worker Configuration
# Run different workers for different queues
# Agent worker (single task at a time for LLM rate limiting)
celery -A app.core.celery worker \
-Q agent_queue \
-c 4 \
--prefetch-multiplier=1 \
-n agent_worker@%h
# Git worker (can handle multiple concurrent tasks)
celery -A app.core.celery worker \
-Q git_queue \
-c 8 \
--prefetch-multiplier=4 \
-n git_worker@%h
# Sync worker
celery -A app.core.celery worker \
-Q sync_queue \
-c 4 \
--prefetch-multiplier=4 \
-n sync_worker@%h
8. Monitoring with Flower
# docker-compose.yml
services:
flower:
image: mher/flower:latest
command: celery flower --broker=redis://redis:6379/0
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- FLOWER_BASIC_AUTH=admin:password
9. Task Scheduling (Celery Beat)
# app/core/celery.py
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
# Sync issues every minute
"sync-external-issues": {
"task": "app.tasks.sync_tasks.sync_all_issues",
"schedule": 60.0,
},
# Health check every 5 minutes
"agent-health-check": {
"task": "app.tasks.agent_tasks.health_check_all_agents",
"schedule": 300.0,
},
# Daily cleanup at midnight
"cleanup-old-tasks": {
"task": "app.tasks.maintenance.cleanup_old_tasks",
"schedule": crontab(hour=0, minute=0),
},
}
Best Practices
- One task per LLM call - Avoid rate limiting issues
- Progress reporting - Update state for long-running tasks
- Idempotent tasks - Handle retries gracefully
- Separate queues - Isolate slow tasks from fast ones
- Task result expiry - Set
result_expiresto avoid Redis bloat - Soft time limits - Allow graceful shutdown before hard kill
Recommendations
-
Use Celery for all long-running operations
- Agent actions
- Git operations
- External sync
- CI/CD triggers
-
Use Redis as both broker and backend
- Simplifies infrastructure
- Fast enough for our scale
-
Configure separate queues
agent_queuewith prefetch=1git_queuewith prefetch=4sync_queuewith prefetch=4
-
Implement proper monitoring
- Flower for web UI
- Prometheus metrics export
- Dead letter queue for failed tasks
References
Decision
Adopt Celery + Redis for all background task processing with queue-based routing and progress reporting via Redis Pub/Sub events.
Spike completed. Findings will inform ADR-003: Background Task Architecture.