# SPIKE-008: Workflow State Machine Architecture **Status:** Completed **Date:** 2025-12-29 **Author:** Architecture Team **Related Issue:** #8 --- ## Executive Summary Syndarix requires durable state machine capabilities to orchestrate long-lived workflows spanning hours to days (sprint execution, story implementation, PR review cycles). After evaluating multiple approaches, we recommend a **hybrid architecture**: 1. **`transitions` library** for state machine logic (lightweight, Pythonic, well-tested) 2. **PostgreSQL** for state persistence with event sourcing for audit trail 3. **Celery** for task execution (already planned in SPIKE-004) 4. **Custom workflow engine** built on these primitives This approach balances simplicity with durability, avoiding the operational complexity of dedicated workflow engines like Temporal while providing the reliability Syndarix requires. --- ## Research Questions & Findings ### 1. Best Python State Machine Libraries (2024-2025) | Library | Stars | Last Update | Async | Persistence | Visualization | Best For | |---------|-------|-------------|-------|-------------|---------------|----------| | [transitions](https://github.com/pytransitions/transitions) | 5.5k+ | Active | Yes | Manual | Graphviz | General FSM | | [python-statemachine](https://github.com/fgmacedo/python-statemachine) | 800+ | Active | Yes | Django mixin | Graphviz | Django projects | | [sismic](https://github.com/AlexandreDecan/sismic) | 400+ | Active | No | Manual | PlantUML | Complex statecharts | | [automat](https://github.com/glyph/automat) | 300+ | Mature | No | No | No | Protocol implementations | **Recommendation:** `transitions` - Most mature, flexible, excellent documentation, supports hierarchical states and callbacks. ### 2. Framework Comparison #### transitions (Recommended for State Logic) **Pros:** - Lightweight, no external dependencies - Hierarchical (nested) states support - Rich callback system (before/after/on_enter/on_exit) - Machine factory for persistence - Graphviz diagram generation - Active maintenance, well-tested **Cons:** - No built-in persistence (by design - flexible) - No distributed coordination - Manual integration with task queues ```python from transitions import Machine class StoryWorkflow: states = ['analysis', 'design', 'implementation', 'review', 'testing', 'done'] def __init__(self, story_id: str): self.story_id = story_id self.machine = Machine(model=self, states=self.states, initial='analysis') # Define transitions self.machine.add_transition('design_ready', 'analysis', 'design') self.machine.add_transition('implementation_ready', 'design', 'implementation') self.machine.add_transition('submit_for_review', 'implementation', 'review') self.machine.add_transition('request_changes', 'review', 'implementation') self.machine.add_transition('approve', 'review', 'testing') self.machine.add_transition('tests_pass', 'testing', 'done') self.machine.add_transition('tests_fail', 'testing', 'implementation') ``` #### Temporal (Considered but Not Recommended) **Pros:** - Durable execution out of the box - Handles long-running workflows (months/years) - Built-in retries, timeouts, versioning - Excellent Python SDK with asyncio integration - Automatic state persistence **Cons:** - **Heavy infrastructure requirement** (Temporal Server cluster) - Vendor lock-in to Temporal's model - Learning curve for Temporal-specific patterns - Overkill for Syndarix's scale - Additional operational burden **When to Choose Temporal:** - Mission-critical financial workflows - 10,000+ concurrent workflows - Team with dedicated infrastructure capacity #### Prefect (Not Recommended for This Use Case) **Pros:** - Great for ETL/data pipelines - Nice UI for workflow visualization - Good scheduling capabilities **Cons:** - Designed for batch data processing, not interactive workflows - State model doesn't map well to business workflows - Would require significant adaptation #### Custom + Celery (Recommended Approach) Combine `transitions` state machine logic with Celery task execution: - State machine handles workflow logic - PostgreSQL persists state - Celery executes tasks - Redis Pub/Sub broadcasts state changes ### 3. State Persistence Strategy #### Database Schema ```python # app/models/workflow.py from enum import Enum from sqlalchemy import Column, String, Enum as SQLEnum, JSON, ForeignKey, Integer from sqlalchemy.orm import relationship from app.models.base import Base, TimestampMixin, UUIDMixin class WorkflowType(str, Enum): SPRINT = "sprint" STORY = "story" PULL_REQUEST = "pull_request" AGENT_TASK = "agent_task" class WorkflowInstance(Base, UUIDMixin, TimestampMixin): """Represents a running workflow instance.""" __tablename__ = "workflow_instances" workflow_type = Column(SQLEnum(WorkflowType), nullable=False, index=True) current_state = Column(String(100), nullable=False, index=True) entity_id = Column(String(100), nullable=False, index=True) # story_id, sprint_id, etc. project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False) context = Column(JSON, default=dict) # Workflow-specific context error = Column(String(1000), nullable=True) retry_count = Column(Integer, default=0) # Relationships project = relationship("Project", back_populates="workflows") transitions = relationship("WorkflowTransition", back_populates="workflow", order_by="WorkflowTransition.created_at") class WorkflowTransition(Base, UUIDMixin, TimestampMixin): """Event sourcing table for workflow state changes.""" __tablename__ = "workflow_transitions" workflow_id = Column(UUID(as_uuid=True), ForeignKey("workflow_instances.id"), nullable=False, index=True) from_state = Column(String(100), nullable=False) to_state = Column(String(100), nullable=False) trigger = Column(String(100), nullable=False) # The transition name triggered_by = Column(String(100), nullable=True) # agent_id, user_id, or "system" metadata = Column(JSON, default=dict) # Additional context # Relationship workflow = relationship("WorkflowInstance", back_populates="transitions") ``` #### Migration Example ```python # alembic/versions/xxx_add_workflow_tables.py def upgrade(): op.create_table( 'workflow_instances', sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True), sa.Column('workflow_type', sa.Enum('sprint', 'story', 'pull_request', 'agent_task', name='workflowtype'), nullable=False), sa.Column('current_state', sa.String(100), nullable=False), sa.Column('entity_id', sa.String(100), nullable=False), sa.Column('project_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('projects.id'), nullable=False), sa.Column('context', postgresql.JSON, server_default='{}'), sa.Column('error', sa.String(1000), nullable=True), sa.Column('retry_count', sa.Integer, server_default='0'), sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), ) op.create_index('ix_workflow_instances_type_state', 'workflow_instances', ['workflow_type', 'current_state']) op.create_index('ix_workflow_instances_entity', 'workflow_instances', ['entity_id']) op.create_table( 'workflow_transitions', sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True), sa.Column('workflow_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('workflow_instances.id'), nullable=False), sa.Column('from_state', sa.String(100), nullable=False), sa.Column('to_state', sa.String(100), nullable=False), sa.Column('trigger', sa.String(100), nullable=False), sa.Column('triggered_by', sa.String(100), nullable=True), sa.Column('metadata', postgresql.JSON, server_default='{}'), sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), ) op.create_index('ix_workflow_transitions_workflow', 'workflow_transitions', ['workflow_id']) ``` ### 4. Syndarix Workflow State Machines #### Sprint Workflow ``` ┌──────────┐ start ┌───────────┐ complete ┌─────────┐ │ Planning │ ─────────────► │Development│ ──────────────► │ Testing │ └──────────┘ └───────────┘ └─────────┘ │ │ │ │ │ block │ pass │ ▼ ▼ │ ┌─────────┐ ┌──────────┐ │ │ Blocked │ │ Demo │ │ └─────────┘ └──────────┘ │ │ unblock │ │ ▼ │ accept │ ┌───────────┐ ▼ │ │Development│ ┌──────────────┐ │ └───────────┘ │Retrospective │ │ └──────────────┘ │ cancel │ ▼ │ complete ┌───────────┐ ▼ │ Cancelled │ ┌───────────┐ └───────────┘ │ Completed │ └───────────┘ ``` ```python # app/workflows/sprint_workflow.py from transitions import Machine from typing import Optional from app.models.workflow import WorkflowInstance, WorkflowTransition class SprintWorkflow: states = [ 'planning', 'development', 'blocked', 'testing', 'demo', 'retrospective', 'completed', 'cancelled' ] transitions = [ # Normal flow {'trigger': 'start', 'source': 'planning', 'dest': 'development'}, {'trigger': 'complete_development', 'source': 'development', 'dest': 'testing'}, {'trigger': 'tests_pass', 'source': 'testing', 'dest': 'demo'}, {'trigger': 'demo_accepted', 'source': 'demo', 'dest': 'retrospective'}, {'trigger': 'complete', 'source': 'retrospective', 'dest': 'completed'}, # Blocking {'trigger': 'block', 'source': 'development', 'dest': 'blocked'}, {'trigger': 'unblock', 'source': 'blocked', 'dest': 'development'}, # Test failures {'trigger': 'tests_fail', 'source': 'testing', 'dest': 'development'}, # Demo rejection {'trigger': 'demo_rejected', 'source': 'demo', 'dest': 'development'}, # Cancellation (from any active state) {'trigger': 'cancel', 'source': ['planning', 'development', 'blocked', 'testing', 'demo'], 'dest': 'cancelled'}, ] def __init__(self, sprint_id: str, project_id: str, initial_state: str = 'planning'): self.sprint_id = sprint_id self.project_id = project_id self.machine = Machine( model=self, states=self.states, transitions=self.transitions, initial=initial_state, auto_transitions=False, send_event=True, # Pass EventData to callbacks ) # Register callbacks for persistence self.machine.on_enter_development(self._on_enter_development) self.machine.on_enter_completed(self._on_enter_completed) self.machine.on_enter_blocked(self._on_enter_blocked) def _on_enter_development(self, event): """Trigger when entering development state.""" # Could dispatch Celery task to notify agents pass def _on_enter_completed(self, event): """Trigger when sprint is completed.""" # Generate sprint report, notify stakeholders pass def _on_enter_blocked(self, event): """Trigger when sprint is blocked.""" # Alert human operator pass ``` #### Story Workflow ``` ┌──────────┐ ready ┌────────┐ ready ┌────────────────┐ │ Analysis │ ────────► │ Design │ ────────► │ Implementation │ └──────────┘ └────────┘ └────────────────┘ │ │ submit ▼ ┌───────┐ tests_pass ┌─────────┐ approve ┌────────┐ │ Done │ ◄─────────── │ Testing │ ◄──────── │ Review │ └───────┘ └─────────┘ └────────┘ │ │ │ tests_fail │ request_changes ▼ ▼ ┌────────────────┐ ┌────────────────┐ │ Implementation │ │ Implementation │ └────────────────┘ └────────────────┘ ``` ```python # app/workflows/story_workflow.py from transitions import Machine class StoryWorkflow: states = [ {'name': 'backlog', 'on_enter': '_notify_backlog'}, {'name': 'analysis', 'on_enter': '_start_analysis_task'}, {'name': 'design', 'on_enter': '_start_design_task'}, {'name': 'implementation', 'on_enter': '_start_implementation_task'}, {'name': 'review', 'on_enter': '_create_pr'}, {'name': 'testing', 'on_enter': '_run_tests'}, {'name': 'done', 'on_enter': '_notify_completion'}, {'name': 'blocked', 'on_enter': '_escalate_block'}, ] transitions = [ # Happy path {'trigger': 'start_analysis', 'source': 'backlog', 'dest': 'analysis'}, {'trigger': 'analysis_complete', 'source': 'analysis', 'dest': 'design'}, {'trigger': 'design_complete', 'source': 'design', 'dest': 'implementation'}, {'trigger': 'submit_for_review', 'source': 'implementation', 'dest': 'review'}, {'trigger': 'approve', 'source': 'review', 'dest': 'testing'}, {'trigger': 'tests_pass', 'source': 'testing', 'dest': 'done'}, # Revision loops {'trigger': 'request_changes', 'source': 'review', 'dest': 'implementation'}, {'trigger': 'tests_fail', 'source': 'testing', 'dest': 'implementation'}, # Blocking (from any active state) {'trigger': 'block', 'source': ['analysis', 'design', 'implementation', 'review', 'testing'], 'dest': 'blocked'}, {'trigger': 'unblock', 'source': 'blocked', 'dest': 'implementation', 'before': '_restore_previous_state'}, # Skip to done (for trivial stories) {'trigger': 'skip_to_done', 'source': '*', 'dest': 'done', 'conditions': '_is_trivial'}, ] def __init__(self, story_id: str, project_id: str, initial_state: str = 'backlog'): self.story_id = story_id self.project_id = project_id self.previous_state = None self.machine = Machine( model=self, states=self.states, transitions=self.transitions, initial=initial_state, auto_transitions=False, ) def _is_trivial(self): """Condition: Check if story is marked as trivial.""" return False # Would check story metadata def _start_analysis_task(self): """Dispatch analysis to BA agent via Celery.""" from app.tasks.agent_tasks import run_agent_action run_agent_action.delay( agent_type="business_analyst", project_id=self.project_id, action="analyze_story", context={"story_id": self.story_id} ) def _start_design_task(self): """Dispatch design to Architect agent via Celery.""" from app.tasks.agent_tasks import run_agent_action run_agent_action.delay( agent_type="architect", project_id=self.project_id, action="design_solution", context={"story_id": self.story_id} ) def _start_implementation_task(self): """Dispatch implementation to Engineer agent via Celery.""" from app.tasks.agent_tasks import run_agent_action run_agent_action.delay( agent_type="engineer", project_id=self.project_id, action="implement_story", context={"story_id": self.story_id} ) def _create_pr(self): """Create pull request for review.""" pass def _run_tests(self): """Trigger test suite via Celery.""" from app.tasks.cicd_tasks import run_test_suite run_test_suite.delay( project_id=self.project_id, story_id=self.story_id ) def _notify_completion(self): """Notify stakeholders of story completion.""" pass def _escalate_block(self): """Escalate blocked story to human.""" pass def _notify_backlog(self): """Story added to backlog notification.""" pass def _restore_previous_state(self): """Restore state before block.""" pass ``` #### PR Workflow ``` ┌─────────┐ submit ┌────────┐ approve ┌──────────┐ │ Created │ ────────► │ Review │ ─────────► │ Approved │ └─────────┘ └────────┘ └──────────┘ │ │ │ request_changes │ merge ▼ ▼ ┌───────────────────┐ ┌────────┐ │ Changes Requested │ │ Merged │ └───────────────────┘ └────────┘ │ │ resubmit ▼ ┌────────┐ │ Review │ └────────┘ ``` ```python # app/workflows/pr_workflow.py from transitions import Machine class PRWorkflow: states = ['created', 'review', 'changes_requested', 'approved', 'merged', 'closed'] transitions = [ {'trigger': 'submit_for_review', 'source': 'created', 'dest': 'review'}, {'trigger': 'request_changes', 'source': 'review', 'dest': 'changes_requested'}, {'trigger': 'resubmit', 'source': 'changes_requested', 'dest': 'review'}, {'trigger': 'approve', 'source': 'review', 'dest': 'approved'}, {'trigger': 'merge', 'source': 'approved', 'dest': 'merged'}, {'trigger': 'close', 'source': ['created', 'review', 'changes_requested', 'approved'], 'dest': 'closed'}, ] def __init__(self, pr_id: str, project_id: str, initial_state: str = 'created'): self.pr_id = pr_id self.project_id = project_id self.machine = Machine( model=self, states=self.states, transitions=self.transitions, initial=initial_state, ) ``` #### Agent Task Workflow ``` ┌──────────┐ start ┌─────────────┐ complete ┌───────────┐ │ Assigned │ ───────► │ In Progress │ ──────────► │ Completed │ └──────────┘ └─────────────┘ └───────────┘ │ │ block / fail ▼ ┌─────────┐ │ Blocked │ └─────────┘ │ │ retry / escalate ▼ ┌─────────────┐ or ┌───────────┐ │ In Progress │ │ Escalated │ └─────────────┘ └───────────┘ ``` ```python # app/workflows/agent_task_workflow.py from transitions import Machine class AgentTaskWorkflow: states = ['assigned', 'in_progress', 'blocked', 'completed', 'failed', 'escalated'] transitions = [ {'trigger': 'start', 'source': 'assigned', 'dest': 'in_progress'}, {'trigger': 'complete', 'source': 'in_progress', 'dest': 'completed'}, {'trigger': 'block', 'source': 'in_progress', 'dest': 'blocked'}, {'trigger': 'fail', 'source': 'in_progress', 'dest': 'failed'}, {'trigger': 'retry', 'source': ['blocked', 'failed'], 'dest': 'in_progress', 'conditions': '_can_retry'}, {'trigger': 'escalate', 'source': ['blocked', 'failed'], 'dest': 'escalated'}, ] def __init__(self, task_id: str, agent_id: str, initial_state: str = 'assigned'): self.task_id = task_id self.agent_id = agent_id self.retry_count = 0 self.max_retries = 3 self.machine = Machine( model=self, states=self.states, transitions=self.transitions, initial=initial_state, ) def _can_retry(self): """Check if retry is allowed.""" return self.retry_count < self.max_retries ``` ### 5. Durable Workflow Engine ```python # app/services/workflow_engine.py from typing import Type, Optional from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from transitions import Machine from app.models.workflow import WorkflowInstance, WorkflowTransition, WorkflowType from app.workflows.sprint_workflow import SprintWorkflow from app.workflows.story_workflow import StoryWorkflow from app.workflows.pr_workflow import PRWorkflow from app.workflows.agent_task_workflow import AgentTaskWorkflow from app.services.events import EventBus WORKFLOW_CLASSES = { WorkflowType.SPRINT: SprintWorkflow, WorkflowType.STORY: StoryWorkflow, WorkflowType.PULL_REQUEST: PRWorkflow, WorkflowType.AGENT_TASK: AgentTaskWorkflow, } class WorkflowEngine: """ Durable workflow engine that persists state to PostgreSQL. Usage: engine = WorkflowEngine(session, event_bus) # Create new workflow workflow = await engine.create( workflow_type=WorkflowType.STORY, entity_id="story-123", project_id=project.id ) # Load existing workflow workflow = await engine.load(workflow_id) # Trigger transition await engine.trigger(workflow_id, "start_analysis", triggered_by="agent-456") """ def __init__(self, session: AsyncSession, event_bus: Optional[EventBus] = None): self.session = session self.event_bus = event_bus async def create( self, workflow_type: WorkflowType, entity_id: str, project_id: UUID, initial_context: dict = None ) -> WorkflowInstance: """Create a new workflow instance.""" workflow_class = WORKFLOW_CLASSES[workflow_type] initial_state = workflow_class.states[0] if isinstance(initial_state, dict): initial_state = initial_state['name'] instance = WorkflowInstance( workflow_type=workflow_type, current_state=initial_state, entity_id=entity_id, project_id=project_id, context=initial_context or {} ) self.session.add(instance) await self.session.commit() await self.session.refresh(instance) # Publish creation event if self.event_bus: await self.event_bus.publish(f"project:{project_id}", { "type": "workflow_created", "workflow_id": str(instance.id), "workflow_type": workflow_type.value, "entity_id": entity_id, "state": initial_state }) return instance async def load(self, workflow_id: UUID) -> Optional[WorkflowInstance]: """Load a workflow instance from the database.""" return await self.session.get(WorkflowInstance, workflow_id) async def get_machine(self, instance: WorkflowInstance) -> Machine: """Reconstruct the state machine from persisted instance.""" workflow_class = WORKFLOW_CLASSES[instance.workflow_type] workflow = workflow_class( entity_id=instance.entity_id, project_id=str(instance.project_id), initial_state=instance.current_state ) return workflow async def trigger( self, workflow_id: UUID, trigger: str, triggered_by: str = "system", metadata: dict = None ) -> bool: """ Trigger a state transition on a workflow. Returns True if transition succeeded, False if not valid. """ instance = await self.load(workflow_id) if not instance: raise ValueError(f"Workflow {workflow_id} not found") workflow = await self.get_machine(instance) from_state = instance.current_state # Check if transition is valid trigger_method = getattr(workflow, trigger, None) if not trigger_method or not callable(trigger_method): return False # Check if transition is allowed from current state if not workflow.machine.may_trigger(trigger): return False # Execute transition try: trigger_method() to_state = workflow.state except Exception as e: # Transition failed instance.error = str(e) await self.session.commit() return False # Persist new state instance.current_state = to_state instance.error = None # Record transition (event sourcing) transition = WorkflowTransition( workflow_id=instance.id, from_state=from_state, to_state=to_state, trigger=trigger, triggered_by=triggered_by, metadata=metadata or {} ) self.session.add(transition) await self.session.commit() # Publish state change event if self.event_bus: await self.event_bus.publish(f"project:{instance.project_id}", { "type": "workflow_state_changed", "workflow_id": str(instance.id), "workflow_type": instance.workflow_type.value, "entity_id": instance.entity_id, "from_state": from_state, "to_state": to_state, "trigger": trigger, "triggered_by": triggered_by }) return True async def get_history(self, workflow_id: UUID) -> list[WorkflowTransition]: """Get full transition history for a workflow.""" instance = await self.load(workflow_id) if not instance: raise ValueError(f"Workflow {workflow_id} not found") await self.session.refresh(instance, ["transitions"]) return instance.transitions async def get_active_by_type( self, project_id: UUID, workflow_type: WorkflowType ) -> list[WorkflowInstance]: """Get all active workflows of a type for a project.""" from sqlalchemy import select workflow_class = WORKFLOW_CLASSES[workflow_type] terminal_states = ['completed', 'cancelled', 'merged', 'closed', 'done'] result = await self.session.execute( select(WorkflowInstance) .where(WorkflowInstance.project_id == project_id) .where(WorkflowInstance.workflow_type == workflow_type) .where(~WorkflowInstance.current_state.in_(terminal_states)) ) return result.scalars().all() ``` ### 6. Retry and Compensation Patterns #### Retry Configuration ```python # app/workflows/retry_config.py from dataclasses import dataclass from typing import Callable, Optional @dataclass class RetryPolicy: """Configuration for retry behavior.""" max_retries: int = 3 initial_delay: float = 1.0 # seconds max_delay: float = 300.0 # 5 minutes exponential_base: float = 2.0 jitter: bool = True retryable_errors: tuple = (ConnectionError, TimeoutError) class RetryableWorkflow: """Mixin for workflows with retry support.""" retry_policy: RetryPolicy = RetryPolicy() def calculate_retry_delay(self, attempt: int) -> float: """Calculate delay for next retry attempt.""" delay = self.retry_policy.initial_delay * (self.retry_policy.exponential_base ** attempt) delay = min(delay, self.retry_policy.max_delay) if self.retry_policy.jitter: import random delay = delay * (0.5 + random.random()) return delay def should_retry(self, error: Exception, attempt: int) -> bool: """Determine if error should trigger retry.""" if attempt >= self.retry_policy.max_retries: return False return isinstance(error, self.retry_policy.retryable_errors) ``` #### Saga Pattern for Compensation ```python # app/workflows/saga.py from dataclasses import dataclass from typing import Callable, Any, Optional from abc import ABC, abstractmethod @dataclass class SagaStep: """A single step in a saga with its compensation.""" name: str action: Callable[..., Any] compensation: Callable[..., Any] class Saga: """ Implements the Saga pattern for distributed transactions. If a step fails, compensating actions are executed in reverse order. """ def __init__(self, steps: list[SagaStep]): self.steps = steps self.completed_steps: list[SagaStep] = [] self.context: dict = {} async def execute(self, initial_context: dict = None) -> dict: """Execute the saga, with automatic compensation on failure.""" self.context = initial_context or {} for step in self.steps: try: result = await step.action(self.context) self.context.update(result or {}) self.completed_steps.append(step) except Exception as e: # Compensate in reverse order await self._compensate() raise SagaFailure( failed_step=step.name, original_error=e, compensation_results=self.context.get('_compensation_results', []) ) return self.context async def _compensate(self): """Execute compensation for all completed steps.""" compensation_results = [] for step in reversed(self.completed_steps): try: await step.compensation(self.context) compensation_results.append({ 'step': step.name, 'status': 'compensated' }) except Exception as e: compensation_results.append({ 'step': step.name, 'status': 'failed', 'error': str(e) }) self.context['_compensation_results'] = compensation_results class SagaFailure(Exception): """Raised when a saga fails and compensation is executed.""" def __init__(self, failed_step: str, original_error: Exception, compensation_results: list): self.failed_step = failed_step self.original_error = original_error self.compensation_results = compensation_results super().__init__(f"Saga failed at step '{failed_step}': {original_error}") # Example: Story implementation saga async def create_story_implementation_saga(story_id: str, project_id: str) -> Saga: """Create saga for implementing a story with compensation.""" steps = [ SagaStep( name="create_branch", action=lambda ctx: create_feature_branch(ctx['story_id']), compensation=lambda ctx: delete_branch(ctx.get('branch_name')) ), SagaStep( name="implement_code", action=lambda ctx: generate_code(ctx['story_id'], ctx['branch_name']), compensation=lambda ctx: revert_commits(ctx.get('commit_shas', [])) ), SagaStep( name="run_tests", action=lambda ctx: run_test_suite(ctx['branch_name']), compensation=lambda ctx: None # Tests don't need compensation ), SagaStep( name="create_pr", action=lambda ctx: create_pull_request(ctx['branch_name'], ctx['story_id']), compensation=lambda ctx: close_pull_request(ctx.get('pr_id')) ), ] return Saga(steps) ``` ### 7. Celery Integration ```python # app/tasks/workflow_tasks.py from celery import Task from app.core.celery import celery_app from app.core.database import async_session_maker from app.services.workflow_engine import WorkflowEngine from app.services.events import EventBus from app.models.workflow import WorkflowType class WorkflowTask(Task): """Base task for workflow operations with database session.""" _session = None _event_bus = None @property def session(self): if self._session is None: self._session = async_session_maker() return self._session @property def event_bus(self): if self._event_bus is None: self._event_bus = EventBus() return self._event_bus def after_return(self, status, retval, task_id, args, kwargs, einfo): """Cleanup after task completion.""" if self._session: self._session.close() @celery_app.task(bind=True, base=WorkflowTask) async def trigger_workflow_transition( self, workflow_id: str, trigger: str, triggered_by: str = "system", metadata: dict = None ): """ Trigger a workflow transition as a background task. Used when transitions need to happen asynchronously. """ from uuid import UUID async with self.session as session: engine = WorkflowEngine(session, self.event_bus) success = await engine.trigger( workflow_id=UUID(workflow_id), trigger=trigger, triggered_by=triggered_by, metadata=metadata ) if not success: raise ValueError(f"Transition '{trigger}' failed for workflow {workflow_id}") return {"workflow_id": workflow_id, "new_trigger": trigger, "success": True} @celery_app.task(bind=True, base=WorkflowTask) async def process_story_workflow_step( self, workflow_id: str, step: str, context: dict ): """ Process a single step in a story workflow. This task runs the actual work for each state. """ from uuid import UUID async with self.session as session: engine = WorkflowEngine(session, self.event_bus) instance = await engine.load(UUID(workflow_id)) if not instance: raise ValueError(f"Workflow {workflow_id} not found") # Execute step-specific logic if step == "analysis": await run_analysis(instance.entity_id, context) await engine.trigger(UUID(workflow_id), "analysis_complete", "agent:ba") elif step == "design": await run_design(instance.entity_id, context) await engine.trigger(UUID(workflow_id), "design_complete", "agent:architect") elif step == "implementation": await run_implementation(instance.entity_id, context) await engine.trigger(UUID(workflow_id), "submit_for_review", "agent:engineer") return {"workflow_id": workflow_id, "step_completed": step} @celery_app.task def check_stalled_workflows(): """ Periodic task to check for stalled workflows. Runs via Celery Beat to identify workflows stuck in non-terminal states. """ from datetime import datetime, timedelta from sqlalchemy import select from app.models.workflow import WorkflowInstance # Consider workflows stalled if no transition in 1 hour stale_threshold = datetime.utcnow() - timedelta(hours=1) # Query for potentially stalled workflows # (Implementation would check updated_at and escalate) pass ``` ### 8. Visualization Approach #### Graphviz Diagram Generation ```python # app/services/workflow_visualizer.py from transitions.extensions import GraphMachine from typing import Optional import io class WorkflowVisualizer: """Generate visual diagrams of workflow state machines.""" @staticmethod def generate_diagram( workflow_class, current_state: Optional[str] = None, format: str = 'svg' ) -> bytes: """ Generate a diagram for a workflow. Args: workflow_class: The workflow class to visualize current_state: Highlight current state (optional) format: Output format ('svg', 'png', 'pdf') Returns: Diagram as bytes """ class DiagramModel: pass model = DiagramModel() machine = GraphMachine( model=model, states=workflow_class.states, transitions=workflow_class.transitions, initial=workflow_class.states[0] if isinstance(workflow_class.states[0], str) else workflow_class.states[0]['name'], show_conditions=True, show_state_attributes=True, title=workflow_class.__name__ ) # Highlight current state if current_state: machine.model_graphs[id(model)].custom_styles['node'][current_state] = { 'fillcolor': '#90EE90', 'style': 'filled' } # Generate graph graph = machine.get_graph() # Render to bytes output = io.BytesIO() graph.draw(output, format=format, prog='dot') return output.getvalue() @staticmethod def get_mermaid_definition(workflow_class) -> str: """ Generate Mermaid.js state diagram definition. Useful for embedding in markdown documentation or web UIs. """ lines = ["stateDiagram-v2"] # Get initial state initial = workflow_class.states[0] if isinstance(initial, dict): initial = initial['name'] lines.append(f" [*] --> {initial}") # Add transitions for t in workflow_class.transitions: sources = t['source'] if isinstance(t['source'], list) else [t['source']] for source in sources: if source == '*': continue lines.append(f" {source} --> {t['dest']}: {t['trigger']}") # Mark terminal states terminal_states = ['completed', 'cancelled', 'done', 'merged', 'closed'] for state in workflow_class.states: state_name = state if isinstance(state, str) else state['name'] if state_name in terminal_states: lines.append(f" {state_name} --> [*]") return "\n".join(lines) # API endpoint for diagram generation # app/api/v1/workflows.py from fastapi import APIRouter from fastapi.responses import Response router = APIRouter() @router.get("/workflows/{workflow_type}/diagram") async def get_workflow_diagram( workflow_type: WorkflowType, format: str = "svg", current_state: Optional[str] = None ): """Get visual diagram of a workflow state machine.""" workflow_class = WORKFLOW_CLASSES[workflow_type] diagram_bytes = WorkflowVisualizer.generate_diagram( workflow_class, current_state=current_state, format=format ) media_types = { 'svg': 'image/svg+xml', 'png': 'image/png', 'pdf': 'application/pdf' } return Response( content=diagram_bytes, media_type=media_types.get(format, 'application/octet-stream') ) @router.get("/workflows/{workflow_type}/mermaid") async def get_workflow_mermaid(workflow_type: WorkflowType): """Get Mermaid.js definition for a workflow.""" workflow_class = WORKFLOW_CLASSES[workflow_type] mermaid_def = WorkflowVisualizer.get_mermaid_definition(workflow_class) return {"mermaid": mermaid_def} ``` #### Frontend Visualization Component ```typescript // frontend/components/WorkflowDiagram.tsx import React from 'react'; import mermaid from 'mermaid'; interface WorkflowDiagramProps { workflowType: 'sprint' | 'story' | 'pull_request' | 'agent_task'; currentState?: string; } export function WorkflowDiagram({ workflowType, currentState }: WorkflowDiagramProps) { const [diagram, setDiagram] = React.useState(''); React.useEffect(() => { async function fetchDiagram() { const response = await fetch(`/api/v1/workflows/${workflowType}/mermaid`); const data = await response.json(); // Highlight current state let mermaidDef = data.mermaid; if (currentState) { mermaidDef += `\n style ${currentState} fill:#90EE90`; } const { svg } = await mermaid.render('workflow-diagram', mermaidDef); setDiagram(svg); } fetchDiagram(); }, [workflowType, currentState]); return (
); } ``` ### 9. Long-Running Workflow Patterns #### Handling Hours/Days Duration ```python # app/services/long_running_workflow.py from datetime import datetime, timedelta from typing import Optional import asyncio class LongRunningWorkflowManager: """ Manager for workflows that span hours or days. Key patterns: 1. Checkpoint persistence - Save progress frequently 2. Heartbeat monitoring - Detect stalled workflows 3. Resume capability - Continue from last checkpoint 4. Timeout handling - Auto-escalate on SLA breach """ def __init__(self, workflow_engine: WorkflowEngine): self.engine = workflow_engine self.sla_configs = { WorkflowType.SPRINT: timedelta(weeks=2), # 2-week sprint WorkflowType.STORY: timedelta(days=5), # 5 days for a story WorkflowType.PULL_REQUEST: timedelta(hours=24), # 24h for PR review WorkflowType.AGENT_TASK: timedelta(hours=1), # 1h for agent task } async def check_sla_breaches(self): """Check for workflows that have breached their SLA.""" from sqlalchemy import select from app.models.workflow import WorkflowInstance breached = [] for workflow_type, sla in self.sla_configs.items(): threshold = datetime.utcnow() - sla # Find active workflows created before threshold # with no recent transitions result = await self.engine.session.execute( select(WorkflowInstance) .where(WorkflowInstance.workflow_type == workflow_type) .where(WorkflowInstance.created_at < threshold) .where(~WorkflowInstance.current_state.in_([ 'completed', 'cancelled', 'done', 'merged', 'closed' ])) ) for instance in result.scalars(): breached.append({ 'workflow_id': instance.id, 'type': workflow_type.value, 'entity_id': instance.entity_id, 'current_state': instance.current_state, 'age': datetime.utcnow() - instance.created_at, 'sla': sla }) return breached async def create_checkpoint( self, workflow_id: UUID, checkpoint_data: dict ): """ Save a checkpoint for long-running workflow. Allows resumption from this point if workflow is interrupted. """ instance = await self.engine.load(workflow_id) if instance: instance.context = { **instance.context, '_checkpoint': { 'data': checkpoint_data, 'timestamp': datetime.utcnow().isoformat() } } await self.engine.session.commit() async def resume_from_checkpoint(self, workflow_id: UUID) -> Optional[dict]: """ Resume a workflow from its last checkpoint. Returns checkpoint data if available. """ instance = await self.engine.load(workflow_id) if instance and instance.context.get('_checkpoint'): return instance.context['_checkpoint']['data'] return None ``` #### Sprint Workflow with Checkpoints ```python # app/workflows/sprint_workflow_runner.py from celery import chain, group from app.tasks.workflow_tasks import trigger_workflow_transition class SprintWorkflowRunner: """ Orchestrates a full sprint lifecycle. A sprint runs for ~2 weeks, with daily standups and continuous work. This runner manages the long-duration process with checkpoints. """ def __init__(self, sprint_id: str, project_id: str, workflow_id: str): self.sprint_id = sprint_id self.project_id = project_id self.workflow_id = workflow_id async def start_sprint(self, stories: list[str]): """ Start the sprint with initial stories. Creates story workflows for each story and begins development. """ # Transition sprint to development await trigger_workflow_transition.delay( self.workflow_id, trigger="start", triggered_by="system", metadata={"stories": stories} ) # Create story workflows for each story story_tasks = [] for story_id in stories: story_tasks.append( create_story_workflow.s(story_id, self.project_id) ) # Execute story creations in parallel group(story_tasks).apply_async() async def run_daily_standup(self): """ Daily standup checkpoint. Collects status from all active story workflows. """ # Get all active story workflows active_stories = await self.get_active_stories() report = { 'date': datetime.utcnow().isoformat(), 'sprint_id': self.sprint_id, 'stories': [] } for story in active_stories: report['stories'].append({ 'story_id': story.entity_id, 'state': story.current_state, 'blocked': story.current_state == 'blocked' }) # Save checkpoint await self.save_checkpoint(report) return report async def complete_sprint(self): """ Complete the sprint and generate retrospective data. """ # Collect all transitions for analysis history = await self.engine.get_history(UUID(self.workflow_id)) # Calculate metrics metrics = { 'total_transitions': len(history), 'duration': (datetime.utcnow() - history[0].created_at).days, 'blocks_encountered': sum(1 for t in history if t.to_state == 'blocked'), } await trigger_workflow_transition.delay( self.workflow_id, trigger="complete", triggered_by="system", metadata={"metrics": metrics} ) ``` --- ## Dependencies Add to `pyproject.toml`: ```toml [project.dependencies] transitions = "^0.9.0" graphviz = "^0.20.1" # For diagram generation [project.optional-dependencies] diagrams = [ "pygraphviz>=1.11", # Alternative Graphviz binding ] ``` --- ## Implementation Roadmap ### Phase 1: Foundation (Week 1) 1. Add `transitions` library dependency 2. Create workflow database models and migrations 3. Implement basic `WorkflowEngine` class 4. Write unit tests for state machines ### Phase 2: Core Workflows (Week 2) 1. Implement `StoryWorkflow` with all transitions 2. Implement `SprintWorkflow` with checkpoints 3. Implement `PRWorkflow` for code review 4. Integrate with Celery tasks ### Phase 3: Durability (Week 3) 1. Add retry and compensation patterns 2. Implement SLA monitoring 3. Add checkpoint/resume capability 4. Integrate with EventBus for real-time updates ### Phase 4: Visualization (Week 4) 1. Add diagram generation endpoints 2. Create frontend visualization component 3. Add workflow monitoring dashboard 4. Documentation and examples --- ## Testing Strategy ```python # tests/workflows/test_story_workflow.py import pytest from app.workflows.story_workflow import StoryWorkflow class TestStoryWorkflow: def test_happy_path(self): """Test normal story progression.""" workflow = StoryWorkflow("story-1", "project-1") assert workflow.state == "backlog" workflow.start_analysis() assert workflow.state == "analysis" workflow.analysis_complete() assert workflow.state == "design" workflow.design_complete() assert workflow.state == "implementation" workflow.submit_for_review() assert workflow.state == "review" workflow.approve() assert workflow.state == "testing" workflow.tests_pass() assert workflow.state == "done" def test_review_rejection(self): """Test review rejection loop.""" workflow = StoryWorkflow("story-1", "project-1", initial_state="review") workflow.request_changes() assert workflow.state == "implementation" workflow.submit_for_review() assert workflow.state == "review" def test_invalid_transition(self): """Test that invalid transitions are rejected.""" workflow = StoryWorkflow("story-1", "project-1") # Can't go from backlog to review with pytest.raises(Exception): workflow.approve() assert workflow.state == "backlog" # State unchanged def test_blocking(self): """Test blocking from any active state.""" for initial_state in ['analysis', 'design', 'implementation']: workflow = StoryWorkflow("story-1", "project-1", initial_state=initial_state) workflow.block() assert workflow.state == "blocked" ``` --- ## Risks and Mitigations | Risk | Impact | Likelihood | Mitigation | |------|--------|------------|------------| | State corruption on crash | High | Low | Event sourcing allows state reconstruction | | Long-running task timeout | Medium | Medium | Celery soft limits + checkpointing | | Race conditions on concurrent transitions | High | Medium | PostgreSQL row-level locking | | Complex workflow debugging | Medium | High | Comprehensive logging + visualization | --- ## Decision **Adopt `transitions` library + PostgreSQL persistence** for Syndarix workflow state machines. **Rationale:** 1. **Simplicity** - No additional infrastructure (vs Temporal) 2. **Flexibility** - Full control over persistence and task execution 3. **Integration** - Natural fit with existing Celery + Redis stack (SPIKE-004) 4. **Durability** - Event sourcing provides audit trail and recovery 5. **Visualization** - Built-in Graphviz support + Mermaid for web **Trade-offs Accepted:** - More custom code vs using Temporal's built-in features - Manual handling of distributed coordination - Custom SLA monitoring implementation --- ## References - [transitions Documentation](https://github.com/pytransitions/transitions) - [Temporal Python SDK](https://github.com/temporalio/sdk-python) - [Managing Long-Running Workflows with Temporal](https://temporal.io/blog/very-long-running-workflows) - [Saga Pattern](https://microservices.io/patterns/data/saga.html) - [Event Sourcing](https://martinfowler.com/eaaDev/EventSourcing.html) - [Celery Dyrygent](https://github.com/ovh/celery-dyrygent) - Complex workflow orchestration - [Selinon](https://github.com/selinon/selinon) - Advanced flow management on Celery - [Toptal Celery Orchestration Guide](https://www.toptal.com/python/orchestrating-celery-python-background-jobs) --- *Spike completed. Findings will inform ADR-008: Workflow State Machine Architecture.*