319 lines
11 KiB
Python
319 lines
11 KiB
Python
from datetime import datetime, timezone
|
|
from typing import List, Optional, Dict, Any, Union
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import asc, desc
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.crud.base import CRUDBase
|
|
from app.models import gift as gift_models
|
|
from app.models.gift import GiftItem, GiftCategory, GiftPurchase, GiftStatus, EventGiftCategory
|
|
from app.models.guest import Guest
|
|
from app.models.event import Event
|
|
from app.schemas import gifts as gift_schemas
|
|
from app.schemas.gifts import (
|
|
GiftItemCreate, GiftItemUpdate,
|
|
GiftCategoryCreate, GiftCategoryUpdate,
|
|
GiftPurchaseCreate, GiftPurchaseUpdate,
|
|
EventGiftCategoryCreate, EventGiftCategoryUpdate
|
|
)
|
|
|
|
|
|
class CRUDGiftItem(CRUDBase[GiftItem, GiftItemCreate, GiftItemUpdate]):
|
|
def create(self, db: Session, *, obj_in: GiftItemCreate) -> GiftItem:
|
|
"""Create a new gift item with appropriate type conversions"""
|
|
obj_data = obj_in.model_dump(exclude_unset=True)
|
|
|
|
# Ensure UUID fields are properly converted if they're strings
|
|
for field in ["event_id", "added_by", "category_id"]:
|
|
if field in obj_data and obj_data[field] is not None:
|
|
if isinstance(obj_data[field], str):
|
|
obj_data[field] = UUID(obj_data[field])
|
|
|
|
# Set initial status change time
|
|
obj_data["last_status_change"] = datetime.now(timezone.utc)
|
|
|
|
db_obj = GiftItem(**obj_data)
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def get_multi_by_event(
|
|
self, db: Session, event_id: UUID, skip: int = 0, limit: int = 100,
|
|
include_hidden: bool = False, category_id: Optional[UUID] = None
|
|
) -> List[GiftItem]:
|
|
"""Get gift items for a specific event with filtering options"""
|
|
query = db.query(self.model).filter(GiftItem.event_id == event_id)
|
|
|
|
if not include_hidden:
|
|
query = query.filter(GiftItem.is_visible == True)
|
|
|
|
if category_id:
|
|
query = query.filter(GiftItem.category_id == category_id)
|
|
|
|
# Order by display_order then name
|
|
query = query.order_by(asc(GiftItem.display_order), asc(GiftItem.name))
|
|
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
def update_status(
|
|
self, db: Session, *, gift_id: UUID, new_status: GiftStatus
|
|
) -> GiftItem:
|
|
"""Update the status of a gift item and record the change time"""
|
|
gift = self.get(db, gift_id)
|
|
if gift:
|
|
gift.status = new_status
|
|
gift.last_status_change = datetime.now(timezone.utc)
|
|
db.commit()
|
|
db.refresh(gift)
|
|
return gift
|
|
|
|
def update_quantity_received(
|
|
self, db: Session, *, gift_id: UUID, quantity: int
|
|
) -> GiftItem:
|
|
"""Update the quantity received and adjust status if needed"""
|
|
gift = self.get(db, gift_id)
|
|
if gift:
|
|
gift.quantity_received = quantity
|
|
|
|
# Automatically update status if fully received
|
|
if gift.quantity_received >= gift.quantity_requested:
|
|
gift.status = GiftStatus.RECEIVED
|
|
gift.last_status_change = datetime.now(timezone.utc)
|
|
|
|
db.commit()
|
|
db.refresh(gift)
|
|
return gift
|
|
|
|
def reserve_gift(
|
|
self, db: Session, *, gift_id: UUID, guest_id: UUID, notes: Optional[str] = None
|
|
) -> GiftItem:
|
|
"""Reserve a gift for a guest"""
|
|
|
|
gift = self.get(db, gift_id)
|
|
if gift and gift.status == GiftStatus.AVAILABLE:
|
|
# Add to the association table using the SQLAlchemy Core Table directly
|
|
from app.models.guest import guest_gifts
|
|
|
|
stmt = guest_gifts.insert().values(
|
|
gift_id=gift_id,
|
|
guest_id=guest_id,
|
|
reserved_at=datetime.now(timezone.utc),
|
|
notes=notes
|
|
)
|
|
db.execute(stmt)
|
|
|
|
# Update gift status
|
|
gift.status = GiftStatus.RESERVED
|
|
gift.last_status_change = datetime.now(timezone.utc)
|
|
|
|
db.commit()
|
|
db.refresh(gift)
|
|
return gift
|
|
|
|
def cancel_reservation(
|
|
self, db: Session, *, gift_id: UUID, guest_id: UUID
|
|
) -> GiftItem:
|
|
"""Cancel a gift reservation"""
|
|
gift = self.get(db, gift_id)
|
|
guest = db.query(Guest).get(guest_id)
|
|
|
|
if gift and gift.status == GiftStatus.RESERVED:
|
|
# Using the ORM relationship approach
|
|
if guest in gift.reserved_by:
|
|
gift.reserved_by.remove(guest)
|
|
|
|
# Update gift status
|
|
gift.status = GiftStatus.AVAILABLE
|
|
gift.last_status_change = datetime.now(timezone.utc)
|
|
|
|
db.commit()
|
|
db.refresh(gift)
|
|
|
|
return gift
|
|
|
|
|
|
class CRUDEventGiftCategory:
|
|
def create(self, db: Session, *, obj_in: EventGiftCategoryCreate) -> EventGiftCategory:
|
|
"""Create a new event-category association"""
|
|
obj_data = obj_in.model_dump(exclude_unset=True)
|
|
|
|
# Ensure UUID fields are properly converted if they're strings
|
|
for field in ["event_id", "category_id"]:
|
|
if field in obj_data and obj_data[field] is not None:
|
|
if isinstance(obj_data[field], str):
|
|
obj_data[field] = UUID(obj_data[field])
|
|
|
|
db_obj = EventGiftCategory(**obj_data)
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def get(self, db: Session, *, event_id: UUID, category_id: UUID) -> Optional[EventGiftCategory]:
|
|
"""Get a specific event-category association"""
|
|
return db.query(EventGiftCategory).filter(
|
|
EventGiftCategory.event_id == event_id,
|
|
EventGiftCategory.category_id == category_id
|
|
).first()
|
|
|
|
def update(
|
|
self, db: Session, *, db_obj: EventGiftCategory, obj_in: Union[EventGiftCategoryUpdate, Dict[str, Any]]
|
|
) -> EventGiftCategory:
|
|
"""Update an event-category association"""
|
|
if isinstance(obj_in, dict):
|
|
update_data = obj_in
|
|
else:
|
|
update_data = obj_in.model_dump(exclude_unset=True)
|
|
|
|
for field in update_data:
|
|
setattr(db_obj, field, update_data[field])
|
|
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def remove(self, db: Session, *, event_id: UUID, category_id: UUID) -> Optional[EventGiftCategory]:
|
|
"""Remove an event-category association"""
|
|
obj = self.get(db, event_id=event_id, category_id=category_id)
|
|
if obj:
|
|
db.delete(obj)
|
|
db.commit()
|
|
return obj
|
|
|
|
def get_categories_by_event(
|
|
self, db: Session, *, event_id: UUID, skip: int = 0, limit: int = 100,
|
|
include_hidden: bool = False
|
|
) -> List[GiftCategory]:
|
|
"""Get categories for a specific event with filtering options"""
|
|
query = db.query(GiftCategory).join(
|
|
EventGiftCategory,
|
|
GiftCategory.id == EventGiftCategory.category_id
|
|
).filter(EventGiftCategory.event_id == event_id)
|
|
|
|
if not include_hidden:
|
|
query = query.filter(EventGiftCategory.is_visible == True)
|
|
|
|
# Order by display_order then name
|
|
query = query.order_by(
|
|
asc(EventGiftCategory.display_order),
|
|
asc(GiftCategory.name)
|
|
)
|
|
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
def get_events_by_category(
|
|
self, db: Session, *, category_id: UUID, skip: int = 0, limit: int = 100
|
|
) -> List[Event]:
|
|
"""Get events for a specific category"""
|
|
query = db.query(Event).join(
|
|
EventGiftCategory,
|
|
Event.id == EventGiftCategory.event_id
|
|
).filter(EventGiftCategory.category_id == category_id)
|
|
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
def reorder_categories(
|
|
self, db: Session, *, event_id: UUID, category_orders: Dict[UUID, int]
|
|
) -> List[EventGiftCategory]:
|
|
"""Update display order of multiple categories for an event"""
|
|
associations = db.query(EventGiftCategory).filter(
|
|
EventGiftCategory.event_id == event_id
|
|
).all()
|
|
|
|
for assoc in associations:
|
|
if assoc.category_id in category_orders:
|
|
assoc.display_order = category_orders[assoc.category_id]
|
|
|
|
db.commit()
|
|
return associations
|
|
|
|
|
|
class CRUDGiftCategory(CRUDBase[GiftCategory, GiftCategoryCreate, GiftCategoryUpdate]):
|
|
def create(self, db: Session, *, obj_in: GiftCategoryCreate) -> GiftCategory:
|
|
"""Create a new gift category with appropriate type conversions"""
|
|
obj_data = obj_in.model_dump(exclude_unset=True)
|
|
|
|
# Ensure UUID fields are properly converted if they're strings
|
|
for field in ["created_by"]:
|
|
if field in obj_data and obj_data[field] is not None:
|
|
if isinstance(obj_data[field], str):
|
|
obj_data[field] = UUID(obj_data[field])
|
|
|
|
db_obj = GiftCategory(**obj_data)
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def reorder_gifts(
|
|
self, db: Session, *, category_id: UUID, gift_orders: Dict[UUID, int]
|
|
) -> GiftCategory:
|
|
"""Update display order of gifts within a category"""
|
|
category = self.get(db, category_id)
|
|
if category:
|
|
for gift in category.gifts:
|
|
if gift.id in gift_orders:
|
|
gift.display_order = gift_orders[gift.id]
|
|
|
|
db.commit()
|
|
db.refresh(category)
|
|
return category
|
|
|
|
|
|
class CRUDGiftPurchase(CRUDBase[GiftPurchase, GiftPurchaseCreate, GiftPurchaseUpdate]):
|
|
def create(self, db: Session, *, obj_in: GiftPurchaseCreate) -> GiftPurchase:
|
|
"""Create a new gift purchase with appropriate type conversions"""
|
|
obj_data = obj_in.model_dump(exclude_unset=True)
|
|
|
|
# Ensure UUID fields are properly converted if they're strings
|
|
for field in ["gift_id", "guest_id"]:
|
|
if field in obj_data and obj_data[field] is not None:
|
|
if isinstance(obj_data[field], str):
|
|
obj_data[field] = UUID(obj_data[field])
|
|
|
|
# Set purchase time
|
|
obj_data["purchased_at"] = datetime.now(timezone.utc)
|
|
|
|
db_obj = GiftPurchase(**obj_data)
|
|
db.add(db_obj)
|
|
|
|
# Update the gift status
|
|
gift = db.query(GiftItem).filter(GiftItem.id == db_obj.gift_id).first()
|
|
if gift:
|
|
gift.status = GiftStatus.PURCHASED
|
|
gift.last_status_change = datetime.now(timezone.utc)
|
|
|
|
# Update quantity received if not specified
|
|
if "quantity_received" not in obj_data:
|
|
gift.quantity_received += db_obj.quantity
|
|
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def get_by_gift(
|
|
self, db: Session, *, gift_id: UUID
|
|
) -> List[GiftPurchase]:
|
|
"""Get all purchases for a specific gift"""
|
|
return db.query(self.model).filter(
|
|
GiftPurchase.gift_id == gift_id
|
|
).order_by(desc(GiftPurchase.purchased_at)).all()
|
|
|
|
def get_by_guest(
|
|
self, db: Session, *, guest_id: UUID
|
|
) -> List[GiftPurchase]:
|
|
"""Get all purchases made by a specific guest"""
|
|
return db.query(self.model).filter(
|
|
GiftPurchase.guest_id == guest_id
|
|
).order_by(desc(GiftPurchase.purchased_at)).all()
|
|
|
|
|
|
# Create CRUD instances
|
|
gift_item_crud = CRUDGiftItem(GiftItem)
|
|
gift_category_crud = CRUDGiftCategory(GiftCategory)
|
|
gift_purchase_crud = CRUDGiftPurchase(GiftPurchase)
|
|
event_gift_category_crud = CRUDEventGiftCategory()
|