diff --git a/backend/tests/e2e/test_oauth_provider_workflows.py b/backend/tests/e2e/test_oauth_provider_workflows.py new file mode 100644 index 0000000..4fb990c --- /dev/null +++ b/backend/tests/e2e/test_oauth_provider_workflows.py @@ -0,0 +1,940 @@ +""" +E2E Tests for OAuth Provider (Authorization Server) Workflows. + +Tests the complete OAuth 2.0 Authorization Server functionality using +real PostgreSQL containers via Testcontainers. + +Requirements: + - Docker must be running + - E2E dependencies: make install-e2e + +Run with: + make test-e2e + # or specific file: + TESTCONTAINERS_RYUK_DISABLED=true IS_TEST=True uv run pytest tests/e2e/test_oauth_provider_workflows.py -v +""" + +import base64 +import hashlib +import secrets +from unittest.mock import patch + +import pytest +import pytest_asyncio + +from app.core.config import settings + + +def generate_pkce_pair(): + """Generate PKCE code_verifier and code_challenge pair.""" + code_verifier = secrets.token_urlsafe(64) + # S256: SHA256(code_verifier) then base64url encode + digest = hashlib.sha256(code_verifier.encode()).digest() + code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode() + return code_verifier, code_challenge + + +def get_error_message(response_json: dict) -> str: + """Extract error message from API response (supports both formats).""" + # New standardized format with errors array + if response_json.get("errors"): + return response_json["errors"][0].get("message", "") + # Legacy format with detail + return response_json.get("detail", "") + + +@pytest.mark.e2e +@pytest.mark.postgres +class TestOAuthProviderServerMetadata: + """Test OAuth Provider server metadata endpoint.""" + + @pytest.mark.asyncio + async def test_server_metadata_endpoint(self, e2e_client): + """Test RFC 8414 well-known endpoint returns correct metadata.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "OAUTH_ISSUER", "https://api.example.com"): + response = await e2e_client.get( + "/.well-known/oauth-authorization-server" + ) + + assert response.status_code == 200 + data = response.json() + + # Verify required metadata fields + assert data["issuer"] == "https://api.example.com" + assert "authorization_endpoint" in data + assert "token_endpoint" in data + assert "revocation_endpoint" in data + assert "introspection_endpoint" in data + + # Verify supported features + assert "code" in data["response_types_supported"] + assert "authorization_code" in data["grant_types_supported"] + assert "refresh_token" in data["grant_types_supported"] + assert "S256" in data["code_challenge_methods_supported"] + + @pytest.mark.asyncio + async def test_server_metadata_disabled(self, e2e_client): + """Test server metadata returns 404 when provider mode is disabled.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", False): + response = await e2e_client.get( + "/.well-known/oauth-authorization-server" + ) + assert response.status_code == 404 + + +@pytest.mark.e2e +@pytest.mark.postgres +class TestOAuthProviderClientManagement: + """Test OAuth client registration and management.""" + + @pytest.mark.asyncio + async def test_register_public_client(self, e2e_client, e2e_superuser): + """Test registering a public OAuth client.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.post( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "client_name": "Test MCP Client", + "redirect_uris": "http://localhost:3000/callback", + "client_type": "public", + "scopes": "openid profile email", + }, + ) + + assert response.status_code == 200 + data = response.json() + + assert "client_id" in data + assert data["client_name"] == "Test MCP Client" + assert data["client_type"] == "public" + # Public clients don't get a secret + assert "client_secret" not in data + + @pytest.mark.asyncio + async def test_register_confidential_client(self, e2e_client, e2e_superuser): + """Test registering a confidential OAuth client.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.post( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "client_name": "Confidential App", + "redirect_uris": "http://localhost:8080/callback", + "client_type": "confidential", + "scopes": "openid profile email read:users", + }, + ) + + assert response.status_code == 200 + data = response.json() + + assert "client_id" in data + assert "client_secret" in data + assert data["client_type"] == "confidential" + assert "warning" in data # Security warning about storing secret + + @pytest.mark.asyncio + async def test_list_clients(self, e2e_client, e2e_superuser): + """Test listing all OAuth clients.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + # First create a client + await e2e_client.post( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "client_name": "List Test Client", + "redirect_uris": "http://localhost:3000/callback", + "client_type": "public", + "scopes": "openid", + }, + ) + + # Then list all + response = await e2e_client.get( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + ) + + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + assert len(data) >= 1 + + @pytest.mark.asyncio + async def test_delete_client(self, e2e_client, e2e_superuser): + """Test deleting an OAuth client.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + # Create a client + create_response = await e2e_client.post( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "client_name": "Delete Test Client", + "redirect_uris": "http://localhost:3000/callback", + "client_type": "public", + "scopes": "openid", + }, + ) + client_id = create_response.json()["client_id"] + + # Delete it + delete_response = await e2e_client.delete( + f"/api/v1/oauth/provider/clients/{client_id}", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + ) + assert delete_response.status_code == 204 + + # Verify it's gone + list_response = await e2e_client.get( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + ) + clients = list_response.json() + assert not any(c["client_id"] == client_id for c in clients) + + +@pytest.mark.e2e +@pytest.mark.postgres +class TestOAuthProviderAuthorizationFlow: + """Test OAuth authorization code flow.""" + + @pytest_asyncio.fixture + async def oauth_client(self, e2e_client, e2e_superuser): + """Create a test OAuth client for authorization tests.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.post( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "client_name": "Auth Flow Test Client", + "redirect_uris": "http://localhost:3000/callback,http://localhost:3000/oauth/callback", + "client_type": "public", + "scopes": "openid profile email read:users", + }, + ) + return response.json() + + @pytest.mark.asyncio + async def test_authorize_requires_pkce_for_public_client( + self, e2e_client, oauth_client + ): + """Test that public clients must use PKCE.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "FRONTEND_URL", "http://localhost:3000"): + response = await e2e_client.get( + "/api/v1/oauth/provider/authorize", + params={ + "response_type": "code", + "client_id": oauth_client["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile", + "state": "test_state_123", + }, + follow_redirects=False, + ) + + # Should redirect with error + assert response.status_code == 302 + location = response.headers.get("location", "") + assert "error=invalid_request" in location + assert "PKCE" in location + + @pytest.mark.asyncio + async def test_authorize_redirects_unauthenticated_to_login( + self, e2e_client, oauth_client + ): + """Test that unauthenticated users are redirected to login.""" + code_verifier, code_challenge = generate_pkce_pair() + + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "FRONTEND_URL", "http://localhost:3000"): + response = await e2e_client.get( + "/api/v1/oauth/provider/authorize", + params={ + "response_type": "code", + "client_id": oauth_client["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile", + "state": "test_state_123", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + }, + follow_redirects=False, + ) + + # Should redirect to login + assert response.status_code == 302 + location = response.headers.get("location", "") + assert "/login" in location + assert "return_to" in location + + @pytest.mark.asyncio + async def test_authorize_redirects_to_consent_for_authenticated_user( + self, e2e_client, oauth_client, e2e_superuser + ): + """Test that authenticated users without consent are redirected to consent page.""" + code_verifier, code_challenge = generate_pkce_pair() + + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "FRONTEND_URL", "http://localhost:3000"): + response = await e2e_client.get( + "/api/v1/oauth/provider/authorize", + params={ + "response_type": "code", + "client_id": oauth_client["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile", + "state": "test_state_123", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + }, + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + follow_redirects=False, + ) + + # Should redirect to consent page + assert response.status_code == 302 + location = response.headers.get("location", "") + assert "/auth/consent" in location + + +@pytest.mark.e2e +@pytest.mark.postgres +class TestOAuthProviderCompleteFlow: + """Test complete OAuth authorization code flow with token exchange.""" + + @pytest_asyncio.fixture + async def oauth_setup(self, e2e_client, e2e_superuser): + """Create OAuth client and prepare for authorization flow.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + # Create a public client + response = await e2e_client.post( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "client_name": "Complete Flow Test Client", + "redirect_uris": "http://localhost:3000/callback", + "client_type": "public", + "scopes": "openid profile email", + }, + ) + client_data = response.json() + + # Generate PKCE pair + code_verifier, code_challenge = generate_pkce_pair() + + return { + "client": client_data, + "code_verifier": code_verifier, + "code_challenge": code_challenge, + "user": e2e_superuser, + } + + @pytest.mark.asyncio + async def test_consent_submission_generates_code(self, e2e_client, oauth_setup): + """Test that submitting consent generates an authorization code.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.post( + "/api/v1/oauth/provider/authorize/consent", + headers={ + "Authorization": f"Bearer {oauth_setup['user']['tokens']['access_token']}" + }, + data={ + "approved": "true", + "client_id": oauth_setup["client"]["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile email", + "state": "test_state_456", + "code_challenge": oauth_setup["code_challenge"], + "code_challenge_method": "S256", + }, + follow_redirects=False, + ) + + # Should redirect with code + assert response.status_code == 302 + location = response.headers.get("location", "") + assert "code=" in location + assert "state=test_state_456" in location + assert "error" not in location + + @pytest.mark.asyncio + async def test_consent_denial_returns_error(self, e2e_client, oauth_setup): + """Test that denying consent returns access_denied error.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.post( + "/api/v1/oauth/provider/authorize/consent", + headers={ + "Authorization": f"Bearer {oauth_setup['user']['tokens']['access_token']}" + }, + data={ + "approved": "false", + "client_id": oauth_setup["client"]["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile", + "state": "test_state_789", + }, + follow_redirects=False, + ) + + # Should redirect with error + assert response.status_code == 302 + location = response.headers.get("location", "") + assert "error=access_denied" in location + assert "state=test_state_789" in location + + @pytest.mark.asyncio + async def test_complete_authorization_code_flow(self, e2e_client, oauth_setup): + """Test complete flow: consent -> code -> token exchange.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "OAUTH_ISSUER", "http://e2e-test"): + # Step 1: Submit consent + consent_response = await e2e_client.post( + "/api/v1/oauth/provider/authorize/consent", + headers={ + "Authorization": f"Bearer {oauth_setup['user']['tokens']['access_token']}" + }, + data={ + "approved": "true", + "client_id": oauth_setup["client"]["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile email", + "state": "flow_test_state", + "code_challenge": oauth_setup["code_challenge"], + "code_challenge_method": "S256", + }, + follow_redirects=False, + ) + + assert consent_response.status_code == 302 + location = consent_response.headers.get("location", "") + + # Extract code from redirect URL + from urllib.parse import parse_qs, urlparse + + parsed = urlparse(location) + params = parse_qs(parsed.query) + assert "code" in params + auth_code = params["code"][0] + + # Step 2: Exchange code for tokens + token_response = await e2e_client.post( + "/api/v1/oauth/provider/token", + data={ + "grant_type": "authorization_code", + "code": auth_code, + "redirect_uri": "http://localhost:3000/callback", + "client_id": oauth_setup["client"]["client_id"], + "code_verifier": oauth_setup["code_verifier"], + }, + ) + + assert token_response.status_code == 200 + tokens = token_response.json() + + assert "access_token" in tokens + assert "refresh_token" in tokens + assert tokens["token_type"] == "Bearer" + assert "expires_in" in tokens + assert "scope" in tokens + + +@pytest.mark.e2e +@pytest.mark.postgres +class TestOAuthProviderTokenOperations: + """Test token refresh, revocation, and introspection.""" + + @pytest_asyncio.fixture + async def tokens_setup(self, e2e_client, e2e_superuser): + """Get tokens through complete OAuth flow for testing.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "OAUTH_ISSUER", "http://e2e-test"): + # Create client + client_response = await e2e_client.post( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "client_name": "Token Ops Test Client", + "redirect_uris": "http://localhost:3000/callback", + "client_type": "public", + "scopes": "openid profile email", + }, + ) + client_data = client_response.json() + + # Generate PKCE + code_verifier, code_challenge = generate_pkce_pair() + + # Submit consent + consent_response = await e2e_client.post( + "/api/v1/oauth/provider/authorize/consent", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "approved": "true", + "client_id": client_data["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile email", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + }, + follow_redirects=False, + ) + + # Extract code + from urllib.parse import parse_qs, urlparse + + location = consent_response.headers.get("location", "") + parsed = urlparse(location) + params = parse_qs(parsed.query) + auth_code = params["code"][0] + + # Exchange for tokens + token_response = await e2e_client.post( + "/api/v1/oauth/provider/token", + data={ + "grant_type": "authorization_code", + "code": auth_code, + "redirect_uri": "http://localhost:3000/callback", + "client_id": client_data["client_id"], + "code_verifier": code_verifier, + }, + ) + + return { + "tokens": token_response.json(), + "client": client_data, + "user": e2e_superuser, + } + + @pytest.mark.asyncio + async def test_token_introspection_active(self, e2e_client, tokens_setup): + """Test introspecting an active access token.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "OAUTH_ISSUER", "http://e2e-test"): + response = await e2e_client.post( + "/api/v1/oauth/provider/introspect", + data={ + "token": tokens_setup["tokens"]["access_token"], + "token_type_hint": "access_token", + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["active"] is True + assert "scope" in data + assert data["client_id"] == tokens_setup["client"]["client_id"] + + @pytest.mark.asyncio + async def test_token_introspection_refresh_token(self, e2e_client, tokens_setup): + """Test introspecting a refresh token.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.post( + "/api/v1/oauth/provider/introspect", + data={ + "token": tokens_setup["tokens"]["refresh_token"], + "token_type_hint": "refresh_token", + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["active"] is True + assert data["token_type"] == "refresh_token" + + @pytest.mark.asyncio + async def test_token_refresh(self, e2e_client, tokens_setup): + """Test refreshing an access token.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "OAUTH_ISSUER", "http://e2e-test"): + response = await e2e_client.post( + "/api/v1/oauth/provider/token", + data={ + "grant_type": "refresh_token", + "refresh_token": tokens_setup["tokens"]["refresh_token"], + "client_id": tokens_setup["client"]["client_id"], + }, + ) + + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + assert "refresh_token" in data + # New tokens should be different (token rotation) + assert data["access_token"] != tokens_setup["tokens"]["access_token"] + assert data["refresh_token"] != tokens_setup["tokens"]["refresh_token"] + + @pytest.mark.asyncio + async def test_token_revocation(self, e2e_client, tokens_setup): + """Test revoking a refresh token.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + # Revoke refresh token + revoke_response = await e2e_client.post( + "/api/v1/oauth/provider/revoke", + data={ + "token": tokens_setup["tokens"]["refresh_token"], + "token_type_hint": "refresh_token", + }, + ) + + # Per RFC 7009, always returns 200 + assert revoke_response.status_code == 200 + + # Verify token is no longer active + introspect_response = await e2e_client.post( + "/api/v1/oauth/provider/introspect", + data={ + "token": tokens_setup["tokens"]["refresh_token"], + "token_type_hint": "refresh_token", + }, + ) + + assert introspect_response.status_code == 200 + assert introspect_response.json()["active"] is False + + @pytest.mark.asyncio + async def test_revoked_refresh_token_cannot_be_used(self, e2e_client, tokens_setup): + """Test that a revoked refresh token cannot be used for refresh.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + # Revoke first + await e2e_client.post( + "/api/v1/oauth/provider/revoke", + data={ + "token": tokens_setup["tokens"]["refresh_token"], + }, + ) + + # Try to use revoked token + refresh_response = await e2e_client.post( + "/api/v1/oauth/provider/token", + data={ + "grant_type": "refresh_token", + "refresh_token": tokens_setup["tokens"]["refresh_token"], + "client_id": tokens_setup["client"]["client_id"], + }, + ) + + assert refresh_response.status_code == 400 + assert "invalid_grant" in get_error_message(refresh_response.json()) + + +@pytest.mark.e2e +@pytest.mark.postgres +class TestOAuthProviderConsentManagement: + """Test user consent listing and revocation.""" + + @pytest_asyncio.fixture + async def consent_setup(self, e2e_client, e2e_superuser): + """Create OAuth client and grant consent for testing.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "OAUTH_ISSUER", "http://e2e-test"): + # Create client + client_response = await e2e_client.post( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "client_name": "Consent Test Client", + "redirect_uris": "http://localhost:3000/callback", + "client_type": "public", + "scopes": "openid profile email", + }, + ) + client_data = client_response.json() + + # Generate PKCE and grant consent + code_verifier, code_challenge = generate_pkce_pair() + + await e2e_client.post( + "/api/v1/oauth/provider/authorize/consent", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "approved": "true", + "client_id": client_data["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile email", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + }, + follow_redirects=False, + ) + + return { + "client": client_data, + "user": e2e_superuser, + } + + @pytest.mark.asyncio + async def test_list_my_consents(self, e2e_client, consent_setup): + """Test listing user's OAuth consents.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.get( + "/api/v1/oauth/provider/consents", + headers={ + "Authorization": f"Bearer {consent_setup['user']['tokens']['access_token']}" + }, + ) + + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + assert len(data) >= 1 + + # Find our consent + consent = next( + (c for c in data if c["client_id"] == consent_setup["client"]["client_id"]), + None, + ) + assert consent is not None + assert consent["client_name"] == "Consent Test Client" + assert "granted_scopes" in consent + assert "granted_at" in consent + + @pytest.mark.asyncio + async def test_revoke_my_consent(self, e2e_client, consent_setup): + """Test revoking user's OAuth consent.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + # Revoke consent + revoke_response = await e2e_client.delete( + f"/api/v1/oauth/provider/consents/{consent_setup['client']['client_id']}", + headers={ + "Authorization": f"Bearer {consent_setup['user']['tokens']['access_token']}" + }, + ) + + assert revoke_response.status_code == 204 + + # Verify consent is gone + list_response = await e2e_client.get( + "/api/v1/oauth/provider/consents", + headers={ + "Authorization": f"Bearer {consent_setup['user']['tokens']['access_token']}" + }, + ) + + consents = list_response.json() + assert not any( + c["client_id"] == consent_setup["client"]["client_id"] for c in consents + ) + + +@pytest.mark.e2e +@pytest.mark.postgres +class TestOAuthProviderSecurityChecks: + """Test OAuth Provider security features.""" + + @pytest_asyncio.fixture + async def security_setup(self, e2e_client, e2e_superuser): + """Create OAuth client for security tests.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.post( + "/api/v1/oauth/provider/clients", + headers={ + "Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}" + }, + data={ + "client_name": "Security Test Client", + "redirect_uris": "http://localhost:3000/callback", + "client_type": "public", + "scopes": "openid profile email", + }, + ) + return { + "client": response.json(), + "user": e2e_superuser, + } + + @pytest.mark.asyncio + async def test_invalid_redirect_uri_rejected(self, e2e_client, security_setup): + """Test that unregistered redirect_uri is rejected.""" + code_verifier, code_challenge = generate_pkce_pair() + + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.get( + "/api/v1/oauth/provider/authorize", + params={ + "response_type": "code", + "client_id": security_setup["client"]["client_id"], + "redirect_uri": "http://evil.com/callback", # Not registered + "scope": "openid", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + }, + headers={ + "Authorization": f"Bearer {security_setup['user']['tokens']['access_token']}" + }, + ) + + # Should return 400, not redirect + assert response.status_code == 400 + assert "Invalid redirect_uri" in get_error_message(response.json()) + + @pytest.mark.asyncio + async def test_plain_pkce_method_rejected(self, e2e_client, security_setup): + """Test that 'plain' PKCE method is rejected (security requirement).""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "FRONTEND_URL", "http://localhost:3000"): + response = await e2e_client.get( + "/api/v1/oauth/provider/authorize", + params={ + "response_type": "code", + "client_id": security_setup["client"]["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid", + "code_challenge": "some_challenge", + "code_challenge_method": "plain", # Should be rejected + }, + headers={ + "Authorization": f"Bearer {security_setup['user']['tokens']['access_token']}" + }, + ) + + assert response.status_code == 400 + assert "S256" in get_error_message(response.json()) + + @pytest.mark.asyncio + async def test_authorization_code_single_use(self, e2e_client, security_setup): + """Test that authorization codes can only be used once.""" + code_verifier, code_challenge = generate_pkce_pair() + + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "OAUTH_ISSUER", "http://e2e-test"): + # Get an authorization code + consent_response = await e2e_client.post( + "/api/v1/oauth/provider/authorize/consent", + headers={ + "Authorization": f"Bearer {security_setup['user']['tokens']['access_token']}" + }, + data={ + "approved": "true", + "client_id": security_setup["client"]["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + }, + follow_redirects=False, + ) + + from urllib.parse import parse_qs, urlparse + + location = consent_response.headers.get("location", "") + parsed = urlparse(location) + params = parse_qs(parsed.query) + auth_code = params["code"][0] + + # First use - should succeed + first_response = await e2e_client.post( + "/api/v1/oauth/provider/token", + data={ + "grant_type": "authorization_code", + "code": auth_code, + "redirect_uri": "http://localhost:3000/callback", + "client_id": security_setup["client"]["client_id"], + "code_verifier": code_verifier, + }, + ) + assert first_response.status_code == 200 + + # Second use - should fail + second_response = await e2e_client.post( + "/api/v1/oauth/provider/token", + data={ + "grant_type": "authorization_code", + "code": auth_code, + "redirect_uri": "http://localhost:3000/callback", + "client_id": security_setup["client"]["client_id"], + "code_verifier": code_verifier, + }, + ) + assert second_response.status_code == 400 + assert "already been used" in get_error_message(second_response.json()) + + @pytest.mark.asyncio + async def test_invalid_pkce_verifier_rejected(self, e2e_client, security_setup): + """Test that wrong code_verifier is rejected.""" + code_verifier, code_challenge = generate_pkce_pair() + + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + with patch.object(settings, "OAUTH_ISSUER", "http://e2e-test"): + # Get an authorization code + consent_response = await e2e_client.post( + "/api/v1/oauth/provider/authorize/consent", + headers={ + "Authorization": f"Bearer {security_setup['user']['tokens']['access_token']}" + }, + data={ + "approved": "true", + "client_id": security_setup["client"]["client_id"], + "redirect_uri": "http://localhost:3000/callback", + "scope": "openid profile", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + }, + follow_redirects=False, + ) + + from urllib.parse import parse_qs, urlparse + + location = consent_response.headers.get("location", "") + parsed = urlparse(location) + params = parse_qs(parsed.query) + auth_code = params["code"][0] + + # Use wrong verifier + response = await e2e_client.post( + "/api/v1/oauth/provider/token", + data={ + "grant_type": "authorization_code", + "code": auth_code, + "redirect_uri": "http://localhost:3000/callback", + "client_id": security_setup["client"]["client_id"], + "code_verifier": "wrong_verifier_value", + }, + ) + + assert response.status_code == 400 + assert "code_verifier" in get_error_message(response.json()).lower() + + @pytest.mark.asyncio + async def test_introspect_invalid_token(self, e2e_client): + """Test that introspecting an invalid token returns active=false.""" + with patch.object(settings, "OAUTH_PROVIDER_ENABLED", True): + response = await e2e_client.post( + "/api/v1/oauth/provider/introspect", + data={ + "token": "invalid_token_that_does_not_exist", + }, + ) + + assert response.status_code == 200 + assert response.json()["active"] is False