Files
eventspace/backend/app/services/auth_service.py
Felipe Cardoso c5478e45b6
All checks were successful
Build and Push Docker Images / changes (push) Successful in 5s
Build and Push Docker Images / build-backend (push) Successful in 52s
Build and Push Docker Images / build-frontend (push) Has been skipped
Refactor AuthService for better error handling and CRUD usage
Replaced raw database queries with CRUD operations for consistency and modularity. Enhanced error handling by adding detailed exception messages and logging for failed actions, such as authentication and registration. Updated tests to reflect new exception-based error handling approach.
2025-03-05 12:00:23 +01:00

197 lines
5.6 KiB
Python

# app/services/auth_service.py
import logging
from typing import Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.core.auth import (
verify_password,
get_password_hash,
create_access_token,
create_refresh_token,
TokenExpiredError,
TokenInvalidError
)
from app.models.user import User
from app.schemas.users import Token, UserCreate
from app.crud.user import user as crud_user
from app.core.auth import decode_token, get_token_data
logger = logging.getLogger(__name__)
class AuthenticationError(Exception):
"""Raised when authentication fails"""
def __init__(self, message: str = "Authentication failed"):
self.message = message
super().__init__(self.message)
class AuthService:
"""Service for handling authentication operations"""
@staticmethod
def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
"""
Authenticate a user with email and password.
Args:
db: Database session
email: User email
password: User password
Returns:
User if authenticated, None otherwise
"""
user = crud_user.get_by_email(db, email=email)
if not user:
logger.warning(f"Login attempt failed: user not found for email {email}")
raise AuthenticationError("Invalid email or password")
if not verify_password(password, user.password_hash):
logger.warning(f"Login attempt failed: invalid password for user {email}")
raise AuthenticationError("Invalid email or password")
if not crud_user.is_active(user):
logger.warning(f"Login attempt failed: inactive user {email}")
raise AuthenticationError("Inactive user")
return user
@staticmethod
def create_user(db: Session, user_data: UserCreate) -> User:
"""
Create a new user.
Args:
db: Database session
user_data: User data
Returns:
Created user
"""
# Check if user already exists
existing_user = crud_user.get_by_email(db, email=user_data.email)
if existing_user:
logger.warning(f"Registration failed: email already registered {user_data.email}")
raise AuthenticationError("Email already registered")
try:
# Create new user using CRUD user
user = crud_user.create(db, obj_in=user_data)
logger.info(f"New user created: {user.email}")
return user
except Exception as e:
logger.error(f"User creation failed: {str(e)}")
raise AuthenticationError("Could not create user")
@staticmethod
def create_tokens(user: User) -> Token:
"""
Create access and refresh tokens for a user.
Args:
user: User to create tokens for
Returns:
Token object with access and refresh tokens
"""
# Generate claims
claims = {
"is_superuser": user.is_superuser,
"email": user.email,
"first_name": user.first_name
}
# Create tokens
access_token = create_access_token(
subject=str(user.id),
claims=claims
)
refresh_token = create_refresh_token(
subject=str(user.id)
)
return Token(
access_token=access_token,
refresh_token=refresh_token
)
@staticmethod
def refresh_tokens(db: Session, refresh_token: str) -> Token:
"""
Generate new tokens using a refresh token.
Args:
db: Database session
refresh_token: Valid refresh token
Returns:
New access and refresh tokens
Raises:
TokenExpiredError: If refresh token has expired
TokenInvalidError: If refresh token is invalid
"""
try:
# Verify token is a refresh token
decode_token(refresh_token, verify_type="refresh")
# Get user ID from token
token_data = get_token_data(refresh_token)
user_id = token_data.user_id
if not user_id:
raise AuthenticationError("Invalid token")
# Get user from database
user: User | None = crud_user.get(db, id=UUID(str(user_id)))
if not user or not user.is_active:
raise TokenInvalidError("Invalid user or inactive account")
user: User
# Generate new tokens
return AuthService.create_tokens(user)
except (TokenExpiredError, TokenInvalidError) as e:
logger.warning(f"Token refresh failed: {str(e)}")
raise
@staticmethod
def change_password(db: Session, user_id: UUID, current_password: str, new_password: str) -> bool:
"""
Change a user's password.
Args:
db: Database session
user_id: User ID
current_password: Current password
new_password: New password
Returns:
True if password was changed successfully
Raises:
AuthenticationError: If current password is incorrect
"""
user = crud_user.get(db, id=user_id)
if not user:
raise AuthenticationError("User not found")
# Verify current password
if not verify_password(current_password, user.password_hash):
raise AuthenticationError("Current password is incorrect")
# Update password
user.password_hash = get_password_hash(new_password)
db.commit()
return True