forked from cardosofelipe/pragma-stack
Reformatted multiline function calls, object definitions, and queries for improved code readability and consistency. Adjusted imports and constraints where necessary.
526 lines
17 KiB
Python
526 lines
17 KiB
Python
# app/crud/syndarix/issue.py
|
|
"""Async CRUD operations for Issue model using SQLAlchemy 2.0 patterns."""
|
|
|
|
import logging
|
|
from datetime import UTC, datetime
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import func, or_, select
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from app.crud.base import CRUDBase
|
|
from app.models.syndarix import AgentInstance, Issue
|
|
from app.models.syndarix.enums import IssuePriority, IssueStatus, SyncStatus
|
|
from app.schemas.syndarix import IssueCreate, IssueUpdate
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CRUDIssue(CRUDBase[Issue, IssueCreate, IssueUpdate]):
|
|
"""Async CRUD operations for Issue model."""
|
|
|
|
async def create(self, db: AsyncSession, *, obj_in: IssueCreate) -> Issue:
|
|
"""Create a new issue with error handling."""
|
|
try:
|
|
db_obj = Issue(
|
|
project_id=obj_in.project_id,
|
|
title=obj_in.title,
|
|
body=obj_in.body,
|
|
status=obj_in.status,
|
|
priority=obj_in.priority,
|
|
labels=obj_in.labels,
|
|
assigned_agent_id=obj_in.assigned_agent_id,
|
|
human_assignee=obj_in.human_assignee,
|
|
sprint_id=obj_in.sprint_id,
|
|
story_points=obj_in.story_points,
|
|
external_tracker_type=obj_in.external_tracker_type,
|
|
external_issue_id=obj_in.external_issue_id,
|
|
remote_url=obj_in.remote_url,
|
|
external_issue_number=obj_in.external_issue_number,
|
|
sync_status=SyncStatus.SYNCED,
|
|
)
|
|
db.add(db_obj)
|
|
await db.commit()
|
|
await db.refresh(db_obj)
|
|
return db_obj
|
|
except IntegrityError as e:
|
|
await db.rollback()
|
|
error_msg = str(e.orig) if hasattr(e, "orig") else str(e)
|
|
logger.error(f"Integrity error creating issue: {error_msg}")
|
|
raise ValueError(f"Database integrity error: {error_msg}")
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(f"Unexpected error creating issue: {e!s}", exc_info=True)
|
|
raise
|
|
|
|
async def get_with_details(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
issue_id: UUID,
|
|
) -> dict[str, Any] | None:
|
|
"""
|
|
Get an issue with full details including related entity names.
|
|
|
|
Returns:
|
|
Dictionary with issue and related entity details
|
|
"""
|
|
try:
|
|
# Get issue with joined relationships
|
|
result = await db.execute(
|
|
select(Issue)
|
|
.options(
|
|
joinedload(Issue.project),
|
|
joinedload(Issue.sprint),
|
|
joinedload(Issue.assigned_agent).joinedload(
|
|
AgentInstance.agent_type
|
|
),
|
|
)
|
|
.where(Issue.id == issue_id)
|
|
)
|
|
issue = result.scalar_one_or_none()
|
|
|
|
if not issue:
|
|
return None
|
|
|
|
return {
|
|
"issue": issue,
|
|
"project_name": issue.project.name if issue.project else None,
|
|
"project_slug": issue.project.slug if issue.project else None,
|
|
"sprint_name": issue.sprint.name if issue.sprint else None,
|
|
"assigned_agent_type_name": (
|
|
issue.assigned_agent.agent_type.name
|
|
if issue.assigned_agent and issue.assigned_agent.agent_type
|
|
else None
|
|
),
|
|
}
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting issue with details {issue_id}: {e!s}", exc_info=True
|
|
)
|
|
raise
|
|
|
|
async def get_by_project(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
project_id: UUID,
|
|
status: IssueStatus | None = None,
|
|
priority: IssuePriority | None = None,
|
|
sprint_id: UUID | None = None,
|
|
assigned_agent_id: UUID | None = None,
|
|
labels: list[str] | None = None,
|
|
search: str | None = None,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
sort_by: str = "created_at",
|
|
sort_order: str = "desc",
|
|
) -> tuple[list[Issue], int]:
|
|
"""Get issues for a specific project with filters."""
|
|
try:
|
|
query = select(Issue).where(Issue.project_id == project_id)
|
|
|
|
# Apply filters
|
|
if status is not None:
|
|
query = query.where(Issue.status == status)
|
|
|
|
if priority is not None:
|
|
query = query.where(Issue.priority == priority)
|
|
|
|
if sprint_id is not None:
|
|
query = query.where(Issue.sprint_id == sprint_id)
|
|
|
|
if assigned_agent_id is not None:
|
|
query = query.where(Issue.assigned_agent_id == assigned_agent_id)
|
|
|
|
if labels:
|
|
# Match any of the provided labels
|
|
for label in labels:
|
|
query = query.where(Issue.labels.contains([label.lower()]))
|
|
|
|
if search:
|
|
search_filter = or_(
|
|
Issue.title.ilike(f"%{search}%"),
|
|
Issue.body.ilike(f"%{search}%"),
|
|
)
|
|
query = query.where(search_filter)
|
|
|
|
# Get total count
|
|
count_query = select(func.count()).select_from(query.alias())
|
|
count_result = await db.execute(count_query)
|
|
total = count_result.scalar_one()
|
|
|
|
# Apply sorting
|
|
sort_column = getattr(Issue, sort_by, Issue.created_at)
|
|
if sort_order == "desc":
|
|
query = query.order_by(sort_column.desc())
|
|
else:
|
|
query = query.order_by(sort_column.asc())
|
|
|
|
# Apply pagination
|
|
query = query.offset(skip).limit(limit)
|
|
result = await db.execute(query)
|
|
issues = list(result.scalars().all())
|
|
|
|
return issues, total
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting issues by project {project_id}: {e!s}", exc_info=True
|
|
)
|
|
raise
|
|
|
|
async def get_by_sprint(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
sprint_id: UUID,
|
|
status: IssueStatus | None = None,
|
|
) -> list[Issue]:
|
|
"""Get all issues in a sprint."""
|
|
try:
|
|
query = select(Issue).where(Issue.sprint_id == sprint_id)
|
|
|
|
if status is not None:
|
|
query = query.where(Issue.status == status)
|
|
|
|
query = query.order_by(Issue.priority.desc(), Issue.created_at.asc())
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting issues by sprint {sprint_id}: {e!s}", exc_info=True
|
|
)
|
|
raise
|
|
|
|
async def assign_to_agent(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
issue_id: UUID,
|
|
agent_id: UUID | None,
|
|
) -> Issue | None:
|
|
"""Assign an issue to an agent (or unassign if agent_id is None)."""
|
|
try:
|
|
result = await db.execute(select(Issue).where(Issue.id == issue_id))
|
|
issue = result.scalar_one_or_none()
|
|
|
|
if not issue:
|
|
return None
|
|
|
|
issue.assigned_agent_id = agent_id
|
|
issue.human_assignee = None # Clear human assignee when assigning to agent
|
|
await db.commit()
|
|
await db.refresh(issue)
|
|
return issue
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(
|
|
f"Error assigning issue {issue_id} to agent {agent_id}: {e!s}",
|
|
exc_info=True,
|
|
)
|
|
raise
|
|
|
|
async def assign_to_human(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
issue_id: UUID,
|
|
human_assignee: str | None,
|
|
) -> Issue | None:
|
|
"""Assign an issue to a human (or unassign if human_assignee is None)."""
|
|
try:
|
|
result = await db.execute(select(Issue).where(Issue.id == issue_id))
|
|
issue = result.scalar_one_or_none()
|
|
|
|
if not issue:
|
|
return None
|
|
|
|
issue.human_assignee = human_assignee
|
|
issue.assigned_agent_id = None # Clear agent when assigning to human
|
|
await db.commit()
|
|
await db.refresh(issue)
|
|
return issue
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(
|
|
f"Error assigning issue {issue_id} to human {human_assignee}: {e!s}",
|
|
exc_info=True,
|
|
)
|
|
raise
|
|
|
|
async def close_issue(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
issue_id: UUID,
|
|
) -> Issue | None:
|
|
"""Close an issue by setting status and closed_at timestamp."""
|
|
try:
|
|
result = await db.execute(select(Issue).where(Issue.id == issue_id))
|
|
issue = result.scalar_one_or_none()
|
|
|
|
if not issue:
|
|
return None
|
|
|
|
issue.status = IssueStatus.CLOSED
|
|
issue.closed_at = datetime.now(UTC)
|
|
await db.commit()
|
|
await db.refresh(issue)
|
|
return issue
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(f"Error closing issue {issue_id}: {e!s}", exc_info=True)
|
|
raise
|
|
|
|
async def reopen_issue(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
issue_id: UUID,
|
|
) -> Issue | None:
|
|
"""Reopen a closed issue."""
|
|
try:
|
|
result = await db.execute(select(Issue).where(Issue.id == issue_id))
|
|
issue = result.scalar_one_or_none()
|
|
|
|
if not issue:
|
|
return None
|
|
|
|
issue.status = IssueStatus.OPEN
|
|
issue.closed_at = None
|
|
await db.commit()
|
|
await db.refresh(issue)
|
|
return issue
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(f"Error reopening issue {issue_id}: {e!s}", exc_info=True)
|
|
raise
|
|
|
|
async def update_sync_status(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
issue_id: UUID,
|
|
sync_status: SyncStatus,
|
|
last_synced_at: datetime | None = None,
|
|
external_updated_at: datetime | None = None,
|
|
) -> Issue | None:
|
|
"""Update the sync status of an issue."""
|
|
try:
|
|
result = await db.execute(select(Issue).where(Issue.id == issue_id))
|
|
issue = result.scalar_one_or_none()
|
|
|
|
if not issue:
|
|
return None
|
|
|
|
issue.sync_status = sync_status
|
|
if last_synced_at:
|
|
issue.last_synced_at = last_synced_at
|
|
if external_updated_at:
|
|
issue.external_updated_at = external_updated_at
|
|
|
|
await db.commit()
|
|
await db.refresh(issue)
|
|
return issue
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(
|
|
f"Error updating sync status for issue {issue_id}: {e!s}", exc_info=True
|
|
)
|
|
raise
|
|
|
|
async def get_project_stats(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
project_id: UUID,
|
|
) -> dict[str, Any]:
|
|
"""Get issue statistics for a project."""
|
|
try:
|
|
# Get counts by status
|
|
status_counts = await db.execute(
|
|
select(Issue.status, func.count(Issue.id).label("count"))
|
|
.where(Issue.project_id == project_id)
|
|
.group_by(Issue.status)
|
|
)
|
|
by_status = {row.status.value: row.count for row in status_counts}
|
|
|
|
# Get counts by priority
|
|
priority_counts = await db.execute(
|
|
select(Issue.priority, func.count(Issue.id).label("count"))
|
|
.where(Issue.project_id == project_id)
|
|
.group_by(Issue.priority)
|
|
)
|
|
by_priority = {row.priority.value: row.count for row in priority_counts}
|
|
|
|
# Get story points
|
|
points_result = await db.execute(
|
|
select(
|
|
func.sum(Issue.story_points).label("total"),
|
|
func.sum(Issue.story_points)
|
|
.filter(Issue.status == IssueStatus.CLOSED)
|
|
.label("completed"),
|
|
).where(Issue.project_id == project_id)
|
|
)
|
|
points_row = points_result.one()
|
|
|
|
total_issues = sum(by_status.values())
|
|
|
|
return {
|
|
"total": total_issues,
|
|
"open": by_status.get("open", 0),
|
|
"in_progress": by_status.get("in_progress", 0),
|
|
"in_review": by_status.get("in_review", 0),
|
|
"blocked": by_status.get("blocked", 0),
|
|
"closed": by_status.get("closed", 0),
|
|
"by_priority": by_priority,
|
|
"total_story_points": points_row.total,
|
|
"completed_story_points": points_row.completed,
|
|
}
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting issue stats for project {project_id}: {e!s}",
|
|
exc_info=True,
|
|
)
|
|
raise
|
|
|
|
async def get_by_external_id(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
external_tracker_type: str,
|
|
external_issue_id: str,
|
|
) -> Issue | None:
|
|
"""Get an issue by its external tracker ID."""
|
|
try:
|
|
result = await db.execute(
|
|
select(Issue).where(
|
|
Issue.external_tracker_type == external_tracker_type,
|
|
Issue.external_issue_id == external_issue_id,
|
|
)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting issue by external ID {external_tracker_type}:{external_issue_id}: {e!s}",
|
|
exc_info=True,
|
|
)
|
|
raise
|
|
|
|
async def get_pending_sync(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
project_id: UUID | None = None,
|
|
limit: int = 100,
|
|
) -> list[Issue]:
|
|
"""Get issues that need to be synced with external tracker."""
|
|
try:
|
|
query = select(Issue).where(
|
|
Issue.external_tracker_type.isnot(None),
|
|
Issue.sync_status.in_([SyncStatus.PENDING, SyncStatus.ERROR]),
|
|
)
|
|
|
|
if project_id:
|
|
query = query.where(Issue.project_id == project_id)
|
|
|
|
query = query.order_by(Issue.updated_at.asc()).limit(limit)
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
except Exception as e:
|
|
logger.error(f"Error getting pending sync issues: {e!s}", exc_info=True)
|
|
raise
|
|
|
|
async def remove_sprint_from_issues(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
sprint_id: UUID,
|
|
) -> int:
|
|
"""Remove sprint assignment from all issues in a sprint.
|
|
|
|
Used when deleting a sprint to clean up references.
|
|
|
|
Returns:
|
|
Number of issues updated
|
|
"""
|
|
try:
|
|
from sqlalchemy import update
|
|
|
|
result = await db.execute(
|
|
update(Issue).where(Issue.sprint_id == sprint_id).values(sprint_id=None)
|
|
)
|
|
await db.commit()
|
|
return result.rowcount
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(
|
|
f"Error removing sprint {sprint_id} from issues: {e!s}",
|
|
exc_info=True,
|
|
)
|
|
raise
|
|
|
|
async def unassign(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
issue_id: UUID,
|
|
) -> Issue | None:
|
|
"""Remove agent assignment from an issue.
|
|
|
|
Returns:
|
|
Updated issue or None if not found
|
|
"""
|
|
try:
|
|
result = await db.execute(select(Issue).where(Issue.id == issue_id))
|
|
issue = result.scalar_one_or_none()
|
|
|
|
if not issue:
|
|
return None
|
|
|
|
issue.assigned_agent_id = None
|
|
await db.commit()
|
|
await db.refresh(issue)
|
|
return issue
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(f"Error unassigning issue {issue_id}: {e!s}", exc_info=True)
|
|
raise
|
|
|
|
async def remove_from_sprint(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
issue_id: UUID,
|
|
) -> Issue | None:
|
|
"""Remove an issue from its current sprint.
|
|
|
|
Returns:
|
|
Updated issue or None if not found
|
|
"""
|
|
try:
|
|
result = await db.execute(select(Issue).where(Issue.id == issue_id))
|
|
issue = result.scalar_one_or_none()
|
|
|
|
if not issue:
|
|
return None
|
|
|
|
issue.sprint_id = None
|
|
await db.commit()
|
|
await db.refresh(issue)
|
|
return issue
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(
|
|
f"Error removing issue {issue_id} from sprint: {e!s}",
|
|
exc_info=True,
|
|
)
|
|
raise
|
|
|
|
|
|
# Create a singleton instance for use across the application
|
|
issue = CRUDIssue(Issue)
|