forked from cardosofelipe/fast-next-template
Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1514 lines
53 KiB
Markdown
1514 lines
53 KiB
Markdown
# SPIKE-008: Workflow State Machine Architecture
|
|
|
|
**Status:** Completed
|
|
**Date:** 2025-12-29
|
|
**Author:** Architecture Team
|
|
**Related Issue:** #8
|
|
|
|
---
|
|
|
|
## Executive Summary
|
|
|
|
Syndarix requires durable state machine capabilities to orchestrate long-lived workflows spanning hours to days (sprint execution, story implementation, PR review cycles). After evaluating multiple approaches, we recommend a **hybrid architecture**:
|
|
|
|
1. **`transitions` library** for state machine logic (lightweight, Pythonic, well-tested)
|
|
2. **PostgreSQL** for state persistence with event sourcing for audit trail
|
|
3. **Celery** for task execution (already planned in SPIKE-004)
|
|
4. **Custom workflow engine** built on these primitives
|
|
|
|
This approach balances simplicity with durability, avoiding the operational complexity of dedicated workflow engines like Temporal while providing the reliability Syndarix requires.
|
|
|
|
---
|
|
|
|
## Research Questions & Findings
|
|
|
|
### 1. Best Python State Machine Libraries (2024-2025)
|
|
|
|
| Library | Stars | Last Update | Async | Persistence | Visualization | Best For |
|
|
|---------|-------|-------------|-------|-------------|---------------|----------|
|
|
| [transitions](https://github.com/pytransitions/transitions) | 5.5k+ | Active | Yes | Manual | Graphviz | General FSM |
|
|
| [python-statemachine](https://github.com/fgmacedo/python-statemachine) | 800+ | Active | Yes | Django mixin | Graphviz | Django projects |
|
|
| [sismic](https://github.com/AlexandreDecan/sismic) | 400+ | Active | No | Manual | PlantUML | Complex statecharts |
|
|
| [automat](https://github.com/glyph/automat) | 300+ | Mature | No | No | No | Protocol implementations |
|
|
|
|
**Recommendation:** `transitions` - Most mature, flexible, excellent documentation, supports hierarchical states and callbacks.
|
|
|
|
### 2. Framework Comparison
|
|
|
|
#### transitions (Recommended for State Logic)
|
|
|
|
**Pros:**
|
|
- Lightweight, no external dependencies
|
|
- Hierarchical (nested) states support
|
|
- Rich callback system (before/after/on_enter/on_exit)
|
|
- Machine factory for persistence
|
|
- Graphviz diagram generation
|
|
- Active maintenance, well-tested
|
|
|
|
**Cons:**
|
|
- No built-in persistence (by design - flexible)
|
|
- No distributed coordination
|
|
- Manual integration with task queues
|
|
|
|
```python
|
|
from transitions import Machine
|
|
|
|
class StoryWorkflow:
|
|
states = ['analysis', 'design', 'implementation', 'review', 'testing', 'done']
|
|
|
|
def __init__(self, story_id: str):
|
|
self.story_id = story_id
|
|
self.machine = Machine(model=self, states=self.states, initial='analysis')
|
|
|
|
# Define transitions
|
|
self.machine.add_transition('design_ready', 'analysis', 'design')
|
|
self.machine.add_transition('implementation_ready', 'design', 'implementation')
|
|
self.machine.add_transition('submit_for_review', 'implementation', 'review')
|
|
self.machine.add_transition('request_changes', 'review', 'implementation')
|
|
self.machine.add_transition('approve', 'review', 'testing')
|
|
self.machine.add_transition('tests_pass', 'testing', 'done')
|
|
self.machine.add_transition('tests_fail', 'testing', 'implementation')
|
|
```
|
|
|
|
#### Temporal (Considered but Not Recommended)
|
|
|
|
**Pros:**
|
|
- Durable execution out of the box
|
|
- Handles long-running workflows (months/years)
|
|
- Built-in retries, timeouts, versioning
|
|
- Excellent Python SDK with asyncio integration
|
|
- Automatic state persistence
|
|
|
|
**Cons:**
|
|
- **Heavy infrastructure requirement** (Temporal Server cluster)
|
|
- Vendor lock-in to Temporal's model
|
|
- Learning curve for Temporal-specific patterns
|
|
- Overkill for Syndarix's scale
|
|
- Additional operational burden
|
|
|
|
**When to Choose Temporal:**
|
|
- Mission-critical financial workflows
|
|
- 10,000+ concurrent workflows
|
|
- Team with dedicated infrastructure capacity
|
|
|
|
#### Prefect (Not Recommended for This Use Case)
|
|
|
|
**Pros:**
|
|
- Great for ETL/data pipelines
|
|
- Nice UI for workflow visualization
|
|
- Good scheduling capabilities
|
|
|
|
**Cons:**
|
|
- Designed for batch data processing, not interactive workflows
|
|
- State model doesn't map well to business workflows
|
|
- Would require significant adaptation
|
|
|
|
#### Custom + Celery (Recommended Approach)
|
|
|
|
Combine `transitions` state machine logic with Celery task execution:
|
|
- State machine handles workflow logic
|
|
- PostgreSQL persists state
|
|
- Celery executes tasks
|
|
- Redis Pub/Sub broadcasts state changes
|
|
|
|
### 3. State Persistence Strategy
|
|
|
|
#### Database Schema
|
|
|
|
```python
|
|
# app/models/workflow.py
|
|
from enum import Enum
|
|
from sqlalchemy import Column, String, Enum as SQLEnum, JSON, ForeignKey, Integer
|
|
from sqlalchemy.orm import relationship
|
|
from app.models.base import Base, TimestampMixin, UUIDMixin
|
|
|
|
class WorkflowType(str, Enum):
|
|
SPRINT = "sprint"
|
|
STORY = "story"
|
|
PULL_REQUEST = "pull_request"
|
|
AGENT_TASK = "agent_task"
|
|
|
|
class WorkflowInstance(Base, UUIDMixin, TimestampMixin):
|
|
"""Represents a running workflow instance."""
|
|
__tablename__ = "workflow_instances"
|
|
|
|
workflow_type = Column(SQLEnum(WorkflowType), nullable=False, index=True)
|
|
current_state = Column(String(100), nullable=False, index=True)
|
|
entity_id = Column(String(100), nullable=False, index=True) # story_id, sprint_id, etc.
|
|
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
|
|
context = Column(JSON, default=dict) # Workflow-specific context
|
|
error = Column(String(1000), nullable=True)
|
|
retry_count = Column(Integer, default=0)
|
|
|
|
# Relationships
|
|
project = relationship("Project", back_populates="workflows")
|
|
transitions = relationship("WorkflowTransition", back_populates="workflow", order_by="WorkflowTransition.created_at")
|
|
|
|
class WorkflowTransition(Base, UUIDMixin, TimestampMixin):
|
|
"""Event sourcing table for workflow state changes."""
|
|
__tablename__ = "workflow_transitions"
|
|
|
|
workflow_id = Column(UUID(as_uuid=True), ForeignKey("workflow_instances.id"), nullable=False, index=True)
|
|
from_state = Column(String(100), nullable=False)
|
|
to_state = Column(String(100), nullable=False)
|
|
trigger = Column(String(100), nullable=False) # The transition name
|
|
triggered_by = Column(String(100), nullable=True) # agent_id, user_id, or "system"
|
|
metadata = Column(JSON, default=dict) # Additional context
|
|
|
|
# Relationship
|
|
workflow = relationship("WorkflowInstance", back_populates="transitions")
|
|
```
|
|
|
|
#### Migration Example
|
|
|
|
```python
|
|
# alembic/versions/xxx_add_workflow_tables.py
|
|
def upgrade():
|
|
op.create_table(
|
|
'workflow_instances',
|
|
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
|
|
sa.Column('workflow_type', sa.Enum('sprint', 'story', 'pull_request', 'agent_task', name='workflowtype'), nullable=False),
|
|
sa.Column('current_state', sa.String(100), nullable=False),
|
|
sa.Column('entity_id', sa.String(100), nullable=False),
|
|
sa.Column('project_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('projects.id'), nullable=False),
|
|
sa.Column('context', postgresql.JSON, server_default='{}'),
|
|
sa.Column('error', sa.String(1000), nullable=True),
|
|
sa.Column('retry_count', sa.Integer, server_default='0'),
|
|
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
|
|
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
|
|
)
|
|
op.create_index('ix_workflow_instances_type_state', 'workflow_instances', ['workflow_type', 'current_state'])
|
|
op.create_index('ix_workflow_instances_entity', 'workflow_instances', ['entity_id'])
|
|
|
|
op.create_table(
|
|
'workflow_transitions',
|
|
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
|
|
sa.Column('workflow_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('workflow_instances.id'), nullable=False),
|
|
sa.Column('from_state', sa.String(100), nullable=False),
|
|
sa.Column('to_state', sa.String(100), nullable=False),
|
|
sa.Column('trigger', sa.String(100), nullable=False),
|
|
sa.Column('triggered_by', sa.String(100), nullable=True),
|
|
sa.Column('metadata', postgresql.JSON, server_default='{}'),
|
|
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
|
|
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
|
|
)
|
|
op.create_index('ix_workflow_transitions_workflow', 'workflow_transitions', ['workflow_id'])
|
|
```
|
|
|
|
### 4. Syndarix Workflow State Machines
|
|
|
|
#### Sprint Workflow
|
|
|
|
```
|
|
┌──────────┐ start ┌───────────┐ complete ┌─────────┐
|
|
│ Planning │ ─────────────► │Development│ ──────────────► │ Testing │
|
|
└──────────┘ └───────────┘ └─────────┘
|
|
│ │ │
|
|
│ │ block │ pass
|
|
│ ▼ ▼
|
|
│ ┌─────────┐ ┌──────────┐
|
|
│ │ Blocked │ │ Demo │
|
|
│ └─────────┘ └──────────┘
|
|
│ │ unblock │
|
|
│ ▼ │ accept
|
|
│ ┌───────────┐ ▼
|
|
│ │Development│ ┌──────────────┐
|
|
│ └───────────┘ │Retrospective │
|
|
│ └──────────────┘
|
|
│ cancel │
|
|
▼ │ complete
|
|
┌───────────┐ ▼
|
|
│ Cancelled │ ┌───────────┐
|
|
└───────────┘ │ Completed │
|
|
└───────────┘
|
|
```
|
|
|
|
```python
|
|
# app/workflows/sprint_workflow.py
|
|
from transitions import Machine
|
|
from typing import Optional
|
|
from app.models.workflow import WorkflowInstance, WorkflowTransition
|
|
|
|
class SprintWorkflow:
|
|
states = [
|
|
'planning',
|
|
'development',
|
|
'blocked',
|
|
'testing',
|
|
'demo',
|
|
'retrospective',
|
|
'completed',
|
|
'cancelled'
|
|
]
|
|
|
|
transitions = [
|
|
# Normal flow
|
|
{'trigger': 'start', 'source': 'planning', 'dest': 'development'},
|
|
{'trigger': 'complete_development', 'source': 'development', 'dest': 'testing'},
|
|
{'trigger': 'tests_pass', 'source': 'testing', 'dest': 'demo'},
|
|
{'trigger': 'demo_accepted', 'source': 'demo', 'dest': 'retrospective'},
|
|
{'trigger': 'complete', 'source': 'retrospective', 'dest': 'completed'},
|
|
|
|
# Blocking
|
|
{'trigger': 'block', 'source': 'development', 'dest': 'blocked'},
|
|
{'trigger': 'unblock', 'source': 'blocked', 'dest': 'development'},
|
|
|
|
# Test failures
|
|
{'trigger': 'tests_fail', 'source': 'testing', 'dest': 'development'},
|
|
|
|
# Demo rejection
|
|
{'trigger': 'demo_rejected', 'source': 'demo', 'dest': 'development'},
|
|
|
|
# Cancellation (from any active state)
|
|
{'trigger': 'cancel', 'source': ['planning', 'development', 'blocked', 'testing', 'demo'], 'dest': 'cancelled'},
|
|
]
|
|
|
|
def __init__(self, sprint_id: str, project_id: str, initial_state: str = 'planning'):
|
|
self.sprint_id = sprint_id
|
|
self.project_id = project_id
|
|
self.machine = Machine(
|
|
model=self,
|
|
states=self.states,
|
|
transitions=self.transitions,
|
|
initial=initial_state,
|
|
auto_transitions=False,
|
|
send_event=True, # Pass EventData to callbacks
|
|
)
|
|
|
|
# Register callbacks for persistence
|
|
self.machine.on_enter_development(self._on_enter_development)
|
|
self.machine.on_enter_completed(self._on_enter_completed)
|
|
self.machine.on_enter_blocked(self._on_enter_blocked)
|
|
|
|
def _on_enter_development(self, event):
|
|
"""Trigger when entering development state."""
|
|
# Could dispatch Celery task to notify agents
|
|
pass
|
|
|
|
def _on_enter_completed(self, event):
|
|
"""Trigger when sprint is completed."""
|
|
# Generate sprint report, notify stakeholders
|
|
pass
|
|
|
|
def _on_enter_blocked(self, event):
|
|
"""Trigger when sprint is blocked."""
|
|
# Alert human operator
|
|
pass
|
|
```
|
|
|
|
#### Story Workflow
|
|
|
|
```
|
|
┌──────────┐ ready ┌────────┐ ready ┌────────────────┐
|
|
│ Analysis │ ────────► │ Design │ ────────► │ Implementation │
|
|
└──────────┘ └────────┘ └────────────────┘
|
|
│
|
|
│ submit
|
|
▼
|
|
┌───────┐ tests_pass ┌─────────┐ approve ┌────────┐
|
|
│ Done │ ◄─────────── │ Testing │ ◄──────── │ Review │
|
|
└───────┘ └─────────┘ └────────┘
|
|
│ │
|
|
│ tests_fail │ request_changes
|
|
▼ ▼
|
|
┌────────────────┐ ┌────────────────┐
|
|
│ Implementation │ │ Implementation │
|
|
└────────────────┘ └────────────────┘
|
|
```
|
|
|
|
```python
|
|
# app/workflows/story_workflow.py
|
|
from transitions import Machine
|
|
|
|
class StoryWorkflow:
|
|
states = [
|
|
{'name': 'backlog', 'on_enter': '_notify_backlog'},
|
|
{'name': 'analysis', 'on_enter': '_start_analysis_task'},
|
|
{'name': 'design', 'on_enter': '_start_design_task'},
|
|
{'name': 'implementation', 'on_enter': '_start_implementation_task'},
|
|
{'name': 'review', 'on_enter': '_create_pr'},
|
|
{'name': 'testing', 'on_enter': '_run_tests'},
|
|
{'name': 'done', 'on_enter': '_notify_completion'},
|
|
{'name': 'blocked', 'on_enter': '_escalate_block'},
|
|
]
|
|
|
|
transitions = [
|
|
# Happy path
|
|
{'trigger': 'start_analysis', 'source': 'backlog', 'dest': 'analysis'},
|
|
{'trigger': 'analysis_complete', 'source': 'analysis', 'dest': 'design'},
|
|
{'trigger': 'design_complete', 'source': 'design', 'dest': 'implementation'},
|
|
{'trigger': 'submit_for_review', 'source': 'implementation', 'dest': 'review'},
|
|
{'trigger': 'approve', 'source': 'review', 'dest': 'testing'},
|
|
{'trigger': 'tests_pass', 'source': 'testing', 'dest': 'done'},
|
|
|
|
# Revision loops
|
|
{'trigger': 'request_changes', 'source': 'review', 'dest': 'implementation'},
|
|
{'trigger': 'tests_fail', 'source': 'testing', 'dest': 'implementation'},
|
|
|
|
# Blocking (from any active state)
|
|
{'trigger': 'block', 'source': ['analysis', 'design', 'implementation', 'review', 'testing'], 'dest': 'blocked'},
|
|
{'trigger': 'unblock', 'source': 'blocked', 'dest': 'implementation', 'before': '_restore_previous_state'},
|
|
|
|
# Skip to done (for trivial stories)
|
|
{'trigger': 'skip_to_done', 'source': '*', 'dest': 'done', 'conditions': '_is_trivial'},
|
|
]
|
|
|
|
def __init__(self, story_id: str, project_id: str, initial_state: str = 'backlog'):
|
|
self.story_id = story_id
|
|
self.project_id = project_id
|
|
self.previous_state = None
|
|
|
|
self.machine = Machine(
|
|
model=self,
|
|
states=self.states,
|
|
transitions=self.transitions,
|
|
initial=initial_state,
|
|
auto_transitions=False,
|
|
)
|
|
|
|
def _is_trivial(self):
|
|
"""Condition: Check if story is marked as trivial."""
|
|
return False # Would check story metadata
|
|
|
|
def _start_analysis_task(self):
|
|
"""Dispatch analysis to BA agent via Celery."""
|
|
from app.tasks.agent_tasks import run_agent_action
|
|
run_agent_action.delay(
|
|
agent_type="business_analyst",
|
|
project_id=self.project_id,
|
|
action="analyze_story",
|
|
context={"story_id": self.story_id}
|
|
)
|
|
|
|
def _start_design_task(self):
|
|
"""Dispatch design to Architect agent via Celery."""
|
|
from app.tasks.agent_tasks import run_agent_action
|
|
run_agent_action.delay(
|
|
agent_type="architect",
|
|
project_id=self.project_id,
|
|
action="design_solution",
|
|
context={"story_id": self.story_id}
|
|
)
|
|
|
|
def _start_implementation_task(self):
|
|
"""Dispatch implementation to Engineer agent via Celery."""
|
|
from app.tasks.agent_tasks import run_agent_action
|
|
run_agent_action.delay(
|
|
agent_type="engineer",
|
|
project_id=self.project_id,
|
|
action="implement_story",
|
|
context={"story_id": self.story_id}
|
|
)
|
|
|
|
def _create_pr(self):
|
|
"""Create pull request for review."""
|
|
pass
|
|
|
|
def _run_tests(self):
|
|
"""Trigger test suite via Celery."""
|
|
from app.tasks.cicd_tasks import run_test_suite
|
|
run_test_suite.delay(
|
|
project_id=self.project_id,
|
|
story_id=self.story_id
|
|
)
|
|
|
|
def _notify_completion(self):
|
|
"""Notify stakeholders of story completion."""
|
|
pass
|
|
|
|
def _escalate_block(self):
|
|
"""Escalate blocked story to human."""
|
|
pass
|
|
|
|
def _notify_backlog(self):
|
|
"""Story added to backlog notification."""
|
|
pass
|
|
|
|
def _restore_previous_state(self):
|
|
"""Restore state before block."""
|
|
pass
|
|
```
|
|
|
|
#### PR Workflow
|
|
|
|
```
|
|
┌─────────┐ submit ┌────────┐ approve ┌──────────┐
|
|
│ Created │ ────────► │ Review │ ─────────► │ Approved │
|
|
└─────────┘ └────────┘ └──────────┘
|
|
│ │
|
|
│ request_changes │ merge
|
|
▼ ▼
|
|
┌───────────────────┐ ┌────────┐
|
|
│ Changes Requested │ │ Merged │
|
|
└───────────────────┘ └────────┘
|
|
│
|
|
│ resubmit
|
|
▼
|
|
┌────────┐
|
|
│ Review │
|
|
└────────┘
|
|
```
|
|
|
|
```python
|
|
# app/workflows/pr_workflow.py
|
|
from transitions import Machine
|
|
|
|
class PRWorkflow:
|
|
states = ['created', 'review', 'changes_requested', 'approved', 'merged', 'closed']
|
|
|
|
transitions = [
|
|
{'trigger': 'submit_for_review', 'source': 'created', 'dest': 'review'},
|
|
{'trigger': 'request_changes', 'source': 'review', 'dest': 'changes_requested'},
|
|
{'trigger': 'resubmit', 'source': 'changes_requested', 'dest': 'review'},
|
|
{'trigger': 'approve', 'source': 'review', 'dest': 'approved'},
|
|
{'trigger': 'merge', 'source': 'approved', 'dest': 'merged'},
|
|
{'trigger': 'close', 'source': ['created', 'review', 'changes_requested', 'approved'], 'dest': 'closed'},
|
|
]
|
|
|
|
def __init__(self, pr_id: str, project_id: str, initial_state: str = 'created'):
|
|
self.pr_id = pr_id
|
|
self.project_id = project_id
|
|
self.machine = Machine(
|
|
model=self,
|
|
states=self.states,
|
|
transitions=self.transitions,
|
|
initial=initial_state,
|
|
)
|
|
```
|
|
|
|
#### Agent Task Workflow
|
|
|
|
```
|
|
┌──────────┐ start ┌─────────────┐ complete ┌───────────┐
|
|
│ Assigned │ ───────► │ In Progress │ ──────────► │ Completed │
|
|
└──────────┘ └─────────────┘ └───────────┘
|
|
│
|
|
│ block / fail
|
|
▼
|
|
┌─────────┐
|
|
│ Blocked │
|
|
└─────────┘
|
|
│
|
|
│ retry / escalate
|
|
▼
|
|
┌─────────────┐ or ┌───────────┐
|
|
│ In Progress │ │ Escalated │
|
|
└─────────────┘ └───────────┘
|
|
```
|
|
|
|
```python
|
|
# app/workflows/agent_task_workflow.py
|
|
from transitions import Machine
|
|
|
|
class AgentTaskWorkflow:
|
|
states = ['assigned', 'in_progress', 'blocked', 'completed', 'failed', 'escalated']
|
|
|
|
transitions = [
|
|
{'trigger': 'start', 'source': 'assigned', 'dest': 'in_progress'},
|
|
{'trigger': 'complete', 'source': 'in_progress', 'dest': 'completed'},
|
|
{'trigger': 'block', 'source': 'in_progress', 'dest': 'blocked'},
|
|
{'trigger': 'fail', 'source': 'in_progress', 'dest': 'failed'},
|
|
{'trigger': 'retry', 'source': ['blocked', 'failed'], 'dest': 'in_progress', 'conditions': '_can_retry'},
|
|
{'trigger': 'escalate', 'source': ['blocked', 'failed'], 'dest': 'escalated'},
|
|
]
|
|
|
|
def __init__(self, task_id: str, agent_id: str, initial_state: str = 'assigned'):
|
|
self.task_id = task_id
|
|
self.agent_id = agent_id
|
|
self.retry_count = 0
|
|
self.max_retries = 3
|
|
|
|
self.machine = Machine(
|
|
model=self,
|
|
states=self.states,
|
|
transitions=self.transitions,
|
|
initial=initial_state,
|
|
)
|
|
|
|
def _can_retry(self):
|
|
"""Check if retry is allowed."""
|
|
return self.retry_count < self.max_retries
|
|
```
|
|
|
|
### 5. Durable Workflow Engine
|
|
|
|
```python
|
|
# app/services/workflow_engine.py
|
|
from typing import Type, Optional
|
|
from uuid import UUID
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from transitions import Machine
|
|
|
|
from app.models.workflow import WorkflowInstance, WorkflowTransition, WorkflowType
|
|
from app.workflows.sprint_workflow import SprintWorkflow
|
|
from app.workflows.story_workflow import StoryWorkflow
|
|
from app.workflows.pr_workflow import PRWorkflow
|
|
from app.workflows.agent_task_workflow import AgentTaskWorkflow
|
|
from app.services.events import EventBus
|
|
|
|
WORKFLOW_CLASSES = {
|
|
WorkflowType.SPRINT: SprintWorkflow,
|
|
WorkflowType.STORY: StoryWorkflow,
|
|
WorkflowType.PULL_REQUEST: PRWorkflow,
|
|
WorkflowType.AGENT_TASK: AgentTaskWorkflow,
|
|
}
|
|
|
|
class WorkflowEngine:
|
|
"""
|
|
Durable workflow engine that persists state to PostgreSQL.
|
|
|
|
Usage:
|
|
engine = WorkflowEngine(session, event_bus)
|
|
|
|
# Create new workflow
|
|
workflow = await engine.create(
|
|
workflow_type=WorkflowType.STORY,
|
|
entity_id="story-123",
|
|
project_id=project.id
|
|
)
|
|
|
|
# Load existing workflow
|
|
workflow = await engine.load(workflow_id)
|
|
|
|
# Trigger transition
|
|
await engine.trigger(workflow_id, "start_analysis", triggered_by="agent-456")
|
|
"""
|
|
|
|
def __init__(self, session: AsyncSession, event_bus: Optional[EventBus] = None):
|
|
self.session = session
|
|
self.event_bus = event_bus
|
|
|
|
async def create(
|
|
self,
|
|
workflow_type: WorkflowType,
|
|
entity_id: str,
|
|
project_id: UUID,
|
|
initial_context: dict = None
|
|
) -> WorkflowInstance:
|
|
"""Create a new workflow instance."""
|
|
|
|
workflow_class = WORKFLOW_CLASSES[workflow_type]
|
|
initial_state = workflow_class.states[0]
|
|
if isinstance(initial_state, dict):
|
|
initial_state = initial_state['name']
|
|
|
|
instance = WorkflowInstance(
|
|
workflow_type=workflow_type,
|
|
current_state=initial_state,
|
|
entity_id=entity_id,
|
|
project_id=project_id,
|
|
context=initial_context or {}
|
|
)
|
|
|
|
self.session.add(instance)
|
|
await self.session.commit()
|
|
await self.session.refresh(instance)
|
|
|
|
# Publish creation event
|
|
if self.event_bus:
|
|
await self.event_bus.publish(f"project:{project_id}", {
|
|
"type": "workflow_created",
|
|
"workflow_id": str(instance.id),
|
|
"workflow_type": workflow_type.value,
|
|
"entity_id": entity_id,
|
|
"state": initial_state
|
|
})
|
|
|
|
return instance
|
|
|
|
async def load(self, workflow_id: UUID) -> Optional[WorkflowInstance]:
|
|
"""Load a workflow instance from the database."""
|
|
return await self.session.get(WorkflowInstance, workflow_id)
|
|
|
|
async def get_machine(self, instance: WorkflowInstance) -> Machine:
|
|
"""Reconstruct the state machine from persisted instance."""
|
|
workflow_class = WORKFLOW_CLASSES[instance.workflow_type]
|
|
workflow = workflow_class(
|
|
entity_id=instance.entity_id,
|
|
project_id=str(instance.project_id),
|
|
initial_state=instance.current_state
|
|
)
|
|
return workflow
|
|
|
|
async def trigger(
|
|
self,
|
|
workflow_id: UUID,
|
|
trigger: str,
|
|
triggered_by: str = "system",
|
|
metadata: dict = None
|
|
) -> bool:
|
|
"""
|
|
Trigger a state transition on a workflow.
|
|
|
|
Returns True if transition succeeded, False if not valid.
|
|
"""
|
|
instance = await self.load(workflow_id)
|
|
if not instance:
|
|
raise ValueError(f"Workflow {workflow_id} not found")
|
|
|
|
workflow = await self.get_machine(instance)
|
|
from_state = instance.current_state
|
|
|
|
# Check if transition is valid
|
|
trigger_method = getattr(workflow, trigger, None)
|
|
if not trigger_method or not callable(trigger_method):
|
|
return False
|
|
|
|
# Check if transition is allowed from current state
|
|
if not workflow.machine.may_trigger(trigger):
|
|
return False
|
|
|
|
# Execute transition
|
|
try:
|
|
trigger_method()
|
|
to_state = workflow.state
|
|
except Exception as e:
|
|
# Transition failed
|
|
instance.error = str(e)
|
|
await self.session.commit()
|
|
return False
|
|
|
|
# Persist new state
|
|
instance.current_state = to_state
|
|
instance.error = None
|
|
|
|
# Record transition (event sourcing)
|
|
transition = WorkflowTransition(
|
|
workflow_id=instance.id,
|
|
from_state=from_state,
|
|
to_state=to_state,
|
|
trigger=trigger,
|
|
triggered_by=triggered_by,
|
|
metadata=metadata or {}
|
|
)
|
|
self.session.add(transition)
|
|
await self.session.commit()
|
|
|
|
# Publish state change event
|
|
if self.event_bus:
|
|
await self.event_bus.publish(f"project:{instance.project_id}", {
|
|
"type": "workflow_state_changed",
|
|
"workflow_id": str(instance.id),
|
|
"workflow_type": instance.workflow_type.value,
|
|
"entity_id": instance.entity_id,
|
|
"from_state": from_state,
|
|
"to_state": to_state,
|
|
"trigger": trigger,
|
|
"triggered_by": triggered_by
|
|
})
|
|
|
|
return True
|
|
|
|
async def get_history(self, workflow_id: UUID) -> list[WorkflowTransition]:
|
|
"""Get full transition history for a workflow."""
|
|
instance = await self.load(workflow_id)
|
|
if not instance:
|
|
raise ValueError(f"Workflow {workflow_id} not found")
|
|
|
|
await self.session.refresh(instance, ["transitions"])
|
|
return instance.transitions
|
|
|
|
async def get_active_by_type(
|
|
self,
|
|
project_id: UUID,
|
|
workflow_type: WorkflowType
|
|
) -> list[WorkflowInstance]:
|
|
"""Get all active workflows of a type for a project."""
|
|
from sqlalchemy import select
|
|
|
|
workflow_class = WORKFLOW_CLASSES[workflow_type]
|
|
terminal_states = ['completed', 'cancelled', 'merged', 'closed', 'done']
|
|
|
|
result = await self.session.execute(
|
|
select(WorkflowInstance)
|
|
.where(WorkflowInstance.project_id == project_id)
|
|
.where(WorkflowInstance.workflow_type == workflow_type)
|
|
.where(~WorkflowInstance.current_state.in_(terminal_states))
|
|
)
|
|
return result.scalars().all()
|
|
```
|
|
|
|
### 6. Retry and Compensation Patterns
|
|
|
|
#### Retry Configuration
|
|
|
|
```python
|
|
# app/workflows/retry_config.py
|
|
from dataclasses import dataclass
|
|
from typing import Callable, Optional
|
|
|
|
@dataclass
|
|
class RetryPolicy:
|
|
"""Configuration for retry behavior."""
|
|
max_retries: int = 3
|
|
initial_delay: float = 1.0 # seconds
|
|
max_delay: float = 300.0 # 5 minutes
|
|
exponential_base: float = 2.0
|
|
jitter: bool = True
|
|
retryable_errors: tuple = (ConnectionError, TimeoutError)
|
|
|
|
class RetryableWorkflow:
|
|
"""Mixin for workflows with retry support."""
|
|
|
|
retry_policy: RetryPolicy = RetryPolicy()
|
|
|
|
def calculate_retry_delay(self, attempt: int) -> float:
|
|
"""Calculate delay for next retry attempt."""
|
|
delay = self.retry_policy.initial_delay * (self.retry_policy.exponential_base ** attempt)
|
|
delay = min(delay, self.retry_policy.max_delay)
|
|
|
|
if self.retry_policy.jitter:
|
|
import random
|
|
delay = delay * (0.5 + random.random())
|
|
|
|
return delay
|
|
|
|
def should_retry(self, error: Exception, attempt: int) -> bool:
|
|
"""Determine if error should trigger retry."""
|
|
if attempt >= self.retry_policy.max_retries:
|
|
return False
|
|
return isinstance(error, self.retry_policy.retryable_errors)
|
|
```
|
|
|
|
#### Saga Pattern for Compensation
|
|
|
|
```python
|
|
# app/workflows/saga.py
|
|
from dataclasses import dataclass
|
|
from typing import Callable, Any, Optional
|
|
from abc import ABC, abstractmethod
|
|
|
|
@dataclass
|
|
class SagaStep:
|
|
"""A single step in a saga with its compensation."""
|
|
name: str
|
|
action: Callable[..., Any]
|
|
compensation: Callable[..., Any]
|
|
|
|
class Saga:
|
|
"""
|
|
Implements the Saga pattern for distributed transactions.
|
|
|
|
If a step fails, compensating actions are executed in reverse order.
|
|
"""
|
|
|
|
def __init__(self, steps: list[SagaStep]):
|
|
self.steps = steps
|
|
self.completed_steps: list[SagaStep] = []
|
|
self.context: dict = {}
|
|
|
|
async def execute(self, initial_context: dict = None) -> dict:
|
|
"""Execute the saga, with automatic compensation on failure."""
|
|
self.context = initial_context or {}
|
|
|
|
for step in self.steps:
|
|
try:
|
|
result = await step.action(self.context)
|
|
self.context.update(result or {})
|
|
self.completed_steps.append(step)
|
|
except Exception as e:
|
|
# Compensate in reverse order
|
|
await self._compensate()
|
|
raise SagaFailure(
|
|
failed_step=step.name,
|
|
original_error=e,
|
|
compensation_results=self.context.get('_compensation_results', [])
|
|
)
|
|
|
|
return self.context
|
|
|
|
async def _compensate(self):
|
|
"""Execute compensation for all completed steps."""
|
|
compensation_results = []
|
|
|
|
for step in reversed(self.completed_steps):
|
|
try:
|
|
await step.compensation(self.context)
|
|
compensation_results.append({
|
|
'step': step.name,
|
|
'status': 'compensated'
|
|
})
|
|
except Exception as e:
|
|
compensation_results.append({
|
|
'step': step.name,
|
|
'status': 'failed',
|
|
'error': str(e)
|
|
})
|
|
|
|
self.context['_compensation_results'] = compensation_results
|
|
|
|
class SagaFailure(Exception):
|
|
"""Raised when a saga fails and compensation is executed."""
|
|
|
|
def __init__(self, failed_step: str, original_error: Exception, compensation_results: list):
|
|
self.failed_step = failed_step
|
|
self.original_error = original_error
|
|
self.compensation_results = compensation_results
|
|
super().__init__(f"Saga failed at step '{failed_step}': {original_error}")
|
|
|
|
# Example: Story implementation saga
|
|
async def create_story_implementation_saga(story_id: str, project_id: str) -> Saga:
|
|
"""Create saga for implementing a story with compensation."""
|
|
|
|
steps = [
|
|
SagaStep(
|
|
name="create_branch",
|
|
action=lambda ctx: create_feature_branch(ctx['story_id']),
|
|
compensation=lambda ctx: delete_branch(ctx.get('branch_name'))
|
|
),
|
|
SagaStep(
|
|
name="implement_code",
|
|
action=lambda ctx: generate_code(ctx['story_id'], ctx['branch_name']),
|
|
compensation=lambda ctx: revert_commits(ctx.get('commit_shas', []))
|
|
),
|
|
SagaStep(
|
|
name="run_tests",
|
|
action=lambda ctx: run_test_suite(ctx['branch_name']),
|
|
compensation=lambda ctx: None # Tests don't need compensation
|
|
),
|
|
SagaStep(
|
|
name="create_pr",
|
|
action=lambda ctx: create_pull_request(ctx['branch_name'], ctx['story_id']),
|
|
compensation=lambda ctx: close_pull_request(ctx.get('pr_id'))
|
|
),
|
|
]
|
|
|
|
return Saga(steps)
|
|
```
|
|
|
|
### 7. Celery Integration
|
|
|
|
```python
|
|
# app/tasks/workflow_tasks.py
|
|
from celery import Task
|
|
from app.core.celery import celery_app
|
|
from app.core.database import async_session_maker
|
|
from app.services.workflow_engine import WorkflowEngine
|
|
from app.services.events import EventBus
|
|
from app.models.workflow import WorkflowType
|
|
|
|
class WorkflowTask(Task):
|
|
"""Base task for workflow operations with database session."""
|
|
|
|
_session = None
|
|
_event_bus = None
|
|
|
|
@property
|
|
def session(self):
|
|
if self._session is None:
|
|
self._session = async_session_maker()
|
|
return self._session
|
|
|
|
@property
|
|
def event_bus(self):
|
|
if self._event_bus is None:
|
|
self._event_bus = EventBus()
|
|
return self._event_bus
|
|
|
|
def after_return(self, status, retval, task_id, args, kwargs, einfo):
|
|
"""Cleanup after task completion."""
|
|
if self._session:
|
|
self._session.close()
|
|
|
|
@celery_app.task(bind=True, base=WorkflowTask)
|
|
async def trigger_workflow_transition(
|
|
self,
|
|
workflow_id: str,
|
|
trigger: str,
|
|
triggered_by: str = "system",
|
|
metadata: dict = None
|
|
):
|
|
"""
|
|
Trigger a workflow transition as a background task.
|
|
|
|
Used when transitions need to happen asynchronously.
|
|
"""
|
|
from uuid import UUID
|
|
|
|
async with self.session as session:
|
|
engine = WorkflowEngine(session, self.event_bus)
|
|
success = await engine.trigger(
|
|
workflow_id=UUID(workflow_id),
|
|
trigger=trigger,
|
|
triggered_by=triggered_by,
|
|
metadata=metadata
|
|
)
|
|
|
|
if not success:
|
|
raise ValueError(f"Transition '{trigger}' failed for workflow {workflow_id}")
|
|
|
|
return {"workflow_id": workflow_id, "new_trigger": trigger, "success": True}
|
|
|
|
@celery_app.task(bind=True, base=WorkflowTask)
|
|
async def process_story_workflow_step(
|
|
self,
|
|
workflow_id: str,
|
|
step: str,
|
|
context: dict
|
|
):
|
|
"""
|
|
Process a single step in a story workflow.
|
|
|
|
This task runs the actual work for each state.
|
|
"""
|
|
from uuid import UUID
|
|
|
|
async with self.session as session:
|
|
engine = WorkflowEngine(session, self.event_bus)
|
|
instance = await engine.load(UUID(workflow_id))
|
|
|
|
if not instance:
|
|
raise ValueError(f"Workflow {workflow_id} not found")
|
|
|
|
# Execute step-specific logic
|
|
if step == "analysis":
|
|
await run_analysis(instance.entity_id, context)
|
|
await engine.trigger(UUID(workflow_id), "analysis_complete", "agent:ba")
|
|
|
|
elif step == "design":
|
|
await run_design(instance.entity_id, context)
|
|
await engine.trigger(UUID(workflow_id), "design_complete", "agent:architect")
|
|
|
|
elif step == "implementation":
|
|
await run_implementation(instance.entity_id, context)
|
|
await engine.trigger(UUID(workflow_id), "submit_for_review", "agent:engineer")
|
|
|
|
return {"workflow_id": workflow_id, "step_completed": step}
|
|
|
|
@celery_app.task
|
|
def check_stalled_workflows():
|
|
"""
|
|
Periodic task to check for stalled workflows.
|
|
|
|
Runs via Celery Beat to identify workflows stuck in non-terminal states.
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy import select
|
|
from app.models.workflow import WorkflowInstance
|
|
|
|
# Consider workflows stalled if no transition in 1 hour
|
|
stale_threshold = datetime.utcnow() - timedelta(hours=1)
|
|
|
|
# Query for potentially stalled workflows
|
|
# (Implementation would check updated_at and escalate)
|
|
pass
|
|
```
|
|
|
|
### 8. Visualization Approach
|
|
|
|
#### Graphviz Diagram Generation
|
|
|
|
```python
|
|
# app/services/workflow_visualizer.py
|
|
from transitions.extensions import GraphMachine
|
|
from typing import Optional
|
|
import io
|
|
|
|
class WorkflowVisualizer:
|
|
"""Generate visual diagrams of workflow state machines."""
|
|
|
|
@staticmethod
|
|
def generate_diagram(
|
|
workflow_class,
|
|
current_state: Optional[str] = None,
|
|
format: str = 'svg'
|
|
) -> bytes:
|
|
"""
|
|
Generate a diagram for a workflow.
|
|
|
|
Args:
|
|
workflow_class: The workflow class to visualize
|
|
current_state: Highlight current state (optional)
|
|
format: Output format ('svg', 'png', 'pdf')
|
|
|
|
Returns:
|
|
Diagram as bytes
|
|
"""
|
|
class DiagramModel:
|
|
pass
|
|
|
|
model = DiagramModel()
|
|
machine = GraphMachine(
|
|
model=model,
|
|
states=workflow_class.states,
|
|
transitions=workflow_class.transitions,
|
|
initial=workflow_class.states[0] if isinstance(workflow_class.states[0], str) else workflow_class.states[0]['name'],
|
|
show_conditions=True,
|
|
show_state_attributes=True,
|
|
title=workflow_class.__name__
|
|
)
|
|
|
|
# Highlight current state
|
|
if current_state:
|
|
machine.model_graphs[id(model)].custom_styles['node'][current_state] = {
|
|
'fillcolor': '#90EE90',
|
|
'style': 'filled'
|
|
}
|
|
|
|
# Generate graph
|
|
graph = machine.get_graph()
|
|
|
|
# Render to bytes
|
|
output = io.BytesIO()
|
|
graph.draw(output, format=format, prog='dot')
|
|
return output.getvalue()
|
|
|
|
@staticmethod
|
|
def get_mermaid_definition(workflow_class) -> str:
|
|
"""
|
|
Generate Mermaid.js state diagram definition.
|
|
|
|
Useful for embedding in markdown documentation or web UIs.
|
|
"""
|
|
lines = ["stateDiagram-v2"]
|
|
|
|
# Get initial state
|
|
initial = workflow_class.states[0]
|
|
if isinstance(initial, dict):
|
|
initial = initial['name']
|
|
lines.append(f" [*] --> {initial}")
|
|
|
|
# Add transitions
|
|
for t in workflow_class.transitions:
|
|
sources = t['source'] if isinstance(t['source'], list) else [t['source']]
|
|
for source in sources:
|
|
if source == '*':
|
|
continue
|
|
lines.append(f" {source} --> {t['dest']}: {t['trigger']}")
|
|
|
|
# Mark terminal states
|
|
terminal_states = ['completed', 'cancelled', 'done', 'merged', 'closed']
|
|
for state in workflow_class.states:
|
|
state_name = state if isinstance(state, str) else state['name']
|
|
if state_name in terminal_states:
|
|
lines.append(f" {state_name} --> [*]")
|
|
|
|
return "\n".join(lines)
|
|
|
|
# API endpoint for diagram generation
|
|
# app/api/v1/workflows.py
|
|
from fastapi import APIRouter
|
|
from fastapi.responses import Response
|
|
|
|
router = APIRouter()
|
|
|
|
@router.get("/workflows/{workflow_type}/diagram")
|
|
async def get_workflow_diagram(
|
|
workflow_type: WorkflowType,
|
|
format: str = "svg",
|
|
current_state: Optional[str] = None
|
|
):
|
|
"""Get visual diagram of a workflow state machine."""
|
|
|
|
workflow_class = WORKFLOW_CLASSES[workflow_type]
|
|
diagram_bytes = WorkflowVisualizer.generate_diagram(
|
|
workflow_class,
|
|
current_state=current_state,
|
|
format=format
|
|
)
|
|
|
|
media_types = {
|
|
'svg': 'image/svg+xml',
|
|
'png': 'image/png',
|
|
'pdf': 'application/pdf'
|
|
}
|
|
|
|
return Response(
|
|
content=diagram_bytes,
|
|
media_type=media_types.get(format, 'application/octet-stream')
|
|
)
|
|
|
|
@router.get("/workflows/{workflow_type}/mermaid")
|
|
async def get_workflow_mermaid(workflow_type: WorkflowType):
|
|
"""Get Mermaid.js definition for a workflow."""
|
|
|
|
workflow_class = WORKFLOW_CLASSES[workflow_type]
|
|
mermaid_def = WorkflowVisualizer.get_mermaid_definition(workflow_class)
|
|
|
|
return {"mermaid": mermaid_def}
|
|
```
|
|
|
|
#### Frontend Visualization Component
|
|
|
|
```typescript
|
|
// frontend/components/WorkflowDiagram.tsx
|
|
import React from 'react';
|
|
import mermaid from 'mermaid';
|
|
|
|
interface WorkflowDiagramProps {
|
|
workflowType: 'sprint' | 'story' | 'pull_request' | 'agent_task';
|
|
currentState?: string;
|
|
}
|
|
|
|
export function WorkflowDiagram({ workflowType, currentState }: WorkflowDiagramProps) {
|
|
const [diagram, setDiagram] = React.useState<string>('');
|
|
|
|
React.useEffect(() => {
|
|
async function fetchDiagram() {
|
|
const response = await fetch(`/api/v1/workflows/${workflowType}/mermaid`);
|
|
const data = await response.json();
|
|
|
|
// Highlight current state
|
|
let mermaidDef = data.mermaid;
|
|
if (currentState) {
|
|
mermaidDef += `\n style ${currentState} fill:#90EE90`;
|
|
}
|
|
|
|
const { svg } = await mermaid.render('workflow-diagram', mermaidDef);
|
|
setDiagram(svg);
|
|
}
|
|
|
|
fetchDiagram();
|
|
}, [workflowType, currentState]);
|
|
|
|
return (
|
|
<div
|
|
className="workflow-diagram"
|
|
dangerouslySetInnerHTML={{ __html: diagram }}
|
|
/>
|
|
);
|
|
}
|
|
```
|
|
|
|
### 9. Long-Running Workflow Patterns
|
|
|
|
#### Handling Hours/Days Duration
|
|
|
|
```python
|
|
# app/services/long_running_workflow.py
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
import asyncio
|
|
|
|
class LongRunningWorkflowManager:
|
|
"""
|
|
Manager for workflows that span hours or days.
|
|
|
|
Key patterns:
|
|
1. Checkpoint persistence - Save progress frequently
|
|
2. Heartbeat monitoring - Detect stalled workflows
|
|
3. Resume capability - Continue from last checkpoint
|
|
4. Timeout handling - Auto-escalate on SLA breach
|
|
"""
|
|
|
|
def __init__(self, workflow_engine: WorkflowEngine):
|
|
self.engine = workflow_engine
|
|
self.sla_configs = {
|
|
WorkflowType.SPRINT: timedelta(weeks=2), # 2-week sprint
|
|
WorkflowType.STORY: timedelta(days=5), # 5 days for a story
|
|
WorkflowType.PULL_REQUEST: timedelta(hours=24), # 24h for PR review
|
|
WorkflowType.AGENT_TASK: timedelta(hours=1), # 1h for agent task
|
|
}
|
|
|
|
async def check_sla_breaches(self):
|
|
"""Check for workflows that have breached their SLA."""
|
|
from sqlalchemy import select
|
|
from app.models.workflow import WorkflowInstance
|
|
|
|
breached = []
|
|
|
|
for workflow_type, sla in self.sla_configs.items():
|
|
threshold = datetime.utcnow() - sla
|
|
|
|
# Find active workflows created before threshold
|
|
# with no recent transitions
|
|
result = await self.engine.session.execute(
|
|
select(WorkflowInstance)
|
|
.where(WorkflowInstance.workflow_type == workflow_type)
|
|
.where(WorkflowInstance.created_at < threshold)
|
|
.where(~WorkflowInstance.current_state.in_([
|
|
'completed', 'cancelled', 'done', 'merged', 'closed'
|
|
]))
|
|
)
|
|
|
|
for instance in result.scalars():
|
|
breached.append({
|
|
'workflow_id': instance.id,
|
|
'type': workflow_type.value,
|
|
'entity_id': instance.entity_id,
|
|
'current_state': instance.current_state,
|
|
'age': datetime.utcnow() - instance.created_at,
|
|
'sla': sla
|
|
})
|
|
|
|
return breached
|
|
|
|
async def create_checkpoint(
|
|
self,
|
|
workflow_id: UUID,
|
|
checkpoint_data: dict
|
|
):
|
|
"""
|
|
Save a checkpoint for long-running workflow.
|
|
|
|
Allows resumption from this point if workflow is interrupted.
|
|
"""
|
|
instance = await self.engine.load(workflow_id)
|
|
if instance:
|
|
instance.context = {
|
|
**instance.context,
|
|
'_checkpoint': {
|
|
'data': checkpoint_data,
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
}
|
|
await self.engine.session.commit()
|
|
|
|
async def resume_from_checkpoint(self, workflow_id: UUID) -> Optional[dict]:
|
|
"""
|
|
Resume a workflow from its last checkpoint.
|
|
|
|
Returns checkpoint data if available.
|
|
"""
|
|
instance = await self.engine.load(workflow_id)
|
|
if instance and instance.context.get('_checkpoint'):
|
|
return instance.context['_checkpoint']['data']
|
|
return None
|
|
```
|
|
|
|
#### Sprint Workflow with Checkpoints
|
|
|
|
```python
|
|
# app/workflows/sprint_workflow_runner.py
|
|
from celery import chain, group
|
|
from app.tasks.workflow_tasks import trigger_workflow_transition
|
|
|
|
class SprintWorkflowRunner:
|
|
"""
|
|
Orchestrates a full sprint lifecycle.
|
|
|
|
A sprint runs for ~2 weeks, with daily standups and continuous work.
|
|
This runner manages the long-duration process with checkpoints.
|
|
"""
|
|
|
|
def __init__(self, sprint_id: str, project_id: str, workflow_id: str):
|
|
self.sprint_id = sprint_id
|
|
self.project_id = project_id
|
|
self.workflow_id = workflow_id
|
|
|
|
async def start_sprint(self, stories: list[str]):
|
|
"""
|
|
Start the sprint with initial stories.
|
|
|
|
Creates story workflows for each story and begins development.
|
|
"""
|
|
# Transition sprint to development
|
|
await trigger_workflow_transition.delay(
|
|
self.workflow_id,
|
|
trigger="start",
|
|
triggered_by="system",
|
|
metadata={"stories": stories}
|
|
)
|
|
|
|
# Create story workflows for each story
|
|
story_tasks = []
|
|
for story_id in stories:
|
|
story_tasks.append(
|
|
create_story_workflow.s(story_id, self.project_id)
|
|
)
|
|
|
|
# Execute story creations in parallel
|
|
group(story_tasks).apply_async()
|
|
|
|
async def run_daily_standup(self):
|
|
"""
|
|
Daily standup checkpoint.
|
|
|
|
Collects status from all active story workflows.
|
|
"""
|
|
# Get all active story workflows
|
|
active_stories = await self.get_active_stories()
|
|
|
|
report = {
|
|
'date': datetime.utcnow().isoformat(),
|
|
'sprint_id': self.sprint_id,
|
|
'stories': []
|
|
}
|
|
|
|
for story in active_stories:
|
|
report['stories'].append({
|
|
'story_id': story.entity_id,
|
|
'state': story.current_state,
|
|
'blocked': story.current_state == 'blocked'
|
|
})
|
|
|
|
# Save checkpoint
|
|
await self.save_checkpoint(report)
|
|
|
|
return report
|
|
|
|
async def complete_sprint(self):
|
|
"""
|
|
Complete the sprint and generate retrospective data.
|
|
"""
|
|
# Collect all transitions for analysis
|
|
history = await self.engine.get_history(UUID(self.workflow_id))
|
|
|
|
# Calculate metrics
|
|
metrics = {
|
|
'total_transitions': len(history),
|
|
'duration': (datetime.utcnow() - history[0].created_at).days,
|
|
'blocks_encountered': sum(1 for t in history if t.to_state == 'blocked'),
|
|
}
|
|
|
|
await trigger_workflow_transition.delay(
|
|
self.workflow_id,
|
|
trigger="complete",
|
|
triggered_by="system",
|
|
metadata={"metrics": metrics}
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## Dependencies
|
|
|
|
Add to `pyproject.toml`:
|
|
|
|
```toml
|
|
[project.dependencies]
|
|
transitions = "^0.9.0"
|
|
graphviz = "^0.20.1" # For diagram generation
|
|
|
|
[project.optional-dependencies]
|
|
diagrams = [
|
|
"pygraphviz>=1.11", # Alternative Graphviz binding
|
|
]
|
|
```
|
|
|
|
---
|
|
|
|
## Implementation Roadmap
|
|
|
|
### Phase 1: Foundation (Week 1)
|
|
1. Add `transitions` library dependency
|
|
2. Create workflow database models and migrations
|
|
3. Implement basic `WorkflowEngine` class
|
|
4. Write unit tests for state machines
|
|
|
|
### Phase 2: Core Workflows (Week 2)
|
|
1. Implement `StoryWorkflow` with all transitions
|
|
2. Implement `SprintWorkflow` with checkpoints
|
|
3. Implement `PRWorkflow` for code review
|
|
4. Integrate with Celery tasks
|
|
|
|
### Phase 3: Durability (Week 3)
|
|
1. Add retry and compensation patterns
|
|
2. Implement SLA monitoring
|
|
3. Add checkpoint/resume capability
|
|
4. Integrate with EventBus for real-time updates
|
|
|
|
### Phase 4: Visualization (Week 4)
|
|
1. Add diagram generation endpoints
|
|
2. Create frontend visualization component
|
|
3. Add workflow monitoring dashboard
|
|
4. Documentation and examples
|
|
|
|
---
|
|
|
|
## Testing Strategy
|
|
|
|
```python
|
|
# tests/workflows/test_story_workflow.py
|
|
import pytest
|
|
from app.workflows.story_workflow import StoryWorkflow
|
|
|
|
class TestStoryWorkflow:
|
|
def test_happy_path(self):
|
|
"""Test normal story progression."""
|
|
workflow = StoryWorkflow("story-1", "project-1")
|
|
|
|
assert workflow.state == "backlog"
|
|
|
|
workflow.start_analysis()
|
|
assert workflow.state == "analysis"
|
|
|
|
workflow.analysis_complete()
|
|
assert workflow.state == "design"
|
|
|
|
workflow.design_complete()
|
|
assert workflow.state == "implementation"
|
|
|
|
workflow.submit_for_review()
|
|
assert workflow.state == "review"
|
|
|
|
workflow.approve()
|
|
assert workflow.state == "testing"
|
|
|
|
workflow.tests_pass()
|
|
assert workflow.state == "done"
|
|
|
|
def test_review_rejection(self):
|
|
"""Test review rejection loop."""
|
|
workflow = StoryWorkflow("story-1", "project-1", initial_state="review")
|
|
|
|
workflow.request_changes()
|
|
assert workflow.state == "implementation"
|
|
|
|
workflow.submit_for_review()
|
|
assert workflow.state == "review"
|
|
|
|
def test_invalid_transition(self):
|
|
"""Test that invalid transitions are rejected."""
|
|
workflow = StoryWorkflow("story-1", "project-1")
|
|
|
|
# Can't go from backlog to review
|
|
with pytest.raises(Exception):
|
|
workflow.approve()
|
|
|
|
assert workflow.state == "backlog" # State unchanged
|
|
|
|
def test_blocking(self):
|
|
"""Test blocking from any active state."""
|
|
for initial_state in ['analysis', 'design', 'implementation']:
|
|
workflow = StoryWorkflow("story-1", "project-1", initial_state=initial_state)
|
|
workflow.block()
|
|
assert workflow.state == "blocked"
|
|
```
|
|
|
|
---
|
|
|
|
## Risks and Mitigations
|
|
|
|
| Risk | Impact | Likelihood | Mitigation |
|
|
|------|--------|------------|------------|
|
|
| State corruption on crash | High | Low | Event sourcing allows state reconstruction |
|
|
| Long-running task timeout | Medium | Medium | Celery soft limits + checkpointing |
|
|
| Race conditions on concurrent transitions | High | Medium | PostgreSQL row-level locking |
|
|
| Complex workflow debugging | Medium | High | Comprehensive logging + visualization |
|
|
|
|
---
|
|
|
|
## Decision
|
|
|
|
**Adopt `transitions` library + PostgreSQL persistence** for Syndarix workflow state machines.
|
|
|
|
**Rationale:**
|
|
1. **Simplicity** - No additional infrastructure (vs Temporal)
|
|
2. **Flexibility** - Full control over persistence and task execution
|
|
3. **Integration** - Natural fit with existing Celery + Redis stack (SPIKE-004)
|
|
4. **Durability** - Event sourcing provides audit trail and recovery
|
|
5. **Visualization** - Built-in Graphviz support + Mermaid for web
|
|
|
|
**Trade-offs Accepted:**
|
|
- More custom code vs using Temporal's built-in features
|
|
- Manual handling of distributed coordination
|
|
- Custom SLA monitoring implementation
|
|
|
|
---
|
|
|
|
## References
|
|
|
|
- [transitions Documentation](https://github.com/pytransitions/transitions)
|
|
- [Temporal Python SDK](https://github.com/temporalio/sdk-python)
|
|
- [Managing Long-Running Workflows with Temporal](https://temporal.io/blog/very-long-running-workflows)
|
|
- [Saga Pattern](https://microservices.io/patterns/data/saga.html)
|
|
- [Event Sourcing](https://martinfowler.com/eaaDev/EventSourcing.html)
|
|
- [Celery Dyrygent](https://github.com/ovh/celery-dyrygent) - Complex workflow orchestration
|
|
- [Selinon](https://github.com/selinon/selinon) - Advanced flow management on Celery
|
|
- [Toptal Celery Orchestration Guide](https://www.toptal.com/python/orchestrating-celery-python-background-jobs)
|
|
|
|
---
|
|
|
|
*Spike completed. Findings will inform ADR-008: Workflow State Machine Architecture.*
|