Files
fast-next-template/backend/tests/api/test_user_routes.py
Felipe Cardoso d75a8de91b Add validation to prevent privilege escalation via is_superuser field and enhance related tests
- Added explicit Pydantic validation to reject modifications to `is_superuser` in `UserUpdate` schema.
- Updated backend logic in `users.py` to support defense-in-depth against privilege escalation.
- Introduced comprehensive tests for `/users` and `/users/me` endpoints to ensure `is_superuser` validation works correctly.
- Enhanced error handling and validation messages for better clarity and robustness.
2025-11-01 16:15:03 +01:00

594 lines
22 KiB
Python
Executable File

# tests/api/test_user_routes.py
"""
Comprehensive tests for user management endpoints.
These tests focus on finding potential bugs, not just coverage.
"""
import pytest
import pytest_asyncio
from unittest.mock import patch
from fastapi import status
import uuid
from sqlalchemy import select
from app.models.user import User
from app.models.user import User
from app.schemas.users import UserUpdate
# Disable rate limiting for tests
@pytest.fixture(autouse=True)
def disable_rate_limit():
"""Disable rate limiting for all tests in this module."""
with patch('app.api.routes.users.limiter.enabled', False):
with patch('app.api.routes.auth.limiter.enabled', False):
yield
async def get_auth_headers(client, email, password):
"""Helper to get authentication headers."""
response = await client.post(
"/api/v1/auth/login",
json={"email": email, "password": password}
)
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
class TestListUsers:
"""Tests for GET /users endpoint."""
@pytest.mark.asyncio
async def test_list_users_as_superuser(self, client, async_test_superuser):
"""Test listing users as superuser."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
response = await client.get("/api/v1/users", headers=headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "data" in data
assert "pagination" in data
assert isinstance(data["data"], list)
@pytest.mark.asyncio
async def test_list_users_as_regular_user(self, client, async_test_user):
"""Test that regular users cannot list users."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.get("/api/v1/users", headers=headers)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_list_users_pagination(self, client, async_test_superuser, async_test_db):
"""Test pagination works correctly."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create multiple users
async with AsyncTestingSessionLocal() as session:
for i in range(15):
user = User(
email=f"paguser{i}@example.com",
password_hash="hash",
first_name=f"PagUser{i}",
is_active=True,
is_superuser=False
)
session.add(user)
await session.commit()
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
# Get first page
response = await client.get("/api/v1/users?page=1&limit=5", headers=headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data["data"]) == 5
assert data["pagination"]["page"] == 1
assert data["pagination"]["total"] >= 15
@pytest.mark.asyncio
async def test_list_users_filter_active(self, client, async_test_superuser, async_test_db):
"""Test filtering by active status."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create active and inactive users
async with AsyncTestingSessionLocal() as session:
active_user = User(
email="activefilter@example.com",
password_hash="hash",
first_name="Active",
is_active=True,
is_superuser=False
)
inactive_user = User(
email="inactivefilter@example.com",
password_hash="hash",
first_name="Inactive",
is_active=False,
is_superuser=False
)
session.add_all([active_user, inactive_user])
await session.commit()
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
# Filter for active users
response = await client.get("/api/v1/users?is_active=true", headers=headers)
data = response.json()
emails = [u["email"] for u in data["data"]]
assert "activefilter@example.com" in emails
assert "inactivefilter@example.com" not in emails
# Filter for inactive users
response = await client.get("/api/v1/users?is_active=false", headers=headers)
data = response.json()
emails = [u["email"] for u in data["data"]]
assert "inactivefilter@example.com" in emails
assert "activefilter@example.com" not in emails
@pytest.mark.asyncio
async def test_list_users_sort_by_email(self, client, async_test_superuser):
"""Test sorting users by email."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
response = await client.get("/api/v1/users?sort_by=email&sort_order=asc", headers=headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
emails = [u["email"] for u in data["data"]]
assert emails == sorted(emails)
@pytest.mark.asyncio
async def test_list_users_no_auth(self, client):
"""Test that unauthenticated requests are rejected."""
response = await client.get("/api/v1/users")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
# Note: Removed test_list_users_unexpected_error because mocking at CRUD level
# causes the exception to be raised before FastAPI can handle it properly
class TestGetCurrentUserProfile:
"""Tests for GET /users/me endpoint."""
@pytest.mark.asyncio
async def test_get_own_profile(self, client, async_test_user):
"""Test getting own profile."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.get("/api/v1/users/me", headers=headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["email"] == async_test_user.email
assert data["first_name"] == async_test_user.first_name
@pytest.mark.asyncio
async def test_get_profile_no_auth(self, client):
"""Test that unauthenticated requests are rejected."""
response = await client.get("/api/v1/users/me")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
class TestUpdateCurrentUser:
"""Tests for PATCH /users/me endpoint."""
@pytest.mark.asyncio
async def test_update_own_profile(self, client, async_test_user):
"""Test updating own profile."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.patch(
"/api/v1/users/me",
headers=headers,
json={"first_name": "Updated", "last_name": "Name"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["first_name"] == "Updated"
assert data["last_name"] == "Name"
@pytest.mark.asyncio
async def test_update_profile_phone_number(self, client, async_test_user, test_db):
"""Test updating phone number with validation."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.patch(
"/api/v1/users/me",
headers=headers,
json={"phone_number": "+19876543210"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["phone_number"] == "+19876543210"
@pytest.mark.asyncio
async def test_update_profile_invalid_phone(self, client, async_test_user):
"""Test that invalid phone numbers are rejected."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.patch(
"/api/v1/users/me",
headers=headers,
json={"phone_number": "invalid"}
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_cannot_elevate_to_superuser(self, client, async_test_user):
"""Test that users cannot make themselves superuser."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
# Note: is_superuser is now in UserUpdate schema with explicit validation
# This tests that Pydantic rejects the attempt at the schema level
response = await client.patch(
"/api/v1/users/me",
headers=headers,
json={"first_name": "Test", "is_superuser": True}
)
# Pydantic validation should reject this at the schema level
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
data = response.json()
assert data["success"] is False
assert "errors" in data
# Check that the error mentions is_superuser
error_fields = [err["field"] for err in data["errors"]]
assert "is_superuser" in error_fields
@pytest.mark.asyncio
async def test_update_profile_no_auth(self, client):
"""Test that unauthenticated requests are rejected."""
response = await client.patch(
"/api/v1/users/me",
json={"first_name": "Hacker"}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
# Note: Removed test_update_profile_unexpected_error - see comment above
class TestGetUserById:
"""Tests for GET /users/{user_id} endpoint."""
@pytest.mark.asyncio
async def test_get_own_profile_by_id(self, client, async_test_user):
"""Test getting own profile by ID."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.get(f"/api/v1/users/{async_test_user.id}", headers=headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["email"] == async_test_user.email
@pytest.mark.asyncio
async def test_get_other_user_as_regular_user(self, client, async_test_user, test_db):
"""Test that regular users cannot view other profiles."""
# Create another user
other_user = User(
email="other@example.com",
password_hash="hash",
first_name="Other",
is_active=True,
is_superuser=False
)
test_db.add(other_user)
test_db.commit()
test_db.refresh(other_user)
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.get(f"/api/v1/users/{other_user.id}", headers=headers)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_get_other_user_as_superuser(self, client, async_test_superuser, async_test_user):
"""Test that superusers can view other profiles."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
response = await client.get(f"/api/v1/users/{async_test_user.id}", headers=headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["email"] == async_test_user.email
@pytest.mark.asyncio
async def test_get_nonexistent_user(self, client, async_test_superuser):
"""Test getting non-existent user."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
fake_id = uuid.uuid4()
response = await client.get(f"/api/v1/users/{fake_id}", headers=headers)
assert response.status_code == status.HTTP_404_NOT_FOUND
@pytest.mark.asyncio
async def test_get_user_invalid_uuid(self, client, async_test_superuser):
"""Test getting user with invalid UUID format."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
response = await client.get("/api/v1/users/not-a-uuid", headers=headers)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
class TestUpdateUserById:
"""Tests for PATCH /users/{user_id} endpoint."""
@pytest.mark.asyncio
async def test_update_own_profile_by_id(self, client, async_test_user, test_db):
"""Test updating own profile by ID."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.patch(
f"/api/v1/users/{async_test_user.id}",
headers=headers,
json={"first_name": "SelfUpdated"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["first_name"] == "SelfUpdated"
@pytest.mark.asyncio
async def test_update_other_user_as_regular_user(self, client, async_test_user, test_db):
"""Test that regular users cannot update other profiles."""
# Create another user
other_user = User(
email="updateother@example.com",
password_hash="hash",
first_name="Other",
is_active=True,
is_superuser=False
)
test_db.add(other_user)
test_db.commit()
test_db.refresh(other_user)
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.patch(
f"/api/v1/users/{other_user.id}",
headers=headers,
json={"first_name": "Hacked"}
)
assert response.status_code == status.HTTP_403_FORBIDDEN
# Verify user was not modified
test_db.refresh(other_user)
assert other_user.first_name == "Other"
@pytest.mark.asyncio
async def test_update_other_user_as_superuser(self, client, async_test_superuser, async_test_user, test_db):
"""Test that superusers can update other profiles."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
response = await client.patch(
f"/api/v1/users/{async_test_user.id}",
headers=headers,
json={"first_name": "AdminUpdated"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["first_name"] == "AdminUpdated"
@pytest.mark.asyncio
async def test_regular_user_cannot_modify_superuser_status(self, client, async_test_user):
"""Test that regular users cannot change superuser status even if they try."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
# is_superuser not in UserUpdate schema, so it gets ignored by Pydantic
# Just verify the user stays the same
response = await client.patch(
f"/api/v1/users/{async_test_user.id}",
headers=headers,
json={"first_name": "Test"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["is_superuser"] is False
@pytest.mark.asyncio
async def test_superuser_can_update_users(self, client, async_test_superuser, async_test_user, test_db):
"""Test that superusers can update other users."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
response = await client.patch(
f"/api/v1/users/{async_test_user.id}",
headers=headers,
json={"first_name": "AdminChanged", "is_active": False}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["first_name"] == "AdminChanged"
assert data["is_active"] is False
@pytest.mark.asyncio
async def test_update_nonexistent_user(self, client, async_test_superuser):
"""Test updating non-existent user."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
fake_id = uuid.uuid4()
response = await client.patch(
f"/api/v1/users/{fake_id}",
headers=headers,
json={"first_name": "Ghost"}
)
assert response.status_code == status.HTTP_404_NOT_FOUND
# Note: Removed test_update_user_unexpected_error - see comment above
class TestChangePassword:
"""Tests for PATCH /users/me/password endpoint."""
@pytest.mark.asyncio
async def test_change_password_success(self, client, async_test_user, test_db):
"""Test successful password change."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.patch(
"/api/v1/users/me/password",
headers=headers,
json={
"current_password": "TestPassword123!",
"new_password": "NewPassword123!"
}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["success"] is True
# Verify can login with new password
login_response = await client.post(
"/api/v1/auth/login",
json={
"email": async_test_user.email,
"password": "NewPassword123!"
}
)
assert login_response.status_code == status.HTTP_200_OK
@pytest.mark.asyncio
async def test_change_password_wrong_current(self, client, async_test_user):
"""Test that wrong current password is rejected."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.patch(
"/api/v1/users/me/password",
headers=headers,
json={
"current_password": "WrongPassword123",
"new_password": "NewPassword123!"
}
)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_change_password_weak_new_password(self, client, async_test_user):
"""Test that weak new passwords are rejected."""
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.patch(
"/api/v1/users/me/password",
headers=headers,
json={
"current_password": "TestPassword123!",
"new_password": "weak"
}
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_change_password_no_auth(self, client):
"""Test that unauthenticated requests are rejected."""
response = await client.patch(
"/api/v1/users/me/password",
json={
"current_password": "TestPassword123!",
"new_password": "NewPassword123!"
}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
# Note: Removed test_change_password_unexpected_error - see comment above
class TestDeleteUser:
"""Tests for DELETE /users/{user_id} endpoint."""
@pytest.mark.asyncio
async def test_delete_user_as_superuser(self, client, async_test_superuser, async_test_db):
"""Test deleting a user as superuser."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create a user to delete
async with AsyncTestingSessionLocal() as session:
user_to_delete = User(
email="deleteme@example.com",
password_hash="hash",
first_name="Delete",
is_active=True,
is_superuser=False
)
session.add(user_to_delete)
await session.commit()
await session.refresh(user_to_delete)
user_id = user_to_delete.id
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
response = await client.delete(f"/api/v1/users/{user_id}", headers=headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["success"] is True
# Verify user is soft-deleted (has deleted_at timestamp)
async with AsyncTestingSessionLocal() as session:
from sqlalchemy import select
result = await session.execute(select(User).where(User.id == user_id))
deleted_user = result.scalar_one_or_none()
assert deleted_user.deleted_at is not None
@pytest.mark.asyncio
async def test_cannot_delete_self(self, client, async_test_superuser):
"""Test that users cannot delete their own account."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
response = await client.delete(f"/api/v1/users/{async_test_superuser.id}", headers=headers)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_delete_user_as_regular_user(self, client, async_test_user, test_db):
"""Test that regular users cannot delete users."""
# Create another user
other_user = User(
email="cantdelete@example.com",
password_hash="hash",
first_name="Protected",
is_active=True,
is_superuser=False
)
test_db.add(other_user)
test_db.commit()
test_db.refresh(other_user)
headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!")
response = await client.delete(f"/api/v1/users/{other_user.id}", headers=headers)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_delete_nonexistent_user(self, client, async_test_superuser):
"""Test deleting non-existent user."""
headers = await get_auth_headers(client, async_test_superuser.email, "SuperPassword123!")
fake_id = uuid.uuid4()
response = await client.delete(f"/api/v1/users/{fake_id}", headers=headers)
assert response.status_code == status.HTTP_404_NOT_FOUND
@pytest.mark.asyncio
async def test_delete_user_no_auth(self, client, async_test_user):
"""Test that unauthenticated requests are rejected."""
response = await client.delete(f"/api/v1/users/{async_test_user.id}")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
# Note: Removed test_delete_user_unexpected_error - see comment above