# app/crud/syndarix/issue.py """Async CRUD operations for Issue model using SQLAlchemy 2.0 patterns.""" import logging from datetime import UTC, datetime from typing import Any from uuid import UUID from sqlalchemy import func, or_, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload from app.crud.base import CRUDBase from app.models.syndarix import AgentInstance, Issue from app.models.syndarix.enums import IssuePriority, IssueStatus, SyncStatus from app.schemas.syndarix import IssueCreate, IssueUpdate logger = logging.getLogger(__name__) class CRUDIssue(CRUDBase[Issue, IssueCreate, IssueUpdate]): """Async CRUD operations for Issue model.""" async def create(self, db: AsyncSession, *, obj_in: IssueCreate) -> Issue: """Create a new issue with error handling.""" try: db_obj = Issue( project_id=obj_in.project_id, title=obj_in.title, body=obj_in.body, status=obj_in.status, priority=obj_in.priority, labels=obj_in.labels, assigned_agent_id=obj_in.assigned_agent_id, human_assignee=obj_in.human_assignee, sprint_id=obj_in.sprint_id, story_points=obj_in.story_points, external_tracker_type=obj_in.external_tracker_type, external_issue_id=obj_in.external_issue_id, remote_url=obj_in.remote_url, external_issue_number=obj_in.external_issue_number, sync_status=SyncStatus.SYNCED, ) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj except IntegrityError as e: await db.rollback() error_msg = str(e.orig) if hasattr(e, "orig") else str(e) logger.error(f"Integrity error creating issue: {error_msg}") raise ValueError(f"Database integrity error: {error_msg}") except Exception as e: await db.rollback() logger.error(f"Unexpected error creating issue: {e!s}", exc_info=True) raise async def get_with_details( self, db: AsyncSession, *, issue_id: UUID, ) -> dict[str, Any] | None: """ Get an issue with full details including related entity names. Returns: Dictionary with issue and related entity details """ try: # Get issue with joined relationships result = await db.execute( select(Issue) .options( joinedload(Issue.project), joinedload(Issue.sprint), joinedload(Issue.assigned_agent).joinedload(AgentInstance.agent_type), ) .where(Issue.id == issue_id) ) issue = result.scalar_one_or_none() if not issue: return None return { "issue": issue, "project_name": issue.project.name if issue.project else None, "project_slug": issue.project.slug if issue.project else None, "sprint_name": issue.sprint.name if issue.sprint else None, "assigned_agent_type_name": ( issue.assigned_agent.agent_type.name if issue.assigned_agent and issue.assigned_agent.agent_type else None ), } except Exception as e: logger.error( f"Error getting issue with details {issue_id}: {e!s}", exc_info=True ) raise async def get_by_project( self, db: AsyncSession, *, project_id: UUID, status: IssueStatus | None = None, priority: IssuePriority | None = None, sprint_id: UUID | None = None, assigned_agent_id: UUID | None = None, labels: list[str] | None = None, search: str | None = None, skip: int = 0, limit: int = 100, sort_by: str = "created_at", sort_order: str = "desc", ) -> tuple[list[Issue], int]: """Get issues for a specific project with filters.""" try: query = select(Issue).where(Issue.project_id == project_id) # Apply filters if status is not None: query = query.where(Issue.status == status) if priority is not None: query = query.where(Issue.priority == priority) if sprint_id is not None: query = query.where(Issue.sprint_id == sprint_id) if assigned_agent_id is not None: query = query.where(Issue.assigned_agent_id == assigned_agent_id) if labels: # Match any of the provided labels for label in labels: query = query.where(Issue.labels.contains([label.lower()])) if search: search_filter = or_( Issue.title.ilike(f"%{search}%"), Issue.body.ilike(f"%{search}%"), ) query = query.where(search_filter) # Get total count count_query = select(func.count()).select_from(query.alias()) count_result = await db.execute(count_query) total = count_result.scalar_one() # Apply sorting sort_column = getattr(Issue, sort_by, Issue.created_at) if sort_order == "desc": query = query.order_by(sort_column.desc()) else: query = query.order_by(sort_column.asc()) # Apply pagination query = query.offset(skip).limit(limit) result = await db.execute(query) issues = list(result.scalars().all()) return issues, total except Exception as e: logger.error( f"Error getting issues by project {project_id}: {e!s}", exc_info=True ) raise async def get_by_sprint( self, db: AsyncSession, *, sprint_id: UUID, status: IssueStatus | None = None, ) -> list[Issue]: """Get all issues in a sprint.""" try: query = select(Issue).where(Issue.sprint_id == sprint_id) if status is not None: query = query.where(Issue.status == status) query = query.order_by(Issue.priority.desc(), Issue.created_at.asc()) result = await db.execute(query) return list(result.scalars().all()) except Exception as e: logger.error( f"Error getting issues by sprint {sprint_id}: {e!s}", exc_info=True ) raise async def assign_to_agent( self, db: AsyncSession, *, issue_id: UUID, agent_id: UUID | None, ) -> Issue | None: """Assign an issue to an agent (or unassign if agent_id is None).""" try: result = await db.execute(select(Issue).where(Issue.id == issue_id)) issue = result.scalar_one_or_none() if not issue: return None issue.assigned_agent_id = agent_id issue.human_assignee = None # Clear human assignee when assigning to agent await db.commit() await db.refresh(issue) return issue except Exception as e: await db.rollback() logger.error( f"Error assigning issue {issue_id} to agent {agent_id}: {e!s}", exc_info=True, ) raise async def assign_to_human( self, db: AsyncSession, *, issue_id: UUID, human_assignee: str | None, ) -> Issue | None: """Assign an issue to a human (or unassign if human_assignee is None).""" try: result = await db.execute(select(Issue).where(Issue.id == issue_id)) issue = result.scalar_one_or_none() if not issue: return None issue.human_assignee = human_assignee issue.assigned_agent_id = None # Clear agent when assigning to human await db.commit() await db.refresh(issue) return issue except Exception as e: await db.rollback() logger.error( f"Error assigning issue {issue_id} to human {human_assignee}: {e!s}", exc_info=True, ) raise async def close_issue( self, db: AsyncSession, *, issue_id: UUID, ) -> Issue | None: """Close an issue by setting status and closed_at timestamp.""" try: result = await db.execute(select(Issue).where(Issue.id == issue_id)) issue = result.scalar_one_or_none() if not issue: return None issue.status = IssueStatus.CLOSED issue.closed_at = datetime.now(UTC) await db.commit() await db.refresh(issue) return issue except Exception as e: await db.rollback() logger.error(f"Error closing issue {issue_id}: {e!s}", exc_info=True) raise async def reopen_issue( self, db: AsyncSession, *, issue_id: UUID, ) -> Issue | None: """Reopen a closed issue.""" try: result = await db.execute(select(Issue).where(Issue.id == issue_id)) issue = result.scalar_one_or_none() if not issue: return None issue.status = IssueStatus.OPEN issue.closed_at = None await db.commit() await db.refresh(issue) return issue except Exception as e: await db.rollback() logger.error(f"Error reopening issue {issue_id}: {e!s}", exc_info=True) raise async def update_sync_status( self, db: AsyncSession, *, issue_id: UUID, sync_status: SyncStatus, last_synced_at: datetime | None = None, external_updated_at: datetime | None = None, ) -> Issue | None: """Update the sync status of an issue.""" try: result = await db.execute(select(Issue).where(Issue.id == issue_id)) issue = result.scalar_one_or_none() if not issue: return None issue.sync_status = sync_status if last_synced_at: issue.last_synced_at = last_synced_at if external_updated_at: issue.external_updated_at = external_updated_at await db.commit() await db.refresh(issue) return issue except Exception as e: await db.rollback() logger.error( f"Error updating sync status for issue {issue_id}: {e!s}", exc_info=True ) raise async def get_project_stats( self, db: AsyncSession, *, project_id: UUID, ) -> dict[str, Any]: """Get issue statistics for a project.""" try: # Get counts by status status_counts = await db.execute( select(Issue.status, func.count(Issue.id).label("count")) .where(Issue.project_id == project_id) .group_by(Issue.status) ) by_status = {row.status.value: row.count for row in status_counts} # Get counts by priority priority_counts = await db.execute( select(Issue.priority, func.count(Issue.id).label("count")) .where(Issue.project_id == project_id) .group_by(Issue.priority) ) by_priority = {row.priority.value: row.count for row in priority_counts} # Get story points points_result = await db.execute( select( func.sum(Issue.story_points).label("total"), func.sum(Issue.story_points) .filter(Issue.status == IssueStatus.CLOSED) .label("completed"), ).where(Issue.project_id == project_id) ) points_row = points_result.one() total_issues = sum(by_status.values()) return { "total": total_issues, "open": by_status.get("open", 0), "in_progress": by_status.get("in_progress", 0), "in_review": by_status.get("in_review", 0), "blocked": by_status.get("blocked", 0), "closed": by_status.get("closed", 0), "by_priority": by_priority, "total_story_points": points_row.total, "completed_story_points": points_row.completed, } except Exception as e: logger.error( f"Error getting issue stats for project {project_id}: {e!s}", exc_info=True, ) raise async def get_by_external_id( self, db: AsyncSession, *, external_tracker_type: str, external_issue_id: str, ) -> Issue | None: """Get an issue by its external tracker ID.""" try: result = await db.execute( select(Issue).where( Issue.external_tracker_type == external_tracker_type, Issue.external_issue_id == external_issue_id, ) ) return result.scalar_one_or_none() except Exception as e: logger.error( f"Error getting issue by external ID {external_tracker_type}:{external_issue_id}: {e!s}", exc_info=True, ) raise async def get_pending_sync( self, db: AsyncSession, *, project_id: UUID | None = None, limit: int = 100, ) -> list[Issue]: """Get issues that need to be synced with external tracker.""" try: query = select(Issue).where( Issue.external_tracker_type.isnot(None), Issue.sync_status.in_([SyncStatus.PENDING, SyncStatus.ERROR]), ) if project_id: query = query.where(Issue.project_id == project_id) query = query.order_by(Issue.updated_at.asc()).limit(limit) result = await db.execute(query) return list(result.scalars().all()) except Exception as e: logger.error(f"Error getting pending sync issues: {e!s}", exc_info=True) raise async def remove_sprint_from_issues( self, db: AsyncSession, *, sprint_id: UUID, ) -> int: """Remove sprint assignment from all issues in a sprint. Used when deleting a sprint to clean up references. Returns: Number of issues updated """ try: from sqlalchemy import update result = await db.execute( update(Issue) .where(Issue.sprint_id == sprint_id) .values(sprint_id=None) ) await db.commit() return result.rowcount except Exception as e: await db.rollback() logger.error( f"Error removing sprint {sprint_id} from issues: {e!s}", exc_info=True, ) raise async def unassign( self, db: AsyncSession, *, issue_id: UUID, ) -> Issue | None: """Remove agent assignment from an issue. Returns: Updated issue or None if not found """ try: result = await db.execute(select(Issue).where(Issue.id == issue_id)) issue = result.scalar_one_or_none() if not issue: return None issue.assigned_agent_id = None await db.commit() await db.refresh(issue) return issue except Exception as e: await db.rollback() logger.error(f"Error unassigning issue {issue_id}: {e!s}", exc_info=True) raise async def remove_from_sprint( self, db: AsyncSession, *, issue_id: UUID, ) -> Issue | None: """Remove an issue from its current sprint. Returns: Updated issue or None if not found """ try: result = await db.execute(select(Issue).where(Issue.id == issue_id)) issue = result.scalar_one_or_none() if not issue: return None issue.sprint_id = None await db.commit() await db.refresh(issue) return issue except Exception as e: await db.rollback() logger.error( f"Error removing issue {issue_id} from sprint: {e!s}", exc_info=True, ) raise # Create a singleton instance for use across the application issue = CRUDIssue(Issue)