Files
eventspace/backend/app/services/auth_service.py
Felipe Cardoso fee7d8b5ec Refactor CRUD module imports and usage for clarity.
Replaces ambiguous shorthand references like `event` and `user` with more descriptive names such as `event_crud` and `user_crud`. Updates imports, function calls, tests, and other references across the codebase to maintain consistency. This improves code readability and reduces potential confusion.
2025-03-15 01:22:04 +01:00

197 lines
5.6 KiB
Python

# app/services/auth_service.py
import logging
from typing import Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.core.auth import (
verify_password,
get_password_hash,
create_access_token,
create_refresh_token,
TokenExpiredError,
TokenInvalidError
)
from app.models.user import User
from app.schemas.users import Token, UserCreate
from app.crud.user import user_crud as crud_user
from app.core.auth import decode_token, get_token_data
logger = logging.getLogger(__name__)
class AuthenticationError(Exception):
"""Raised when authentication fails"""
def __init__(self, message: str = "Authentication failed"):
self.message = message
super().__init__(self.message)
class AuthService:
"""Service for handling authentication operations"""
@staticmethod
def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
"""
Authenticate a user with email and password.
Args:
db: Database session
email: User email
password: User password
Returns:
User if authenticated, None otherwise
"""
user = crud_user.get_by_email(db, email=email)
if not user:
logger.warning(f"Login attempt failed: user not found for email {email}")
raise AuthenticationError("Invalid email or password")
if not verify_password(password, user.password_hash):
logger.warning(f"Login attempt failed: invalid password for user {email}")
raise AuthenticationError("Invalid email or password")
if not crud_user.is_active(user):
logger.warning(f"Login attempt failed: inactive user {email}")
raise AuthenticationError("Inactive user")
return user
@staticmethod
def create_user(db: Session, user_data: UserCreate) -> User:
"""
Create a new user.
Args:
db: Database session
user_data: User data
Returns:
Created user
"""
# Check if user already exists
existing_user = crud_user.get_by_email(db, email=user_data.email)
if existing_user:
logger.warning(f"Registration failed: email already registered {user_data.email}")
raise AuthenticationError("Email already registered")
try:
# Create new user using CRUD user
user = crud_user.create(db, obj_in=user_data)
logger.info(f"New user created: {user.email}")
return user
except Exception as e:
logger.error(f"User creation failed: {str(e)}")
raise AuthenticationError("Could not create user")
@staticmethod
def create_tokens(user: User) -> Token:
"""
Create access and refresh tokens for a user.
Args:
user: User to create tokens for
Returns:
Token object with access and refresh tokens
"""
# Generate claims
claims = {
"is_superuser": user.is_superuser,
"email": user.email,
"first_name": user.first_name
}
# Create tokens
access_token = create_access_token(
subject=str(user.id),
claims=claims
)
refresh_token = create_refresh_token(
subject=str(user.id)
)
return Token(
access_token=access_token,
refresh_token=refresh_token
)
@staticmethod
def refresh_tokens(db: Session, refresh_token: str) -> Token:
"""
Generate new tokens using a refresh token.
Args:
db: Database session
refresh_token: Valid refresh token
Returns:
New access and refresh tokens
Raises:
TokenExpiredError: If refresh token has expired
TokenInvalidError: If refresh token is invalid
"""
try:
# Verify token is a refresh token
decode_token(refresh_token, verify_type="refresh")
# Get user ID from token
token_data = get_token_data(refresh_token)
user_id = token_data.user_id
if not user_id:
raise AuthenticationError("Invalid token")
# Get user from database
user: User | None = crud_user.get(db, id=UUID(str(user_id)))
if not user or not user.is_active:
raise TokenInvalidError("Invalid user or inactive account")
user: User
# Generate new tokens
return AuthService.create_tokens(user)
except (TokenExpiredError, TokenInvalidError) as e:
logger.warning(f"Token refresh failed: {str(e)}")
raise
@staticmethod
def change_password(db: Session, user_id: UUID, current_password: str, new_password: str) -> bool:
"""
Change a user's password.
Args:
db: Database session
user_id: User ID
current_password: Current password
new_password: New password
Returns:
True if password was changed successfully
Raises:
AuthenticationError: If current password is incorrect
"""
user = crud_user.get(db, id=user_id)
if not user:
raise AuthenticationError("User not found")
# Verify current password
if not verify_password(current_password, user.password_hash):
raise AuthenticationError("Current password is incorrect")
# Update password
user.password_hash = get_password_hash(new_password)
db.commit()
return True