forked from cardosofelipe/fast-next-template
Reformatted multiline function calls, object definitions, and queries for improved code readability and consistency. Adjusted imports and constraints where necessary.
210 lines
5.8 KiB
Python
210 lines
5.8 KiB
Python
# app/tasks/workflow.py
|
|
"""
|
|
Workflow state management tasks for Syndarix.
|
|
|
|
These tasks manage workflow execution and state transitions:
|
|
- Sprint workflows (planning -> implementation -> review -> done)
|
|
- Story workflows (todo -> in_progress -> review -> done)
|
|
- Approval checkpoints for autonomy levels
|
|
- Stale workflow recovery
|
|
|
|
Per ADR-007 and ADR-010, workflow state is durable in PostgreSQL
|
|
with defined state transitions.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from app.celery_app import celery_app
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.tasks.workflow.recover_stale_workflows")
|
|
def recover_stale_workflows(self) -> dict[str, Any]:
|
|
"""
|
|
Recover workflows that have become stale.
|
|
|
|
This periodic task (runs every 5 minutes):
|
|
1. Find workflows stuck in intermediate states
|
|
2. Check for timed-out agent operations
|
|
3. Retry or escalate based on configuration
|
|
4. Notify relevant users if needed
|
|
|
|
Returns:
|
|
dict with status and recovered count
|
|
"""
|
|
logger.info("Checking for stale workflows to recover")
|
|
|
|
# TODO: Implement stale workflow recovery
|
|
# This will involve:
|
|
# 1. Querying for workflows with last_updated > threshold
|
|
# 2. Checking if associated agents are still running
|
|
# 3. Retrying or resetting stuck workflows
|
|
# 4. Sending notifications for manual intervention
|
|
|
|
return {
|
|
"status": "pending",
|
|
"recovered": 0,
|
|
}
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.tasks.workflow.execute_workflow_step")
|
|
def execute_workflow_step(
|
|
self,
|
|
workflow_id: str,
|
|
transition: str,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Execute a state transition for a workflow.
|
|
|
|
This task applies a transition to a workflow:
|
|
1. Validate transition is allowed from current state
|
|
2. Execute any pre-transition hooks
|
|
3. Update workflow state
|
|
4. Execute any post-transition hooks
|
|
5. Trigger follow-up tasks
|
|
|
|
Args:
|
|
workflow_id: UUID of the workflow
|
|
transition: Transition to execute (start, approve, reject, etc.)
|
|
|
|
Returns:
|
|
dict with status, workflow_id, and transition
|
|
"""
|
|
logger.info(f"Executing transition '{transition}' for workflow {workflow_id}")
|
|
|
|
# TODO: Implement workflow transition
|
|
# This will involve:
|
|
# 1. Loading workflow from database
|
|
# 2. Validating transition from current state
|
|
# 3. Running pre-transition hooks
|
|
# 4. Updating state in database
|
|
# 5. Running post-transition hooks
|
|
# 6. Scheduling follow-up tasks
|
|
|
|
return {
|
|
"status": "pending",
|
|
"workflow_id": workflow_id,
|
|
"transition": transition,
|
|
}
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.tasks.workflow.handle_approval_response")
|
|
def handle_approval_response(
|
|
self,
|
|
workflow_id: str,
|
|
approved: bool,
|
|
comment: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Handle a user approval response for a workflow checkpoint.
|
|
|
|
This task processes approval decisions:
|
|
1. Record approval decision with timestamp
|
|
2. Update workflow state accordingly
|
|
3. Resume or halt workflow execution
|
|
4. Notify relevant parties
|
|
|
|
Args:
|
|
workflow_id: UUID of the workflow
|
|
approved: Whether the checkpoint was approved
|
|
comment: Optional comment from approver
|
|
|
|
Returns:
|
|
dict with status, workflow_id, and approved flag
|
|
"""
|
|
logger.info(
|
|
f"Handling approval response for workflow {workflow_id}: approved={approved}"
|
|
)
|
|
|
|
# TODO: Implement approval handling
|
|
# This will involve:
|
|
# 1. Loading workflow and approval checkpoint
|
|
# 2. Recording decision with user and timestamp
|
|
# 3. Transitioning workflow state
|
|
# 4. Resuming or stopping execution
|
|
# 5. Sending notifications
|
|
|
|
return {
|
|
"status": "pending",
|
|
"workflow_id": workflow_id,
|
|
"approved": approved,
|
|
}
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.tasks.workflow.start_sprint_workflow")
|
|
def start_sprint_workflow(
|
|
self,
|
|
project_id: str,
|
|
sprint_id: str,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Start a new sprint workflow.
|
|
|
|
This task initializes sprint execution:
|
|
1. Create sprint workflow record
|
|
2. Set up sprint planning phase
|
|
3. Spawn Product Owner agent for planning
|
|
4. Begin story assignment
|
|
|
|
Args:
|
|
project_id: UUID of the project
|
|
sprint_id: UUID of the sprint
|
|
|
|
Returns:
|
|
dict with status and sprint_id
|
|
"""
|
|
logger.info(
|
|
f"Starting sprint workflow for sprint {sprint_id} in project {project_id}"
|
|
)
|
|
|
|
# TODO: Implement sprint workflow initialization
|
|
# This will involve:
|
|
# 1. Creating workflow record for sprint
|
|
# 2. Setting initial state to PLANNING
|
|
# 3. Spawning PO agent for sprint planning
|
|
# 4. Setting up monitoring and checkpoints
|
|
|
|
return {
|
|
"status": "pending",
|
|
"sprint_id": sprint_id,
|
|
}
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.tasks.workflow.start_story_workflow")
|
|
def start_story_workflow(
|
|
self,
|
|
project_id: str,
|
|
story_id: str,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Start a new story workflow.
|
|
|
|
This task initializes story execution:
|
|
1. Create story workflow record
|
|
2. Spawn appropriate developer agent
|
|
3. Set up implementation tracking
|
|
4. Configure approval checkpoints based on autonomy level
|
|
|
|
Args:
|
|
project_id: UUID of the project
|
|
story_id: UUID of the story/issue
|
|
|
|
Returns:
|
|
dict with status and story_id
|
|
"""
|
|
logger.info(f"Starting story workflow for story {story_id} in project {project_id}")
|
|
|
|
# TODO: Implement story workflow initialization
|
|
# This will involve:
|
|
# 1. Creating workflow record for story
|
|
# 2. Determining appropriate agent type
|
|
# 3. Spawning developer agent
|
|
# 4. Setting up checkpoints based on autonomy level
|
|
|
|
return {
|
|
"status": "pending",
|
|
"story_id": story_id,
|
|
}
|