Refactor auth dependencies and add comprehensive tests
Moved `auth` module from `dependencies.py` to `dependencies/auth.py` for better organization. Added extensive unit tests for authentication services and API dependencies to ensure robust verification of users, tokens, and permissions.
This commit is contained in:
0
backend/app/api/dependencies/__init__.py
Normal file
0
backend/app/api/dependencies/__init__.py
Normal file
@@ -1,4 +1,3 @@
|
||||
# app/api/dependencies/auth.py
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import Depends, HTTPException, status
|
||||
@@ -19,14 +18,14 @@ def get_current_user(
|
||||
) -> User:
|
||||
"""
|
||||
Get the current authenticated user.
|
||||
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
token: JWT token from request
|
||||
|
||||
|
||||
Returns:
|
||||
User: The authenticated user
|
||||
|
||||
|
||||
Raises:
|
||||
HTTPException: If authentication fails
|
||||
"""
|
||||
@@ -69,13 +68,13 @@ def get_current_active_user(
|
||||
) -> User:
|
||||
"""
|
||||
Check if the current user is active.
|
||||
|
||||
|
||||
Args:
|
||||
current_user: The current authenticated user
|
||||
|
||||
|
||||
Returns:
|
||||
User: The authenticated and active user
|
||||
|
||||
|
||||
Raises:
|
||||
HTTPException: If user is inactive
|
||||
"""
|
||||
@@ -92,13 +91,13 @@ def get_current_superuser(
|
||||
) -> User:
|
||||
"""
|
||||
Check if the current user is a superuser.
|
||||
|
||||
|
||||
Args:
|
||||
current_user: The current authenticated user
|
||||
|
||||
|
||||
Returns:
|
||||
User: The authenticated superuser
|
||||
|
||||
|
||||
Raises:
|
||||
HTTPException: If user is not a superuser
|
||||
"""
|
||||
@@ -117,11 +116,11 @@ def get_optional_current_user(
|
||||
"""
|
||||
Get the current user if authenticated, otherwise return None.
|
||||
Useful for endpoints that work with both authenticated and unauthenticated users.
|
||||
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
token: JWT token from request
|
||||
|
||||
|
||||
Returns:
|
||||
User or None: The authenticated user or None
|
||||
"""
|
||||
211
backend/tests/api/test_auth_dependencies.py
Normal file
211
backend/tests/api/test_auth_dependencies.py
Normal file
@@ -0,0 +1,211 @@
|
||||
# tests/api/dependencies/test_auth_dependencies.py
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from fastapi import HTTPException
|
||||
|
||||
from app.api.dependencies.auth import (
|
||||
get_current_user,
|
||||
get_current_active_user,
|
||||
get_current_superuser,
|
||||
get_optional_current_user
|
||||
)
|
||||
from app.core.auth import TokenExpiredError, TokenInvalidError
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_token():
|
||||
return "mock.jwt.token"
|
||||
|
||||
|
||||
class TestGetCurrentUser:
|
||||
"""Tests for get_current_user dependency"""
|
||||
|
||||
def test_get_current_user_success(self, db_session, mock_user, mock_token):
|
||||
"""Test successfully getting the current user"""
|
||||
# Mock get_token_data to return user_id that matches our mock_user
|
||||
with patch('app.api.dependencies.auth.get_token_data') as mock_get_data:
|
||||
mock_get_data.return_value.user_id = mock_user.id
|
||||
|
||||
# Call the dependency
|
||||
user = get_current_user(db=db_session, token=mock_token)
|
||||
|
||||
# Verify the correct user was returned
|
||||
assert user.id == mock_user.id
|
||||
assert user.email == mock_user.email
|
||||
|
||||
def test_get_current_user_nonexistent(self, db_session, mock_token):
|
||||
"""Test when the token contains a user ID that doesn't exist"""
|
||||
# Mock get_token_data to return a non-existent user ID
|
||||
# Use a real UUID object instead of a string
|
||||
import uuid
|
||||
nonexistent_id = uuid.UUID("11111111-1111-1111-1111-111111111111")
|
||||
|
||||
with patch('app.api.dependencies.auth.get_token_data') as mock_get_data:
|
||||
mock_get_data.return_value.user_id = nonexistent_id # Using UUID object, not string
|
||||
|
||||
# Should raise HTTPException with 404 status
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
get_current_user(db=db_session, token=mock_token)
|
||||
|
||||
assert exc_info.value.status_code == 404
|
||||
|
||||
def test_get_current_user_inactive(self, db_session, mock_user, mock_token):
|
||||
"""Test when the user is inactive"""
|
||||
# Make the user inactive
|
||||
mock_user.is_active = False
|
||||
db_session.commit()
|
||||
|
||||
# Mock get_token_data
|
||||
with patch('app.api.dependencies.auth.get_token_data') as mock_get_data:
|
||||
mock_get_data.return_value.user_id = mock_user.id
|
||||
|
||||
# Should raise HTTPException with 403 status
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
get_current_user(db=db_session, token=mock_token)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
def test_get_current_user_expired_token(self, db_session, mock_token):
|
||||
"""Test with an expired token"""
|
||||
# Mock get_token_data to raise TokenExpiredError
|
||||
with patch('app.api.dependencies.auth.get_token_data') as mock_get_data:
|
||||
mock_get_data.side_effect = TokenExpiredError("Token expired")
|
||||
|
||||
# Should raise HTTPException with 401 status
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
get_current_user(db=db_session, token=mock_token)
|
||||
|
||||
assert exc_info.value.status_code == 401
|
||||
assert "Token expired" in exc_info.value.detail
|
||||
|
||||
def test_get_current_user_invalid_token(self, db_session, mock_token):
|
||||
"""Test with an invalid token"""
|
||||
# Mock get_token_data to raise TokenInvalidError
|
||||
with patch('app.api.dependencies.auth.get_token_data') as mock_get_data:
|
||||
mock_get_data.side_effect = TokenInvalidError("Invalid token")
|
||||
|
||||
# Should raise HTTPException with 401 status
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
get_current_user(db=db_session, token=mock_token)
|
||||
|
||||
assert exc_info.value.status_code == 401
|
||||
assert "Could not validate credentials" in exc_info.value.detail
|
||||
|
||||
|
||||
class TestGetCurrentActiveUser:
|
||||
"""Tests for get_current_active_user dependency"""
|
||||
|
||||
def test_get_current_active_user(self, mock_user):
|
||||
"""Test getting an active user"""
|
||||
# Ensure user is active
|
||||
mock_user.is_active = True
|
||||
|
||||
# Call the dependency with mocked current_user
|
||||
user = get_current_active_user(current_user=mock_user)
|
||||
|
||||
# Should return the same user
|
||||
assert user == mock_user
|
||||
|
||||
def test_get_current_inactive_user(self, mock_user):
|
||||
"""Test getting an inactive user"""
|
||||
# Make user inactive
|
||||
mock_user.is_active = False
|
||||
|
||||
# Should raise HTTPException with 403 status
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
get_current_active_user(current_user=mock_user)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "Inactive user" in exc_info.value.detail
|
||||
|
||||
|
||||
class TestGetCurrentSuperuser:
|
||||
"""Tests for get_current_superuser dependency"""
|
||||
|
||||
def test_get_current_superuser(self, mock_user):
|
||||
"""Test getting a superuser"""
|
||||
# Make user a superuser
|
||||
mock_user.is_superuser = True
|
||||
|
||||
# Call the dependency with mocked current_user
|
||||
user = get_current_superuser(current_user=mock_user)
|
||||
|
||||
# Should return the same user
|
||||
assert user == mock_user
|
||||
|
||||
def test_get_current_non_superuser(self, mock_user):
|
||||
"""Test getting a non-superuser"""
|
||||
# Ensure user is not a superuser
|
||||
mock_user.is_superuser = False
|
||||
|
||||
# Should raise HTTPException with 403 status
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
get_current_superuser(current_user=mock_user)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "Not enough permissions" in exc_info.value.detail
|
||||
|
||||
|
||||
class TestGetOptionalCurrentUser:
|
||||
"""Tests for get_optional_current_user dependency"""
|
||||
|
||||
def test_get_optional_current_user_with_token(self, db_session, mock_user, mock_token):
|
||||
"""Test getting optional user with a valid token"""
|
||||
# Mock get_token_data
|
||||
with patch('app.api.dependencies.auth.get_token_data') as mock_get_data:
|
||||
mock_get_data.return_value.user_id = mock_user.id
|
||||
|
||||
# Call the dependency
|
||||
user = get_optional_current_user(db=db_session, token=mock_token)
|
||||
|
||||
# Should return the correct user
|
||||
assert user is not None
|
||||
assert user.id == mock_user.id
|
||||
|
||||
def test_get_optional_current_user_no_token(self, db_session):
|
||||
"""Test getting optional user with no token"""
|
||||
# Call the dependency with no token
|
||||
user = get_optional_current_user(db=db_session, token=None)
|
||||
|
||||
# Should return None
|
||||
assert user is None
|
||||
|
||||
def test_get_optional_current_user_invalid_token(self, db_session, mock_token):
|
||||
"""Test getting optional user with an invalid token"""
|
||||
# Mock get_token_data to raise TokenInvalidError
|
||||
with patch('app.api.dependencies.auth.get_token_data') as mock_get_data:
|
||||
mock_get_data.side_effect = TokenInvalidError("Invalid token")
|
||||
|
||||
# Call the dependency
|
||||
user = get_optional_current_user(db=db_session, token=mock_token)
|
||||
|
||||
# Should return None, not raise an exception
|
||||
assert user is None
|
||||
|
||||
def test_get_optional_current_user_expired_token(self, db_session, mock_token):
|
||||
"""Test getting optional user with an expired token"""
|
||||
# Mock get_token_data to raise TokenExpiredError
|
||||
with patch('app.api.dependencies.auth.get_token_data') as mock_get_data:
|
||||
mock_get_data.side_effect = TokenExpiredError("Token expired")
|
||||
|
||||
# Call the dependency
|
||||
user = get_optional_current_user(db=db_session, token=mock_token)
|
||||
|
||||
# Should return None, not raise an exception
|
||||
assert user is None
|
||||
|
||||
def test_get_optional_current_user_inactive(self, db_session, mock_user, mock_token):
|
||||
"""Test getting optional user when user is inactive"""
|
||||
# Make the user inactive
|
||||
mock_user.is_active = False
|
||||
db_session.commit()
|
||||
|
||||
# Mock get_token_data
|
||||
with patch('app.api.dependencies.auth.get_token_data') as mock_get_data:
|
||||
mock_get_data.return_value.user_id = mock_user.id
|
||||
|
||||
# Call the dependency
|
||||
user = get_optional_current_user(db=db_session, token=mock_token)
|
||||
|
||||
# Should return None for inactive users
|
||||
assert user is None
|
||||
252
backend/tests/services/test_auth_service.py
Normal file
252
backend/tests/services/test_auth_service.py
Normal file
@@ -0,0 +1,252 @@
|
||||
# tests/services/test_auth_service.py
|
||||
import uuid
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from app.core.auth import get_password_hash, verify_password, TokenExpiredError, TokenInvalidError
|
||||
from app.models.user import User
|
||||
from app.schemas.users import UserCreate, Token
|
||||
from app.services.auth_service import AuthService, AuthenticationError
|
||||
|
||||
|
||||
class TestAuthServiceAuthentication:
|
||||
"""Tests for AuthService.authenticate_user method"""
|
||||
|
||||
def test_authenticate_valid_user(self, db_session, mock_user):
|
||||
"""Test authenticating a user with valid credentials"""
|
||||
# Set a known password for the mock user
|
||||
password = "TestPassword123"
|
||||
mock_user.password_hash = get_password_hash(password)
|
||||
db_session.commit()
|
||||
|
||||
# Authenticate with correct credentials
|
||||
user = AuthService.authenticate_user(
|
||||
db=db_session,
|
||||
email=mock_user.email,
|
||||
password=password
|
||||
)
|
||||
|
||||
assert user is not None
|
||||
assert user.id == mock_user.id
|
||||
assert user.email == mock_user.email
|
||||
|
||||
def test_authenticate_nonexistent_user(self, db_session):
|
||||
"""Test authenticating with an email that doesn't exist"""
|
||||
user = AuthService.authenticate_user(
|
||||
db=db_session,
|
||||
email="nonexistent@example.com",
|
||||
password="password"
|
||||
)
|
||||
|
||||
assert user is None
|
||||
|
||||
def test_authenticate_with_wrong_password(self, db_session, mock_user):
|
||||
"""Test authenticating with the wrong password"""
|
||||
# Set a known password for the mock user
|
||||
password = "TestPassword123"
|
||||
mock_user.password_hash = get_password_hash(password)
|
||||
db_session.commit()
|
||||
|
||||
# Authenticate with wrong password
|
||||
user = AuthService.authenticate_user(
|
||||
db=db_session,
|
||||
email=mock_user.email,
|
||||
password="WrongPassword123"
|
||||
)
|
||||
|
||||
assert user is None
|
||||
|
||||
def test_authenticate_inactive_user(self, db_session, mock_user):
|
||||
"""Test authenticating an inactive user"""
|
||||
# Set a known password and make user inactive
|
||||
password = "TestPassword123"
|
||||
mock_user.password_hash = get_password_hash(password)
|
||||
mock_user.is_active = False
|
||||
db_session.commit()
|
||||
|
||||
# Should raise AuthenticationError
|
||||
with pytest.raises(AuthenticationError):
|
||||
AuthService.authenticate_user(
|
||||
db=db_session,
|
||||
email=mock_user.email,
|
||||
password=password
|
||||
)
|
||||
|
||||
|
||||
class TestAuthServiceUserCreation:
|
||||
"""Tests for AuthService.create_user method"""
|
||||
|
||||
def test_create_new_user(self, db_session):
|
||||
"""Test creating a new user"""
|
||||
user_data = UserCreate(
|
||||
email="newuser@example.com",
|
||||
password="TestPassword123",
|
||||
first_name="New",
|
||||
last_name="User",
|
||||
phone_number="1234567890"
|
||||
)
|
||||
|
||||
user = AuthService.create_user(db=db_session, user_data=user_data)
|
||||
|
||||
# Verify user was created with correct data
|
||||
assert user is not None
|
||||
assert user.email == user_data.email
|
||||
assert user.first_name == user_data.first_name
|
||||
assert user.last_name == user_data.last_name
|
||||
assert user.phone_number == user_data.phone_number
|
||||
|
||||
# Verify password was hashed
|
||||
assert user.password_hash != user_data.password
|
||||
assert verify_password(user_data.password, user.password_hash)
|
||||
|
||||
# Verify default values
|
||||
assert user.is_active is True
|
||||
assert user.is_superuser is False
|
||||
|
||||
def test_create_user_with_existing_email(self, db_session, mock_user):
|
||||
"""Test creating a user with an email that already exists"""
|
||||
user_data = UserCreate(
|
||||
email=mock_user.email, # Use existing email
|
||||
password="TestPassword123",
|
||||
first_name="Duplicate",
|
||||
last_name="User"
|
||||
)
|
||||
|
||||
# Should raise AuthenticationError
|
||||
with pytest.raises(AuthenticationError):
|
||||
AuthService.create_user(db=db_session, user_data=user_data)
|
||||
|
||||
|
||||
class TestAuthServiceTokens:
|
||||
"""Tests for AuthService token-related methods"""
|
||||
|
||||
def test_create_tokens(self, mock_user):
|
||||
"""Test creating access and refresh tokens for a user"""
|
||||
tokens = AuthService.create_tokens(mock_user)
|
||||
|
||||
# Verify token structure
|
||||
assert isinstance(tokens, Token)
|
||||
assert tokens.access_token is not None
|
||||
assert tokens.refresh_token is not None
|
||||
assert tokens.token_type == "bearer"
|
||||
|
||||
# This is a more in-depth test that would decode the tokens to verify claims
|
||||
# but we'll rely on the auth module tests for token verification
|
||||
|
||||
def test_refresh_tokens(self, db_session, mock_user):
|
||||
"""Test refreshing tokens with a valid refresh token"""
|
||||
# Create initial tokens
|
||||
initial_tokens = AuthService.create_tokens(mock_user)
|
||||
|
||||
# Refresh tokens
|
||||
new_tokens = AuthService.refresh_tokens(
|
||||
db=db_session,
|
||||
refresh_token=initial_tokens.refresh_token
|
||||
)
|
||||
|
||||
# Verify new tokens are different from old ones
|
||||
assert new_tokens.access_token != initial_tokens.access_token
|
||||
assert new_tokens.refresh_token != initial_tokens.refresh_token
|
||||
|
||||
def test_refresh_tokens_with_invalid_token(self, db_session):
|
||||
"""Test refreshing tokens with an invalid token"""
|
||||
# Create an invalid token
|
||||
invalid_token = "invalid.token.string"
|
||||
|
||||
# Should raise TokenInvalidError
|
||||
with pytest.raises(TokenInvalidError):
|
||||
AuthService.refresh_tokens(
|
||||
db=db_session,
|
||||
refresh_token=invalid_token
|
||||
)
|
||||
|
||||
def test_refresh_tokens_with_access_token(self, db_session, mock_user):
|
||||
"""Test refreshing tokens with an access token instead of refresh token"""
|
||||
# Create tokens
|
||||
tokens = AuthService.create_tokens(mock_user)
|
||||
|
||||
# Try to refresh with access token
|
||||
with pytest.raises(TokenInvalidError):
|
||||
AuthService.refresh_tokens(
|
||||
db=db_session,
|
||||
refresh_token=tokens.access_token
|
||||
)
|
||||
|
||||
def test_refresh_tokens_with_nonexistent_user(self, db_session):
|
||||
"""Test refreshing tokens for a user that doesn't exist in the database"""
|
||||
# Create a token for a non-existent user
|
||||
non_existent_id = str(uuid.uuid4())
|
||||
with patch('app.core.auth.decode_token'), patch('app.core.auth.get_token_data') as mock_get_data:
|
||||
# Mock the token data to return a non-existent user ID
|
||||
mock_get_data.return_value.user_id = uuid.UUID(non_existent_id)
|
||||
|
||||
# Should raise TokenInvalidError
|
||||
with pytest.raises(TokenInvalidError):
|
||||
AuthService.refresh_tokens(
|
||||
db=db_session,
|
||||
refresh_token="some.refresh.token"
|
||||
)
|
||||
|
||||
|
||||
class TestAuthServicePasswordChange:
|
||||
"""Tests for AuthService.change_password method"""
|
||||
|
||||
def test_change_password(self, db_session, mock_user):
|
||||
"""Test changing a user's password"""
|
||||
# Set a known password for the mock user
|
||||
current_password = "CurrentPassword123"
|
||||
mock_user.password_hash = get_password_hash(current_password)
|
||||
db_session.commit()
|
||||
|
||||
# Change password
|
||||
new_password = "NewPassword456"
|
||||
result = AuthService.change_password(
|
||||
db=db_session,
|
||||
user_id=mock_user.id,
|
||||
current_password=current_password,
|
||||
new_password=new_password
|
||||
)
|
||||
|
||||
# Verify operation was successful
|
||||
assert result is True
|
||||
|
||||
# Refresh user from DB
|
||||
db_session.refresh(mock_user)
|
||||
|
||||
# Verify old password no longer works
|
||||
assert not verify_password(current_password, mock_user.password_hash)
|
||||
|
||||
# Verify new password works
|
||||
assert verify_password(new_password, mock_user.password_hash)
|
||||
|
||||
def test_change_password_wrong_current_password(self, db_session, mock_user):
|
||||
"""Test changing password with incorrect current password"""
|
||||
# Set a known password for the mock user
|
||||
current_password = "CurrentPassword123"
|
||||
mock_user.password_hash = get_password_hash(current_password)
|
||||
db_session.commit()
|
||||
|
||||
# Try to change password with wrong current password
|
||||
wrong_password = "WrongPassword123"
|
||||
with pytest.raises(AuthenticationError):
|
||||
AuthService.change_password(
|
||||
db=db_session,
|
||||
user_id=mock_user.id,
|
||||
current_password=wrong_password,
|
||||
new_password="NewPassword456"
|
||||
)
|
||||
|
||||
# Verify password was not changed
|
||||
assert verify_password(current_password, mock_user.password_hash)
|
||||
|
||||
def test_change_password_nonexistent_user(self, db_session):
|
||||
"""Test changing password for a user that doesn't exist"""
|
||||
non_existent_id = uuid.uuid4()
|
||||
|
||||
with pytest.raises(AuthenticationError):
|
||||
AuthService.change_password(
|
||||
db=db_session,
|
||||
user_id=non_existent_id,
|
||||
current_password="CurrentPassword123",
|
||||
new_password="NewPassword456"
|
||||
)
|
||||
Reference in New Issue
Block a user