Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
53 KiB
SPIKE-008: Workflow State Machine Architecture
Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #8
Executive Summary
Syndarix requires durable state machine capabilities to orchestrate long-lived workflows spanning hours to days (sprint execution, story implementation, PR review cycles). After evaluating multiple approaches, we recommend a hybrid architecture:
transitionslibrary for state machine logic (lightweight, Pythonic, well-tested)- PostgreSQL for state persistence with event sourcing for audit trail
- Celery for task execution (already planned in SPIKE-004)
- Custom workflow engine built on these primitives
This approach balances simplicity with durability, avoiding the operational complexity of dedicated workflow engines like Temporal while providing the reliability Syndarix requires.
Research Questions & Findings
1. Best Python State Machine Libraries (2024-2025)
| Library | Stars | Last Update | Async | Persistence | Visualization | Best For |
|---|---|---|---|---|---|---|
| transitions | 5.5k+ | Active | Yes | Manual | Graphviz | General FSM |
| python-statemachine | 800+ | Active | Yes | Django mixin | Graphviz | Django projects |
| sismic | 400+ | Active | No | Manual | PlantUML | Complex statecharts |
| automat | 300+ | Mature | No | No | No | Protocol implementations |
Recommendation: transitions - Most mature, flexible, excellent documentation, supports hierarchical states and callbacks.
2. Framework Comparison
transitions (Recommended for State Logic)
Pros:
- Lightweight, no external dependencies
- Hierarchical (nested) states support
- Rich callback system (before/after/on_enter/on_exit)
- Machine factory for persistence
- Graphviz diagram generation
- Active maintenance, well-tested
Cons:
- No built-in persistence (by design - flexible)
- No distributed coordination
- Manual integration with task queues
from transitions import Machine
class StoryWorkflow:
states = ['analysis', 'design', 'implementation', 'review', 'testing', 'done']
def __init__(self, story_id: str):
self.story_id = story_id
self.machine = Machine(model=self, states=self.states, initial='analysis')
# Define transitions
self.machine.add_transition('design_ready', 'analysis', 'design')
self.machine.add_transition('implementation_ready', 'design', 'implementation')
self.machine.add_transition('submit_for_review', 'implementation', 'review')
self.machine.add_transition('request_changes', 'review', 'implementation')
self.machine.add_transition('approve', 'review', 'testing')
self.machine.add_transition('tests_pass', 'testing', 'done')
self.machine.add_transition('tests_fail', 'testing', 'implementation')
Temporal (Considered but Not Recommended)
Pros:
- Durable execution out of the box
- Handles long-running workflows (months/years)
- Built-in retries, timeouts, versioning
- Excellent Python SDK with asyncio integration
- Automatic state persistence
Cons:
- Heavy infrastructure requirement (Temporal Server cluster)
- Vendor lock-in to Temporal's model
- Learning curve for Temporal-specific patterns
- Overkill for Syndarix's scale
- Additional operational burden
When to Choose Temporal:
- Mission-critical financial workflows
- 10,000+ concurrent workflows
- Team with dedicated infrastructure capacity
Prefect (Not Recommended for This Use Case)
Pros:
- Great for ETL/data pipelines
- Nice UI for workflow visualization
- Good scheduling capabilities
Cons:
- Designed for batch data processing, not interactive workflows
- State model doesn't map well to business workflows
- Would require significant adaptation
Custom + Celery (Recommended Approach)
Combine transitions state machine logic with Celery task execution:
- State machine handles workflow logic
- PostgreSQL persists state
- Celery executes tasks
- Redis Pub/Sub broadcasts state changes
3. State Persistence Strategy
Database Schema
# app/models/workflow.py
from enum import Enum
from sqlalchemy import Column, String, Enum as SQLEnum, JSON, ForeignKey, Integer
from sqlalchemy.orm import relationship
from app.models.base import Base, TimestampMixin, UUIDMixin
class WorkflowType(str, Enum):
SPRINT = "sprint"
STORY = "story"
PULL_REQUEST = "pull_request"
AGENT_TASK = "agent_task"
class WorkflowInstance(Base, UUIDMixin, TimestampMixin):
"""Represents a running workflow instance."""
__tablename__ = "workflow_instances"
workflow_type = Column(SQLEnum(WorkflowType), nullable=False, index=True)
current_state = Column(String(100), nullable=False, index=True)
entity_id = Column(String(100), nullable=False, index=True) # story_id, sprint_id, etc.
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
context = Column(JSON, default=dict) # Workflow-specific context
error = Column(String(1000), nullable=True)
retry_count = Column(Integer, default=0)
# Relationships
project = relationship("Project", back_populates="workflows")
transitions = relationship("WorkflowTransition", back_populates="workflow", order_by="WorkflowTransition.created_at")
class WorkflowTransition(Base, UUIDMixin, TimestampMixin):
"""Event sourcing table for workflow state changes."""
__tablename__ = "workflow_transitions"
workflow_id = Column(UUID(as_uuid=True), ForeignKey("workflow_instances.id"), nullable=False, index=True)
from_state = Column(String(100), nullable=False)
to_state = Column(String(100), nullable=False)
trigger = Column(String(100), nullable=False) # The transition name
triggered_by = Column(String(100), nullable=True) # agent_id, user_id, or "system"
metadata = Column(JSON, default=dict) # Additional context
# Relationship
workflow = relationship("WorkflowInstance", back_populates="transitions")
Migration Example
# alembic/versions/xxx_add_workflow_tables.py
def upgrade():
op.create_table(
'workflow_instances',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('workflow_type', sa.Enum('sprint', 'story', 'pull_request', 'agent_task', name='workflowtype'), nullable=False),
sa.Column('current_state', sa.String(100), nullable=False),
sa.Column('entity_id', sa.String(100), nullable=False),
sa.Column('project_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('projects.id'), nullable=False),
sa.Column('context', postgresql.JSON, server_default='{}'),
sa.Column('error', sa.String(1000), nullable=True),
sa.Column('retry_count', sa.Integer, server_default='0'),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
)
op.create_index('ix_workflow_instances_type_state', 'workflow_instances', ['workflow_type', 'current_state'])
op.create_index('ix_workflow_instances_entity', 'workflow_instances', ['entity_id'])
op.create_table(
'workflow_transitions',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('workflow_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('workflow_instances.id'), nullable=False),
sa.Column('from_state', sa.String(100), nullable=False),
sa.Column('to_state', sa.String(100), nullable=False),
sa.Column('trigger', sa.String(100), nullable=False),
sa.Column('triggered_by', sa.String(100), nullable=True),
sa.Column('metadata', postgresql.JSON, server_default='{}'),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
)
op.create_index('ix_workflow_transitions_workflow', 'workflow_transitions', ['workflow_id'])
4. Syndarix Workflow State Machines
Sprint Workflow
┌──────────┐ start ┌───────────┐ complete ┌─────────┐
│ Planning │ ─────────────► │Development│ ──────────────► │ Testing │
└──────────┘ └───────────┘ └─────────┘
│ │ │
│ │ block │ pass
│ ▼ ▼
│ ┌─────────┐ ┌──────────┐
│ │ Blocked │ │ Demo │
│ └─────────┘ └──────────┘
│ │ unblock │
│ ▼ │ accept
│ ┌───────────┐ ▼
│ │Development│ ┌──────────────┐
│ └───────────┘ │Retrospective │
│ └──────────────┘
│ cancel │
▼ │ complete
┌───────────┐ ▼
│ Cancelled │ ┌───────────┐
└───────────┘ │ Completed │
└───────────┘
# app/workflows/sprint_workflow.py
from transitions import Machine
from typing import Optional
from app.models.workflow import WorkflowInstance, WorkflowTransition
class SprintWorkflow:
states = [
'planning',
'development',
'blocked',
'testing',
'demo',
'retrospective',
'completed',
'cancelled'
]
transitions = [
# Normal flow
{'trigger': 'start', 'source': 'planning', 'dest': 'development'},
{'trigger': 'complete_development', 'source': 'development', 'dest': 'testing'},
{'trigger': 'tests_pass', 'source': 'testing', 'dest': 'demo'},
{'trigger': 'demo_accepted', 'source': 'demo', 'dest': 'retrospective'},
{'trigger': 'complete', 'source': 'retrospective', 'dest': 'completed'},
# Blocking
{'trigger': 'block', 'source': 'development', 'dest': 'blocked'},
{'trigger': 'unblock', 'source': 'blocked', 'dest': 'development'},
# Test failures
{'trigger': 'tests_fail', 'source': 'testing', 'dest': 'development'},
# Demo rejection
{'trigger': 'demo_rejected', 'source': 'demo', 'dest': 'development'},
# Cancellation (from any active state)
{'trigger': 'cancel', 'source': ['planning', 'development', 'blocked', 'testing', 'demo'], 'dest': 'cancelled'},
]
def __init__(self, sprint_id: str, project_id: str, initial_state: str = 'planning'):
self.sprint_id = sprint_id
self.project_id = project_id
self.machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial=initial_state,
auto_transitions=False,
send_event=True, # Pass EventData to callbacks
)
# Register callbacks for persistence
self.machine.on_enter_development(self._on_enter_development)
self.machine.on_enter_completed(self._on_enter_completed)
self.machine.on_enter_blocked(self._on_enter_blocked)
def _on_enter_development(self, event):
"""Trigger when entering development state."""
# Could dispatch Celery task to notify agents
pass
def _on_enter_completed(self, event):
"""Trigger when sprint is completed."""
# Generate sprint report, notify stakeholders
pass
def _on_enter_blocked(self, event):
"""Trigger when sprint is blocked."""
# Alert human operator
pass
Story Workflow
┌──────────┐ ready ┌────────┐ ready ┌────────────────┐
│ Analysis │ ────────► │ Design │ ────────► │ Implementation │
└──────────┘ └────────┘ └────────────────┘
│
│ submit
▼
┌───────┐ tests_pass ┌─────────┐ approve ┌────────┐
│ Done │ ◄─────────── │ Testing │ ◄──────── │ Review │
└───────┘ └─────────┘ └────────┘
│ │
│ tests_fail │ request_changes
▼ ▼
┌────────────────┐ ┌────────────────┐
│ Implementation │ │ Implementation │
└────────────────┘ └────────────────┘
# app/workflows/story_workflow.py
from transitions import Machine
class StoryWorkflow:
states = [
{'name': 'backlog', 'on_enter': '_notify_backlog'},
{'name': 'analysis', 'on_enter': '_start_analysis_task'},
{'name': 'design', 'on_enter': '_start_design_task'},
{'name': 'implementation', 'on_enter': '_start_implementation_task'},
{'name': 'review', 'on_enter': '_create_pr'},
{'name': 'testing', 'on_enter': '_run_tests'},
{'name': 'done', 'on_enter': '_notify_completion'},
{'name': 'blocked', 'on_enter': '_escalate_block'},
]
transitions = [
# Happy path
{'trigger': 'start_analysis', 'source': 'backlog', 'dest': 'analysis'},
{'trigger': 'analysis_complete', 'source': 'analysis', 'dest': 'design'},
{'trigger': 'design_complete', 'source': 'design', 'dest': 'implementation'},
{'trigger': 'submit_for_review', 'source': 'implementation', 'dest': 'review'},
{'trigger': 'approve', 'source': 'review', 'dest': 'testing'},
{'trigger': 'tests_pass', 'source': 'testing', 'dest': 'done'},
# Revision loops
{'trigger': 'request_changes', 'source': 'review', 'dest': 'implementation'},
{'trigger': 'tests_fail', 'source': 'testing', 'dest': 'implementation'},
# Blocking (from any active state)
{'trigger': 'block', 'source': ['analysis', 'design', 'implementation', 'review', 'testing'], 'dest': 'blocked'},
{'trigger': 'unblock', 'source': 'blocked', 'dest': 'implementation', 'before': '_restore_previous_state'},
# Skip to done (for trivial stories)
{'trigger': 'skip_to_done', 'source': '*', 'dest': 'done', 'conditions': '_is_trivial'},
]
def __init__(self, story_id: str, project_id: str, initial_state: str = 'backlog'):
self.story_id = story_id
self.project_id = project_id
self.previous_state = None
self.machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial=initial_state,
auto_transitions=False,
)
def _is_trivial(self):
"""Condition: Check if story is marked as trivial."""
return False # Would check story metadata
def _start_analysis_task(self):
"""Dispatch analysis to BA agent via Celery."""
from app.tasks.agent_tasks import run_agent_action
run_agent_action.delay(
agent_type="business_analyst",
project_id=self.project_id,
action="analyze_story",
context={"story_id": self.story_id}
)
def _start_design_task(self):
"""Dispatch design to Architect agent via Celery."""
from app.tasks.agent_tasks import run_agent_action
run_agent_action.delay(
agent_type="architect",
project_id=self.project_id,
action="design_solution",
context={"story_id": self.story_id}
)
def _start_implementation_task(self):
"""Dispatch implementation to Engineer agent via Celery."""
from app.tasks.agent_tasks import run_agent_action
run_agent_action.delay(
agent_type="engineer",
project_id=self.project_id,
action="implement_story",
context={"story_id": self.story_id}
)
def _create_pr(self):
"""Create pull request for review."""
pass
def _run_tests(self):
"""Trigger test suite via Celery."""
from app.tasks.cicd_tasks import run_test_suite
run_test_suite.delay(
project_id=self.project_id,
story_id=self.story_id
)
def _notify_completion(self):
"""Notify stakeholders of story completion."""
pass
def _escalate_block(self):
"""Escalate blocked story to human."""
pass
def _notify_backlog(self):
"""Story added to backlog notification."""
pass
def _restore_previous_state(self):
"""Restore state before block."""
pass
PR Workflow
┌─────────┐ submit ┌────────┐ approve ┌──────────┐
│ Created │ ────────► │ Review │ ─────────► │ Approved │
└─────────┘ └────────┘ └──────────┘
│ │
│ request_changes │ merge
▼ ▼
┌───────────────────┐ ┌────────┐
│ Changes Requested │ │ Merged │
└───────────────────┘ └────────┘
│
│ resubmit
▼
┌────────┐
│ Review │
└────────┘
# app/workflows/pr_workflow.py
from transitions import Machine
class PRWorkflow:
states = ['created', 'review', 'changes_requested', 'approved', 'merged', 'closed']
transitions = [
{'trigger': 'submit_for_review', 'source': 'created', 'dest': 'review'},
{'trigger': 'request_changes', 'source': 'review', 'dest': 'changes_requested'},
{'trigger': 'resubmit', 'source': 'changes_requested', 'dest': 'review'},
{'trigger': 'approve', 'source': 'review', 'dest': 'approved'},
{'trigger': 'merge', 'source': 'approved', 'dest': 'merged'},
{'trigger': 'close', 'source': ['created', 'review', 'changes_requested', 'approved'], 'dest': 'closed'},
]
def __init__(self, pr_id: str, project_id: str, initial_state: str = 'created'):
self.pr_id = pr_id
self.project_id = project_id
self.machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial=initial_state,
)
Agent Task Workflow
┌──────────┐ start ┌─────────────┐ complete ┌───────────┐
│ Assigned │ ───────► │ In Progress │ ──────────► │ Completed │
└──────────┘ └─────────────┘ └───────────┘
│
│ block / fail
▼
┌─────────┐
│ Blocked │
└─────────┘
│
│ retry / escalate
▼
┌─────────────┐ or ┌───────────┐
│ In Progress │ │ Escalated │
└─────────────┘ └───────────┘
# app/workflows/agent_task_workflow.py
from transitions import Machine
class AgentTaskWorkflow:
states = ['assigned', 'in_progress', 'blocked', 'completed', 'failed', 'escalated']
transitions = [
{'trigger': 'start', 'source': 'assigned', 'dest': 'in_progress'},
{'trigger': 'complete', 'source': 'in_progress', 'dest': 'completed'},
{'trigger': 'block', 'source': 'in_progress', 'dest': 'blocked'},
{'trigger': 'fail', 'source': 'in_progress', 'dest': 'failed'},
{'trigger': 'retry', 'source': ['blocked', 'failed'], 'dest': 'in_progress', 'conditions': '_can_retry'},
{'trigger': 'escalate', 'source': ['blocked', 'failed'], 'dest': 'escalated'},
]
def __init__(self, task_id: str, agent_id: str, initial_state: str = 'assigned'):
self.task_id = task_id
self.agent_id = agent_id
self.retry_count = 0
self.max_retries = 3
self.machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial=initial_state,
)
def _can_retry(self):
"""Check if retry is allowed."""
return self.retry_count < self.max_retries
5. Durable Workflow Engine
# app/services/workflow_engine.py
from typing import Type, Optional
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from transitions import Machine
from app.models.workflow import WorkflowInstance, WorkflowTransition, WorkflowType
from app.workflows.sprint_workflow import SprintWorkflow
from app.workflows.story_workflow import StoryWorkflow
from app.workflows.pr_workflow import PRWorkflow
from app.workflows.agent_task_workflow import AgentTaskWorkflow
from app.services.events import EventBus
WORKFLOW_CLASSES = {
WorkflowType.SPRINT: SprintWorkflow,
WorkflowType.STORY: StoryWorkflow,
WorkflowType.PULL_REQUEST: PRWorkflow,
WorkflowType.AGENT_TASK: AgentTaskWorkflow,
}
class WorkflowEngine:
"""
Durable workflow engine that persists state to PostgreSQL.
Usage:
engine = WorkflowEngine(session, event_bus)
# Create new workflow
workflow = await engine.create(
workflow_type=WorkflowType.STORY,
entity_id="story-123",
project_id=project.id
)
# Load existing workflow
workflow = await engine.load(workflow_id)
# Trigger transition
await engine.trigger(workflow_id, "start_analysis", triggered_by="agent-456")
"""
def __init__(self, session: AsyncSession, event_bus: Optional[EventBus] = None):
self.session = session
self.event_bus = event_bus
async def create(
self,
workflow_type: WorkflowType,
entity_id: str,
project_id: UUID,
initial_context: dict = None
) -> WorkflowInstance:
"""Create a new workflow instance."""
workflow_class = WORKFLOW_CLASSES[workflow_type]
initial_state = workflow_class.states[0]
if isinstance(initial_state, dict):
initial_state = initial_state['name']
instance = WorkflowInstance(
workflow_type=workflow_type,
current_state=initial_state,
entity_id=entity_id,
project_id=project_id,
context=initial_context or {}
)
self.session.add(instance)
await self.session.commit()
await self.session.refresh(instance)
# Publish creation event
if self.event_bus:
await self.event_bus.publish(f"project:{project_id}", {
"type": "workflow_created",
"workflow_id": str(instance.id),
"workflow_type": workflow_type.value,
"entity_id": entity_id,
"state": initial_state
})
return instance
async def load(self, workflow_id: UUID) -> Optional[WorkflowInstance]:
"""Load a workflow instance from the database."""
return await self.session.get(WorkflowInstance, workflow_id)
async def get_machine(self, instance: WorkflowInstance) -> Machine:
"""Reconstruct the state machine from persisted instance."""
workflow_class = WORKFLOW_CLASSES[instance.workflow_type]
workflow = workflow_class(
entity_id=instance.entity_id,
project_id=str(instance.project_id),
initial_state=instance.current_state
)
return workflow
async def trigger(
self,
workflow_id: UUID,
trigger: str,
triggered_by: str = "system",
metadata: dict = None
) -> bool:
"""
Trigger a state transition on a workflow.
Returns True if transition succeeded, False if not valid.
"""
instance = await self.load(workflow_id)
if not instance:
raise ValueError(f"Workflow {workflow_id} not found")
workflow = await self.get_machine(instance)
from_state = instance.current_state
# Check if transition is valid
trigger_method = getattr(workflow, trigger, None)
if not trigger_method or not callable(trigger_method):
return False
# Check if transition is allowed from current state
if not workflow.machine.may_trigger(trigger):
return False
# Execute transition
try:
trigger_method()
to_state = workflow.state
except Exception as e:
# Transition failed
instance.error = str(e)
await self.session.commit()
return False
# Persist new state
instance.current_state = to_state
instance.error = None
# Record transition (event sourcing)
transition = WorkflowTransition(
workflow_id=instance.id,
from_state=from_state,
to_state=to_state,
trigger=trigger,
triggered_by=triggered_by,
metadata=metadata or {}
)
self.session.add(transition)
await self.session.commit()
# Publish state change event
if self.event_bus:
await self.event_bus.publish(f"project:{instance.project_id}", {
"type": "workflow_state_changed",
"workflow_id": str(instance.id),
"workflow_type": instance.workflow_type.value,
"entity_id": instance.entity_id,
"from_state": from_state,
"to_state": to_state,
"trigger": trigger,
"triggered_by": triggered_by
})
return True
async def get_history(self, workflow_id: UUID) -> list[WorkflowTransition]:
"""Get full transition history for a workflow."""
instance = await self.load(workflow_id)
if not instance:
raise ValueError(f"Workflow {workflow_id} not found")
await self.session.refresh(instance, ["transitions"])
return instance.transitions
async def get_active_by_type(
self,
project_id: UUID,
workflow_type: WorkflowType
) -> list[WorkflowInstance]:
"""Get all active workflows of a type for a project."""
from sqlalchemy import select
workflow_class = WORKFLOW_CLASSES[workflow_type]
terminal_states = ['completed', 'cancelled', 'merged', 'closed', 'done']
result = await self.session.execute(
select(WorkflowInstance)
.where(WorkflowInstance.project_id == project_id)
.where(WorkflowInstance.workflow_type == workflow_type)
.where(~WorkflowInstance.current_state.in_(terminal_states))
)
return result.scalars().all()
6. Retry and Compensation Patterns
Retry Configuration
# app/workflows/retry_config.py
from dataclasses import dataclass
from typing import Callable, Optional
@dataclass
class RetryPolicy:
"""Configuration for retry behavior."""
max_retries: int = 3
initial_delay: float = 1.0 # seconds
max_delay: float = 300.0 # 5 minutes
exponential_base: float = 2.0
jitter: bool = True
retryable_errors: tuple = (ConnectionError, TimeoutError)
class RetryableWorkflow:
"""Mixin for workflows with retry support."""
retry_policy: RetryPolicy = RetryPolicy()
def calculate_retry_delay(self, attempt: int) -> float:
"""Calculate delay for next retry attempt."""
delay = self.retry_policy.initial_delay * (self.retry_policy.exponential_base ** attempt)
delay = min(delay, self.retry_policy.max_delay)
if self.retry_policy.jitter:
import random
delay = delay * (0.5 + random.random())
return delay
def should_retry(self, error: Exception, attempt: int) -> bool:
"""Determine if error should trigger retry."""
if attempt >= self.retry_policy.max_retries:
return False
return isinstance(error, self.retry_policy.retryable_errors)
Saga Pattern for Compensation
# app/workflows/saga.py
from dataclasses import dataclass
from typing import Callable, Any, Optional
from abc import ABC, abstractmethod
@dataclass
class SagaStep:
"""A single step in a saga with its compensation."""
name: str
action: Callable[..., Any]
compensation: Callable[..., Any]
class Saga:
"""
Implements the Saga pattern for distributed transactions.
If a step fails, compensating actions are executed in reverse order.
"""
def __init__(self, steps: list[SagaStep]):
self.steps = steps
self.completed_steps: list[SagaStep] = []
self.context: dict = {}
async def execute(self, initial_context: dict = None) -> dict:
"""Execute the saga, with automatic compensation on failure."""
self.context = initial_context or {}
for step in self.steps:
try:
result = await step.action(self.context)
self.context.update(result or {})
self.completed_steps.append(step)
except Exception as e:
# Compensate in reverse order
await self._compensate()
raise SagaFailure(
failed_step=step.name,
original_error=e,
compensation_results=self.context.get('_compensation_results', [])
)
return self.context
async def _compensate(self):
"""Execute compensation for all completed steps."""
compensation_results = []
for step in reversed(self.completed_steps):
try:
await step.compensation(self.context)
compensation_results.append({
'step': step.name,
'status': 'compensated'
})
except Exception as e:
compensation_results.append({
'step': step.name,
'status': 'failed',
'error': str(e)
})
self.context['_compensation_results'] = compensation_results
class SagaFailure(Exception):
"""Raised when a saga fails and compensation is executed."""
def __init__(self, failed_step: str, original_error: Exception, compensation_results: list):
self.failed_step = failed_step
self.original_error = original_error
self.compensation_results = compensation_results
super().__init__(f"Saga failed at step '{failed_step}': {original_error}")
# Example: Story implementation saga
async def create_story_implementation_saga(story_id: str, project_id: str) -> Saga:
"""Create saga for implementing a story with compensation."""
steps = [
SagaStep(
name="create_branch",
action=lambda ctx: create_feature_branch(ctx['story_id']),
compensation=lambda ctx: delete_branch(ctx.get('branch_name'))
),
SagaStep(
name="implement_code",
action=lambda ctx: generate_code(ctx['story_id'], ctx['branch_name']),
compensation=lambda ctx: revert_commits(ctx.get('commit_shas', []))
),
SagaStep(
name="run_tests",
action=lambda ctx: run_test_suite(ctx['branch_name']),
compensation=lambda ctx: None # Tests don't need compensation
),
SagaStep(
name="create_pr",
action=lambda ctx: create_pull_request(ctx['branch_name'], ctx['story_id']),
compensation=lambda ctx: close_pull_request(ctx.get('pr_id'))
),
]
return Saga(steps)
7. Celery Integration
# app/tasks/workflow_tasks.py
from celery import Task
from app.core.celery import celery_app
from app.core.database import async_session_maker
from app.services.workflow_engine import WorkflowEngine
from app.services.events import EventBus
from app.models.workflow import WorkflowType
class WorkflowTask(Task):
"""Base task for workflow operations with database session."""
_session = None
_event_bus = None
@property
def session(self):
if self._session is None:
self._session = async_session_maker()
return self._session
@property
def event_bus(self):
if self._event_bus is None:
self._event_bus = EventBus()
return self._event_bus
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""Cleanup after task completion."""
if self._session:
self._session.close()
@celery_app.task(bind=True, base=WorkflowTask)
async def trigger_workflow_transition(
self,
workflow_id: str,
trigger: str,
triggered_by: str = "system",
metadata: dict = None
):
"""
Trigger a workflow transition as a background task.
Used when transitions need to happen asynchronously.
"""
from uuid import UUID
async with self.session as session:
engine = WorkflowEngine(session, self.event_bus)
success = await engine.trigger(
workflow_id=UUID(workflow_id),
trigger=trigger,
triggered_by=triggered_by,
metadata=metadata
)
if not success:
raise ValueError(f"Transition '{trigger}' failed for workflow {workflow_id}")
return {"workflow_id": workflow_id, "new_trigger": trigger, "success": True}
@celery_app.task(bind=True, base=WorkflowTask)
async def process_story_workflow_step(
self,
workflow_id: str,
step: str,
context: dict
):
"""
Process a single step in a story workflow.
This task runs the actual work for each state.
"""
from uuid import UUID
async with self.session as session:
engine = WorkflowEngine(session, self.event_bus)
instance = await engine.load(UUID(workflow_id))
if not instance:
raise ValueError(f"Workflow {workflow_id} not found")
# Execute step-specific logic
if step == "analysis":
await run_analysis(instance.entity_id, context)
await engine.trigger(UUID(workflow_id), "analysis_complete", "agent:ba")
elif step == "design":
await run_design(instance.entity_id, context)
await engine.trigger(UUID(workflow_id), "design_complete", "agent:architect")
elif step == "implementation":
await run_implementation(instance.entity_id, context)
await engine.trigger(UUID(workflow_id), "submit_for_review", "agent:engineer")
return {"workflow_id": workflow_id, "step_completed": step}
@celery_app.task
def check_stalled_workflows():
"""
Periodic task to check for stalled workflows.
Runs via Celery Beat to identify workflows stuck in non-terminal states.
"""
from datetime import datetime, timedelta
from sqlalchemy import select
from app.models.workflow import WorkflowInstance
# Consider workflows stalled if no transition in 1 hour
stale_threshold = datetime.utcnow() - timedelta(hours=1)
# Query for potentially stalled workflows
# (Implementation would check updated_at and escalate)
pass
8. Visualization Approach
Graphviz Diagram Generation
# app/services/workflow_visualizer.py
from transitions.extensions import GraphMachine
from typing import Optional
import io
class WorkflowVisualizer:
"""Generate visual diagrams of workflow state machines."""
@staticmethod
def generate_diagram(
workflow_class,
current_state: Optional[str] = None,
format: str = 'svg'
) -> bytes:
"""
Generate a diagram for a workflow.
Args:
workflow_class: The workflow class to visualize
current_state: Highlight current state (optional)
format: Output format ('svg', 'png', 'pdf')
Returns:
Diagram as bytes
"""
class DiagramModel:
pass
model = DiagramModel()
machine = GraphMachine(
model=model,
states=workflow_class.states,
transitions=workflow_class.transitions,
initial=workflow_class.states[0] if isinstance(workflow_class.states[0], str) else workflow_class.states[0]['name'],
show_conditions=True,
show_state_attributes=True,
title=workflow_class.__name__
)
# Highlight current state
if current_state:
machine.model_graphs[id(model)].custom_styles['node'][current_state] = {
'fillcolor': '#90EE90',
'style': 'filled'
}
# Generate graph
graph = machine.get_graph()
# Render to bytes
output = io.BytesIO()
graph.draw(output, format=format, prog='dot')
return output.getvalue()
@staticmethod
def get_mermaid_definition(workflow_class) -> str:
"""
Generate Mermaid.js state diagram definition.
Useful for embedding in markdown documentation or web UIs.
"""
lines = ["stateDiagram-v2"]
# Get initial state
initial = workflow_class.states[0]
if isinstance(initial, dict):
initial = initial['name']
lines.append(f" [*] --> {initial}")
# Add transitions
for t in workflow_class.transitions:
sources = t['source'] if isinstance(t['source'], list) else [t['source']]
for source in sources:
if source == '*':
continue
lines.append(f" {source} --> {t['dest']}: {t['trigger']}")
# Mark terminal states
terminal_states = ['completed', 'cancelled', 'done', 'merged', 'closed']
for state in workflow_class.states:
state_name = state if isinstance(state, str) else state['name']
if state_name in terminal_states:
lines.append(f" {state_name} --> [*]")
return "\n".join(lines)
# API endpoint for diagram generation
# app/api/v1/workflows.py
from fastapi import APIRouter
from fastapi.responses import Response
router = APIRouter()
@router.get("/workflows/{workflow_type}/diagram")
async def get_workflow_diagram(
workflow_type: WorkflowType,
format: str = "svg",
current_state: Optional[str] = None
):
"""Get visual diagram of a workflow state machine."""
workflow_class = WORKFLOW_CLASSES[workflow_type]
diagram_bytes = WorkflowVisualizer.generate_diagram(
workflow_class,
current_state=current_state,
format=format
)
media_types = {
'svg': 'image/svg+xml',
'png': 'image/png',
'pdf': 'application/pdf'
}
return Response(
content=diagram_bytes,
media_type=media_types.get(format, 'application/octet-stream')
)
@router.get("/workflows/{workflow_type}/mermaid")
async def get_workflow_mermaid(workflow_type: WorkflowType):
"""Get Mermaid.js definition for a workflow."""
workflow_class = WORKFLOW_CLASSES[workflow_type]
mermaid_def = WorkflowVisualizer.get_mermaid_definition(workflow_class)
return {"mermaid": mermaid_def}
Frontend Visualization Component
// frontend/components/WorkflowDiagram.tsx
import React from 'react';
import mermaid from 'mermaid';
interface WorkflowDiagramProps {
workflowType: 'sprint' | 'story' | 'pull_request' | 'agent_task';
currentState?: string;
}
export function WorkflowDiagram({ workflowType, currentState }: WorkflowDiagramProps) {
const [diagram, setDiagram] = React.useState<string>('');
React.useEffect(() => {
async function fetchDiagram() {
const response = await fetch(`/api/v1/workflows/${workflowType}/mermaid`);
const data = await response.json();
// Highlight current state
let mermaidDef = data.mermaid;
if (currentState) {
mermaidDef += `\n style ${currentState} fill:#90EE90`;
}
const { svg } = await mermaid.render('workflow-diagram', mermaidDef);
setDiagram(svg);
}
fetchDiagram();
}, [workflowType, currentState]);
return (
<div
className="workflow-diagram"
dangerouslySetInnerHTML={{ __html: diagram }}
/>
);
}
9. Long-Running Workflow Patterns
Handling Hours/Days Duration
# app/services/long_running_workflow.py
from datetime import datetime, timedelta
from typing import Optional
import asyncio
class LongRunningWorkflowManager:
"""
Manager for workflows that span hours or days.
Key patterns:
1. Checkpoint persistence - Save progress frequently
2. Heartbeat monitoring - Detect stalled workflows
3. Resume capability - Continue from last checkpoint
4. Timeout handling - Auto-escalate on SLA breach
"""
def __init__(self, workflow_engine: WorkflowEngine):
self.engine = workflow_engine
self.sla_configs = {
WorkflowType.SPRINT: timedelta(weeks=2), # 2-week sprint
WorkflowType.STORY: timedelta(days=5), # 5 days for a story
WorkflowType.PULL_REQUEST: timedelta(hours=24), # 24h for PR review
WorkflowType.AGENT_TASK: timedelta(hours=1), # 1h for agent task
}
async def check_sla_breaches(self):
"""Check for workflows that have breached their SLA."""
from sqlalchemy import select
from app.models.workflow import WorkflowInstance
breached = []
for workflow_type, sla in self.sla_configs.items():
threshold = datetime.utcnow() - sla
# Find active workflows created before threshold
# with no recent transitions
result = await self.engine.session.execute(
select(WorkflowInstance)
.where(WorkflowInstance.workflow_type == workflow_type)
.where(WorkflowInstance.created_at < threshold)
.where(~WorkflowInstance.current_state.in_([
'completed', 'cancelled', 'done', 'merged', 'closed'
]))
)
for instance in result.scalars():
breached.append({
'workflow_id': instance.id,
'type': workflow_type.value,
'entity_id': instance.entity_id,
'current_state': instance.current_state,
'age': datetime.utcnow() - instance.created_at,
'sla': sla
})
return breached
async def create_checkpoint(
self,
workflow_id: UUID,
checkpoint_data: dict
):
"""
Save a checkpoint for long-running workflow.
Allows resumption from this point if workflow is interrupted.
"""
instance = await self.engine.load(workflow_id)
if instance:
instance.context = {
**instance.context,
'_checkpoint': {
'data': checkpoint_data,
'timestamp': datetime.utcnow().isoformat()
}
}
await self.engine.session.commit()
async def resume_from_checkpoint(self, workflow_id: UUID) -> Optional[dict]:
"""
Resume a workflow from its last checkpoint.
Returns checkpoint data if available.
"""
instance = await self.engine.load(workflow_id)
if instance and instance.context.get('_checkpoint'):
return instance.context['_checkpoint']['data']
return None
Sprint Workflow with Checkpoints
# app/workflows/sprint_workflow_runner.py
from celery import chain, group
from app.tasks.workflow_tasks import trigger_workflow_transition
class SprintWorkflowRunner:
"""
Orchestrates a full sprint lifecycle.
A sprint runs for ~2 weeks, with daily standups and continuous work.
This runner manages the long-duration process with checkpoints.
"""
def __init__(self, sprint_id: str, project_id: str, workflow_id: str):
self.sprint_id = sprint_id
self.project_id = project_id
self.workflow_id = workflow_id
async def start_sprint(self, stories: list[str]):
"""
Start the sprint with initial stories.
Creates story workflows for each story and begins development.
"""
# Transition sprint to development
await trigger_workflow_transition.delay(
self.workflow_id,
trigger="start",
triggered_by="system",
metadata={"stories": stories}
)
# Create story workflows for each story
story_tasks = []
for story_id in stories:
story_tasks.append(
create_story_workflow.s(story_id, self.project_id)
)
# Execute story creations in parallel
group(story_tasks).apply_async()
async def run_daily_standup(self):
"""
Daily standup checkpoint.
Collects status from all active story workflows.
"""
# Get all active story workflows
active_stories = await self.get_active_stories()
report = {
'date': datetime.utcnow().isoformat(),
'sprint_id': self.sprint_id,
'stories': []
}
for story in active_stories:
report['stories'].append({
'story_id': story.entity_id,
'state': story.current_state,
'blocked': story.current_state == 'blocked'
})
# Save checkpoint
await self.save_checkpoint(report)
return report
async def complete_sprint(self):
"""
Complete the sprint and generate retrospective data.
"""
# Collect all transitions for analysis
history = await self.engine.get_history(UUID(self.workflow_id))
# Calculate metrics
metrics = {
'total_transitions': len(history),
'duration': (datetime.utcnow() - history[0].created_at).days,
'blocks_encountered': sum(1 for t in history if t.to_state == 'blocked'),
}
await trigger_workflow_transition.delay(
self.workflow_id,
trigger="complete",
triggered_by="system",
metadata={"metrics": metrics}
)
Dependencies
Add to pyproject.toml:
[project.dependencies]
transitions = "^0.9.0"
graphviz = "^0.20.1" # For diagram generation
[project.optional-dependencies]
diagrams = [
"pygraphviz>=1.11", # Alternative Graphviz binding
]
Implementation Roadmap
Phase 1: Foundation (Week 1)
- Add
transitionslibrary dependency - Create workflow database models and migrations
- Implement basic
WorkflowEngineclass - Write unit tests for state machines
Phase 2: Core Workflows (Week 2)
- Implement
StoryWorkflowwith all transitions - Implement
SprintWorkflowwith checkpoints - Implement
PRWorkflowfor code review - Integrate with Celery tasks
Phase 3: Durability (Week 3)
- Add retry and compensation patterns
- Implement SLA monitoring
- Add checkpoint/resume capability
- Integrate with EventBus for real-time updates
Phase 4: Visualization (Week 4)
- Add diagram generation endpoints
- Create frontend visualization component
- Add workflow monitoring dashboard
- Documentation and examples
Testing Strategy
# tests/workflows/test_story_workflow.py
import pytest
from app.workflows.story_workflow import StoryWorkflow
class TestStoryWorkflow:
def test_happy_path(self):
"""Test normal story progression."""
workflow = StoryWorkflow("story-1", "project-1")
assert workflow.state == "backlog"
workflow.start_analysis()
assert workflow.state == "analysis"
workflow.analysis_complete()
assert workflow.state == "design"
workflow.design_complete()
assert workflow.state == "implementation"
workflow.submit_for_review()
assert workflow.state == "review"
workflow.approve()
assert workflow.state == "testing"
workflow.tests_pass()
assert workflow.state == "done"
def test_review_rejection(self):
"""Test review rejection loop."""
workflow = StoryWorkflow("story-1", "project-1", initial_state="review")
workflow.request_changes()
assert workflow.state == "implementation"
workflow.submit_for_review()
assert workflow.state == "review"
def test_invalid_transition(self):
"""Test that invalid transitions are rejected."""
workflow = StoryWorkflow("story-1", "project-1")
# Can't go from backlog to review
with pytest.raises(Exception):
workflow.approve()
assert workflow.state == "backlog" # State unchanged
def test_blocking(self):
"""Test blocking from any active state."""
for initial_state in ['analysis', 'design', 'implementation']:
workflow = StoryWorkflow("story-1", "project-1", initial_state=initial_state)
workflow.block()
assert workflow.state == "blocked"
Risks and Mitigations
| Risk | Impact | Likelihood | Mitigation |
|---|---|---|---|
| State corruption on crash | High | Low | Event sourcing allows state reconstruction |
| Long-running task timeout | Medium | Medium | Celery soft limits + checkpointing |
| Race conditions on concurrent transitions | High | Medium | PostgreSQL row-level locking |
| Complex workflow debugging | Medium | High | Comprehensive logging + visualization |
Decision
Adopt transitions library + PostgreSQL persistence for Syndarix workflow state machines.
Rationale:
- Simplicity - No additional infrastructure (vs Temporal)
- Flexibility - Full control over persistence and task execution
- Integration - Natural fit with existing Celery + Redis stack (SPIKE-004)
- Durability - Event sourcing provides audit trail and recovery
- Visualization - Built-in Graphviz support + Mermaid for web
Trade-offs Accepted:
- More custom code vs using Temporal's built-in features
- Manual handling of distributed coordination
- Custom SLA monitoring implementation
References
- transitions Documentation
- Temporal Python SDK
- Managing Long-Running Workflows with Temporal
- Saga Pattern
- Event Sourcing
- Celery Dyrygent - Complex workflow orchestration
- Selinon - Advanced flow management on Celery
- Toptal Celery Orchestration Guide
Spike completed. Findings will inform ADR-008: Workflow State Machine Architecture.