Files
fast-next-template/backend/tests/e2e/test_database_workflows.py
Felipe Cardoso 7716468d72 Add E2E tests for admin and organization workflows
- Introduced E2E tests for admin user and organization management workflows: user listing, creation, updates, bulk actions, and organization membership management.
- Added comprehensive tests for organization CRUD operations, membership visibility, roles, and permission validation.
- Expanded fixtures for superuser and member setup to streamline testing of admin-specific operations.
- Verified pagination, filtering, and action consistency across admin endpoints.
2025-11-25 23:50:02 +01:00

322 lines
11 KiB
Python

"""
PostgreSQL-specific E2E workflow tests.
These tests validate complete user workflows against a real PostgreSQL
database. They catch issues that SQLite-based tests might miss:
- PostgreSQL-specific SQL behavior
- Real constraint violations
- Actual transaction semantics
- JSONB column behavior
Usage:
make test-e2e # Run all E2E tests
"""
from uuid import uuid4
import pytest
pytestmark = [
pytest.mark.e2e,
pytest.mark.postgres,
pytest.mark.asyncio,
]
class TestUserRegistrationWorkflow:
"""Test complete user registration workflows with real PostgreSQL."""
async def test_user_registration_creates_user(self, e2e_client):
"""Test that user registration successfully creates a user in PostgreSQL."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
response = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "SecurePassword123!",
"first_name": "E2E",
"last_name": "Test",
},
)
assert response.status_code in [200, 201], (
f"Registration failed: {response.text}"
)
data = response.json()
assert data["email"] == email
assert "id" in data
async def test_duplicate_email_rejected(self, e2e_client):
"""Test that duplicate email registration is properly rejected."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
# First registration should succeed
response1 = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "SecurePassword123!",
"first_name": "First",
"last_name": "User",
},
)
assert response1.status_code in [200, 201]
# Second registration with same email should fail
# API returns 400 (Bad Request) for duplicate email
response2 = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "AnotherPassword123!",
"first_name": "Second",
"last_name": "User",
},
)
assert response2.status_code in [400, 409], "Should reject duplicate email"
class TestAuthenticationWorkflow:
"""Test complete authentication workflows."""
async def test_register_login_access_protected(self, e2e_client):
"""Test complete flow: register -> login -> access protected endpoint."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
password = "SecurePassword123!"
# 1. Register
reg_resp = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "E2E",
"last_name": "Test",
},
)
assert reg_resp.status_code in [200, 201], (
f"Registration failed: {reg_resp.text}"
)
# 2. Login
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
assert login_resp.status_code == 200, f"Login failed: {login_resp.text}"
tokens = login_resp.json()
assert "access_token" in tokens
assert "refresh_token" in tokens
# 3. Access protected endpoint
me_resp = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert me_resp.status_code == 200, f"Protected access failed: {me_resp.text}"
user_data = me_resp.json()
assert user_data["email"] == email
async def test_invalid_credentials_rejected(self, e2e_client):
"""Test that invalid login credentials are rejected."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
# Register user first
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "CorrectPassword123!",
"first_name": "E2E",
"last_name": "Test",
},
)
# Try to login with wrong password
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": "WrongPassword123!"},
)
assert login_resp.status_code == 401, "Should reject invalid credentials"
async def test_token_refresh_workflow(self, e2e_client):
"""Test that refresh tokens can be used to get new access tokens."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
password = "SecurePassword123!"
# Register and login
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "E2E",
"last_name": "Test",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# Use refresh token
refresh_resp = await e2e_client.post(
"/api/v1/auth/refresh",
json={"refresh_token": tokens["refresh_token"]},
)
assert refresh_resp.status_code == 200, f"Refresh failed: {refresh_resp.text}"
new_tokens = refresh_resp.json()
assert "access_token" in new_tokens
class TestHealthEndpoint:
"""Test health endpoint behavior."""
async def test_health_check_responds(self, e2e_client):
"""
Test that health endpoint responds.
Note: In E2E tests, the health endpoint's database check uses
the production database config (not the test database override),
so it may return 503. This test verifies the endpoint is accessible.
"""
response = await e2e_client.get("/health")
# Health endpoint should respond (may be 200 or 503 depending on DB config)
assert response.status_code in [200, 503]
data = response.json()
assert "status" in data
class TestLogoutWorkflows:
"""Test logout workflows."""
async def test_logout_invalidates_session(self, e2e_client):
"""Test that logout invalidates the session."""
email = f"e2e-logout-{uuid4().hex[:8]}@example.com"
password = "SecurePassword123!"
# Register and login
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Logout",
"last_name": "Test",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# Logout requires both access token (auth) and refresh token (body)
logout_resp = await e2e_client.post(
"/api/v1/auth/logout",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"refresh_token": tokens["refresh_token"]},
)
assert logout_resp.status_code == 200
async def test_invalid_refresh_token_rejected(self, e2e_client):
"""Test that invalid refresh tokens are rejected."""
response = await e2e_client.post(
"/api/v1/auth/refresh",
json={"refresh_token": "invalid_refresh_token"},
)
assert response.status_code in [401, 422]
class TestValidationWorkflows:
"""Test input validation workflows."""
async def test_register_invalid_email(self, e2e_client):
"""Test that invalid email format is rejected."""
response = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": "not_an_email",
"password": "ValidPassword123!",
"first_name": "Test",
"last_name": "User",
},
)
assert response.status_code == 422
async def test_register_weak_password(self, e2e_client):
"""Test that weak passwords are rejected."""
email = f"e2e-weak-{uuid4().hex[:8]}@example.com"
response = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "weak", # Too weak
"first_name": "Test",
"last_name": "User",
},
)
assert response.status_code == 422
async def test_login_missing_fields(self, e2e_client):
"""Test that login requires all fields."""
response = await e2e_client.post(
"/api/v1/auth/login",
json={"email": "test@example.com"}, # Missing password
)
assert response.status_code == 422
class TestRootEndpoint:
"""Test root endpoint."""
async def test_root_responds(self, e2e_client):
"""Root endpoint should respond with HTML."""
response = await e2e_client.get("/")
assert response.status_code == 200
# Root returns HTML
assert "html" in response.text.lower() or "Welcome" in response.text
async def test_openapi_available(self, e2e_client):
"""OpenAPI schema should be available."""
response = await e2e_client.get("/api/v1/openapi.json")
assert response.status_code == 200
data = response.json()
assert "openapi" in data
assert "paths" in data
class TestAuthTokenWorkflows:
"""Test authentication token workflows."""
async def test_access_token_expires(self, e2e_client):
"""Test using expired access token."""
# Use a fake/expired token
fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwiZXhwIjoxNjAwMDAwMDAwfQ.invalid"
response = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {fake_token}"},
)
assert response.status_code == 401
async def test_malformed_token_rejected(self, e2e_client):
"""Test that malformed tokens are rejected."""
response = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": "Bearer not-a-valid-token"},
)
assert response.status_code == 401
async def test_missing_bearer_prefix(self, e2e_client):
"""Test that tokens without Bearer prefix are rejected."""
response = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": "some-token"},
)
assert response.status_code == 401