Add event counting methods and generic pagination schema
Introduce methods to count user, public, and upcoming events to enhance CRUD functionality for events. Additionally, add a `PaginatedResponse` schema to simplify and standardize paginated API responses. These updates support improved data querying and response handling.
This commit is contained in:
@@ -2,7 +2,7 @@ from datetime import timezone
|
||||
from typing import List, Optional, Union, Dict, Any
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy import desc
|
||||
from sqlalchemy import desc, func
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.crud.base import CRUDBase
|
||||
@@ -32,6 +32,21 @@ class CRUDEvent(CRUDBase[Event, EventCreate, EventUpdate]):
|
||||
|
||||
return query.order_by(desc(Event.event_date)).offset(skip).limit(limit).all()
|
||||
|
||||
def count_user_events(
|
||||
self,
|
||||
db: Session,
|
||||
*,
|
||||
user_id: UUID,
|
||||
include_inactive: bool = False
|
||||
) -> int:
|
||||
"""Count total events created by a specific user."""
|
||||
query = db.query(func.count(Event.id)).filter(Event.created_by == user_id)
|
||||
|
||||
if not include_inactive:
|
||||
query = query.filter(Event.is_active == True)
|
||||
|
||||
return query.scalar()
|
||||
|
||||
def create_with_owner(
|
||||
self,
|
||||
db: Session,
|
||||
@@ -83,6 +98,51 @@ class CRUDEvent(CRUDBase[Event, EventCreate, EventUpdate]):
|
||||
.all()
|
||||
)
|
||||
|
||||
def count_upcoming_events(
|
||||
self,
|
||||
db: Session
|
||||
) -> int:
|
||||
"""Count total upcoming events."""
|
||||
from datetime import datetime
|
||||
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
return (
|
||||
db.query(func.count(Event.id))
|
||||
.filter(Event.is_active == True)
|
||||
.filter(Event.event_date >= now)
|
||||
.scalar()
|
||||
)
|
||||
|
||||
def get_public_events(
|
||||
self,
|
||||
db: Session,
|
||||
*,
|
||||
skip: int = 0,
|
||||
limit: int = 100
|
||||
) -> List[Event]:
|
||||
"""Get public active events."""
|
||||
return (
|
||||
db.query(Event)
|
||||
.filter(Event.is_active == True)
|
||||
.filter(Event.is_public == True)
|
||||
.order_by(Event.event_date)
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
def count_public_events(
|
||||
self,
|
||||
db: Session
|
||||
) -> int:
|
||||
"""Count total public events."""
|
||||
return (
|
||||
db.query(func.count(Event.id))
|
||||
.filter(Event.is_active == True)
|
||||
.filter(Event.is_public == True)
|
||||
.scalar()
|
||||
)
|
||||
|
||||
def get_public_event(
|
||||
self,
|
||||
db: Session,
|
||||
|
||||
22
backend/app/schemas/common.py
Normal file
22
backend/app/schemas/common.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from typing import Generic, List, TypeVar
|
||||
from pydantic import BaseModel
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
class PaginatedResponse(BaseModel, Generic[T]):
|
||||
"""
|
||||
Generic schema for paginated responses.
|
||||
|
||||
Attributes:
|
||||
total (int): Total number of items
|
||||
items (List[T]): List of items in the current page
|
||||
page (int): Current page number
|
||||
size (int): Number of items per page
|
||||
"""
|
||||
total: int
|
||||
items: List[T]
|
||||
page: int
|
||||
size: int
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
Reference in New Issue
Block a user