Files
syndarix/docs/spikes/SPIKE-008-workflow-state-machine.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

53 KiB

SPIKE-008: Workflow State Machine Architecture

Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #8


Executive Summary

Syndarix requires durable state machine capabilities to orchestrate long-lived workflows spanning hours to days (sprint execution, story implementation, PR review cycles). After evaluating multiple approaches, we recommend a hybrid architecture:

  1. transitions library for state machine logic (lightweight, Pythonic, well-tested)
  2. PostgreSQL for state persistence with event sourcing for audit trail
  3. Celery for task execution (already planned in SPIKE-004)
  4. Custom workflow engine built on these primitives

This approach balances simplicity with durability, avoiding the operational complexity of dedicated workflow engines like Temporal while providing the reliability Syndarix requires.


Research Questions & Findings

1. Best Python State Machine Libraries (2024-2025)

Library Stars Last Update Async Persistence Visualization Best For
transitions 5.5k+ Active Yes Manual Graphviz General FSM
python-statemachine 800+ Active Yes Django mixin Graphviz Django projects
sismic 400+ Active No Manual PlantUML Complex statecharts
automat 300+ Mature No No No Protocol implementations

Recommendation: transitions - Most mature, flexible, excellent documentation, supports hierarchical states and callbacks.

2. Framework Comparison

Pros:

  • Lightweight, no external dependencies
  • Hierarchical (nested) states support
  • Rich callback system (before/after/on_enter/on_exit)
  • Machine factory for persistence
  • Graphviz diagram generation
  • Active maintenance, well-tested

Cons:

  • No built-in persistence (by design - flexible)
  • No distributed coordination
  • Manual integration with task queues
from transitions import Machine

class StoryWorkflow:
    states = ['analysis', 'design', 'implementation', 'review', 'testing', 'done']

    def __init__(self, story_id: str):
        self.story_id = story_id
        self.machine = Machine(model=self, states=self.states, initial='analysis')

        # Define transitions
        self.machine.add_transition('design_ready', 'analysis', 'design')
        self.machine.add_transition('implementation_ready', 'design', 'implementation')
        self.machine.add_transition('submit_for_review', 'implementation', 'review')
        self.machine.add_transition('request_changes', 'review', 'implementation')
        self.machine.add_transition('approve', 'review', 'testing')
        self.machine.add_transition('tests_pass', 'testing', 'done')
        self.machine.add_transition('tests_fail', 'testing', 'implementation')

Pros:

  • Durable execution out of the box
  • Handles long-running workflows (months/years)
  • Built-in retries, timeouts, versioning
  • Excellent Python SDK with asyncio integration
  • Automatic state persistence

Cons:

  • Heavy infrastructure requirement (Temporal Server cluster)
  • Vendor lock-in to Temporal's model
  • Learning curve for Temporal-specific patterns
  • Overkill for Syndarix's scale
  • Additional operational burden

When to Choose Temporal:

  • Mission-critical financial workflows
  • 10,000+ concurrent workflows
  • Team with dedicated infrastructure capacity

Pros:

  • Great for ETL/data pipelines
  • Nice UI for workflow visualization
  • Good scheduling capabilities

Cons:

  • Designed for batch data processing, not interactive workflows
  • State model doesn't map well to business workflows
  • Would require significant adaptation

Combine transitions state machine logic with Celery task execution:

  • State machine handles workflow logic
  • PostgreSQL persists state
  • Celery executes tasks
  • Redis Pub/Sub broadcasts state changes

3. State Persistence Strategy

Database Schema

# app/models/workflow.py
from enum import Enum
from sqlalchemy import Column, String, Enum as SQLEnum, JSON, ForeignKey, Integer
from sqlalchemy.orm import relationship
from app.models.base import Base, TimestampMixin, UUIDMixin

class WorkflowType(str, Enum):
    SPRINT = "sprint"
    STORY = "story"
    PULL_REQUEST = "pull_request"
    AGENT_TASK = "agent_task"

class WorkflowInstance(Base, UUIDMixin, TimestampMixin):
    """Represents a running workflow instance."""
    __tablename__ = "workflow_instances"

    workflow_type = Column(SQLEnum(WorkflowType), nullable=False, index=True)
    current_state = Column(String(100), nullable=False, index=True)
    entity_id = Column(String(100), nullable=False, index=True)  # story_id, sprint_id, etc.
    project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
    context = Column(JSON, default=dict)  # Workflow-specific context
    error = Column(String(1000), nullable=True)
    retry_count = Column(Integer, default=0)

    # Relationships
    project = relationship("Project", back_populates="workflows")
    transitions = relationship("WorkflowTransition", back_populates="workflow", order_by="WorkflowTransition.created_at")

class WorkflowTransition(Base, UUIDMixin, TimestampMixin):
    """Event sourcing table for workflow state changes."""
    __tablename__ = "workflow_transitions"

    workflow_id = Column(UUID(as_uuid=True), ForeignKey("workflow_instances.id"), nullable=False, index=True)
    from_state = Column(String(100), nullable=False)
    to_state = Column(String(100), nullable=False)
    trigger = Column(String(100), nullable=False)  # The transition name
    triggered_by = Column(String(100), nullable=True)  # agent_id, user_id, or "system"
    metadata = Column(JSON, default=dict)  # Additional context

    # Relationship
    workflow = relationship("WorkflowInstance", back_populates="transitions")

Migration Example

# alembic/versions/xxx_add_workflow_tables.py
def upgrade():
    op.create_table(
        'workflow_instances',
        sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
        sa.Column('workflow_type', sa.Enum('sprint', 'story', 'pull_request', 'agent_task', name='workflowtype'), nullable=False),
        sa.Column('current_state', sa.String(100), nullable=False),
        sa.Column('entity_id', sa.String(100), nullable=False),
        sa.Column('project_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('projects.id'), nullable=False),
        sa.Column('context', postgresql.JSON, server_default='{}'),
        sa.Column('error', sa.String(1000), nullable=True),
        sa.Column('retry_count', sa.Integer, server_default='0'),
        sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
        sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
    )
    op.create_index('ix_workflow_instances_type_state', 'workflow_instances', ['workflow_type', 'current_state'])
    op.create_index('ix_workflow_instances_entity', 'workflow_instances', ['entity_id'])

    op.create_table(
        'workflow_transitions',
        sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
        sa.Column('workflow_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('workflow_instances.id'), nullable=False),
        sa.Column('from_state', sa.String(100), nullable=False),
        sa.Column('to_state', sa.String(100), nullable=False),
        sa.Column('trigger', sa.String(100), nullable=False),
        sa.Column('triggered_by', sa.String(100), nullable=True),
        sa.Column('metadata', postgresql.JSON, server_default='{}'),
        sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
        sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
    )
    op.create_index('ix_workflow_transitions_workflow', 'workflow_transitions', ['workflow_id'])

4. Syndarix Workflow State Machines

Sprint Workflow

┌──────────┐     start      ┌───────────┐    complete     ┌─────────┐
│ Planning │ ─────────────► │Development│ ──────────────► │ Testing │
└──────────┘                └───────────┘                 └─────────┘
     │                           │                             │
     │                           │ block                       │ pass
     │                           ▼                             ▼
     │                      ┌─────────┐                  ┌──────────┐
     │                      │ Blocked │                  │   Demo   │
     │                      └─────────┘                  └──────────┘
     │                           │ unblock                     │
     │                           ▼                             │ accept
     │                      ┌───────────┐                      ▼
     │                      │Development│               ┌──────────────┐
     │                      └───────────┘               │Retrospective │
     │                                                  └──────────────┘
     │ cancel                                                  │
     ▼                                                         │ complete
┌───────────┐                                                  ▼
│ Cancelled │                                           ┌───────────┐
└───────────┘                                           │ Completed │
                                                        └───────────┘
# app/workflows/sprint_workflow.py
from transitions import Machine
from typing import Optional
from app.models.workflow import WorkflowInstance, WorkflowTransition

class SprintWorkflow:
    states = [
        'planning',
        'development',
        'blocked',
        'testing',
        'demo',
        'retrospective',
        'completed',
        'cancelled'
    ]

    transitions = [
        # Normal flow
        {'trigger': 'start', 'source': 'planning', 'dest': 'development'},
        {'trigger': 'complete_development', 'source': 'development', 'dest': 'testing'},
        {'trigger': 'tests_pass', 'source': 'testing', 'dest': 'demo'},
        {'trigger': 'demo_accepted', 'source': 'demo', 'dest': 'retrospective'},
        {'trigger': 'complete', 'source': 'retrospective', 'dest': 'completed'},

        # Blocking
        {'trigger': 'block', 'source': 'development', 'dest': 'blocked'},
        {'trigger': 'unblock', 'source': 'blocked', 'dest': 'development'},

        # Test failures
        {'trigger': 'tests_fail', 'source': 'testing', 'dest': 'development'},

        # Demo rejection
        {'trigger': 'demo_rejected', 'source': 'demo', 'dest': 'development'},

        # Cancellation (from any active state)
        {'trigger': 'cancel', 'source': ['planning', 'development', 'blocked', 'testing', 'demo'], 'dest': 'cancelled'},
    ]

    def __init__(self, sprint_id: str, project_id: str, initial_state: str = 'planning'):
        self.sprint_id = sprint_id
        self.project_id = project_id
        self.machine = Machine(
            model=self,
            states=self.states,
            transitions=self.transitions,
            initial=initial_state,
            auto_transitions=False,
            send_event=True,  # Pass EventData to callbacks
        )

        # Register callbacks for persistence
        self.machine.on_enter_development(self._on_enter_development)
        self.machine.on_enter_completed(self._on_enter_completed)
        self.machine.on_enter_blocked(self._on_enter_blocked)

    def _on_enter_development(self, event):
        """Trigger when entering development state."""
        # Could dispatch Celery task to notify agents
        pass

    def _on_enter_completed(self, event):
        """Trigger when sprint is completed."""
        # Generate sprint report, notify stakeholders
        pass

    def _on_enter_blocked(self, event):
        """Trigger when sprint is blocked."""
        # Alert human operator
        pass

Story Workflow

┌──────────┐   ready   ┌────────┐   ready   ┌────────────────┐
│ Analysis │ ────────► │ Design │ ────────► │ Implementation │
└──────────┘           └────────┘           └────────────────┘
                                                    │
                                                    │ submit
                                                    ▼
┌───────┐  tests_pass  ┌─────────┐  approve  ┌────────┐
│ Done  │ ◄─────────── │ Testing │ ◄──────── │ Review │
└───────┘              └─────────┘           └────────┘
                            │                    │
                            │ tests_fail         │ request_changes
                            ▼                    ▼
                       ┌────────────────┐   ┌────────────────┐
                       │ Implementation │   │ Implementation │
                       └────────────────┘   └────────────────┘
# app/workflows/story_workflow.py
from transitions import Machine

class StoryWorkflow:
    states = [
        {'name': 'backlog', 'on_enter': '_notify_backlog'},
        {'name': 'analysis', 'on_enter': '_start_analysis_task'},
        {'name': 'design', 'on_enter': '_start_design_task'},
        {'name': 'implementation', 'on_enter': '_start_implementation_task'},
        {'name': 'review', 'on_enter': '_create_pr'},
        {'name': 'testing', 'on_enter': '_run_tests'},
        {'name': 'done', 'on_enter': '_notify_completion'},
        {'name': 'blocked', 'on_enter': '_escalate_block'},
    ]

    transitions = [
        # Happy path
        {'trigger': 'start_analysis', 'source': 'backlog', 'dest': 'analysis'},
        {'trigger': 'analysis_complete', 'source': 'analysis', 'dest': 'design'},
        {'trigger': 'design_complete', 'source': 'design', 'dest': 'implementation'},
        {'trigger': 'submit_for_review', 'source': 'implementation', 'dest': 'review'},
        {'trigger': 'approve', 'source': 'review', 'dest': 'testing'},
        {'trigger': 'tests_pass', 'source': 'testing', 'dest': 'done'},

        # Revision loops
        {'trigger': 'request_changes', 'source': 'review', 'dest': 'implementation'},
        {'trigger': 'tests_fail', 'source': 'testing', 'dest': 'implementation'},

        # Blocking (from any active state)
        {'trigger': 'block', 'source': ['analysis', 'design', 'implementation', 'review', 'testing'], 'dest': 'blocked'},
        {'trigger': 'unblock', 'source': 'blocked', 'dest': 'implementation', 'before': '_restore_previous_state'},

        # Skip to done (for trivial stories)
        {'trigger': 'skip_to_done', 'source': '*', 'dest': 'done', 'conditions': '_is_trivial'},
    ]

    def __init__(self, story_id: str, project_id: str, initial_state: str = 'backlog'):
        self.story_id = story_id
        self.project_id = project_id
        self.previous_state = None

        self.machine = Machine(
            model=self,
            states=self.states,
            transitions=self.transitions,
            initial=initial_state,
            auto_transitions=False,
        )

    def _is_trivial(self):
        """Condition: Check if story is marked as trivial."""
        return False  # Would check story metadata

    def _start_analysis_task(self):
        """Dispatch analysis to BA agent via Celery."""
        from app.tasks.agent_tasks import run_agent_action
        run_agent_action.delay(
            agent_type="business_analyst",
            project_id=self.project_id,
            action="analyze_story",
            context={"story_id": self.story_id}
        )

    def _start_design_task(self):
        """Dispatch design to Architect agent via Celery."""
        from app.tasks.agent_tasks import run_agent_action
        run_agent_action.delay(
            agent_type="architect",
            project_id=self.project_id,
            action="design_solution",
            context={"story_id": self.story_id}
        )

    def _start_implementation_task(self):
        """Dispatch implementation to Engineer agent via Celery."""
        from app.tasks.agent_tasks import run_agent_action
        run_agent_action.delay(
            agent_type="engineer",
            project_id=self.project_id,
            action="implement_story",
            context={"story_id": self.story_id}
        )

    def _create_pr(self):
        """Create pull request for review."""
        pass

    def _run_tests(self):
        """Trigger test suite via Celery."""
        from app.tasks.cicd_tasks import run_test_suite
        run_test_suite.delay(
            project_id=self.project_id,
            story_id=self.story_id
        )

    def _notify_completion(self):
        """Notify stakeholders of story completion."""
        pass

    def _escalate_block(self):
        """Escalate blocked story to human."""
        pass

    def _notify_backlog(self):
        """Story added to backlog notification."""
        pass

    def _restore_previous_state(self):
        """Restore state before block."""
        pass

PR Workflow

┌─────────┐  submit   ┌────────┐  approve   ┌──────────┐
│ Created │ ────────► │ Review │ ─────────► │ Approved │
└─────────┘           └────────┘            └──────────┘
                          │                      │
                          │ request_changes      │ merge
                          ▼                      ▼
                     ┌───────────────────┐  ┌────────┐
                     │ Changes Requested │  │ Merged │
                     └───────────────────┘  └────────┘
                          │
                          │ resubmit
                          ▼
                     ┌────────┐
                     │ Review │
                     └────────┘
# app/workflows/pr_workflow.py
from transitions import Machine

class PRWorkflow:
    states = ['created', 'review', 'changes_requested', 'approved', 'merged', 'closed']

    transitions = [
        {'trigger': 'submit_for_review', 'source': 'created', 'dest': 'review'},
        {'trigger': 'request_changes', 'source': 'review', 'dest': 'changes_requested'},
        {'trigger': 'resubmit', 'source': 'changes_requested', 'dest': 'review'},
        {'trigger': 'approve', 'source': 'review', 'dest': 'approved'},
        {'trigger': 'merge', 'source': 'approved', 'dest': 'merged'},
        {'trigger': 'close', 'source': ['created', 'review', 'changes_requested', 'approved'], 'dest': 'closed'},
    ]

    def __init__(self, pr_id: str, project_id: str, initial_state: str = 'created'):
        self.pr_id = pr_id
        self.project_id = project_id
        self.machine = Machine(
            model=self,
            states=self.states,
            transitions=self.transitions,
            initial=initial_state,
        )

Agent Task Workflow

┌──────────┐  start   ┌─────────────┐  complete   ┌───────────┐
│ Assigned │ ───────► │ In Progress │ ──────────► │ Completed │
└──────────┘          └─────────────┘             └───────────┘
                            │
                            │ block / fail
                            ▼
                       ┌─────────┐
                       │ Blocked │
                       └─────────┘
                            │
                            │ retry / escalate
                            ▼
                      ┌─────────────┐ or ┌───────────┐
                      │ In Progress │    │ Escalated │
                      └─────────────┘    └───────────┘
# app/workflows/agent_task_workflow.py
from transitions import Machine

class AgentTaskWorkflow:
    states = ['assigned', 'in_progress', 'blocked', 'completed', 'failed', 'escalated']

    transitions = [
        {'trigger': 'start', 'source': 'assigned', 'dest': 'in_progress'},
        {'trigger': 'complete', 'source': 'in_progress', 'dest': 'completed'},
        {'trigger': 'block', 'source': 'in_progress', 'dest': 'blocked'},
        {'trigger': 'fail', 'source': 'in_progress', 'dest': 'failed'},
        {'trigger': 'retry', 'source': ['blocked', 'failed'], 'dest': 'in_progress', 'conditions': '_can_retry'},
        {'trigger': 'escalate', 'source': ['blocked', 'failed'], 'dest': 'escalated'},
    ]

    def __init__(self, task_id: str, agent_id: str, initial_state: str = 'assigned'):
        self.task_id = task_id
        self.agent_id = agent_id
        self.retry_count = 0
        self.max_retries = 3

        self.machine = Machine(
            model=self,
            states=self.states,
            transitions=self.transitions,
            initial=initial_state,
        )

    def _can_retry(self):
        """Check if retry is allowed."""
        return self.retry_count < self.max_retries

5. Durable Workflow Engine

# app/services/workflow_engine.py
from typing import Type, Optional
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from transitions import Machine

from app.models.workflow import WorkflowInstance, WorkflowTransition, WorkflowType
from app.workflows.sprint_workflow import SprintWorkflow
from app.workflows.story_workflow import StoryWorkflow
from app.workflows.pr_workflow import PRWorkflow
from app.workflows.agent_task_workflow import AgentTaskWorkflow
from app.services.events import EventBus

WORKFLOW_CLASSES = {
    WorkflowType.SPRINT: SprintWorkflow,
    WorkflowType.STORY: StoryWorkflow,
    WorkflowType.PULL_REQUEST: PRWorkflow,
    WorkflowType.AGENT_TASK: AgentTaskWorkflow,
}

class WorkflowEngine:
    """
    Durable workflow engine that persists state to PostgreSQL.

    Usage:
        engine = WorkflowEngine(session, event_bus)

        # Create new workflow
        workflow = await engine.create(
            workflow_type=WorkflowType.STORY,
            entity_id="story-123",
            project_id=project.id
        )

        # Load existing workflow
        workflow = await engine.load(workflow_id)

        # Trigger transition
        await engine.trigger(workflow_id, "start_analysis", triggered_by="agent-456")
    """

    def __init__(self, session: AsyncSession, event_bus: Optional[EventBus] = None):
        self.session = session
        self.event_bus = event_bus

    async def create(
        self,
        workflow_type: WorkflowType,
        entity_id: str,
        project_id: UUID,
        initial_context: dict = None
    ) -> WorkflowInstance:
        """Create a new workflow instance."""

        workflow_class = WORKFLOW_CLASSES[workflow_type]
        initial_state = workflow_class.states[0]
        if isinstance(initial_state, dict):
            initial_state = initial_state['name']

        instance = WorkflowInstance(
            workflow_type=workflow_type,
            current_state=initial_state,
            entity_id=entity_id,
            project_id=project_id,
            context=initial_context or {}
        )

        self.session.add(instance)
        await self.session.commit()
        await self.session.refresh(instance)

        # Publish creation event
        if self.event_bus:
            await self.event_bus.publish(f"project:{project_id}", {
                "type": "workflow_created",
                "workflow_id": str(instance.id),
                "workflow_type": workflow_type.value,
                "entity_id": entity_id,
                "state": initial_state
            })

        return instance

    async def load(self, workflow_id: UUID) -> Optional[WorkflowInstance]:
        """Load a workflow instance from the database."""
        return await self.session.get(WorkflowInstance, workflow_id)

    async def get_machine(self, instance: WorkflowInstance) -> Machine:
        """Reconstruct the state machine from persisted instance."""
        workflow_class = WORKFLOW_CLASSES[instance.workflow_type]
        workflow = workflow_class(
            entity_id=instance.entity_id,
            project_id=str(instance.project_id),
            initial_state=instance.current_state
        )
        return workflow

    async def trigger(
        self,
        workflow_id: UUID,
        trigger: str,
        triggered_by: str = "system",
        metadata: dict = None
    ) -> bool:
        """
        Trigger a state transition on a workflow.

        Returns True if transition succeeded, False if not valid.
        """
        instance = await self.load(workflow_id)
        if not instance:
            raise ValueError(f"Workflow {workflow_id} not found")

        workflow = await self.get_machine(instance)
        from_state = instance.current_state

        # Check if transition is valid
        trigger_method = getattr(workflow, trigger, None)
        if not trigger_method or not callable(trigger_method):
            return False

        # Check if transition is allowed from current state
        if not workflow.machine.may_trigger(trigger):
            return False

        # Execute transition
        try:
            trigger_method()
            to_state = workflow.state
        except Exception as e:
            # Transition failed
            instance.error = str(e)
            await self.session.commit()
            return False

        # Persist new state
        instance.current_state = to_state
        instance.error = None

        # Record transition (event sourcing)
        transition = WorkflowTransition(
            workflow_id=instance.id,
            from_state=from_state,
            to_state=to_state,
            trigger=trigger,
            triggered_by=triggered_by,
            metadata=metadata or {}
        )
        self.session.add(transition)
        await self.session.commit()

        # Publish state change event
        if self.event_bus:
            await self.event_bus.publish(f"project:{instance.project_id}", {
                "type": "workflow_state_changed",
                "workflow_id": str(instance.id),
                "workflow_type": instance.workflow_type.value,
                "entity_id": instance.entity_id,
                "from_state": from_state,
                "to_state": to_state,
                "trigger": trigger,
                "triggered_by": triggered_by
            })

        return True

    async def get_history(self, workflow_id: UUID) -> list[WorkflowTransition]:
        """Get full transition history for a workflow."""
        instance = await self.load(workflow_id)
        if not instance:
            raise ValueError(f"Workflow {workflow_id} not found")

        await self.session.refresh(instance, ["transitions"])
        return instance.transitions

    async def get_active_by_type(
        self,
        project_id: UUID,
        workflow_type: WorkflowType
    ) -> list[WorkflowInstance]:
        """Get all active workflows of a type for a project."""
        from sqlalchemy import select

        workflow_class = WORKFLOW_CLASSES[workflow_type]
        terminal_states = ['completed', 'cancelled', 'merged', 'closed', 'done']

        result = await self.session.execute(
            select(WorkflowInstance)
            .where(WorkflowInstance.project_id == project_id)
            .where(WorkflowInstance.workflow_type == workflow_type)
            .where(~WorkflowInstance.current_state.in_(terminal_states))
        )
        return result.scalars().all()

6. Retry and Compensation Patterns

Retry Configuration

# app/workflows/retry_config.py
from dataclasses import dataclass
from typing import Callable, Optional

@dataclass
class RetryPolicy:
    """Configuration for retry behavior."""
    max_retries: int = 3
    initial_delay: float = 1.0  # seconds
    max_delay: float = 300.0  # 5 minutes
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_errors: tuple = (ConnectionError, TimeoutError)

class RetryableWorkflow:
    """Mixin for workflows with retry support."""

    retry_policy: RetryPolicy = RetryPolicy()

    def calculate_retry_delay(self, attempt: int) -> float:
        """Calculate delay for next retry attempt."""
        delay = self.retry_policy.initial_delay * (self.retry_policy.exponential_base ** attempt)
        delay = min(delay, self.retry_policy.max_delay)

        if self.retry_policy.jitter:
            import random
            delay = delay * (0.5 + random.random())

        return delay

    def should_retry(self, error: Exception, attempt: int) -> bool:
        """Determine if error should trigger retry."""
        if attempt >= self.retry_policy.max_retries:
            return False
        return isinstance(error, self.retry_policy.retryable_errors)

Saga Pattern for Compensation

# app/workflows/saga.py
from dataclasses import dataclass
from typing import Callable, Any, Optional
from abc import ABC, abstractmethod

@dataclass
class SagaStep:
    """A single step in a saga with its compensation."""
    name: str
    action: Callable[..., Any]
    compensation: Callable[..., Any]

class Saga:
    """
    Implements the Saga pattern for distributed transactions.

    If a step fails, compensating actions are executed in reverse order.
    """

    def __init__(self, steps: list[SagaStep]):
        self.steps = steps
        self.completed_steps: list[SagaStep] = []
        self.context: dict = {}

    async def execute(self, initial_context: dict = None) -> dict:
        """Execute the saga, with automatic compensation on failure."""
        self.context = initial_context or {}

        for step in self.steps:
            try:
                result = await step.action(self.context)
                self.context.update(result or {})
                self.completed_steps.append(step)
            except Exception as e:
                # Compensate in reverse order
                await self._compensate()
                raise SagaFailure(
                    failed_step=step.name,
                    original_error=e,
                    compensation_results=self.context.get('_compensation_results', [])
                )

        return self.context

    async def _compensate(self):
        """Execute compensation for all completed steps."""
        compensation_results = []

        for step in reversed(self.completed_steps):
            try:
                await step.compensation(self.context)
                compensation_results.append({
                    'step': step.name,
                    'status': 'compensated'
                })
            except Exception as e:
                compensation_results.append({
                    'step': step.name,
                    'status': 'failed',
                    'error': str(e)
                })

        self.context['_compensation_results'] = compensation_results

class SagaFailure(Exception):
    """Raised when a saga fails and compensation is executed."""

    def __init__(self, failed_step: str, original_error: Exception, compensation_results: list):
        self.failed_step = failed_step
        self.original_error = original_error
        self.compensation_results = compensation_results
        super().__init__(f"Saga failed at step '{failed_step}': {original_error}")

# Example: Story implementation saga
async def create_story_implementation_saga(story_id: str, project_id: str) -> Saga:
    """Create saga for implementing a story with compensation."""

    steps = [
        SagaStep(
            name="create_branch",
            action=lambda ctx: create_feature_branch(ctx['story_id']),
            compensation=lambda ctx: delete_branch(ctx.get('branch_name'))
        ),
        SagaStep(
            name="implement_code",
            action=lambda ctx: generate_code(ctx['story_id'], ctx['branch_name']),
            compensation=lambda ctx: revert_commits(ctx.get('commit_shas', []))
        ),
        SagaStep(
            name="run_tests",
            action=lambda ctx: run_test_suite(ctx['branch_name']),
            compensation=lambda ctx: None  # Tests don't need compensation
        ),
        SagaStep(
            name="create_pr",
            action=lambda ctx: create_pull_request(ctx['branch_name'], ctx['story_id']),
            compensation=lambda ctx: close_pull_request(ctx.get('pr_id'))
        ),
    ]

    return Saga(steps)

7. Celery Integration

# app/tasks/workflow_tasks.py
from celery import Task
from app.core.celery import celery_app
from app.core.database import async_session_maker
from app.services.workflow_engine import WorkflowEngine
from app.services.events import EventBus
from app.models.workflow import WorkflowType

class WorkflowTask(Task):
    """Base task for workflow operations with database session."""

    _session = None
    _event_bus = None

    @property
    def session(self):
        if self._session is None:
            self._session = async_session_maker()
        return self._session

    @property
    def event_bus(self):
        if self._event_bus is None:
            self._event_bus = EventBus()
        return self._event_bus

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        """Cleanup after task completion."""
        if self._session:
            self._session.close()

@celery_app.task(bind=True, base=WorkflowTask)
async def trigger_workflow_transition(
    self,
    workflow_id: str,
    trigger: str,
    triggered_by: str = "system",
    metadata: dict = None
):
    """
    Trigger a workflow transition as a background task.

    Used when transitions need to happen asynchronously.
    """
    from uuid import UUID

    async with self.session as session:
        engine = WorkflowEngine(session, self.event_bus)
        success = await engine.trigger(
            workflow_id=UUID(workflow_id),
            trigger=trigger,
            triggered_by=triggered_by,
            metadata=metadata
        )

        if not success:
            raise ValueError(f"Transition '{trigger}' failed for workflow {workflow_id}")

        return {"workflow_id": workflow_id, "new_trigger": trigger, "success": True}

@celery_app.task(bind=True, base=WorkflowTask)
async def process_story_workflow_step(
    self,
    workflow_id: str,
    step: str,
    context: dict
):
    """
    Process a single step in a story workflow.

    This task runs the actual work for each state.
    """
    from uuid import UUID

    async with self.session as session:
        engine = WorkflowEngine(session, self.event_bus)
        instance = await engine.load(UUID(workflow_id))

        if not instance:
            raise ValueError(f"Workflow {workflow_id} not found")

        # Execute step-specific logic
        if step == "analysis":
            await run_analysis(instance.entity_id, context)
            await engine.trigger(UUID(workflow_id), "analysis_complete", "agent:ba")

        elif step == "design":
            await run_design(instance.entity_id, context)
            await engine.trigger(UUID(workflow_id), "design_complete", "agent:architect")

        elif step == "implementation":
            await run_implementation(instance.entity_id, context)
            await engine.trigger(UUID(workflow_id), "submit_for_review", "agent:engineer")

        return {"workflow_id": workflow_id, "step_completed": step}

@celery_app.task
def check_stalled_workflows():
    """
    Periodic task to check for stalled workflows.

    Runs via Celery Beat to identify workflows stuck in non-terminal states.
    """
    from datetime import datetime, timedelta
    from sqlalchemy import select
    from app.models.workflow import WorkflowInstance

    # Consider workflows stalled if no transition in 1 hour
    stale_threshold = datetime.utcnow() - timedelta(hours=1)

    # Query for potentially stalled workflows
    # (Implementation would check updated_at and escalate)
    pass

8. Visualization Approach

Graphviz Diagram Generation

# app/services/workflow_visualizer.py
from transitions.extensions import GraphMachine
from typing import Optional
import io

class WorkflowVisualizer:
    """Generate visual diagrams of workflow state machines."""

    @staticmethod
    def generate_diagram(
        workflow_class,
        current_state: Optional[str] = None,
        format: str = 'svg'
    ) -> bytes:
        """
        Generate a diagram for a workflow.

        Args:
            workflow_class: The workflow class to visualize
            current_state: Highlight current state (optional)
            format: Output format ('svg', 'png', 'pdf')

        Returns:
            Diagram as bytes
        """
        class DiagramModel:
            pass

        model = DiagramModel()
        machine = GraphMachine(
            model=model,
            states=workflow_class.states,
            transitions=workflow_class.transitions,
            initial=workflow_class.states[0] if isinstance(workflow_class.states[0], str) else workflow_class.states[0]['name'],
            show_conditions=True,
            show_state_attributes=True,
            title=workflow_class.__name__
        )

        # Highlight current state
        if current_state:
            machine.model_graphs[id(model)].custom_styles['node'][current_state] = {
                'fillcolor': '#90EE90',
                'style': 'filled'
            }

        # Generate graph
        graph = machine.get_graph()

        # Render to bytes
        output = io.BytesIO()
        graph.draw(output, format=format, prog='dot')
        return output.getvalue()

    @staticmethod
    def get_mermaid_definition(workflow_class) -> str:
        """
        Generate Mermaid.js state diagram definition.

        Useful for embedding in markdown documentation or web UIs.
        """
        lines = ["stateDiagram-v2"]

        # Get initial state
        initial = workflow_class.states[0]
        if isinstance(initial, dict):
            initial = initial['name']
        lines.append(f"    [*] --> {initial}")

        # Add transitions
        for t in workflow_class.transitions:
            sources = t['source'] if isinstance(t['source'], list) else [t['source']]
            for source in sources:
                if source == '*':
                    continue
                lines.append(f"    {source} --> {t['dest']}: {t['trigger']}")

        # Mark terminal states
        terminal_states = ['completed', 'cancelled', 'done', 'merged', 'closed']
        for state in workflow_class.states:
            state_name = state if isinstance(state, str) else state['name']
            if state_name in terminal_states:
                lines.append(f"    {state_name} --> [*]")

        return "\n".join(lines)

# API endpoint for diagram generation
# app/api/v1/workflows.py
from fastapi import APIRouter
from fastapi.responses import Response

router = APIRouter()

@router.get("/workflows/{workflow_type}/diagram")
async def get_workflow_diagram(
    workflow_type: WorkflowType,
    format: str = "svg",
    current_state: Optional[str] = None
):
    """Get visual diagram of a workflow state machine."""

    workflow_class = WORKFLOW_CLASSES[workflow_type]
    diagram_bytes = WorkflowVisualizer.generate_diagram(
        workflow_class,
        current_state=current_state,
        format=format
    )

    media_types = {
        'svg': 'image/svg+xml',
        'png': 'image/png',
        'pdf': 'application/pdf'
    }

    return Response(
        content=diagram_bytes,
        media_type=media_types.get(format, 'application/octet-stream')
    )

@router.get("/workflows/{workflow_type}/mermaid")
async def get_workflow_mermaid(workflow_type: WorkflowType):
    """Get Mermaid.js definition for a workflow."""

    workflow_class = WORKFLOW_CLASSES[workflow_type]
    mermaid_def = WorkflowVisualizer.get_mermaid_definition(workflow_class)

    return {"mermaid": mermaid_def}

Frontend Visualization Component

// frontend/components/WorkflowDiagram.tsx
import React from 'react';
import mermaid from 'mermaid';

interface WorkflowDiagramProps {
  workflowType: 'sprint' | 'story' | 'pull_request' | 'agent_task';
  currentState?: string;
}

export function WorkflowDiagram({ workflowType, currentState }: WorkflowDiagramProps) {
  const [diagram, setDiagram] = React.useState<string>('');

  React.useEffect(() => {
    async function fetchDiagram() {
      const response = await fetch(`/api/v1/workflows/${workflowType}/mermaid`);
      const data = await response.json();

      // Highlight current state
      let mermaidDef = data.mermaid;
      if (currentState) {
        mermaidDef += `\n    style ${currentState} fill:#90EE90`;
      }

      const { svg } = await mermaid.render('workflow-diagram', mermaidDef);
      setDiagram(svg);
    }

    fetchDiagram();
  }, [workflowType, currentState]);

  return (
    <div
      className="workflow-diagram"
      dangerouslySetInnerHTML={{ __html: diagram }}
    />
  );
}

9. Long-Running Workflow Patterns

Handling Hours/Days Duration

# app/services/long_running_workflow.py
from datetime import datetime, timedelta
from typing import Optional
import asyncio

class LongRunningWorkflowManager:
    """
    Manager for workflows that span hours or days.

    Key patterns:
    1. Checkpoint persistence - Save progress frequently
    2. Heartbeat monitoring - Detect stalled workflows
    3. Resume capability - Continue from last checkpoint
    4. Timeout handling - Auto-escalate on SLA breach
    """

    def __init__(self, workflow_engine: WorkflowEngine):
        self.engine = workflow_engine
        self.sla_configs = {
            WorkflowType.SPRINT: timedelta(weeks=2),  # 2-week sprint
            WorkflowType.STORY: timedelta(days=5),    # 5 days for a story
            WorkflowType.PULL_REQUEST: timedelta(hours=24),  # 24h for PR review
            WorkflowType.AGENT_TASK: timedelta(hours=1),  # 1h for agent task
        }

    async def check_sla_breaches(self):
        """Check for workflows that have breached their SLA."""
        from sqlalchemy import select
        from app.models.workflow import WorkflowInstance

        breached = []

        for workflow_type, sla in self.sla_configs.items():
            threshold = datetime.utcnow() - sla

            # Find active workflows created before threshold
            # with no recent transitions
            result = await self.engine.session.execute(
                select(WorkflowInstance)
                .where(WorkflowInstance.workflow_type == workflow_type)
                .where(WorkflowInstance.created_at < threshold)
                .where(~WorkflowInstance.current_state.in_([
                    'completed', 'cancelled', 'done', 'merged', 'closed'
                ]))
            )

            for instance in result.scalars():
                breached.append({
                    'workflow_id': instance.id,
                    'type': workflow_type.value,
                    'entity_id': instance.entity_id,
                    'current_state': instance.current_state,
                    'age': datetime.utcnow() - instance.created_at,
                    'sla': sla
                })

        return breached

    async def create_checkpoint(
        self,
        workflow_id: UUID,
        checkpoint_data: dict
    ):
        """
        Save a checkpoint for long-running workflow.

        Allows resumption from this point if workflow is interrupted.
        """
        instance = await self.engine.load(workflow_id)
        if instance:
            instance.context = {
                **instance.context,
                '_checkpoint': {
                    'data': checkpoint_data,
                    'timestamp': datetime.utcnow().isoformat()
                }
            }
            await self.engine.session.commit()

    async def resume_from_checkpoint(self, workflow_id: UUID) -> Optional[dict]:
        """
        Resume a workflow from its last checkpoint.

        Returns checkpoint data if available.
        """
        instance = await self.engine.load(workflow_id)
        if instance and instance.context.get('_checkpoint'):
            return instance.context['_checkpoint']['data']
        return None

Sprint Workflow with Checkpoints

# app/workflows/sprint_workflow_runner.py
from celery import chain, group
from app.tasks.workflow_tasks import trigger_workflow_transition

class SprintWorkflowRunner:
    """
    Orchestrates a full sprint lifecycle.

    A sprint runs for ~2 weeks, with daily standups and continuous work.
    This runner manages the long-duration process with checkpoints.
    """

    def __init__(self, sprint_id: str, project_id: str, workflow_id: str):
        self.sprint_id = sprint_id
        self.project_id = project_id
        self.workflow_id = workflow_id

    async def start_sprint(self, stories: list[str]):
        """
        Start the sprint with initial stories.

        Creates story workflows for each story and begins development.
        """
        # Transition sprint to development
        await trigger_workflow_transition.delay(
            self.workflow_id,
            trigger="start",
            triggered_by="system",
            metadata={"stories": stories}
        )

        # Create story workflows for each story
        story_tasks = []
        for story_id in stories:
            story_tasks.append(
                create_story_workflow.s(story_id, self.project_id)
            )

        # Execute story creations in parallel
        group(story_tasks).apply_async()

    async def run_daily_standup(self):
        """
        Daily standup checkpoint.

        Collects status from all active story workflows.
        """
        # Get all active story workflows
        active_stories = await self.get_active_stories()

        report = {
            'date': datetime.utcnow().isoformat(),
            'sprint_id': self.sprint_id,
            'stories': []
        }

        for story in active_stories:
            report['stories'].append({
                'story_id': story.entity_id,
                'state': story.current_state,
                'blocked': story.current_state == 'blocked'
            })

        # Save checkpoint
        await self.save_checkpoint(report)

        return report

    async def complete_sprint(self):
        """
        Complete the sprint and generate retrospective data.
        """
        # Collect all transitions for analysis
        history = await self.engine.get_history(UUID(self.workflow_id))

        # Calculate metrics
        metrics = {
            'total_transitions': len(history),
            'duration': (datetime.utcnow() - history[0].created_at).days,
            'blocks_encountered': sum(1 for t in history if t.to_state == 'blocked'),
        }

        await trigger_workflow_transition.delay(
            self.workflow_id,
            trigger="complete",
            triggered_by="system",
            metadata={"metrics": metrics}
        )

Dependencies

Add to pyproject.toml:

[project.dependencies]
transitions = "^0.9.0"
graphviz = "^0.20.1"  # For diagram generation

[project.optional-dependencies]
diagrams = [
    "pygraphviz>=1.11",  # Alternative Graphviz binding
]

Implementation Roadmap

Phase 1: Foundation (Week 1)

  1. Add transitions library dependency
  2. Create workflow database models and migrations
  3. Implement basic WorkflowEngine class
  4. Write unit tests for state machines

Phase 2: Core Workflows (Week 2)

  1. Implement StoryWorkflow with all transitions
  2. Implement SprintWorkflow with checkpoints
  3. Implement PRWorkflow for code review
  4. Integrate with Celery tasks

Phase 3: Durability (Week 3)

  1. Add retry and compensation patterns
  2. Implement SLA monitoring
  3. Add checkpoint/resume capability
  4. Integrate with EventBus for real-time updates

Phase 4: Visualization (Week 4)

  1. Add diagram generation endpoints
  2. Create frontend visualization component
  3. Add workflow monitoring dashboard
  4. Documentation and examples

Testing Strategy

# tests/workflows/test_story_workflow.py
import pytest
from app.workflows.story_workflow import StoryWorkflow

class TestStoryWorkflow:
    def test_happy_path(self):
        """Test normal story progression."""
        workflow = StoryWorkflow("story-1", "project-1")

        assert workflow.state == "backlog"

        workflow.start_analysis()
        assert workflow.state == "analysis"

        workflow.analysis_complete()
        assert workflow.state == "design"

        workflow.design_complete()
        assert workflow.state == "implementation"

        workflow.submit_for_review()
        assert workflow.state == "review"

        workflow.approve()
        assert workflow.state == "testing"

        workflow.tests_pass()
        assert workflow.state == "done"

    def test_review_rejection(self):
        """Test review rejection loop."""
        workflow = StoryWorkflow("story-1", "project-1", initial_state="review")

        workflow.request_changes()
        assert workflow.state == "implementation"

        workflow.submit_for_review()
        assert workflow.state == "review"

    def test_invalid_transition(self):
        """Test that invalid transitions are rejected."""
        workflow = StoryWorkflow("story-1", "project-1")

        # Can't go from backlog to review
        with pytest.raises(Exception):
            workflow.approve()

        assert workflow.state == "backlog"  # State unchanged

    def test_blocking(self):
        """Test blocking from any active state."""
        for initial_state in ['analysis', 'design', 'implementation']:
            workflow = StoryWorkflow("story-1", "project-1", initial_state=initial_state)
            workflow.block()
            assert workflow.state == "blocked"

Risks and Mitigations

Risk Impact Likelihood Mitigation
State corruption on crash High Low Event sourcing allows state reconstruction
Long-running task timeout Medium Medium Celery soft limits + checkpointing
Race conditions on concurrent transitions High Medium PostgreSQL row-level locking
Complex workflow debugging Medium High Comprehensive logging + visualization

Decision

Adopt transitions library + PostgreSQL persistence for Syndarix workflow state machines.

Rationale:

  1. Simplicity - No additional infrastructure (vs Temporal)
  2. Flexibility - Full control over persistence and task execution
  3. Integration - Natural fit with existing Celery + Redis stack (SPIKE-004)
  4. Durability - Event sourcing provides audit trail and recovery
  5. Visualization - Built-in Graphviz support + Mermaid for web

Trade-offs Accepted:

  • More custom code vs using Temporal's built-in features
  • Manual handling of distributed coordination
  • Custom SLA monitoring implementation

References


Spike completed. Findings will inform ADR-008: Workflow State Machine Architecture.