forked from cardosofelipe/fast-next-template
fix(backend): race condition fixes for task completion and sprint operations
## Changes ### agent_instance.py - Task Completion Counter Race Condition - Changed `record_task_completion()` from read-modify-write pattern to atomic SQL UPDATE - Previously: Read instance → increment in Python memory → write back - Now: Uses `UPDATE ... SET tasks_completed = tasks_completed + 1` - Prevents lost updates when multiple concurrent task completions occur ### sprint.py - Row-Level Locking for Sprint Operations - Added `with_for_update()` to `complete_sprint()` to prevent race conditions during velocity calculation - Added `with_for_update()` to `cancel_sprint()` for consistency - Ensures atomic check-and-update for sprint status changes ## Impact These fixes prevent: - Counter metrics being lost under concurrent load - Data corruption during sprint completion - Race conditions with concurrent sprint status changes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -249,23 +249,35 @@ class CRUDAgentInstance(CRUDBase[AgentInstance, AgentInstanceCreate, AgentInstan
|
|||||||
tokens_used: int,
|
tokens_used: int,
|
||||||
cost_incurred: Decimal,
|
cost_incurred: Decimal,
|
||||||
) -> AgentInstance | None:
|
) -> AgentInstance | None:
|
||||||
"""Record a completed task and update metrics."""
|
"""Record a completed task and update metrics.
|
||||||
|
|
||||||
|
Uses atomic SQL UPDATE to prevent lost updates under concurrent load.
|
||||||
|
This avoids the read-modify-write race condition that occurs when
|
||||||
|
multiple task completions happen simultaneously.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
|
now = datetime.now(UTC)
|
||||||
|
|
||||||
|
# Use atomic SQL UPDATE to increment counters without race conditions
|
||||||
|
# This is safe for concurrent updates - no read-modify-write pattern
|
||||||
result = await db.execute(
|
result = await db.execute(
|
||||||
select(AgentInstance).where(AgentInstance.id == instance_id)
|
update(AgentInstance)
|
||||||
|
.where(AgentInstance.id == instance_id)
|
||||||
|
.values(
|
||||||
|
tasks_completed=AgentInstance.tasks_completed + 1,
|
||||||
|
tokens_used=AgentInstance.tokens_used + tokens_used,
|
||||||
|
cost_incurred=AgentInstance.cost_incurred + cost_incurred,
|
||||||
|
last_activity_at=now,
|
||||||
|
updated_at=now,
|
||||||
|
)
|
||||||
|
.returning(AgentInstance)
|
||||||
)
|
)
|
||||||
instance = result.scalar_one_or_none()
|
instance = result.scalar_one_or_none()
|
||||||
|
|
||||||
if not instance:
|
if not instance:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
instance.tasks_completed += 1
|
|
||||||
instance.tokens_used += tokens_used
|
|
||||||
instance.cost_incurred += cost_incurred
|
|
||||||
instance.last_activity_at = datetime.now(UTC)
|
|
||||||
|
|
||||||
await db.commit()
|
await db.commit()
|
||||||
await db.refresh(instance)
|
|
||||||
return instance
|
return instance
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await db.rollback()
|
await db.rollback()
|
||||||
|
|||||||
@@ -249,9 +249,18 @@ class CRUDSprint(CRUDBase[Sprint, SprintCreate, SprintUpdate]):
|
|||||||
*,
|
*,
|
||||||
sprint_id: UUID,
|
sprint_id: UUID,
|
||||||
) -> Sprint | None:
|
) -> Sprint | None:
|
||||||
"""Complete an active sprint and calculate completed points."""
|
"""Complete an active sprint and calculate completed points.
|
||||||
|
|
||||||
|
Uses row-level locking (SELECT FOR UPDATE) to prevent race conditions
|
||||||
|
when velocity is being calculated and other operations might modify issues.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
result = await db.execute(select(Sprint).where(Sprint.id == sprint_id))
|
# Lock the sprint row to prevent concurrent modifications
|
||||||
|
result = await db.execute(
|
||||||
|
select(Sprint)
|
||||||
|
.where(Sprint.id == sprint_id)
|
||||||
|
.with_for_update()
|
||||||
|
)
|
||||||
sprint = result.scalar_one_or_none()
|
sprint = result.scalar_one_or_none()
|
||||||
|
|
||||||
if not sprint:
|
if not sprint:
|
||||||
@@ -265,6 +274,8 @@ class CRUDSprint(CRUDBase[Sprint, SprintCreate, SprintUpdate]):
|
|||||||
sprint.status = SprintStatus.COMPLETED
|
sprint.status = SprintStatus.COMPLETED
|
||||||
|
|
||||||
# Calculate velocity (completed points) from closed issues
|
# Calculate velocity (completed points) from closed issues
|
||||||
|
# Note: Issues are not locked, but sprint lock ensures this sprint's
|
||||||
|
# completion is atomic and prevents concurrent completion attempts
|
||||||
points_result = await db.execute(
|
points_result = await db.execute(
|
||||||
select(func.sum(Issue.story_points)).where(
|
select(func.sum(Issue.story_points)).where(
|
||||||
Issue.sprint_id == sprint_id,
|
Issue.sprint_id == sprint_id,
|
||||||
@@ -289,9 +300,18 @@ class CRUDSprint(CRUDBase[Sprint, SprintCreate, SprintUpdate]):
|
|||||||
*,
|
*,
|
||||||
sprint_id: UUID,
|
sprint_id: UUID,
|
||||||
) -> Sprint | None:
|
) -> Sprint | None:
|
||||||
"""Cancel a sprint (only PLANNED or ACTIVE sprints can be cancelled)."""
|
"""Cancel a sprint (only PLANNED or ACTIVE sprints can be cancelled).
|
||||||
|
|
||||||
|
Uses row-level locking to prevent race conditions with concurrent
|
||||||
|
sprint status modifications.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
result = await db.execute(select(Sprint).where(Sprint.id == sprint_id))
|
# Lock the sprint row to prevent concurrent modifications
|
||||||
|
result = await db.execute(
|
||||||
|
select(Sprint)
|
||||||
|
.where(Sprint.id == sprint_id)
|
||||||
|
.with_for_update()
|
||||||
|
)
|
||||||
sprint = result.scalar_one_or_none()
|
sprint = result.scalar_one_or_none()
|
||||||
|
|
||||||
if not sprint:
|
if not sprint:
|
||||||
|
|||||||
Reference in New Issue
Block a user