This commit introduces a new RSVP module with endpoints for creating, reading, updating, and deleting RSVPs, along with a custom status update endpoint. The router is integrated into the events API, providing full RSVP management capabilities. Validation and error handling have been implemented to ensure data consistency.
408 lines
12 KiB
Python
408 lines
12 KiB
Python
import logging
|
|
from typing import Any, Dict
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.api.dependencies.auth import get_current_user, get_optional_current_user
|
|
from app.core.database import get_db
|
|
from app.crud.event import event_crud
|
|
from app.models import Guest
|
|
from app.models.event_manager import EventManager
|
|
from app.models.user import User
|
|
from app.schemas.common import PaginatedResponse
|
|
from app.schemas.events import (
|
|
EventCreate,
|
|
EventUpdate,
|
|
EventResponse, Event,
|
|
)
|
|
|
|
from app.api.routes.events import guests
|
|
from app.api.routes.events import rsvps
|
|
logger = logging.getLogger(__name__)
|
|
events_router = APIRouter()
|
|
|
|
events_router.include_router(guests.router, prefix="/guests", tags=["guests"])
|
|
events_router.include_router(rsvps.router, prefix="/rsvps", tags=["rsvps"])
|
|
|
|
def validate_event_access(
|
|
*,
|
|
db: Session,
|
|
event_obj: Optional[Event],
|
|
current_user: Optional[User],
|
|
access_code: Optional[str] = None
|
|
) -> EventResponse:
|
|
"""Validate access permissions for an event."""
|
|
|
|
if event_obj is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Event not found"
|
|
)
|
|
|
|
# Allow creator or superuser
|
|
if current_user and (
|
|
event_obj.created_by == current_user.id or current_user.is_superuser
|
|
):
|
|
return event_obj
|
|
|
|
# Allow manager
|
|
if current_user:
|
|
is_manager = db.query(EventManager).filter_by(
|
|
event_id=event_obj.id, user_id=current_user.id
|
|
).first()
|
|
if is_manager:
|
|
return event_obj
|
|
|
|
# Public event, allow anyone
|
|
if event_obj.is_public:
|
|
return event_obj
|
|
|
|
# Guest user allowed if authenticated
|
|
if current_user:
|
|
guest_entry = db.query(Guest).filter_by(
|
|
event_id=event_obj.id, user_id=current_user.id
|
|
).first()
|
|
if guest_entry:
|
|
return event_obj
|
|
|
|
# Access with invite/access code (generic method if implemented)
|
|
if access_code and (event_obj.access_code == access_code):
|
|
return event_obj
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions to access this event"
|
|
)
|
|
|
|
|
|
@events_router.post(
|
|
"/",
|
|
response_model=EventResponse,
|
|
status_code=status.HTTP_201_CREATED,
|
|
operation_id="create_event"
|
|
)
|
|
def create_event(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
event_in: EventCreate,
|
|
current_user: User = Depends(get_current_user)
|
|
) -> EventResponse:
|
|
"""Create a new event."""
|
|
if current_user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
# Check if slug is already taken
|
|
if event_crud.get_by_slug(db, slug=event_in.slug):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="An event with this slug already exists"
|
|
)
|
|
|
|
created_event = event_crud.create_with_owner(db=db, obj_in=event_in, owner_id=current_user.id)
|
|
logger.info(f"Event created by {current_user.email}: {created_event.slug}")
|
|
return created_event
|
|
except SQLAlchemyError as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Database error occurred"
|
|
)
|
|
|
|
|
|
@events_router.get(
|
|
"/me",
|
|
response_model=PaginatedResponse[EventResponse],
|
|
operation_id="get_user_events"
|
|
)
|
|
def get_user_events(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=500),
|
|
include_inactive: bool = Query(False),
|
|
current_user: User = Depends(get_current_user)
|
|
) -> Dict[str, Any]:
|
|
"""Get all events created by the current user with pagination."""
|
|
if current_user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
total = event_crud.count_user_events(
|
|
db=db,
|
|
user_id=current_user.id,
|
|
include_inactive=include_inactive
|
|
)
|
|
items = event_crud.get_user_events(
|
|
db=db,
|
|
user_id=current_user.id,
|
|
skip=skip,
|
|
limit=limit,
|
|
include_inactive=include_inactive
|
|
)
|
|
return {
|
|
"total": total,
|
|
"items": items,
|
|
"page": skip // limit + 1 if limit > 0 else 1,
|
|
"size": limit
|
|
}
|
|
except SQLAlchemyError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error retrieving events"
|
|
)
|
|
|
|
|
|
@events_router.get(
|
|
"/upcoming",
|
|
response_model=PaginatedResponse[EventResponse],
|
|
operation_id="get_upcoming_events"
|
|
)
|
|
def get_upcoming_events(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=500),
|
|
current_user: User = Depends(get_current_user)
|
|
) -> Dict[str, Any]:
|
|
"""Get upcoming public events with pagination."""
|
|
|
|
if current_user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
items = event_crud.get_upcoming_events(db=db, skip=skip, limit=limit)
|
|
# Count total upcoming events for pagination
|
|
total = event_crud.count_upcoming_events(db=db)
|
|
|
|
return {
|
|
"total": total,
|
|
"items": items,
|
|
"page": skip // limit + 1 if limit > 0 else 1,
|
|
"size": limit
|
|
}
|
|
except SQLAlchemyError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error retrieving upcoming events"
|
|
)
|
|
|
|
|
|
@events_router.get(
|
|
"/public",
|
|
response_model=PaginatedResponse[EventResponse],
|
|
operation_id="get_public_events"
|
|
)
|
|
def get_public_events(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=500)
|
|
) -> Dict[str, Any]:
|
|
"""Get all public events with pagination."""
|
|
try:
|
|
items = event_crud.get_public_events(db=db, skip=skip, limit=limit)
|
|
total = event_crud.count_public_events(db=db)
|
|
|
|
return {
|
|
"total": total,
|
|
"items": items,
|
|
"page": skip // limit + 1 if limit > 0 else 1,
|
|
"size": limit
|
|
}
|
|
except SQLAlchemyError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error retrieving public events"
|
|
)
|
|
|
|
|
|
@events_router.get(
|
|
"/{event_id}",
|
|
response_model=EventResponse,
|
|
operation_id="get_event"
|
|
)
|
|
def get_event(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
event_id: UUID,
|
|
access_code: Optional[str] = Query(None),
|
|
current_user: Optional[User] = Depends(get_current_user)
|
|
) -> EventResponse:
|
|
"""Get event by ID."""
|
|
try:
|
|
event_obj = event_crud.get(db=db, id=event_id)
|
|
return validate_event_access(
|
|
db=db,
|
|
event_obj=event_obj,
|
|
current_user=current_user,
|
|
access_code=access_code
|
|
)
|
|
|
|
except SQLAlchemyError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error retrieving event",
|
|
)
|
|
|
|
|
|
@events_router.get(
|
|
"/by-slug/{slug}",
|
|
response_model=EventResponse,
|
|
operation_id="get_event_by_slug"
|
|
)
|
|
def get_event_by_slug(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
slug: str,
|
|
access_code: Optional[str] = Query(None),
|
|
current_user: Optional[User] = Depends(get_current_user)
|
|
) -> EventResponse:
|
|
"""Get event by slug."""
|
|
try:
|
|
event_obj = event_crud.get_by_slug(db=db, slug=slug)
|
|
return validate_event_access(
|
|
db=db,
|
|
event_obj=event_obj,
|
|
current_user=current_user,
|
|
access_code=access_code
|
|
)
|
|
|
|
except SQLAlchemyError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error retrieving event",
|
|
)
|
|
|
|
|
|
@events_router.put(
|
|
"/{event_id}",
|
|
response_model=EventResponse,
|
|
operation_id="update_event"
|
|
)
|
|
def update_event(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
event_id: UUID,
|
|
event_in: EventUpdate,
|
|
current_user: User = Depends(get_current_user)
|
|
) -> EventResponse:
|
|
"""Update event."""
|
|
if current_user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
event_obj = event_crud.get(db=db, id=event_id)
|
|
if not event_obj:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Event not found"
|
|
)
|
|
|
|
# Check permissions (creator or manager with edit rights)
|
|
has_permission = False
|
|
|
|
if event_obj.created_by == current_user.id or current_user.is_superuser:
|
|
has_permission = True
|
|
else:
|
|
manager = db.query(EventManager).filter(
|
|
EventManager.event_id == event_id,
|
|
EventManager.user_id == current_user.id,
|
|
EventManager.can_edit == True
|
|
).first()
|
|
has_permission = manager is not None
|
|
|
|
if not has_permission:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions to edit this event"
|
|
)
|
|
|
|
# If slug is being updated, check if new slug is available and different
|
|
if event_in.slug and event_in.slug != event_obj.slug:
|
|
existing = event_crud.get_by_slug(db, slug=event_in.slug)
|
|
if existing and existing.id != event_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="An event with this slug already exists"
|
|
)
|
|
|
|
return event_crud.update(db=db, db_obj=event_obj, obj_in=event_in)
|
|
except SQLAlchemyError:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error updating event"
|
|
)
|
|
|
|
|
|
@events_router.delete(
|
|
"/{event_id}",
|
|
status_code=status.HTTP_204_NO_CONTENT,
|
|
operation_id="delete_event"
|
|
)
|
|
def delete_event(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
event_id: UUID,
|
|
current_user: User = Depends(get_current_user),
|
|
hard_delete: bool = Query(False, description="Perform hard delete instead of soft delete")
|
|
):
|
|
"""Delete event (soft delete by default)."""
|
|
if current_user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
event_obj = event_crud.get(db=db, id=event_id)
|
|
if not event_obj:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Event not found"
|
|
)
|
|
|
|
# Only creator or superuser can delete
|
|
if event_obj.created_by != current_user.id and not current_user.is_superuser:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions to delete this event"
|
|
)
|
|
|
|
if hard_delete:
|
|
# Hard delete - only for superusers
|
|
if not current_user.is_superuser:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only administrators can perform hard delete"
|
|
)
|
|
event_crud.remove(db=db, id=event_id)
|
|
else:
|
|
# Soft delete - set is_active to False
|
|
event_crud.update(db=db, db_obj=event_obj, obj_in={"is_active": False})
|
|
|
|
return None # 204 No Content
|
|
except SQLAlchemyError:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error deleting event"
|
|
)
|