Files
fast-next-template/backend/app/api/routes/users.py
Felipe Cardoso 2c600290a1 Enhance user management, improve API structure, add database optimizations, and update Docker setup
- Introduced endpoints for user management, including CRUD operations, pagination, and password management.
- Added new schema validations for user updates, password strength, pagination, and standardized error responses.
- Integrated custom exception handling for a consistent API error experience.
- Refined CORS settings: restricted methods and allowed headers, added header exposure, and preflight caching.
- Optimized database: added indexes on `is_active` and `is_superuser` fields, updated column types, enforced constraints, and set defaults.
- Updated `Dockerfile` to improve security by using a non-root user and adding a health check for the application.
- Enhanced tests for database initialization, user operations, and exception handling to ensure better coverage.
2025-10-30 15:43:52 +01:00

371 lines
10 KiB
Python

"""
User management endpoints for CRUD operations.
"""
import logging
from typing import Any
from uuid import UUID
from fastapi import APIRouter, Depends, Query, status, Request
from sqlalchemy.orm import Session
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.api.dependencies.auth import get_current_user, get_current_superuser
from app.core.database import get_db
from app.crud.user import user as user_crud
from app.models.user import User
from app.schemas.users import UserResponse, UserUpdate, PasswordChange
from app.schemas.common import (
PaginationParams,
PaginatedResponse,
MessageResponse,
create_pagination_meta
)
from app.services.auth_service import AuthService
from app.core.exceptions import (
NotFoundError,
AuthorizationError,
ErrorCode
)
logger = logging.getLogger(__name__)
router = APIRouter()
limiter = Limiter(key_func=get_remote_address)
@router.get(
"",
response_model=PaginatedResponse[UserResponse],
summary="List Users",
description="""
List all users with pagination (admin only).
**Authentication**: Required (Bearer token)
**Authorization**: Superuser only
**Rate Limit**: 60 requests/minute
""",
operation_id="list_users"
)
def list_users(
pagination: PaginationParams = Depends(),
current_user: User = Depends(get_current_superuser),
db: Session = Depends(get_db)
) -> Any:
"""
List all users with pagination.
Only accessible by superusers.
"""
try:
# Get paginated users with total count
users, total = user_crud.get_multi_with_total(
db,
skip=pagination.offset,
limit=pagination.limit
)
# Create pagination metadata
pagination_meta = create_pagination_meta(
total=total,
page=pagination.page,
limit=pagination.limit,
items_count=len(users)
)
return PaginatedResponse(
data=users,
pagination=pagination_meta
)
except Exception as e:
logger.error(f"Error listing users: {str(e)}", exc_info=True)
raise
@router.get(
"/me",
response_model=UserResponse,
summary="Get Current User",
description="""
Get the current authenticated user's profile.
**Authentication**: Required (Bearer token)
**Rate Limit**: 60 requests/minute
""",
operation_id="get_current_user_profile"
)
def get_current_user_profile(
current_user: User = Depends(get_current_user)
) -> Any:
"""Get current user's profile."""
return current_user
@router.patch(
"/me",
response_model=UserResponse,
summary="Update Current User",
description="""
Update the current authenticated user's profile.
Users can update their own profile information (except is_superuser).
**Authentication**: Required (Bearer token)
**Rate Limit**: 30 requests/minute
""",
operation_id="update_current_user"
)
def update_current_user(
user_update: UserUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> Any:
"""
Update current user's profile.
Users cannot elevate their own permissions (is_superuser).
"""
# Prevent users from making themselves superuser
if user_update.is_superuser is not None:
logger.warning(f"User {current_user.id} attempted to modify is_superuser field")
raise AuthorizationError(
message="Cannot modify superuser status",
error_code=ErrorCode.INSUFFICIENT_PERMISSIONS
)
try:
updated_user = user_crud.update(
db,
db_obj=current_user,
obj_in=user_update
)
logger.info(f"User {current_user.id} updated their profile")
return updated_user
except ValueError as e:
logger.error(f"Error updating user {current_user.id}: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error updating user {current_user.id}: {str(e)}", exc_info=True)
raise
@router.get(
"/{user_id}",
response_model=UserResponse,
summary="Get User by ID",
description="""
Get a specific user by their ID.
**Authentication**: Required (Bearer token)
**Authorization**:
- Regular users: Can only access their own profile
- Superusers: Can access any profile
**Rate Limit**: 60 requests/minute
""",
operation_id="get_user_by_id"
)
def get_user_by_id(
user_id: UUID,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> Any:
"""
Get user by ID.
Users can only view their own profile unless they are superusers.
"""
# Check permissions
if str(user_id) != str(current_user.id) and not current_user.is_superuser:
logger.warning(
f"User {current_user.id} attempted to access user {user_id} without permission"
)
raise AuthorizationError(
message="Not enough permissions to view this user",
error_code=ErrorCode.INSUFFICIENT_PERMISSIONS
)
# Get user
user = user_crud.get(db, id=str(user_id))
if not user:
raise NotFoundError(
message=f"User with id {user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
return user
@router.patch(
"/{user_id}",
response_model=UserResponse,
summary="Update User",
description="""
Update a specific user by their ID.
**Authentication**: Required (Bearer token)
**Authorization**:
- Regular users: Can only update their own profile (except is_superuser)
- Superusers: Can update any profile
**Rate Limit**: 30 requests/minute
""",
operation_id="update_user"
)
def update_user(
user_id: UUID,
user_update: UserUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> Any:
"""
Update user by ID.
Users can update their own profile. Superusers can update any profile.
Regular users cannot modify is_superuser field.
"""
# Check permissions
is_own_profile = str(user_id) == str(current_user.id)
if not is_own_profile and not current_user.is_superuser:
logger.warning(
f"User {current_user.id} attempted to update user {user_id} without permission"
)
raise AuthorizationError(
message="Not enough permissions to update this user",
error_code=ErrorCode.INSUFFICIENT_PERMISSIONS
)
# Get user
user = user_crud.get(db, id=str(user_id))
if not user:
raise NotFoundError(
message=f"User with id {user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
# Prevent non-superusers from modifying superuser status
if user_update.is_superuser is not None and not current_user.is_superuser:
logger.warning(f"User {current_user.id} attempted to modify is_superuser field")
raise AuthorizationError(
message="Cannot modify superuser status",
error_code=ErrorCode.INSUFFICIENT_PERMISSIONS
)
try:
updated_user = user_crud.update(db, db_obj=user, obj_in=user_update)
logger.info(f"User {user_id} updated by {current_user.id}")
return updated_user
except ValueError as e:
logger.error(f"Error updating user {user_id}: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error updating user {user_id}: {str(e)}", exc_info=True)
raise
@router.patch(
"/me/password",
response_model=MessageResponse,
summary="Change Current User Password",
description="""
Change the current authenticated user's password.
Requires the current password for verification.
**Authentication**: Required (Bearer token)
**Rate Limit**: 5 requests/minute
""",
operation_id="change_current_user_password"
)
@limiter.limit("5/minute")
def change_current_user_password(
request: Request,
password_change: PasswordChange,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> Any:
"""
Change current user's password.
Requires current password for verification.
"""
try:
success = AuthService.change_password(
db=db,
user_id=current_user.id,
current_password=password_change.current_password,
new_password=password_change.new_password
)
if success:
logger.info(f"User {current_user.id} changed their password")
return MessageResponse(
success=True,
message="Password changed successfully"
)
except Exception as e:
logger.error(f"Error changing password for user {current_user.id}: {str(e)}")
raise
@router.delete(
"/{user_id}",
status_code=status.HTTP_200_OK,
response_model=MessageResponse,
summary="Delete User",
description="""
Delete a specific user by their ID.
**Authentication**: Required (Bearer token)
**Authorization**: Superuser only
**Rate Limit**: 10 requests/minute
**Note**: This performs a hard delete. Consider implementing soft deletes for production.
""",
operation_id="delete_user"
)
def delete_user(
user_id: UUID,
current_user: User = Depends(get_current_superuser),
db: Session = Depends(get_db)
) -> Any:
"""
Delete user by ID (superuser only).
This is a hard delete operation.
"""
# Prevent self-deletion
if str(user_id) == str(current_user.id):
raise AuthorizationError(
message="Cannot delete your own account",
error_code=ErrorCode.INSUFFICIENT_PERMISSIONS
)
# Get user
user = user_crud.get(db, id=str(user_id))
if not user:
raise NotFoundError(
message=f"User with id {user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
try:
user_crud.remove(db, id=str(user_id))
logger.info(f"User {user_id} deleted by {current_user.id}")
return MessageResponse(
success=True,
message=f"User {user_id} deleted successfully"
)
except ValueError as e:
logger.error(f"Error deleting user {user_id}: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error deleting user {user_id}: {str(e)}", exc_info=True)
raise