forked from cardosofelipe/pragma-stack
Reformatted multiline function calls, object definitions, and queries for improved code readability and consistency. Adjusted imports and constraints where necessary.
363 lines
12 KiB
Python
363 lines
12 KiB
Python
# app/crud/syndarix/project.py
|
|
"""Async CRUD operations for Project model using SQLAlchemy 2.0 patterns."""
|
|
|
|
import logging
|
|
from datetime import UTC, datetime
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import func, or_, select, update
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.crud.base import CRUDBase
|
|
from app.models.syndarix import AgentInstance, Issue, Project, Sprint
|
|
from app.models.syndarix.enums import AgentStatus, ProjectStatus, SprintStatus
|
|
from app.schemas.syndarix import ProjectCreate, ProjectUpdate
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CRUDProject(CRUDBase[Project, ProjectCreate, ProjectUpdate]):
|
|
"""Async CRUD operations for Project model."""
|
|
|
|
async def get_by_slug(self, db: AsyncSession, *, slug: str) -> Project | None:
|
|
"""Get project by slug."""
|
|
try:
|
|
result = await db.execute(select(Project).where(Project.slug == slug))
|
|
return result.scalar_one_or_none()
|
|
except Exception as e:
|
|
logger.error(f"Error getting project by slug {slug}: {e!s}")
|
|
raise
|
|
|
|
async def create(self, db: AsyncSession, *, obj_in: ProjectCreate) -> Project:
|
|
"""Create a new project with error handling."""
|
|
try:
|
|
db_obj = Project(
|
|
name=obj_in.name,
|
|
slug=obj_in.slug,
|
|
description=obj_in.description,
|
|
autonomy_level=obj_in.autonomy_level,
|
|
status=obj_in.status,
|
|
settings=obj_in.settings or {},
|
|
owner_id=obj_in.owner_id,
|
|
)
|
|
db.add(db_obj)
|
|
await db.commit()
|
|
await db.refresh(db_obj)
|
|
return db_obj
|
|
except IntegrityError as e:
|
|
await db.rollback()
|
|
error_msg = str(e.orig) if hasattr(e, "orig") else str(e)
|
|
if "slug" in error_msg.lower():
|
|
logger.warning(f"Duplicate slug attempted: {obj_in.slug}")
|
|
raise ValueError(f"Project with slug '{obj_in.slug}' already exists")
|
|
logger.error(f"Integrity error creating project: {error_msg}")
|
|
raise ValueError(f"Database integrity error: {error_msg}")
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(f"Unexpected error creating project: {e!s}", exc_info=True)
|
|
raise
|
|
|
|
async def get_multi_with_filters(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
status: ProjectStatus | None = None,
|
|
owner_id: UUID | None = None,
|
|
search: str | None = None,
|
|
sort_by: str = "created_at",
|
|
sort_order: str = "desc",
|
|
) -> tuple[list[Project], int]:
|
|
"""
|
|
Get multiple projects with filtering, searching, and sorting.
|
|
|
|
Returns:
|
|
Tuple of (projects list, total count)
|
|
"""
|
|
try:
|
|
query = select(Project)
|
|
|
|
# Apply filters
|
|
if status is not None:
|
|
query = query.where(Project.status == status)
|
|
|
|
if owner_id is not None:
|
|
query = query.where(Project.owner_id == owner_id)
|
|
|
|
if search:
|
|
search_filter = or_(
|
|
Project.name.ilike(f"%{search}%"),
|
|
Project.slug.ilike(f"%{search}%"),
|
|
Project.description.ilike(f"%{search}%"),
|
|
)
|
|
query = query.where(search_filter)
|
|
|
|
# Get total count before pagination
|
|
count_query = select(func.count()).select_from(query.alias())
|
|
count_result = await db.execute(count_query)
|
|
total = count_result.scalar_one()
|
|
|
|
# Apply sorting
|
|
sort_column = getattr(Project, sort_by, Project.created_at)
|
|
if sort_order == "desc":
|
|
query = query.order_by(sort_column.desc())
|
|
else:
|
|
query = query.order_by(sort_column.asc())
|
|
|
|
# Apply pagination
|
|
query = query.offset(skip).limit(limit)
|
|
result = await db.execute(query)
|
|
projects = list(result.scalars().all())
|
|
|
|
return projects, total
|
|
except Exception as e:
|
|
logger.error(f"Error getting projects with filters: {e!s}")
|
|
raise
|
|
|
|
async def get_with_counts(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
project_id: UUID,
|
|
) -> dict[str, Any] | None:
|
|
"""
|
|
Get a single project with agent and issue counts.
|
|
|
|
Returns:
|
|
Dictionary with project, agent_count, issue_count, active_sprint_name
|
|
"""
|
|
try:
|
|
# Get project
|
|
result = await db.execute(select(Project).where(Project.id == project_id))
|
|
project = result.scalar_one_or_none()
|
|
|
|
if not project:
|
|
return None
|
|
|
|
# Get agent count
|
|
agent_count_result = await db.execute(
|
|
select(func.count(AgentInstance.id)).where(
|
|
AgentInstance.project_id == project_id
|
|
)
|
|
)
|
|
agent_count = agent_count_result.scalar_one()
|
|
|
|
# Get issue count
|
|
issue_count_result = await db.execute(
|
|
select(func.count(Issue.id)).where(Issue.project_id == project_id)
|
|
)
|
|
issue_count = issue_count_result.scalar_one()
|
|
|
|
# Get active sprint name
|
|
active_sprint_result = await db.execute(
|
|
select(Sprint.name).where(
|
|
Sprint.project_id == project_id,
|
|
Sprint.status == SprintStatus.ACTIVE,
|
|
)
|
|
)
|
|
active_sprint_name = active_sprint_result.scalar_one_or_none()
|
|
|
|
return {
|
|
"project": project,
|
|
"agent_count": agent_count,
|
|
"issue_count": issue_count,
|
|
"active_sprint_name": active_sprint_name,
|
|
}
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting project with counts {project_id}: {e!s}", exc_info=True
|
|
)
|
|
raise
|
|
|
|
async def get_multi_with_counts(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
status: ProjectStatus | None = None,
|
|
owner_id: UUID | None = None,
|
|
search: str | None = None,
|
|
) -> tuple[list[dict[str, Any]], int]:
|
|
"""
|
|
Get projects with agent/issue counts in optimized queries.
|
|
|
|
Returns:
|
|
Tuple of (list of dicts with project and counts, total count)
|
|
"""
|
|
try:
|
|
# Get filtered projects
|
|
projects, total = await self.get_multi_with_filters(
|
|
db,
|
|
skip=skip,
|
|
limit=limit,
|
|
status=status,
|
|
owner_id=owner_id,
|
|
search=search,
|
|
)
|
|
|
|
if not projects:
|
|
return [], 0
|
|
|
|
project_ids = [p.id for p in projects]
|
|
|
|
# Get agent counts in bulk
|
|
agent_counts_result = await db.execute(
|
|
select(
|
|
AgentInstance.project_id,
|
|
func.count(AgentInstance.id).label("count"),
|
|
)
|
|
.where(AgentInstance.project_id.in_(project_ids))
|
|
.group_by(AgentInstance.project_id)
|
|
)
|
|
agent_counts = {row.project_id: row.count for row in agent_counts_result}
|
|
|
|
# Get issue counts in bulk
|
|
issue_counts_result = await db.execute(
|
|
select(
|
|
Issue.project_id,
|
|
func.count(Issue.id).label("count"),
|
|
)
|
|
.where(Issue.project_id.in_(project_ids))
|
|
.group_by(Issue.project_id)
|
|
)
|
|
issue_counts = {row.project_id: row.count for row in issue_counts_result}
|
|
|
|
# Get active sprint names
|
|
active_sprints_result = await db.execute(
|
|
select(Sprint.project_id, Sprint.name).where(
|
|
Sprint.project_id.in_(project_ids),
|
|
Sprint.status == SprintStatus.ACTIVE,
|
|
)
|
|
)
|
|
active_sprints = {row.project_id: row.name for row in active_sprints_result}
|
|
|
|
# Combine results
|
|
results = [
|
|
{
|
|
"project": project,
|
|
"agent_count": agent_counts.get(project.id, 0),
|
|
"issue_count": issue_counts.get(project.id, 0),
|
|
"active_sprint_name": active_sprints.get(project.id),
|
|
}
|
|
for project in projects
|
|
]
|
|
|
|
return results, total
|
|
except Exception as e:
|
|
logger.error(f"Error getting projects with counts: {e!s}", exc_info=True)
|
|
raise
|
|
|
|
async def get_projects_by_owner(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
owner_id: UUID,
|
|
status: ProjectStatus | None = None,
|
|
) -> list[Project]:
|
|
"""Get all projects owned by a specific user."""
|
|
try:
|
|
query = select(Project).where(Project.owner_id == owner_id)
|
|
|
|
if status is not None:
|
|
query = query.where(Project.status == status)
|
|
|
|
query = query.order_by(Project.created_at.desc())
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting projects by owner {owner_id}: {e!s}", exc_info=True
|
|
)
|
|
raise
|
|
|
|
async def archive_project(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
project_id: UUID,
|
|
) -> Project | None:
|
|
"""Archive a project by setting status to ARCHIVED.
|
|
|
|
This also performs cascading cleanup:
|
|
- Terminates all active agent instances
|
|
- Cancels all planned/active sprints
|
|
- Unassigns issues from terminated agents
|
|
"""
|
|
try:
|
|
result = await db.execute(select(Project).where(Project.id == project_id))
|
|
project = result.scalar_one_or_none()
|
|
|
|
if not project:
|
|
return None
|
|
|
|
now = datetime.now(UTC)
|
|
|
|
# 1. Get all agent IDs that will be terminated
|
|
agents_to_terminate = await db.execute(
|
|
select(AgentInstance.id).where(
|
|
AgentInstance.project_id == project_id,
|
|
AgentInstance.status != AgentStatus.TERMINATED,
|
|
)
|
|
)
|
|
agent_ids = [row[0] for row in agents_to_terminate.fetchall()]
|
|
|
|
# 2. Unassign issues from these agents to prevent orphaned assignments
|
|
if agent_ids:
|
|
await db.execute(
|
|
update(Issue)
|
|
.where(Issue.assigned_agent_id.in_(agent_ids))
|
|
.values(assigned_agent_id=None)
|
|
)
|
|
|
|
# 3. Terminate all active agents
|
|
await db.execute(
|
|
update(AgentInstance)
|
|
.where(
|
|
AgentInstance.project_id == project_id,
|
|
AgentInstance.status != AgentStatus.TERMINATED,
|
|
)
|
|
.values(
|
|
status=AgentStatus.TERMINATED,
|
|
terminated_at=now,
|
|
current_task=None,
|
|
session_id=None,
|
|
updated_at=now,
|
|
)
|
|
)
|
|
|
|
# 4. Cancel all planned/active sprints
|
|
await db.execute(
|
|
update(Sprint)
|
|
.where(
|
|
Sprint.project_id == project_id,
|
|
Sprint.status.in_([SprintStatus.PLANNED, SprintStatus.ACTIVE]),
|
|
)
|
|
.values(
|
|
status=SprintStatus.CANCELLED,
|
|
updated_at=now,
|
|
)
|
|
)
|
|
|
|
# 5. Archive the project
|
|
project.status = ProjectStatus.ARCHIVED
|
|
await db.commit()
|
|
await db.refresh(project)
|
|
|
|
logger.info(
|
|
f"Archived project {project_id}: terminated agents={len(agent_ids)}"
|
|
)
|
|
|
|
return project
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(f"Error archiving project {project_id}: {e!s}", exc_info=True)
|
|
raise
|
|
|
|
|
|
# Create a singleton instance for use across the application
|
|
project = CRUDProject(Project)
|