Add CRUD operations for users with tests and fixtures
All checks were successful
Build and Push Docker Images / changes (push) Successful in 4s
Build and Push Docker Images / build-backend (push) Successful in 54s
Build and Push Docker Images / build-frontend (push) Has been skipped

Implemented user CRUD operations including creation, retrieval, updating, and deletion through a generic CRUD base class. Enhanced user schemas with additional attributes and created tests to verify functionality, covering edge cases such as duplicates and pagination. Updated the test suite with new fixtures and methods to support the tests.
This commit is contained in:
2025-03-04 18:33:33 +01:00
parent c6fe040f21
commit 7d9f4ee4b2
7 changed files with 260 additions and 5 deletions

View File

62
backend/app/crud/base.py Normal file
View File

@@ -0,0 +1,62 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.database import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
Parameters:
model: A SQLAlchemy model class
"""
self.model = model
def get(self, db: Session, id: str) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: str) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

56
backend/app/crud/user.py Normal file
View File

@@ -0,0 +1,56 @@
# app/crud/user.py
from typing import Optional, Union, Dict, Any
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.users import UserCreate, UserUpdate
from app.core.auth import get_password_hash
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
password_hash=get_password_hash(obj_in.password),
first_name=obj_in.first_name,
last_name=obj_in.last_name,
phone_number=obj_in.phone_number if hasattr(obj_in, 'phone_number') else None,
is_superuser=obj_in.is_superuser if hasattr(obj_in, 'is_superuser') else False,
preferences={}
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: User,
obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
# Handle password separately if it exists in update data
if "password" in update_data:
update_data["password_hash"] = get_password_hash(update_data["password"])
del update_data["password"]
return super().update(db, db_obj=db_obj, obj_in=update_data)
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
# Create a singleton instance for use across the application
user = CRUDUser(User)

View File

@@ -4,7 +4,6 @@ from datetime import datetime
from typing import Optional, Dict, Any
from uuid import UUID
import pydantic
from pydantic import BaseModel, EmailStr, field_validator, ConfigDict
@@ -27,6 +26,7 @@ class UserBase(BaseModel):
class UserCreate(UserBase):
password: str
is_superuser: bool = False
@field_validator('password')
@classmethod
@@ -46,7 +46,7 @@ class UserUpdate(BaseModel):
last_name: Optional[str] = None
phone_number: Optional[str] = None
preferences: Optional[Dict[str, Any]] = None
is_active: Optional[bool] = True
@field_validator('phone_number')
def validate_phone_number(cls, v: Optional[str]) -> Optional[str]:
if v is None:
@@ -81,7 +81,6 @@ class UserUpdate(BaseModel):
return cleaned
class UserInDB(UserBase):
id: UUID
is_active: bool
@@ -147,4 +146,4 @@ class LoginRequest(BaseModel):
class RefreshTokenRequest(BaseModel):
refresh_token: str
refresh_token: str

View File

@@ -38,6 +38,18 @@ async def async_test_db():
yield test_engine, AsyncTestingSessionLocal
await teardown_async_test_db(test_engine)
@pytest.fixture
def user_create_data():
return {
"email": "newtest@example.com", # Changed to avoid conflict with mock_user
"password": "TestPassword123!",
"first_name": "Test",
"last_name": "User",
"phone_number": "+1234567890",
"is_superuser": False,
"preferences": None
}
@pytest.fixture
def mock_user(db_session):
@@ -50,7 +62,8 @@ def mock_user(db_session):
last_name="User",
phone_number="1234567890",
is_active=True,
is_superuser=False
is_superuser=False,
preferences=None,
)
db_session.add(mock_user)
db_session.commit()

View File

View File

@@ -0,0 +1,125 @@
import pytest
from app.crud.user import user as user_crud
from app.models.user import User
from app.schemas.users import UserCreate, UserUpdate
def test_create_user(db_session, user_create_data):
user_in = UserCreate(**user_create_data)
user_obj = user_crud.create(db_session, obj_in=user_in)
assert user_obj.email == user_create_data["email"]
assert user_obj.first_name == user_create_data["first_name"]
assert user_obj.last_name == user_create_data["last_name"]
assert user_obj.phone_number == user_create_data["phone_number"]
assert user_obj.is_superuser == user_create_data["is_superuser"]
assert user_obj.password_hash is not None
assert user_obj.id is not None
def test_get_user(db_session, mock_user):
# Using mock_user fixture instead of creating new user
stored_user = user_crud.get(db_session, id=mock_user.id)
assert stored_user
assert stored_user.id == mock_user.id
assert stored_user.email == mock_user.email
def test_get_user_by_email(db_session, mock_user):
stored_user = user_crud.get_by_email(db_session, email=mock_user.email)
assert stored_user
assert stored_user.id == mock_user.id
assert stored_user.email == mock_user.email
def test_update_user(db_session, mock_user):
update_data = UserUpdate(
first_name="Updated",
last_name="Name",
phone_number="+9876543210"
)
updated_user = user_crud.update(db_session, db_obj=mock_user, obj_in=update_data)
assert updated_user.first_name == "Updated"
assert updated_user.last_name == "Name"
assert updated_user.phone_number == "+9876543210"
assert updated_user.email == mock_user.email
def test_delete_user(db_session, mock_user):
user_crud.remove(db_session, id=mock_user.id)
deleted_user = user_crud.get(db_session, id=mock_user.id)
assert deleted_user is None
def test_get_multi_users(db_session, mock_user, user_create_data):
# Create additional users (mock_user is already in db)
users_data = [
{**user_create_data, "email": f"test{i}@example.com"}
for i in range(2) # Creating 2 more users + mock_user = 3 total
]
for user_data in users_data:
user_in = UserCreate(**user_data)
user_crud.create(db_session, obj_in=user_in)
users = user_crud.get_multi(db_session, skip=0, limit=10)
assert len(users) == 3
assert all(isinstance(user, User) for user in users)
def test_is_active(db_session, mock_user):
assert user_crud.is_active(mock_user) is True
# Test deactivating user
update_data = UserUpdate(is_active=False)
deactivated_user = user_crud.update(db_session, db_obj=mock_user, obj_in=update_data)
assert user_crud.is_active(deactivated_user) is False
def test_is_superuser(db_session, mock_user, user_create_data):
# mock_user is regular user
assert user_crud.is_superuser(mock_user) is False
# Create superuser
super_user_data = {**user_create_data, "email": "super@example.com", "is_superuser": True}
super_user_in = UserCreate(**super_user_data)
super_user = user_crud.create(db_session, obj_in=super_user_in)
assert user_crud.is_superuser(super_user) is True
# Additional test cases
def test_create_duplicate_email(db_session, mock_user):
user_data = UserCreate(
email=mock_user.email, # Try to create user with existing email
password="TestPassword123!",
first_name="Test",
last_name="User"
)
with pytest.raises(Exception): # Should raise an integrity error
user_crud.create(db_session, obj_in=user_data)
def test_update_user_preferences(db_session, mock_user):
preferences = {"theme": "dark", "notifications": True}
update_data = UserUpdate(preferences=preferences)
updated_user = user_crud.update(db_session, db_obj=mock_user, obj_in=update_data)
assert updated_user.preferences == preferences
def test_get_multi_users_pagination(db_session, user_create_data):
# Create 5 users
for i in range(5):
user_in = UserCreate(**{**user_create_data, "email": f"test{i}@example.com"})
user_crud.create(db_session, obj_in=user_in)
# Test pagination
first_page = user_crud.get_multi(db_session, skip=0, limit=2)
second_page = user_crud.get_multi(db_session, skip=2, limit=2)
assert len(first_page) == 2
assert len(second_page) == 2
assert first_page[0].id != second_page[0].id