diff --git a/backend/tests/e2e/test_admin_superuser_workflows.py b/backend/tests/e2e/test_admin_superuser_workflows.py index 5373f4f..2664472 100644 --- a/backend/tests/e2e/test_admin_superuser_workflows.py +++ b/backend/tests/e2e/test_admin_superuser_workflows.py @@ -429,6 +429,7 @@ class TestAdminSessionManagement: data = response.json() assert "data" in data + class TestAdminDeleteOperations: """Test admin delete operations.""" @@ -645,5 +646,3 @@ class TestAdminSearchAndFilter: assert response.status_code == 200 data = response.json() assert len(data["data"]) >= 1 - - diff --git a/backend/tests/e2e/test_api_contracts.py b/backend/tests/e2e/test_api_contracts.py index ca14903..7a4b922 100644 --- a/backend/tests/e2e/test_api_contracts.py +++ b/backend/tests/e2e/test_api_contracts.py @@ -160,7 +160,9 @@ if SCHEMATHESIS_AVAILABLE: def test_schema_has_user_operations(self): """Verify user-related operations exist.""" - user_ops = list(schema.include(path_regex=r".*users.*").get_all_operations()) + user_ops = list( + schema.include(path_regex=r".*users.*").get_all_operations() + ) assert len(user_ops) > 0, "No user operations found" def test_schema_has_organization_operations(self): @@ -172,7 +174,9 @@ if SCHEMATHESIS_AVAILABLE: def test_schema_has_admin_operations(self): """Verify admin-related operations exist.""" - admin_ops = list(schema.include(path_regex=r".*admin.*").get_all_operations()) + admin_ops = list( + schema.include(path_regex=r".*admin.*").get_all_operations() + ) assert len(admin_ops) > 0, "No admin operations found" def test_schema_has_session_operations(self): diff --git a/backend/tests/e2e/test_organization_workflows.py b/backend/tests/e2e/test_organization_workflows.py index 3cd038d..710c87a 100644 --- a/backend/tests/e2e/test_organization_workflows.py +++ b/backend/tests/e2e/test_organization_workflows.py @@ -165,7 +165,9 @@ class TestOrganizationWithMembers: org = e2e_org_with_members response = await e2e_client.get( f"/api/v1/organizations/{org['org_id']}", - headers={"Authorization": f"Bearer {org['owner']['tokens']['access_token']}"}, + headers={ + "Authorization": f"Bearer {org['owner']['tokens']['access_token']}" + }, ) assert response.status_code == 200 @@ -178,7 +180,9 @@ class TestOrganizationWithMembers: org = e2e_org_with_members response = await e2e_client.get( f"/api/v1/organizations/{org['org_id']}", - headers={"Authorization": f"Bearer {org['member']['tokens']['access_token']}"}, + headers={ + "Authorization": f"Bearer {org['member']['tokens']['access_token']}" + }, ) assert response.status_code == 200 @@ -190,7 +194,9 @@ class TestOrganizationWithMembers: org = e2e_org_with_members response = await e2e_client.get( f"/api/v1/organizations/{org['org_id']}/members", - headers={"Authorization": f"Bearer {org['owner']['tokens']['access_token']}"}, + headers={ + "Authorization": f"Bearer {org['owner']['tokens']['access_token']}" + }, ) assert response.status_code == 200 @@ -203,17 +209,23 @@ class TestOrganizationWithMembers: org = e2e_org_with_members response = await e2e_client.get( f"/api/v1/organizations/{org['org_id']}/members", - headers={"Authorization": f"Bearer {org['member']['tokens']['access_token']}"}, + headers={ + "Authorization": f"Bearer {org['member']['tokens']['access_token']}" + }, ) assert response.status_code == 200 - async def test_owner_appears_in_my_organizations(self, e2e_client, e2e_org_with_members): + async def test_owner_appears_in_my_organizations( + self, e2e_client, e2e_org_with_members + ): """Owner sees organization in their organizations list.""" org = e2e_org_with_members response = await e2e_client.get( "/api/v1/organizations/me", - headers={"Authorization": f"Bearer {org['owner']['tokens']['access_token']}"}, + headers={ + "Authorization": f"Bearer {org['owner']['tokens']['access_token']}" + }, ) assert response.status_code == 200 @@ -221,12 +233,16 @@ class TestOrganizationWithMembers: org_ids = [o["id"] for o in data] assert org["org_id"] in org_ids - async def test_member_appears_in_my_organizations(self, e2e_client, e2e_org_with_members): + async def test_member_appears_in_my_organizations( + self, e2e_client, e2e_org_with_members + ): """Member sees organization in their organizations list.""" org = e2e_org_with_members response = await e2e_client.get( "/api/v1/organizations/me", - headers={"Authorization": f"Bearer {org['member']['tokens']['access_token']}"}, + headers={ + "Authorization": f"Bearer {org['member']['tokens']['access_token']}" + }, ) assert response.status_code == 200 @@ -234,14 +250,18 @@ class TestOrganizationWithMembers: org_ids = [o["id"] for o in data] assert org["org_id"] in org_ids - async def test_owner_can_update_organization(self, e2e_client, e2e_org_with_members): + async def test_owner_can_update_organization( + self, e2e_client, e2e_org_with_members + ): """Organization owner can update organization details.""" org = e2e_org_with_members new_description = f"Updated at {uuid4().hex[:8]}" response = await e2e_client.put( f"/api/v1/organizations/{org['org_id']}", - headers={"Authorization": f"Bearer {org['owner']['tokens']['access_token']}"}, + headers={ + "Authorization": f"Bearer {org['owner']['tokens']['access_token']}" + }, json={"description": new_description}, ) @@ -249,13 +269,17 @@ class TestOrganizationWithMembers: data = response.json() assert data["description"] == new_description - async def test_member_cannot_update_organization(self, e2e_client, e2e_org_with_members): + async def test_member_cannot_update_organization( + self, e2e_client, e2e_org_with_members + ): """Regular member cannot update organization details.""" org = e2e_org_with_members response = await e2e_client.put( f"/api/v1/organizations/{org['org_id']}", - headers={"Authorization": f"Bearer {org['member']['tokens']['access_token']}"}, + headers={ + "Authorization": f"Bearer {org['member']['tokens']['access_token']}" + }, json={"description": "Should fail"}, ) diff --git a/backend/tests/e2e/test_session_workflows.py b/backend/tests/e2e/test_session_workflows.py index 34c5895..a085246 100644 --- a/backend/tests/e2e/test_session_workflows.py +++ b/backend/tests/e2e/test_session_workflows.py @@ -23,7 +23,10 @@ pytestmark = [ async def register_and_login( - client, email: str, password: str = "SecurePassword123!", user_agent: str = None + client, + email: str, + password: str = "SecurePassword123!", # noqa: S107 + user_agent: str | None = None, ): """Helper to register a user and get tokens.""" await client.post( @@ -117,13 +120,12 @@ class TestSessionListingWorkflows: ) ).json() - tokens2 = ( - await e2e_client.post( - "/api/v1/auth/login", - json={"email": email, "password": password}, - headers={"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_0)"}, - ) - ).json() + # Second login to create another session + await e2e_client.post( + "/api/v1/auth/login", + json={"email": email, "password": password}, + headers={"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_0)"}, + ) # Check sessions using first token response = await e2e_client.get( @@ -162,12 +164,11 @@ class TestSessionRevocationWorkflows: ) ).json() - tokens2 = ( - await e2e_client.post( - "/api/v1/auth/login", - json={"email": email, "password": password}, - ) - ).json() + # Second login to create another session + await e2e_client.post( + "/api/v1/auth/login", + json={"email": email, "password": password}, + ) # Get sessions sessions_resp = await e2e_client.get( diff --git a/backend/tests/e2e/test_user_workflows.py b/backend/tests/e2e/test_user_workflows.py index 3a26922..6ad3a5f 100644 --- a/backend/tests/e2e/test_user_workflows.py +++ b/backend/tests/e2e/test_user_workflows.py @@ -21,7 +21,7 @@ pytestmark = [ ] -async def register_and_login(client, email: str, password: str = "SecurePassword123!"): +async def register_and_login(client, email: str, password: str = "SecurePassword123!"): # noqa: S107 """Helper to register a user and get tokens.""" await client.post( "/api/v1/auth/register", diff --git a/backend/tests/services/test_oauth_provider_service.py b/backend/tests/services/test_oauth_provider_service.py index 7e41714..0dfdf90 100644 --- a/backend/tests/services/test_oauth_provider_service.py +++ b/backend/tests/services/test_oauth_provider_service.py @@ -73,9 +73,12 @@ async def public_client(db): @pytest_asyncio.fixture async def confidential_client(db): - """Create a test confidential OAuth client.""" + """Create a test confidential OAuth client using bcrypt.""" + from app.core.auth import get_password_hash + secret = "test_client_secret" - secret_hash = hashlib.sha256(secret.encode()).hexdigest() + # Use bcrypt for new client secret hashing (security improvement) + secret_hash = get_password_hash(secret) client = OAuthClient( id=uuid4(), client_id="test_confidential_client", @@ -92,6 +95,28 @@ async def confidential_client(db): return client, secret +@pytest_asyncio.fixture +async def confidential_client_legacy_hash(db): + """Create a test confidential OAuth client with legacy SHA-256 hash.""" + # This tests backward compatibility with old SHA-256 hashed secrets + secret = "test_legacy_secret" + secret_hash = hashlib.sha256(secret.encode()).hexdigest() + client = OAuthClient( + id=uuid4(), + client_id="test_legacy_client", + client_name="Test Legacy Client", + client_type="confidential", + client_secret_hash=secret_hash, + redirect_uris=["http://localhost:3000/callback"], + allowed_scopes=["openid", "profile"], + is_active=True, + ) + db.add(client) + await db.commit() + await db.refresh(client) + return client, secret + + class TestHelperFunctions: """Tests for helper functions.""" @@ -170,11 +195,12 @@ class TestPKCEVerification: assert service.verify_pkce(wrong_verifier, code_challenge, "S256") is False - def test_verify_pkce_plain(self): - """Test PKCE verification with plain method.""" + def test_verify_pkce_plain_rejected(self): + """Test PKCE verification rejects 'plain' method for security.""" + # SECURITY: 'plain' method provides no security benefit and must be rejected + # per RFC 7636 Section 4.3 - only S256 is allowed code_verifier = "test_verifier" - assert service.verify_pkce(code_verifier, code_verifier, "plain") is True - assert service.verify_pkce(code_verifier, "wrong", "plain") is False + assert service.verify_pkce(code_verifier, code_verifier, "plain") is False def test_verify_pkce_unknown_method(self): """Test PKCE verification with unknown method returns False.""" @@ -231,12 +257,32 @@ class TestClientValidation: await service.validate_client(db, client.client_id, "wrong_secret") @pytest.mark.asyncio - async def test_validate_client_confidential_no_secret(self, db, confidential_client): + async def test_validate_client_confidential_no_secret( + self, db, confidential_client + ): """Test validating a confidential client without secret.""" client, _ = confidential_client with pytest.raises(service.InvalidClientError, match="Client secret required"): await service.validate_client(db, client.client_id) + @pytest.mark.asyncio + async def test_validate_client_legacy_sha256_hash( + self, db, confidential_client_legacy_hash + ): + """Test validating a client with legacy SHA-256 hash (backward compatibility).""" + client, secret = confidential_client_legacy_hash + validated = await service.validate_client(db, client.client_id, secret) + assert validated.client_id == client.client_id + + @pytest.mark.asyncio + async def test_validate_client_legacy_sha256_wrong_secret( + self, db, confidential_client_legacy_hash + ): + """Test legacy SHA-256 client rejects wrong secret.""" + client, _ = confidential_client_legacy_hash + with pytest.raises(service.InvalidClientError, match="Invalid client secret"): + await service.validate_client(db, client.client_id, "wrong_secret") + def test_validate_redirect_uri_success(self, public_client): """Test validating a registered redirect URI.""" # Should not raise