forked from cardosofelipe/fast-next-template
- Added research findings and recommendations as separate SPIKE documents in `docs/spikes/`: - `SPIKE-005-llm-provider-abstraction.md`: Research on unified abstraction for LLM providers with failover, cost tracking, and caching strategies. - `SPIKE-001-mcp-integration-pattern.md`: Optimal pattern for integrating MCP with project/agent scoping and authentication strategies. - `SPIKE-003-realtime-updates.md`: Evaluation of SSE vs WebSocket for real-time updates, aligned with use-case needs. - Focused on aligning implementation architectures with scalability, efficiency, and user needs. - Documentation intended to inform upcoming ADRs.
421 lines
13 KiB
Markdown
421 lines
13 KiB
Markdown
# SPIKE-004: Celery + Redis Integration
|
|
|
|
**Status:** Completed
|
|
**Date:** 2025-12-29
|
|
**Author:** Architecture Team
|
|
**Related Issue:** #4
|
|
|
|
---
|
|
|
|
## Objective
|
|
|
|
Research best practices for integrating Celery with FastAPI for background task processing, focusing on agent orchestration, long-running workflows, and task monitoring.
|
|
|
|
## Research Questions
|
|
|
|
1. How to properly integrate Celery with async FastAPI?
|
|
2. What is the optimal task queue architecture for Syndarix?
|
|
3. How to handle long-running agent tasks?
|
|
4. What monitoring and visibility patterns should we use?
|
|
|
|
## Findings
|
|
|
|
### 1. Celery + FastAPI Integration Pattern
|
|
|
|
**Challenge:** Celery is synchronous, FastAPI is async.
|
|
|
|
**Solution:** Use `celery.result.AsyncResult` with async polling or callbacks.
|
|
|
|
```python
|
|
# app/core/celery.py
|
|
from celery import Celery
|
|
from app.core.config import settings
|
|
|
|
celery_app = Celery(
|
|
"syndarix",
|
|
broker=settings.REDIS_URL,
|
|
backend=settings.REDIS_URL,
|
|
include=[
|
|
"app.tasks.agent_tasks",
|
|
"app.tasks.git_tasks",
|
|
"app.tasks.sync_tasks",
|
|
]
|
|
)
|
|
|
|
celery_app.conf.update(
|
|
task_serializer="json",
|
|
accept_content=["json"],
|
|
result_serializer="json",
|
|
timezone="UTC",
|
|
enable_utc=True,
|
|
task_track_started=True,
|
|
task_time_limit=3600, # 1 hour max
|
|
task_soft_time_limit=3300, # 55 min soft limit
|
|
worker_prefetch_multiplier=1, # One task at a time for LLM tasks
|
|
task_acks_late=True, # Acknowledge after completion
|
|
task_reject_on_worker_lost=True, # Retry if worker dies
|
|
)
|
|
```
|
|
|
|
### 2. Task Queue Architecture
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ FastAPI Backend │
|
|
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
|
│ │ API Layer │ │ Services │ │ Events │ │
|
|
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
|
|
│ │ │ │ │
|
|
│ └────────────────┼────────────────┘ │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ┌────────────────────────────────┐ │
|
|
│ │ Task Dispatcher │ │
|
|
│ │ (Celery send_task) │ │
|
|
│ └────────────────┬───────────────┘ │
|
|
└──────────────────────────┼──────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌──────────────────────────────────────────────────────────────────┐
|
|
│ Redis (Broker + Backend) │
|
|
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
|
│ │ agent_queue │ │ git_queue │ │ sync_queue │ │
|
|
│ │ (priority) │ │ │ │ │ │
|
|
│ └──────────────┘ └──────────────┘ └──────────────┘ │
|
|
└──────────────────────────────────────────────────────────────────┘
|
|
│
|
|
┌───────────────┼───────────────┐
|
|
│ │ │
|
|
▼ ▼ ▼
|
|
┌────────────┐ ┌────────────┐ ┌────────────┐
|
|
│ Worker │ │ Worker │ │ Worker │
|
|
│ (agents) │ │ (git) │ │ (sync) │
|
|
│ prefetch=1 │ │ prefetch=4 │ │ prefetch=4 │
|
|
└────────────┘ └────────────┘ └────────────┘
|
|
```
|
|
|
|
### 3. Queue Configuration
|
|
|
|
```python
|
|
# app/core/celery.py
|
|
celery_app.conf.task_queues = [
|
|
Queue("agent_queue", routing_key="agent.#"),
|
|
Queue("git_queue", routing_key="git.#"),
|
|
Queue("sync_queue", routing_key="sync.#"),
|
|
Queue("cicd_queue", routing_key="cicd.#"),
|
|
]
|
|
|
|
celery_app.conf.task_routes = {
|
|
"app.tasks.agent_tasks.*": {"queue": "agent_queue"},
|
|
"app.tasks.git_tasks.*": {"queue": "git_queue"},
|
|
"app.tasks.sync_tasks.*": {"queue": "sync_queue"},
|
|
"app.tasks.cicd_tasks.*": {"queue": "cicd_queue"},
|
|
}
|
|
```
|
|
|
|
### 4. Agent Task Implementation
|
|
|
|
```python
|
|
# app/tasks/agent_tasks.py
|
|
from celery import Task
|
|
from app.core.celery import celery_app
|
|
from app.services.agent_runner import AgentRunner
|
|
from app.services.events import EventBus
|
|
|
|
class AgentTask(Task):
|
|
"""Base class for agent tasks with retry and monitoring."""
|
|
|
|
autoretry_for = (ConnectionError, TimeoutError)
|
|
retry_backoff = True
|
|
retry_backoff_max = 600
|
|
retry_jitter = True
|
|
max_retries = 3
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
"""Handle task failure."""
|
|
project_id = kwargs.get("project_id")
|
|
agent_id = kwargs.get("agent_id")
|
|
EventBus().publish(f"project:{project_id}", {
|
|
"type": "agent_error",
|
|
"agent_id": agent_id,
|
|
"error": str(exc)
|
|
})
|
|
|
|
@celery_app.task(bind=True, base=AgentTask)
|
|
def run_agent_action(
|
|
self,
|
|
agent_id: str,
|
|
project_id: str,
|
|
action: str,
|
|
context: dict
|
|
) -> dict:
|
|
"""
|
|
Execute an agent action as a background task.
|
|
|
|
Args:
|
|
agent_id: The agent instance ID
|
|
project_id: The project context
|
|
action: The action to perform
|
|
context: Action-specific context
|
|
|
|
Returns:
|
|
Action result dictionary
|
|
"""
|
|
runner = AgentRunner(agent_id, project_id)
|
|
|
|
# Update task state for monitoring
|
|
self.update_state(
|
|
state="RUNNING",
|
|
meta={"agent_id": agent_id, "action": action}
|
|
)
|
|
|
|
# Publish start event
|
|
EventBus().publish(f"project:{project_id}", {
|
|
"type": "agent_started",
|
|
"agent_id": agent_id,
|
|
"action": action,
|
|
"task_id": self.request.id
|
|
})
|
|
|
|
try:
|
|
result = runner.execute(action, context)
|
|
|
|
# Publish completion event
|
|
EventBus().publish(f"project:{project_id}", {
|
|
"type": "agent_completed",
|
|
"agent_id": agent_id,
|
|
"action": action,
|
|
"result_summary": result.get("summary")
|
|
})
|
|
|
|
return result
|
|
except Exception as e:
|
|
# Will trigger on_failure
|
|
raise
|
|
```
|
|
|
|
### 5. Long-Running Task Patterns
|
|
|
|
**Progress Reporting:**
|
|
```python
|
|
@celery_app.task(bind=True)
|
|
def implement_story(self, story_id: str, agent_id: str, project_id: str):
|
|
"""Implement a user story with progress reporting."""
|
|
|
|
steps = [
|
|
("analyzing", "Analyzing requirements"),
|
|
("designing", "Designing solution"),
|
|
("implementing", "Writing code"),
|
|
("testing", "Running tests"),
|
|
("documenting", "Updating documentation"),
|
|
]
|
|
|
|
for i, (state, description) in enumerate(steps):
|
|
self.update_state(
|
|
state="PROGRESS",
|
|
meta={
|
|
"current": i + 1,
|
|
"total": len(steps),
|
|
"status": description
|
|
}
|
|
)
|
|
|
|
# Do the actual work
|
|
execute_step(state, story_id, agent_id)
|
|
|
|
# Publish progress event
|
|
EventBus().publish(f"project:{project_id}", {
|
|
"type": "agent_progress",
|
|
"agent_id": agent_id,
|
|
"step": i + 1,
|
|
"total": len(steps),
|
|
"description": description
|
|
})
|
|
|
|
return {"status": "completed", "story_id": story_id}
|
|
```
|
|
|
|
**Task Chaining:**
|
|
```python
|
|
from celery import chain, group
|
|
|
|
# Sequential workflow
|
|
workflow = chain(
|
|
analyze_requirements.s(story_id),
|
|
design_solution.s(),
|
|
implement_code.s(),
|
|
run_tests.s(),
|
|
create_pr.s()
|
|
)
|
|
|
|
# Parallel execution
|
|
parallel_tests = group(
|
|
run_unit_tests.s(project_id),
|
|
run_integration_tests.s(project_id),
|
|
run_linting.s(project_id)
|
|
)
|
|
```
|
|
|
|
### 6. FastAPI Integration
|
|
|
|
```python
|
|
# app/api/v1/agents.py
|
|
from fastapi import APIRouter, BackgroundTasks
|
|
from app.tasks.agent_tasks import run_agent_action
|
|
from celery.result import AsyncResult
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/agents/{agent_id}/actions")
|
|
async def trigger_agent_action(
|
|
agent_id: str,
|
|
action: AgentActionRequest,
|
|
background_tasks: BackgroundTasks
|
|
):
|
|
"""Trigger an agent action as a background task."""
|
|
|
|
# Dispatch to Celery
|
|
task = run_agent_action.delay(
|
|
agent_id=agent_id,
|
|
project_id=action.project_id,
|
|
action=action.action,
|
|
context=action.context
|
|
)
|
|
|
|
return {
|
|
"task_id": task.id,
|
|
"status": "queued"
|
|
}
|
|
|
|
@router.get("/tasks/{task_id}")
|
|
async def get_task_status(task_id: str):
|
|
"""Get the status of a background task."""
|
|
|
|
result = AsyncResult(task_id)
|
|
|
|
if result.state == "PENDING":
|
|
return {"status": "pending"}
|
|
elif result.state == "RUNNING":
|
|
return {"status": "running", **result.info}
|
|
elif result.state == "PROGRESS":
|
|
return {"status": "progress", **result.info}
|
|
elif result.state == "SUCCESS":
|
|
return {"status": "completed", "result": result.result}
|
|
elif result.state == "FAILURE":
|
|
return {"status": "failed", "error": str(result.result)}
|
|
|
|
return {"status": result.state}
|
|
```
|
|
|
|
### 7. Worker Configuration
|
|
|
|
```bash
|
|
# Run different workers for different queues
|
|
|
|
# Agent worker (single task at a time for LLM rate limiting)
|
|
celery -A app.core.celery worker \
|
|
-Q agent_queue \
|
|
-c 4 \
|
|
--prefetch-multiplier=1 \
|
|
-n agent_worker@%h
|
|
|
|
# Git worker (can handle multiple concurrent tasks)
|
|
celery -A app.core.celery worker \
|
|
-Q git_queue \
|
|
-c 8 \
|
|
--prefetch-multiplier=4 \
|
|
-n git_worker@%h
|
|
|
|
# Sync worker
|
|
celery -A app.core.celery worker \
|
|
-Q sync_queue \
|
|
-c 4 \
|
|
--prefetch-multiplier=4 \
|
|
-n sync_worker@%h
|
|
```
|
|
|
|
### 8. Monitoring with Flower
|
|
|
|
```python
|
|
# docker-compose.yml
|
|
services:
|
|
flower:
|
|
image: mher/flower:latest
|
|
command: celery flower --broker=redis://redis:6379/0
|
|
ports:
|
|
- "5555:5555"
|
|
environment:
|
|
- CELERY_BROKER_URL=redis://redis:6379/0
|
|
- FLOWER_BASIC_AUTH=admin:password
|
|
```
|
|
|
|
### 9. Task Scheduling (Celery Beat)
|
|
|
|
```python
|
|
# app/core/celery.py
|
|
from celery.schedules import crontab
|
|
|
|
celery_app.conf.beat_schedule = {
|
|
# Sync issues every minute
|
|
"sync-external-issues": {
|
|
"task": "app.tasks.sync_tasks.sync_all_issues",
|
|
"schedule": 60.0,
|
|
},
|
|
# Health check every 5 minutes
|
|
"agent-health-check": {
|
|
"task": "app.tasks.agent_tasks.health_check_all_agents",
|
|
"schedule": 300.0,
|
|
},
|
|
# Daily cleanup at midnight
|
|
"cleanup-old-tasks": {
|
|
"task": "app.tasks.maintenance.cleanup_old_tasks",
|
|
"schedule": crontab(hour=0, minute=0),
|
|
},
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
1. **One task per LLM call** - Avoid rate limiting issues
|
|
2. **Progress reporting** - Update state for long-running tasks
|
|
3. **Idempotent tasks** - Handle retries gracefully
|
|
4. **Separate queues** - Isolate slow tasks from fast ones
|
|
5. **Task result expiry** - Set `result_expires` to avoid Redis bloat
|
|
6. **Soft time limits** - Allow graceful shutdown before hard kill
|
|
|
|
## Recommendations
|
|
|
|
1. **Use Celery for all long-running operations**
|
|
- Agent actions
|
|
- Git operations
|
|
- External sync
|
|
- CI/CD triggers
|
|
|
|
2. **Use Redis as both broker and backend**
|
|
- Simplifies infrastructure
|
|
- Fast enough for our scale
|
|
|
|
3. **Configure separate queues**
|
|
- `agent_queue` with prefetch=1
|
|
- `git_queue` with prefetch=4
|
|
- `sync_queue` with prefetch=4
|
|
|
|
4. **Implement proper monitoring**
|
|
- Flower for web UI
|
|
- Prometheus metrics export
|
|
- Dead letter queue for failed tasks
|
|
|
|
## References
|
|
|
|
- [Celery Documentation](https://docs.celeryq.dev/)
|
|
- [FastAPI Background Tasks](https://fastapi.tiangolo.com/tutorial/background-tasks/)
|
|
- [Celery Best Practices](https://docs.celeryq.dev/en/stable/userguide/tasks.html#tips-and-best-practices)
|
|
|
|
## Decision
|
|
|
|
**Adopt Celery + Redis** for all background task processing with queue-based routing and progress reporting via Redis Pub/Sub events.
|
|
|
|
---
|
|
|
|
*Spike completed. Findings will inform ADR-003: Background Task Architecture.*
|