Add tests for auth dependencies and security utilities
All checks were successful
Build and Push Docker Images / changes (push) Successful in 4s
Build and Push Docker Images / build-backend (push) Successful in 49s
Build and Push Docker Images / build-frontend (push) Has been skipped

Introduced unit tests for `get_current_user`, `get_current_active_user`, and security functions like token creation and decoding. Also refactored imports for consistency and cleaned up unused or misplaced code to improve maintainability.
This commit is contained in:
2025-02-28 16:34:59 +01:00
parent 43df9d73b0
commit c3a55b26c7
4 changed files with 149 additions and 5 deletions

View File

@@ -3,13 +3,13 @@ from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt from jose import JWTError, jwt
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from auth.security import SECRET_KEY, ALGORITHM
from app.core.database import get_db from app.core.database import get_db
from models.user import User from auth.security import SECRET_KEY, ALGORITHM
from app.schemas.token import TokenData from app.models.user import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
async def get_current_user( async def get_current_user(
token: str = Depends(oauth2_scheme), token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db) db: AsyncSession = Depends(get_db)
@@ -37,9 +37,10 @@ async def get_current_user(
return user return user
async def get_current_active_user( async def get_current_active_user(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
if not current_user.is_active: if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user") raise HTTPException(status_code=400, detail="Inactive user")
return current_user return current_user

View File

@@ -5,7 +5,7 @@ from uuid import uuid4
from jose import jwt, JWTError from jose import jwt, JWTError
from passlib.context import CryptContext from passlib.context import CryptContext
from app.core.config import settings from app.core.config import settings
from .token import TokenPayload, TokenResponse from app.schemas.token import TokenPayload, TokenResponse
# Configuration # Configuration
SECRET_KEY = settings.SECRET_KEY SECRET_KEY = settings.SECRET_KEY

View File

@@ -0,0 +1,75 @@
from unittest.mock import AsyncMock
import pytest
from fastapi import HTTPException
from jose import jwt
from app.auth.dependencies import get_current_user, get_current_active_user
from app.auth.security import SECRET_KEY, ALGORITHM
from app.models.user import User
@pytest.fixture
def mock_user():
return User(
id="123e4567-e89b-12d3-a456-426614174000",
email="test@example.com",
password_hash="hashedpassword",
is_active=True
)
@pytest.mark.asyncio
async def test_get_current_user_success(mock_user):
valid_token = jwt.encode({"sub": str(mock_user.id), "type": "access"}, SECRET_KEY, algorithm=ALGORITHM)
mock_db = AsyncMock()
mock_db.get.return_value = mock_user
user = await get_current_user(token=valid_token, db=mock_db)
assert user == mock_user
mock_db.get.assert_called_once_with(User, mock_user.id)
@pytest.mark.asyncio
async def test_get_current_user_invalid_token():
invalid_token = "invalid.token.payload"
with pytest.raises(HTTPException) as exc_info:
await get_current_user(token=invalid_token, db=AsyncMock())
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Could not validate credentials"
@pytest.mark.asyncio
async def test_get_current_user_wrong_token_type():
token = jwt.encode({"sub": "123", "type": "refresh"}, SECRET_KEY, algorithm=ALGORITHM)
with pytest.raises(HTTPException) as exc_info:
await get_current_user(token=token, db=AsyncMock())
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Could not validate credentials"
@pytest.mark.asyncio
async def test_get_current_active_user_success(mock_user):
result = await get_current_active_user(mock_user)
assert result == mock_user
@pytest.mark.asyncio
async def test_get_current_active_user_inactive():
inactive_user = User(
id="123e4567-e89b-12d3-a456-426614174000",
email="inactive@example.com",
password_hash="hashedpassword",
is_active=False
)
with pytest.raises(HTTPException) as exc_info:
await get_current_active_user(inactive_user)
assert exc_info.value.status_code == 400
assert exc_info.value.detail == "Inactive user"

View File

@@ -0,0 +1,68 @@
import pytest
from datetime import timedelta
from jose import jwt, JWTError
from app.auth.security import (
get_password_hash, verify_password,
create_access_token, create_refresh_token,
decode_token, SECRET_KEY, ALGORITHM
)
from app.schemas.token import TokenPayload
def test_password_hashing():
plain_password = "securepassword123"
hashed_password = get_password_hash(plain_password)
# Ensure hashed passwords are not the same
assert hashed_password != plain_password
# Test password verification
assert verify_password(plain_password, hashed_password)
assert not verify_password("wrongpassword", hashed_password)
def test_access_token_creation():
user_id = "123e4567-e89b-12d3-a456-426614174000"
token = create_access_token({"sub": user_id})
decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
assert decoded_payload.get("sub") == user_id
assert decoded_payload.get("type") == "access"
def test_refresh_token_creation():
user_id = "123e4567-e89b-12d3-a456-426614174000"
token = create_refresh_token({"sub": user_id})
decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
assert decoded_payload.get("sub") == user_id
assert decoded_payload.get("type") == "refresh"
def test_decode_token_valid():
user_id = "123e4567-e89b-12d3-a456-426614174000"
access_token = create_access_token({"sub": user_id})
token_payload = decode_token(access_token)
assert isinstance(token_payload, TokenPayload)
assert token_payload.sub == user_id
assert token_payload.type == "access"
def test_decode_token_expired():
user_id = "123e4567-e89b-12d3-a456-426614174000"
token = create_access_token({"sub": user_id}, expires_delta=timedelta(seconds=-1))
with pytest.raises(JWTError):
decode_token(token)
def test_decode_token_invalid_signature():
token = jwt.encode({"some": "data"}, "invalid_key", algorithm=ALGORITHM)
with pytest.raises(JWTError):
decode_token(token)
def test_decode_token_malformed():
malformed_token = "malformed.header.payload"
with pytest.raises(JWTError):
decode_token(malformed_token)