Add support for E2E testing infrastructure and OAuth configurations

- Introduced make commands for E2E tests using Testcontainers and Schemathesis.
- Updated `.env.demo` with configurable OAuth settings for Google and GitHub.
- Enhanced `README.md` with updated environment setup instructions.
- Added E2E testing dependencies and markers in `pyproject.toml` for real PostgreSQL and API contract validation.
- Included new libraries (`arrow`, `attrs`, `docker`, etc.) for testing and schema validation workflows.
This commit is contained in:
Felipe Cardoso
2025-11-25 22:24:23 +01:00
parent fcbcff99e9
commit c0b253a010
14 changed files with 1539 additions and 13 deletions

View File

@@ -0,0 +1 @@
"""E2E tests using Testcontainers and Schemathesis."""

View File

@@ -0,0 +1,205 @@
"""
E2E Test Fixtures using Testcontainers.
This module provides fixtures for end-to-end testing with:
- Real PostgreSQL containers (via Testcontainers)
- ASGI test clients connected to real database
Requirements:
- Docker must be running
- Install E2E deps: make install-e2e (or uv sync --extra e2e)
Usage:
make test-e2e # Run all E2E tests
make test-e2e-schema # Run schema tests only
"""
import os
import subprocess
# Disable Testcontainers Ryuk (Reaper) to avoid port mapping issues in some environments.
# Ryuk is used for automatic cleanup of containers, but can cause issues with port 8080.
# Containers will still be cleaned up when the test session ends via explicit stop() calls.
os.environ.setdefault("TESTCONTAINERS_RYUK_DISABLED", "true")
import pytest
import pytest_asyncio
def is_docker_available() -> bool:
"""Check if Docker daemon is accessible."""
try:
result = subprocess.run(
["docker", "info"],
capture_output=True,
timeout=10,
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
DOCKER_AVAILABLE = is_docker_available()
# Conditional import - only import testcontainers if Docker is available
if DOCKER_AVAILABLE:
try:
from testcontainers.postgres import PostgresContainer
TESTCONTAINERS_AVAILABLE = True
except ImportError:
TESTCONTAINERS_AVAILABLE = False
else:
TESTCONTAINERS_AVAILABLE = False
def pytest_collection_modifyitems(config, items):
"""Skip E2E tests if Docker is not available."""
if not DOCKER_AVAILABLE:
skip_marker = pytest.mark.skip(
reason="Docker not available - start Docker to run E2E tests"
)
for item in items:
if (
"e2e" in item.keywords
or "postgres" in item.keywords
or "schemathesis" in item.keywords
):
item.add_marker(skip_marker)
elif not TESTCONTAINERS_AVAILABLE:
skip_marker = pytest.mark.skip(
reason="testcontainers not installed - run: make install-e2e"
)
for item in items:
if "e2e" in item.keywords or "postgres" in item.keywords:
item.add_marker(skip_marker)
# Store container at module level to share across tests
_postgres_container = None
_postgres_url = None
@pytest.fixture(scope="session")
def postgres_container():
"""
Session-scoped PostgreSQL container.
Starts once per test session and is shared across all E2E tests.
This significantly improves test performance compared to per-test containers.
"""
global _postgres_container, _postgres_url
if not DOCKER_AVAILABLE:
pytest.skip("Docker not available")
if not TESTCONTAINERS_AVAILABLE:
pytest.skip("testcontainers not installed - run: make install-e2e")
_postgres_container = PostgresContainer("postgres:17-alpine")
_postgres_container.start()
_postgres_url = _postgres_container.get_connection_url()
yield _postgres_container
_postgres_container.stop()
_postgres_container = None
_postgres_url = None
@pytest.fixture(scope="session")
def postgres_url(postgres_container) -> str:
"""Get sync PostgreSQL URL from container."""
return _postgres_url
@pytest.fixture(scope="session")
def async_postgres_url(postgres_url) -> str:
"""
Get async-compatible PostgreSQL URL from container.
Converts the sync URL to use asyncpg driver.
Testcontainers returns postgresql+psycopg2:// format.
"""
# Testcontainers uses psycopg2 by default, convert to asyncpg
return postgres_url.replace("postgresql+psycopg2://", "postgresql+asyncpg://").replace(
"postgresql://", "postgresql+asyncpg://"
)
@pytest_asyncio.fixture
async def e2e_db_session(async_postgres_url):
"""
Function-scoped async database session with fresh tables.
Each test gets:
- Fresh database schema (tables dropped and recreated)
- Isolated session that doesn't leak state between tests
"""
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from app.models.base import Base
engine = create_async_engine(async_postgres_url, echo=False)
# Drop and recreate all tables for isolation
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async with AsyncSessionLocal() as session:
yield session
await engine.dispose()
@pytest_asyncio.fixture
async def e2e_client(async_postgres_url):
"""
ASGI test client with real PostgreSQL backend.
Provides an httpx AsyncClient connected to the FastAPI app,
with database dependency overridden to use real PostgreSQL.
"""
os.environ["IS_TEST"] = "True"
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from app.core.database import get_db
from app.main import app
from app.models.base import Base
engine = create_async_engine(async_postgres_url, echo=False)
# Drop and recreate all tables for isolation
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
AsyncTestingSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def override_get_db():
async with AsyncTestingSessionLocal() as session:
yield session
app.dependency_overrides[get_db] = override_get_db
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://e2e-test") as client:
yield client
app.dependency_overrides.clear()
await engine.dispose()

View File

@@ -0,0 +1,95 @@
"""
API Contract Tests using Schemathesis.
These tests demonstrate Schemathesis contract testing capabilities.
Schemathesis auto-generates test cases from OpenAPI schema and validates
that responses match documented schemas.
Usage:
make test-e2e-schema # Run schema tests only
make test-e2e # Run all E2E tests
Note: Schemathesis v4.x API - filtering is done via include/exclude methods.
"""
import pytest
try:
from schemathesis import openapi
from hypothesis import Phase, settings
SCHEMATHESIS_AVAILABLE = True
except ImportError:
SCHEMATHESIS_AVAILABLE = False
# Skip all tests in this module if schemathesis is not installed
pytestmark = [
pytest.mark.e2e,
pytest.mark.schemathesis,
pytest.mark.skipif(
not SCHEMATHESIS_AVAILABLE,
reason="schemathesis not installed - run: make install-e2e",
),
]
if SCHEMATHESIS_AVAILABLE:
from app.main import app
# Load schema from the FastAPI app using schemathesis.openapi (v4.x API)
schema = openapi.from_asgi("/api/v1/openapi.json", app=app)
# Test root endpoint (simple, always works)
root_schema = schema.include(path="/")
@root_schema.parametrize()
@settings(max_examples=5)
def test_root_endpoint_schema(case):
"""
Root endpoint schema compliance.
Tests that the root endpoint returns responses matching its schema.
"""
response = case.call()
# Just verify we get a response and no 5xx errors
assert response.status_code < 500, f"Server error: {response.text}"
# Test auth registration endpoint
# Note: This tests schema validation, not actual database operations
auth_register_schema = schema.include(path="/api/v1/auth/register")
@auth_register_schema.parametrize()
@settings(max_examples=10)
def test_register_endpoint_validates_input(case):
"""
Registration endpoint input validation.
Schemathesis generates various inputs to test validation.
The endpoint should never return 5xx errors for invalid input.
"""
response = case.call()
# Registration returns 200/201 (success), 400/422 (validation), 409 (conflict)
# Never a 5xx error for validation issues
assert response.status_code < 500, f"Server error: {response.text}"
class TestSchemaValidation:
"""Manual validation tests for schema structure."""
def test_schema_loaded_successfully(self):
"""Verify schema was loaded from the app."""
# Count operations to verify schema loaded
ops = list(schema.get_all_operations())
assert len(ops) > 0, "No operations found in schema"
def test_multiple_endpoints_documented(self):
"""Verify multiple endpoints are documented in schema."""
ops = list(schema.get_all_operations())
# Should have at least 10 operations in a real API
assert len(ops) >= 10, f"Only {len(ops)} operations found"
def test_schema_has_auth_operations(self):
"""Verify auth-related operations exist."""
# Filter for auth endpoints
auth_ops = list(schema.include(path_regex=r".*auth.*").get_all_operations())
assert len(auth_ops) > 0, "No auth operations found"

View File

@@ -0,0 +1,186 @@
"""
PostgreSQL-specific E2E workflow tests.
These tests validate complete user workflows against a real PostgreSQL
database. They catch issues that SQLite-based tests might miss:
- PostgreSQL-specific SQL behavior
- Real constraint violations
- Actual transaction semantics
- JSONB column behavior
Usage:
make test-e2e # Run all E2E tests
"""
from uuid import uuid4
import pytest
pytestmark = [
pytest.mark.e2e,
pytest.mark.postgres,
pytest.mark.asyncio,
]
class TestUserRegistrationWorkflow:
"""Test complete user registration workflows with real PostgreSQL."""
async def test_user_registration_creates_user(self, e2e_client):
"""Test that user registration successfully creates a user in PostgreSQL."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
response = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "SecurePassword123!",
"first_name": "E2E",
"last_name": "Test",
},
)
assert response.status_code in [200, 201], f"Registration failed: {response.text}"
data = response.json()
assert data["email"] == email
assert "id" in data
async def test_duplicate_email_rejected(self, e2e_client):
"""Test that duplicate email registration is properly rejected."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
# First registration should succeed
response1 = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "SecurePassword123!",
"first_name": "First",
"last_name": "User",
},
)
assert response1.status_code in [200, 201]
# Second registration with same email should fail
# API returns 400 (Bad Request) for duplicate email
response2 = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "AnotherPassword123!",
"first_name": "Second",
"last_name": "User",
},
)
assert response2.status_code in [400, 409], "Should reject duplicate email"
class TestAuthenticationWorkflow:
"""Test complete authentication workflows."""
async def test_register_login_access_protected(self, e2e_client):
"""Test complete flow: register -> login -> access protected endpoint."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
password = "SecurePassword123!"
# 1. Register
reg_resp = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "E2E",
"last_name": "Test",
},
)
assert reg_resp.status_code in [200, 201], f"Registration failed: {reg_resp.text}"
# 2. Login
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
assert login_resp.status_code == 200, f"Login failed: {login_resp.text}"
tokens = login_resp.json()
assert "access_token" in tokens
assert "refresh_token" in tokens
# 3. Access protected endpoint
me_resp = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert me_resp.status_code == 200, f"Protected access failed: {me_resp.text}"
user_data = me_resp.json()
assert user_data["email"] == email
async def test_invalid_credentials_rejected(self, e2e_client):
"""Test that invalid login credentials are rejected."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
# Register user first
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "CorrectPassword123!",
"first_name": "E2E",
"last_name": "Test",
},
)
# Try to login with wrong password
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": "WrongPassword123!"},
)
assert login_resp.status_code == 401, "Should reject invalid credentials"
async def test_token_refresh_workflow(self, e2e_client):
"""Test that refresh tokens can be used to get new access tokens."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
password = "SecurePassword123!"
# Register and login
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "E2E",
"last_name": "Test",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# Use refresh token
refresh_resp = await e2e_client.post(
"/api/v1/auth/refresh",
json={"refresh_token": tokens["refresh_token"]},
)
assert refresh_resp.status_code == 200, f"Refresh failed: {refresh_resp.text}"
new_tokens = refresh_resp.json()
assert "access_token" in new_tokens
class TestHealthEndpoint:
"""Test health endpoint behavior."""
async def test_health_check_responds(self, e2e_client):
"""
Test that health endpoint responds.
Note: In E2E tests, the health endpoint's database check uses
the production database config (not the test database override),
so it may return 503. This test verifies the endpoint is accessible.
"""
response = await e2e_client.get("/health")
# Health endpoint should respond (may be 200 or 503 depending on DB config)
assert response.status_code in [200, 503]
data = response.json()
assert "status" in data