Files
syndarix/docs/spikes/SPIKE-008-workflow-state-machine.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

1514 lines
53 KiB
Markdown

# SPIKE-008: Workflow State Machine Architecture
**Status:** Completed
**Date:** 2025-12-29
**Author:** Architecture Team
**Related Issue:** #8
---
## Executive Summary
Syndarix requires durable state machine capabilities to orchestrate long-lived workflows spanning hours to days (sprint execution, story implementation, PR review cycles). After evaluating multiple approaches, we recommend a **hybrid architecture**:
1. **`transitions` library** for state machine logic (lightweight, Pythonic, well-tested)
2. **PostgreSQL** for state persistence with event sourcing for audit trail
3. **Celery** for task execution (already planned in SPIKE-004)
4. **Custom workflow engine** built on these primitives
This approach balances simplicity with durability, avoiding the operational complexity of dedicated workflow engines like Temporal while providing the reliability Syndarix requires.
---
## Research Questions & Findings
### 1. Best Python State Machine Libraries (2024-2025)
| Library | Stars | Last Update | Async | Persistence | Visualization | Best For |
|---------|-------|-------------|-------|-------------|---------------|----------|
| [transitions](https://github.com/pytransitions/transitions) | 5.5k+ | Active | Yes | Manual | Graphviz | General FSM |
| [python-statemachine](https://github.com/fgmacedo/python-statemachine) | 800+ | Active | Yes | Django mixin | Graphviz | Django projects |
| [sismic](https://github.com/AlexandreDecan/sismic) | 400+ | Active | No | Manual | PlantUML | Complex statecharts |
| [automat](https://github.com/glyph/automat) | 300+ | Mature | No | No | No | Protocol implementations |
**Recommendation:** `transitions` - Most mature, flexible, excellent documentation, supports hierarchical states and callbacks.
### 2. Framework Comparison
#### transitions (Recommended for State Logic)
**Pros:**
- Lightweight, no external dependencies
- Hierarchical (nested) states support
- Rich callback system (before/after/on_enter/on_exit)
- Machine factory for persistence
- Graphviz diagram generation
- Active maintenance, well-tested
**Cons:**
- No built-in persistence (by design - flexible)
- No distributed coordination
- Manual integration with task queues
```python
from transitions import Machine
class StoryWorkflow:
states = ['analysis', 'design', 'implementation', 'review', 'testing', 'done']
def __init__(self, story_id: str):
self.story_id = story_id
self.machine = Machine(model=self, states=self.states, initial='analysis')
# Define transitions
self.machine.add_transition('design_ready', 'analysis', 'design')
self.machine.add_transition('implementation_ready', 'design', 'implementation')
self.machine.add_transition('submit_for_review', 'implementation', 'review')
self.machine.add_transition('request_changes', 'review', 'implementation')
self.machine.add_transition('approve', 'review', 'testing')
self.machine.add_transition('tests_pass', 'testing', 'done')
self.machine.add_transition('tests_fail', 'testing', 'implementation')
```
#### Temporal (Considered but Not Recommended)
**Pros:**
- Durable execution out of the box
- Handles long-running workflows (months/years)
- Built-in retries, timeouts, versioning
- Excellent Python SDK with asyncio integration
- Automatic state persistence
**Cons:**
- **Heavy infrastructure requirement** (Temporal Server cluster)
- Vendor lock-in to Temporal's model
- Learning curve for Temporal-specific patterns
- Overkill for Syndarix's scale
- Additional operational burden
**When to Choose Temporal:**
- Mission-critical financial workflows
- 10,000+ concurrent workflows
- Team with dedicated infrastructure capacity
#### Prefect (Not Recommended for This Use Case)
**Pros:**
- Great for ETL/data pipelines
- Nice UI for workflow visualization
- Good scheduling capabilities
**Cons:**
- Designed for batch data processing, not interactive workflows
- State model doesn't map well to business workflows
- Would require significant adaptation
#### Custom + Celery (Recommended Approach)
Combine `transitions` state machine logic with Celery task execution:
- State machine handles workflow logic
- PostgreSQL persists state
- Celery executes tasks
- Redis Pub/Sub broadcasts state changes
### 3. State Persistence Strategy
#### Database Schema
```python
# app/models/workflow.py
from enum import Enum
from sqlalchemy import Column, String, Enum as SQLEnum, JSON, ForeignKey, Integer
from sqlalchemy.orm import relationship
from app.models.base import Base, TimestampMixin, UUIDMixin
class WorkflowType(str, Enum):
SPRINT = "sprint"
STORY = "story"
PULL_REQUEST = "pull_request"
AGENT_TASK = "agent_task"
class WorkflowInstance(Base, UUIDMixin, TimestampMixin):
"""Represents a running workflow instance."""
__tablename__ = "workflow_instances"
workflow_type = Column(SQLEnum(WorkflowType), nullable=False, index=True)
current_state = Column(String(100), nullable=False, index=True)
entity_id = Column(String(100), nullable=False, index=True) # story_id, sprint_id, etc.
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
context = Column(JSON, default=dict) # Workflow-specific context
error = Column(String(1000), nullable=True)
retry_count = Column(Integer, default=0)
# Relationships
project = relationship("Project", back_populates="workflows")
transitions = relationship("WorkflowTransition", back_populates="workflow", order_by="WorkflowTransition.created_at")
class WorkflowTransition(Base, UUIDMixin, TimestampMixin):
"""Event sourcing table for workflow state changes."""
__tablename__ = "workflow_transitions"
workflow_id = Column(UUID(as_uuid=True), ForeignKey("workflow_instances.id"), nullable=False, index=True)
from_state = Column(String(100), nullable=False)
to_state = Column(String(100), nullable=False)
trigger = Column(String(100), nullable=False) # The transition name
triggered_by = Column(String(100), nullable=True) # agent_id, user_id, or "system"
metadata = Column(JSON, default=dict) # Additional context
# Relationship
workflow = relationship("WorkflowInstance", back_populates="transitions")
```
#### Migration Example
```python
# alembic/versions/xxx_add_workflow_tables.py
def upgrade():
op.create_table(
'workflow_instances',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('workflow_type', sa.Enum('sprint', 'story', 'pull_request', 'agent_task', name='workflowtype'), nullable=False),
sa.Column('current_state', sa.String(100), nullable=False),
sa.Column('entity_id', sa.String(100), nullable=False),
sa.Column('project_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('projects.id'), nullable=False),
sa.Column('context', postgresql.JSON, server_default='{}'),
sa.Column('error', sa.String(1000), nullable=True),
sa.Column('retry_count', sa.Integer, server_default='0'),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
)
op.create_index('ix_workflow_instances_type_state', 'workflow_instances', ['workflow_type', 'current_state'])
op.create_index('ix_workflow_instances_entity', 'workflow_instances', ['entity_id'])
op.create_table(
'workflow_transitions',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('workflow_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('workflow_instances.id'), nullable=False),
sa.Column('from_state', sa.String(100), nullable=False),
sa.Column('to_state', sa.String(100), nullable=False),
sa.Column('trigger', sa.String(100), nullable=False),
sa.Column('triggered_by', sa.String(100), nullable=True),
sa.Column('metadata', postgresql.JSON, server_default='{}'),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
)
op.create_index('ix_workflow_transitions_workflow', 'workflow_transitions', ['workflow_id'])
```
### 4. Syndarix Workflow State Machines
#### Sprint Workflow
```
┌──────────┐ start ┌───────────┐ complete ┌─────────┐
│ Planning │ ─────────────► │Development│ ──────────────► │ Testing │
└──────────┘ └───────────┘ └─────────┘
│ │ │
│ │ block │ pass
│ ▼ ▼
│ ┌─────────┐ ┌──────────┐
│ │ Blocked │ │ Demo │
│ └─────────┘ └──────────┘
│ │ unblock │
│ ▼ │ accept
│ ┌───────────┐ ▼
│ │Development│ ┌──────────────┐
│ └───────────┘ │Retrospective │
│ └──────────────┘
│ cancel │
▼ │ complete
┌───────────┐ ▼
│ Cancelled │ ┌───────────┐
└───────────┘ │ Completed │
└───────────┘
```
```python
# app/workflows/sprint_workflow.py
from transitions import Machine
from typing import Optional
from app.models.workflow import WorkflowInstance, WorkflowTransition
class SprintWorkflow:
states = [
'planning',
'development',
'blocked',
'testing',
'demo',
'retrospective',
'completed',
'cancelled'
]
transitions = [
# Normal flow
{'trigger': 'start', 'source': 'planning', 'dest': 'development'},
{'trigger': 'complete_development', 'source': 'development', 'dest': 'testing'},
{'trigger': 'tests_pass', 'source': 'testing', 'dest': 'demo'},
{'trigger': 'demo_accepted', 'source': 'demo', 'dest': 'retrospective'},
{'trigger': 'complete', 'source': 'retrospective', 'dest': 'completed'},
# Blocking
{'trigger': 'block', 'source': 'development', 'dest': 'blocked'},
{'trigger': 'unblock', 'source': 'blocked', 'dest': 'development'},
# Test failures
{'trigger': 'tests_fail', 'source': 'testing', 'dest': 'development'},
# Demo rejection
{'trigger': 'demo_rejected', 'source': 'demo', 'dest': 'development'},
# Cancellation (from any active state)
{'trigger': 'cancel', 'source': ['planning', 'development', 'blocked', 'testing', 'demo'], 'dest': 'cancelled'},
]
def __init__(self, sprint_id: str, project_id: str, initial_state: str = 'planning'):
self.sprint_id = sprint_id
self.project_id = project_id
self.machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial=initial_state,
auto_transitions=False,
send_event=True, # Pass EventData to callbacks
)
# Register callbacks for persistence
self.machine.on_enter_development(self._on_enter_development)
self.machine.on_enter_completed(self._on_enter_completed)
self.machine.on_enter_blocked(self._on_enter_blocked)
def _on_enter_development(self, event):
"""Trigger when entering development state."""
# Could dispatch Celery task to notify agents
pass
def _on_enter_completed(self, event):
"""Trigger when sprint is completed."""
# Generate sprint report, notify stakeholders
pass
def _on_enter_blocked(self, event):
"""Trigger when sprint is blocked."""
# Alert human operator
pass
```
#### Story Workflow
```
┌──────────┐ ready ┌────────┐ ready ┌────────────────┐
│ Analysis │ ────────► │ Design │ ────────► │ Implementation │
└──────────┘ └────────┘ └────────────────┘
│ submit
┌───────┐ tests_pass ┌─────────┐ approve ┌────────┐
│ Done │ ◄─────────── │ Testing │ ◄──────── │ Review │
└───────┘ └─────────┘ └────────┘
│ │
│ tests_fail │ request_changes
▼ ▼
┌────────────────┐ ┌────────────────┐
│ Implementation │ │ Implementation │
└────────────────┘ └────────────────┘
```
```python
# app/workflows/story_workflow.py
from transitions import Machine
class StoryWorkflow:
states = [
{'name': 'backlog', 'on_enter': '_notify_backlog'},
{'name': 'analysis', 'on_enter': '_start_analysis_task'},
{'name': 'design', 'on_enter': '_start_design_task'},
{'name': 'implementation', 'on_enter': '_start_implementation_task'},
{'name': 'review', 'on_enter': '_create_pr'},
{'name': 'testing', 'on_enter': '_run_tests'},
{'name': 'done', 'on_enter': '_notify_completion'},
{'name': 'blocked', 'on_enter': '_escalate_block'},
]
transitions = [
# Happy path
{'trigger': 'start_analysis', 'source': 'backlog', 'dest': 'analysis'},
{'trigger': 'analysis_complete', 'source': 'analysis', 'dest': 'design'},
{'trigger': 'design_complete', 'source': 'design', 'dest': 'implementation'},
{'trigger': 'submit_for_review', 'source': 'implementation', 'dest': 'review'},
{'trigger': 'approve', 'source': 'review', 'dest': 'testing'},
{'trigger': 'tests_pass', 'source': 'testing', 'dest': 'done'},
# Revision loops
{'trigger': 'request_changes', 'source': 'review', 'dest': 'implementation'},
{'trigger': 'tests_fail', 'source': 'testing', 'dest': 'implementation'},
# Blocking (from any active state)
{'trigger': 'block', 'source': ['analysis', 'design', 'implementation', 'review', 'testing'], 'dest': 'blocked'},
{'trigger': 'unblock', 'source': 'blocked', 'dest': 'implementation', 'before': '_restore_previous_state'},
# Skip to done (for trivial stories)
{'trigger': 'skip_to_done', 'source': '*', 'dest': 'done', 'conditions': '_is_trivial'},
]
def __init__(self, story_id: str, project_id: str, initial_state: str = 'backlog'):
self.story_id = story_id
self.project_id = project_id
self.previous_state = None
self.machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial=initial_state,
auto_transitions=False,
)
def _is_trivial(self):
"""Condition: Check if story is marked as trivial."""
return False # Would check story metadata
def _start_analysis_task(self):
"""Dispatch analysis to BA agent via Celery."""
from app.tasks.agent_tasks import run_agent_action
run_agent_action.delay(
agent_type="business_analyst",
project_id=self.project_id,
action="analyze_story",
context={"story_id": self.story_id}
)
def _start_design_task(self):
"""Dispatch design to Architect agent via Celery."""
from app.tasks.agent_tasks import run_agent_action
run_agent_action.delay(
agent_type="architect",
project_id=self.project_id,
action="design_solution",
context={"story_id": self.story_id}
)
def _start_implementation_task(self):
"""Dispatch implementation to Engineer agent via Celery."""
from app.tasks.agent_tasks import run_agent_action
run_agent_action.delay(
agent_type="engineer",
project_id=self.project_id,
action="implement_story",
context={"story_id": self.story_id}
)
def _create_pr(self):
"""Create pull request for review."""
pass
def _run_tests(self):
"""Trigger test suite via Celery."""
from app.tasks.cicd_tasks import run_test_suite
run_test_suite.delay(
project_id=self.project_id,
story_id=self.story_id
)
def _notify_completion(self):
"""Notify stakeholders of story completion."""
pass
def _escalate_block(self):
"""Escalate blocked story to human."""
pass
def _notify_backlog(self):
"""Story added to backlog notification."""
pass
def _restore_previous_state(self):
"""Restore state before block."""
pass
```
#### PR Workflow
```
┌─────────┐ submit ┌────────┐ approve ┌──────────┐
│ Created │ ────────► │ Review │ ─────────► │ Approved │
└─────────┘ └────────┘ └──────────┘
│ │
│ request_changes │ merge
▼ ▼
┌───────────────────┐ ┌────────┐
│ Changes Requested │ │ Merged │
└───────────────────┘ └────────┘
│ resubmit
┌────────┐
│ Review │
└────────┘
```
```python
# app/workflows/pr_workflow.py
from transitions import Machine
class PRWorkflow:
states = ['created', 'review', 'changes_requested', 'approved', 'merged', 'closed']
transitions = [
{'trigger': 'submit_for_review', 'source': 'created', 'dest': 'review'},
{'trigger': 'request_changes', 'source': 'review', 'dest': 'changes_requested'},
{'trigger': 'resubmit', 'source': 'changes_requested', 'dest': 'review'},
{'trigger': 'approve', 'source': 'review', 'dest': 'approved'},
{'trigger': 'merge', 'source': 'approved', 'dest': 'merged'},
{'trigger': 'close', 'source': ['created', 'review', 'changes_requested', 'approved'], 'dest': 'closed'},
]
def __init__(self, pr_id: str, project_id: str, initial_state: str = 'created'):
self.pr_id = pr_id
self.project_id = project_id
self.machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial=initial_state,
)
```
#### Agent Task Workflow
```
┌──────────┐ start ┌─────────────┐ complete ┌───────────┐
│ Assigned │ ───────► │ In Progress │ ──────────► │ Completed │
└──────────┘ └─────────────┘ └───────────┘
│ block / fail
┌─────────┐
│ Blocked │
└─────────┘
│ retry / escalate
┌─────────────┐ or ┌───────────┐
│ In Progress │ │ Escalated │
└─────────────┘ └───────────┘
```
```python
# app/workflows/agent_task_workflow.py
from transitions import Machine
class AgentTaskWorkflow:
states = ['assigned', 'in_progress', 'blocked', 'completed', 'failed', 'escalated']
transitions = [
{'trigger': 'start', 'source': 'assigned', 'dest': 'in_progress'},
{'trigger': 'complete', 'source': 'in_progress', 'dest': 'completed'},
{'trigger': 'block', 'source': 'in_progress', 'dest': 'blocked'},
{'trigger': 'fail', 'source': 'in_progress', 'dest': 'failed'},
{'trigger': 'retry', 'source': ['blocked', 'failed'], 'dest': 'in_progress', 'conditions': '_can_retry'},
{'trigger': 'escalate', 'source': ['blocked', 'failed'], 'dest': 'escalated'},
]
def __init__(self, task_id: str, agent_id: str, initial_state: str = 'assigned'):
self.task_id = task_id
self.agent_id = agent_id
self.retry_count = 0
self.max_retries = 3
self.machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial=initial_state,
)
def _can_retry(self):
"""Check if retry is allowed."""
return self.retry_count < self.max_retries
```
### 5. Durable Workflow Engine
```python
# app/services/workflow_engine.py
from typing import Type, Optional
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from transitions import Machine
from app.models.workflow import WorkflowInstance, WorkflowTransition, WorkflowType
from app.workflows.sprint_workflow import SprintWorkflow
from app.workflows.story_workflow import StoryWorkflow
from app.workflows.pr_workflow import PRWorkflow
from app.workflows.agent_task_workflow import AgentTaskWorkflow
from app.services.events import EventBus
WORKFLOW_CLASSES = {
WorkflowType.SPRINT: SprintWorkflow,
WorkflowType.STORY: StoryWorkflow,
WorkflowType.PULL_REQUEST: PRWorkflow,
WorkflowType.AGENT_TASK: AgentTaskWorkflow,
}
class WorkflowEngine:
"""
Durable workflow engine that persists state to PostgreSQL.
Usage:
engine = WorkflowEngine(session, event_bus)
# Create new workflow
workflow = await engine.create(
workflow_type=WorkflowType.STORY,
entity_id="story-123",
project_id=project.id
)
# Load existing workflow
workflow = await engine.load(workflow_id)
# Trigger transition
await engine.trigger(workflow_id, "start_analysis", triggered_by="agent-456")
"""
def __init__(self, session: AsyncSession, event_bus: Optional[EventBus] = None):
self.session = session
self.event_bus = event_bus
async def create(
self,
workflow_type: WorkflowType,
entity_id: str,
project_id: UUID,
initial_context: dict = None
) -> WorkflowInstance:
"""Create a new workflow instance."""
workflow_class = WORKFLOW_CLASSES[workflow_type]
initial_state = workflow_class.states[0]
if isinstance(initial_state, dict):
initial_state = initial_state['name']
instance = WorkflowInstance(
workflow_type=workflow_type,
current_state=initial_state,
entity_id=entity_id,
project_id=project_id,
context=initial_context or {}
)
self.session.add(instance)
await self.session.commit()
await self.session.refresh(instance)
# Publish creation event
if self.event_bus:
await self.event_bus.publish(f"project:{project_id}", {
"type": "workflow_created",
"workflow_id": str(instance.id),
"workflow_type": workflow_type.value,
"entity_id": entity_id,
"state": initial_state
})
return instance
async def load(self, workflow_id: UUID) -> Optional[WorkflowInstance]:
"""Load a workflow instance from the database."""
return await self.session.get(WorkflowInstance, workflow_id)
async def get_machine(self, instance: WorkflowInstance) -> Machine:
"""Reconstruct the state machine from persisted instance."""
workflow_class = WORKFLOW_CLASSES[instance.workflow_type]
workflow = workflow_class(
entity_id=instance.entity_id,
project_id=str(instance.project_id),
initial_state=instance.current_state
)
return workflow
async def trigger(
self,
workflow_id: UUID,
trigger: str,
triggered_by: str = "system",
metadata: dict = None
) -> bool:
"""
Trigger a state transition on a workflow.
Returns True if transition succeeded, False if not valid.
"""
instance = await self.load(workflow_id)
if not instance:
raise ValueError(f"Workflow {workflow_id} not found")
workflow = await self.get_machine(instance)
from_state = instance.current_state
# Check if transition is valid
trigger_method = getattr(workflow, trigger, None)
if not trigger_method or not callable(trigger_method):
return False
# Check if transition is allowed from current state
if not workflow.machine.may_trigger(trigger):
return False
# Execute transition
try:
trigger_method()
to_state = workflow.state
except Exception as e:
# Transition failed
instance.error = str(e)
await self.session.commit()
return False
# Persist new state
instance.current_state = to_state
instance.error = None
# Record transition (event sourcing)
transition = WorkflowTransition(
workflow_id=instance.id,
from_state=from_state,
to_state=to_state,
trigger=trigger,
triggered_by=triggered_by,
metadata=metadata or {}
)
self.session.add(transition)
await self.session.commit()
# Publish state change event
if self.event_bus:
await self.event_bus.publish(f"project:{instance.project_id}", {
"type": "workflow_state_changed",
"workflow_id": str(instance.id),
"workflow_type": instance.workflow_type.value,
"entity_id": instance.entity_id,
"from_state": from_state,
"to_state": to_state,
"trigger": trigger,
"triggered_by": triggered_by
})
return True
async def get_history(self, workflow_id: UUID) -> list[WorkflowTransition]:
"""Get full transition history for a workflow."""
instance = await self.load(workflow_id)
if not instance:
raise ValueError(f"Workflow {workflow_id} not found")
await self.session.refresh(instance, ["transitions"])
return instance.transitions
async def get_active_by_type(
self,
project_id: UUID,
workflow_type: WorkflowType
) -> list[WorkflowInstance]:
"""Get all active workflows of a type for a project."""
from sqlalchemy import select
workflow_class = WORKFLOW_CLASSES[workflow_type]
terminal_states = ['completed', 'cancelled', 'merged', 'closed', 'done']
result = await self.session.execute(
select(WorkflowInstance)
.where(WorkflowInstance.project_id == project_id)
.where(WorkflowInstance.workflow_type == workflow_type)
.where(~WorkflowInstance.current_state.in_(terminal_states))
)
return result.scalars().all()
```
### 6. Retry and Compensation Patterns
#### Retry Configuration
```python
# app/workflows/retry_config.py
from dataclasses import dataclass
from typing import Callable, Optional
@dataclass
class RetryPolicy:
"""Configuration for retry behavior."""
max_retries: int = 3
initial_delay: float = 1.0 # seconds
max_delay: float = 300.0 # 5 minutes
exponential_base: float = 2.0
jitter: bool = True
retryable_errors: tuple = (ConnectionError, TimeoutError)
class RetryableWorkflow:
"""Mixin for workflows with retry support."""
retry_policy: RetryPolicy = RetryPolicy()
def calculate_retry_delay(self, attempt: int) -> float:
"""Calculate delay for next retry attempt."""
delay = self.retry_policy.initial_delay * (self.retry_policy.exponential_base ** attempt)
delay = min(delay, self.retry_policy.max_delay)
if self.retry_policy.jitter:
import random
delay = delay * (0.5 + random.random())
return delay
def should_retry(self, error: Exception, attempt: int) -> bool:
"""Determine if error should trigger retry."""
if attempt >= self.retry_policy.max_retries:
return False
return isinstance(error, self.retry_policy.retryable_errors)
```
#### Saga Pattern for Compensation
```python
# app/workflows/saga.py
from dataclasses import dataclass
from typing import Callable, Any, Optional
from abc import ABC, abstractmethod
@dataclass
class SagaStep:
"""A single step in a saga with its compensation."""
name: str
action: Callable[..., Any]
compensation: Callable[..., Any]
class Saga:
"""
Implements the Saga pattern for distributed transactions.
If a step fails, compensating actions are executed in reverse order.
"""
def __init__(self, steps: list[SagaStep]):
self.steps = steps
self.completed_steps: list[SagaStep] = []
self.context: dict = {}
async def execute(self, initial_context: dict = None) -> dict:
"""Execute the saga, with automatic compensation on failure."""
self.context = initial_context or {}
for step in self.steps:
try:
result = await step.action(self.context)
self.context.update(result or {})
self.completed_steps.append(step)
except Exception as e:
# Compensate in reverse order
await self._compensate()
raise SagaFailure(
failed_step=step.name,
original_error=e,
compensation_results=self.context.get('_compensation_results', [])
)
return self.context
async def _compensate(self):
"""Execute compensation for all completed steps."""
compensation_results = []
for step in reversed(self.completed_steps):
try:
await step.compensation(self.context)
compensation_results.append({
'step': step.name,
'status': 'compensated'
})
except Exception as e:
compensation_results.append({
'step': step.name,
'status': 'failed',
'error': str(e)
})
self.context['_compensation_results'] = compensation_results
class SagaFailure(Exception):
"""Raised when a saga fails and compensation is executed."""
def __init__(self, failed_step: str, original_error: Exception, compensation_results: list):
self.failed_step = failed_step
self.original_error = original_error
self.compensation_results = compensation_results
super().__init__(f"Saga failed at step '{failed_step}': {original_error}")
# Example: Story implementation saga
async def create_story_implementation_saga(story_id: str, project_id: str) -> Saga:
"""Create saga for implementing a story with compensation."""
steps = [
SagaStep(
name="create_branch",
action=lambda ctx: create_feature_branch(ctx['story_id']),
compensation=lambda ctx: delete_branch(ctx.get('branch_name'))
),
SagaStep(
name="implement_code",
action=lambda ctx: generate_code(ctx['story_id'], ctx['branch_name']),
compensation=lambda ctx: revert_commits(ctx.get('commit_shas', []))
),
SagaStep(
name="run_tests",
action=lambda ctx: run_test_suite(ctx['branch_name']),
compensation=lambda ctx: None # Tests don't need compensation
),
SagaStep(
name="create_pr",
action=lambda ctx: create_pull_request(ctx['branch_name'], ctx['story_id']),
compensation=lambda ctx: close_pull_request(ctx.get('pr_id'))
),
]
return Saga(steps)
```
### 7. Celery Integration
```python
# app/tasks/workflow_tasks.py
from celery import Task
from app.core.celery import celery_app
from app.core.database import async_session_maker
from app.services.workflow_engine import WorkflowEngine
from app.services.events import EventBus
from app.models.workflow import WorkflowType
class WorkflowTask(Task):
"""Base task for workflow operations with database session."""
_session = None
_event_bus = None
@property
def session(self):
if self._session is None:
self._session = async_session_maker()
return self._session
@property
def event_bus(self):
if self._event_bus is None:
self._event_bus = EventBus()
return self._event_bus
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""Cleanup after task completion."""
if self._session:
self._session.close()
@celery_app.task(bind=True, base=WorkflowTask)
async def trigger_workflow_transition(
self,
workflow_id: str,
trigger: str,
triggered_by: str = "system",
metadata: dict = None
):
"""
Trigger a workflow transition as a background task.
Used when transitions need to happen asynchronously.
"""
from uuid import UUID
async with self.session as session:
engine = WorkflowEngine(session, self.event_bus)
success = await engine.trigger(
workflow_id=UUID(workflow_id),
trigger=trigger,
triggered_by=triggered_by,
metadata=metadata
)
if not success:
raise ValueError(f"Transition '{trigger}' failed for workflow {workflow_id}")
return {"workflow_id": workflow_id, "new_trigger": trigger, "success": True}
@celery_app.task(bind=True, base=WorkflowTask)
async def process_story_workflow_step(
self,
workflow_id: str,
step: str,
context: dict
):
"""
Process a single step in a story workflow.
This task runs the actual work for each state.
"""
from uuid import UUID
async with self.session as session:
engine = WorkflowEngine(session, self.event_bus)
instance = await engine.load(UUID(workflow_id))
if not instance:
raise ValueError(f"Workflow {workflow_id} not found")
# Execute step-specific logic
if step == "analysis":
await run_analysis(instance.entity_id, context)
await engine.trigger(UUID(workflow_id), "analysis_complete", "agent:ba")
elif step == "design":
await run_design(instance.entity_id, context)
await engine.trigger(UUID(workflow_id), "design_complete", "agent:architect")
elif step == "implementation":
await run_implementation(instance.entity_id, context)
await engine.trigger(UUID(workflow_id), "submit_for_review", "agent:engineer")
return {"workflow_id": workflow_id, "step_completed": step}
@celery_app.task
def check_stalled_workflows():
"""
Periodic task to check for stalled workflows.
Runs via Celery Beat to identify workflows stuck in non-terminal states.
"""
from datetime import datetime, timedelta
from sqlalchemy import select
from app.models.workflow import WorkflowInstance
# Consider workflows stalled if no transition in 1 hour
stale_threshold = datetime.utcnow() - timedelta(hours=1)
# Query for potentially stalled workflows
# (Implementation would check updated_at and escalate)
pass
```
### 8. Visualization Approach
#### Graphviz Diagram Generation
```python
# app/services/workflow_visualizer.py
from transitions.extensions import GraphMachine
from typing import Optional
import io
class WorkflowVisualizer:
"""Generate visual diagrams of workflow state machines."""
@staticmethod
def generate_diagram(
workflow_class,
current_state: Optional[str] = None,
format: str = 'svg'
) -> bytes:
"""
Generate a diagram for a workflow.
Args:
workflow_class: The workflow class to visualize
current_state: Highlight current state (optional)
format: Output format ('svg', 'png', 'pdf')
Returns:
Diagram as bytes
"""
class DiagramModel:
pass
model = DiagramModel()
machine = GraphMachine(
model=model,
states=workflow_class.states,
transitions=workflow_class.transitions,
initial=workflow_class.states[0] if isinstance(workflow_class.states[0], str) else workflow_class.states[0]['name'],
show_conditions=True,
show_state_attributes=True,
title=workflow_class.__name__
)
# Highlight current state
if current_state:
machine.model_graphs[id(model)].custom_styles['node'][current_state] = {
'fillcolor': '#90EE90',
'style': 'filled'
}
# Generate graph
graph = machine.get_graph()
# Render to bytes
output = io.BytesIO()
graph.draw(output, format=format, prog='dot')
return output.getvalue()
@staticmethod
def get_mermaid_definition(workflow_class) -> str:
"""
Generate Mermaid.js state diagram definition.
Useful for embedding in markdown documentation or web UIs.
"""
lines = ["stateDiagram-v2"]
# Get initial state
initial = workflow_class.states[0]
if isinstance(initial, dict):
initial = initial['name']
lines.append(f" [*] --> {initial}")
# Add transitions
for t in workflow_class.transitions:
sources = t['source'] if isinstance(t['source'], list) else [t['source']]
for source in sources:
if source == '*':
continue
lines.append(f" {source} --> {t['dest']}: {t['trigger']}")
# Mark terminal states
terminal_states = ['completed', 'cancelled', 'done', 'merged', 'closed']
for state in workflow_class.states:
state_name = state if isinstance(state, str) else state['name']
if state_name in terminal_states:
lines.append(f" {state_name} --> [*]")
return "\n".join(lines)
# API endpoint for diagram generation
# app/api/v1/workflows.py
from fastapi import APIRouter
from fastapi.responses import Response
router = APIRouter()
@router.get("/workflows/{workflow_type}/diagram")
async def get_workflow_diagram(
workflow_type: WorkflowType,
format: str = "svg",
current_state: Optional[str] = None
):
"""Get visual diagram of a workflow state machine."""
workflow_class = WORKFLOW_CLASSES[workflow_type]
diagram_bytes = WorkflowVisualizer.generate_diagram(
workflow_class,
current_state=current_state,
format=format
)
media_types = {
'svg': 'image/svg+xml',
'png': 'image/png',
'pdf': 'application/pdf'
}
return Response(
content=diagram_bytes,
media_type=media_types.get(format, 'application/octet-stream')
)
@router.get("/workflows/{workflow_type}/mermaid")
async def get_workflow_mermaid(workflow_type: WorkflowType):
"""Get Mermaid.js definition for a workflow."""
workflow_class = WORKFLOW_CLASSES[workflow_type]
mermaid_def = WorkflowVisualizer.get_mermaid_definition(workflow_class)
return {"mermaid": mermaid_def}
```
#### Frontend Visualization Component
```typescript
// frontend/components/WorkflowDiagram.tsx
import React from 'react';
import mermaid from 'mermaid';
interface WorkflowDiagramProps {
workflowType: 'sprint' | 'story' | 'pull_request' | 'agent_task';
currentState?: string;
}
export function WorkflowDiagram({ workflowType, currentState }: WorkflowDiagramProps) {
const [diagram, setDiagram] = React.useState<string>('');
React.useEffect(() => {
async function fetchDiagram() {
const response = await fetch(`/api/v1/workflows/${workflowType}/mermaid`);
const data = await response.json();
// Highlight current state
let mermaidDef = data.mermaid;
if (currentState) {
mermaidDef += `\n style ${currentState} fill:#90EE90`;
}
const { svg } = await mermaid.render('workflow-diagram', mermaidDef);
setDiagram(svg);
}
fetchDiagram();
}, [workflowType, currentState]);
return (
<div
className="workflow-diagram"
dangerouslySetInnerHTML={{ __html: diagram }}
/>
);
}
```
### 9. Long-Running Workflow Patterns
#### Handling Hours/Days Duration
```python
# app/services/long_running_workflow.py
from datetime import datetime, timedelta
from typing import Optional
import asyncio
class LongRunningWorkflowManager:
"""
Manager for workflows that span hours or days.
Key patterns:
1. Checkpoint persistence - Save progress frequently
2. Heartbeat monitoring - Detect stalled workflows
3. Resume capability - Continue from last checkpoint
4. Timeout handling - Auto-escalate on SLA breach
"""
def __init__(self, workflow_engine: WorkflowEngine):
self.engine = workflow_engine
self.sla_configs = {
WorkflowType.SPRINT: timedelta(weeks=2), # 2-week sprint
WorkflowType.STORY: timedelta(days=5), # 5 days for a story
WorkflowType.PULL_REQUEST: timedelta(hours=24), # 24h for PR review
WorkflowType.AGENT_TASK: timedelta(hours=1), # 1h for agent task
}
async def check_sla_breaches(self):
"""Check for workflows that have breached their SLA."""
from sqlalchemy import select
from app.models.workflow import WorkflowInstance
breached = []
for workflow_type, sla in self.sla_configs.items():
threshold = datetime.utcnow() - sla
# Find active workflows created before threshold
# with no recent transitions
result = await self.engine.session.execute(
select(WorkflowInstance)
.where(WorkflowInstance.workflow_type == workflow_type)
.where(WorkflowInstance.created_at < threshold)
.where(~WorkflowInstance.current_state.in_([
'completed', 'cancelled', 'done', 'merged', 'closed'
]))
)
for instance in result.scalars():
breached.append({
'workflow_id': instance.id,
'type': workflow_type.value,
'entity_id': instance.entity_id,
'current_state': instance.current_state,
'age': datetime.utcnow() - instance.created_at,
'sla': sla
})
return breached
async def create_checkpoint(
self,
workflow_id: UUID,
checkpoint_data: dict
):
"""
Save a checkpoint for long-running workflow.
Allows resumption from this point if workflow is interrupted.
"""
instance = await self.engine.load(workflow_id)
if instance:
instance.context = {
**instance.context,
'_checkpoint': {
'data': checkpoint_data,
'timestamp': datetime.utcnow().isoformat()
}
}
await self.engine.session.commit()
async def resume_from_checkpoint(self, workflow_id: UUID) -> Optional[dict]:
"""
Resume a workflow from its last checkpoint.
Returns checkpoint data if available.
"""
instance = await self.engine.load(workflow_id)
if instance and instance.context.get('_checkpoint'):
return instance.context['_checkpoint']['data']
return None
```
#### Sprint Workflow with Checkpoints
```python
# app/workflows/sprint_workflow_runner.py
from celery import chain, group
from app.tasks.workflow_tasks import trigger_workflow_transition
class SprintWorkflowRunner:
"""
Orchestrates a full sprint lifecycle.
A sprint runs for ~2 weeks, with daily standups and continuous work.
This runner manages the long-duration process with checkpoints.
"""
def __init__(self, sprint_id: str, project_id: str, workflow_id: str):
self.sprint_id = sprint_id
self.project_id = project_id
self.workflow_id = workflow_id
async def start_sprint(self, stories: list[str]):
"""
Start the sprint with initial stories.
Creates story workflows for each story and begins development.
"""
# Transition sprint to development
await trigger_workflow_transition.delay(
self.workflow_id,
trigger="start",
triggered_by="system",
metadata={"stories": stories}
)
# Create story workflows for each story
story_tasks = []
for story_id in stories:
story_tasks.append(
create_story_workflow.s(story_id, self.project_id)
)
# Execute story creations in parallel
group(story_tasks).apply_async()
async def run_daily_standup(self):
"""
Daily standup checkpoint.
Collects status from all active story workflows.
"""
# Get all active story workflows
active_stories = await self.get_active_stories()
report = {
'date': datetime.utcnow().isoformat(),
'sprint_id': self.sprint_id,
'stories': []
}
for story in active_stories:
report['stories'].append({
'story_id': story.entity_id,
'state': story.current_state,
'blocked': story.current_state == 'blocked'
})
# Save checkpoint
await self.save_checkpoint(report)
return report
async def complete_sprint(self):
"""
Complete the sprint and generate retrospective data.
"""
# Collect all transitions for analysis
history = await self.engine.get_history(UUID(self.workflow_id))
# Calculate metrics
metrics = {
'total_transitions': len(history),
'duration': (datetime.utcnow() - history[0].created_at).days,
'blocks_encountered': sum(1 for t in history if t.to_state == 'blocked'),
}
await trigger_workflow_transition.delay(
self.workflow_id,
trigger="complete",
triggered_by="system",
metadata={"metrics": metrics}
)
```
---
## Dependencies
Add to `pyproject.toml`:
```toml
[project.dependencies]
transitions = "^0.9.0"
graphviz = "^0.20.1" # For diagram generation
[project.optional-dependencies]
diagrams = [
"pygraphviz>=1.11", # Alternative Graphviz binding
]
```
---
## Implementation Roadmap
### Phase 1: Foundation (Week 1)
1. Add `transitions` library dependency
2. Create workflow database models and migrations
3. Implement basic `WorkflowEngine` class
4. Write unit tests for state machines
### Phase 2: Core Workflows (Week 2)
1. Implement `StoryWorkflow` with all transitions
2. Implement `SprintWorkflow` with checkpoints
3. Implement `PRWorkflow` for code review
4. Integrate with Celery tasks
### Phase 3: Durability (Week 3)
1. Add retry and compensation patterns
2. Implement SLA monitoring
3. Add checkpoint/resume capability
4. Integrate with EventBus for real-time updates
### Phase 4: Visualization (Week 4)
1. Add diagram generation endpoints
2. Create frontend visualization component
3. Add workflow monitoring dashboard
4. Documentation and examples
---
## Testing Strategy
```python
# tests/workflows/test_story_workflow.py
import pytest
from app.workflows.story_workflow import StoryWorkflow
class TestStoryWorkflow:
def test_happy_path(self):
"""Test normal story progression."""
workflow = StoryWorkflow("story-1", "project-1")
assert workflow.state == "backlog"
workflow.start_analysis()
assert workflow.state == "analysis"
workflow.analysis_complete()
assert workflow.state == "design"
workflow.design_complete()
assert workflow.state == "implementation"
workflow.submit_for_review()
assert workflow.state == "review"
workflow.approve()
assert workflow.state == "testing"
workflow.tests_pass()
assert workflow.state == "done"
def test_review_rejection(self):
"""Test review rejection loop."""
workflow = StoryWorkflow("story-1", "project-1", initial_state="review")
workflow.request_changes()
assert workflow.state == "implementation"
workflow.submit_for_review()
assert workflow.state == "review"
def test_invalid_transition(self):
"""Test that invalid transitions are rejected."""
workflow = StoryWorkflow("story-1", "project-1")
# Can't go from backlog to review
with pytest.raises(Exception):
workflow.approve()
assert workflow.state == "backlog" # State unchanged
def test_blocking(self):
"""Test blocking from any active state."""
for initial_state in ['analysis', 'design', 'implementation']:
workflow = StoryWorkflow("story-1", "project-1", initial_state=initial_state)
workflow.block()
assert workflow.state == "blocked"
```
---
## Risks and Mitigations
| Risk | Impact | Likelihood | Mitigation |
|------|--------|------------|------------|
| State corruption on crash | High | Low | Event sourcing allows state reconstruction |
| Long-running task timeout | Medium | Medium | Celery soft limits + checkpointing |
| Race conditions on concurrent transitions | High | Medium | PostgreSQL row-level locking |
| Complex workflow debugging | Medium | High | Comprehensive logging + visualization |
---
## Decision
**Adopt `transitions` library + PostgreSQL persistence** for Syndarix workflow state machines.
**Rationale:**
1. **Simplicity** - No additional infrastructure (vs Temporal)
2. **Flexibility** - Full control over persistence and task execution
3. **Integration** - Natural fit with existing Celery + Redis stack (SPIKE-004)
4. **Durability** - Event sourcing provides audit trail and recovery
5. **Visualization** - Built-in Graphviz support + Mermaid for web
**Trade-offs Accepted:**
- More custom code vs using Temporal's built-in features
- Manual handling of distributed coordination
- Custom SLA monitoring implementation
---
## References
- [transitions Documentation](https://github.com/pytransitions/transitions)
- [Temporal Python SDK](https://github.com/temporalio/sdk-python)
- [Managing Long-Running Workflows with Temporal](https://temporal.io/blog/very-long-running-workflows)
- [Saga Pattern](https://microservices.io/patterns/data/saga.html)
- [Event Sourcing](https://martinfowler.com/eaaDev/EventSourcing.html)
- [Celery Dyrygent](https://github.com/ovh/celery-dyrygent) - Complex workflow orchestration
- [Selinon](https://github.com/selinon/selinon) - Advanced flow management on Celery
- [Toptal Celery Orchestration Guide](https://www.toptal.com/python/orchestrating-celery-python-background-jobs)
---
*Spike completed. Findings will inform ADR-008: Workflow State Machine Architecture.*