diff --git a/backend/app/api/routes/users.py b/backend/app/api/routes/users.py index ea47681..14b6d8d 100755 --- a/backend/app/api/routes/users.py +++ b/backend/app/api/routes/users.py @@ -146,10 +146,10 @@ async def update_current_user( Users cannot elevate their own permissions (is_superuser). """ # Prevent users from making themselves superuser - # NOTE: UserUpdate schema doesn't include is_superuser, so this is dead code - if getattr(user_update, 'is_superuser', None) is not None: # pragma: no cover - logger.warning(f"User {current_user.id} attempted to modify is_superuser field") # pragma: no cover - raise AuthorizationError( # pragma: no cover + # NOTE: Pydantic validator will reject is_superuser != None, but this provides defense in depth + if getattr(user_update, 'is_superuser', None) is not None: + logger.warning(f"User {current_user.id} attempted to modify is_superuser field") + raise AuthorizationError( message="Cannot modify superuser status", error_code=ErrorCode.INSUFFICIENT_PERMISSIONS ) @@ -266,10 +266,10 @@ async def update_user( ) # Prevent non-superusers from modifying superuser status - # NOTE: UserUpdate schema doesn't include is_superuser, so this is dead code - if getattr(user_update, 'is_superuser', None) is not None and not current_user.is_superuser: # pragma: no cover - logger.warning(f"User {current_user.id} attempted to modify is_superuser field") # pragma: no cover - raise AuthorizationError( # pragma: no cover + # NOTE: Pydantic validator will reject is_superuser != None, but this provides defense in depth + if getattr(user_update, 'is_superuser', None) is not None and not current_user.is_superuser: + logger.warning(f"User {current_user.id} attempted to modify is_superuser field") + raise AuthorizationError( message="Cannot modify superuser status", error_code=ErrorCode.INSUFFICIENT_PERMISSIONS ) diff --git a/backend/app/schemas/users.py b/backend/app/schemas/users.py index fb98918..d4b4b92 100644 --- a/backend/app/schemas/users.py +++ b/backend/app/schemas/users.py @@ -38,6 +38,7 @@ class UserUpdate(BaseModel): password: Optional[str] = None preferences: Optional[Dict[str, Any]] = None is_active: Optional[bool] = None # Changed default from True to None to avoid unintended updates + is_superuser: Optional[bool] = None # Explicitly reject privilege escalation attempts @field_validator('phone_number') @classmethod @@ -52,6 +53,14 @@ class UserUpdate(BaseModel): return v return validate_password_strength(v) + @field_validator('is_superuser') + @classmethod + def prevent_superuser_modification(cls, v: Optional[bool]) -> Optional[bool]: + """Prevent users from modifying their superuser status via this schema.""" + if v is not None: + raise ValueError("Cannot modify superuser status through user update") + return v + class UserInDB(UserBase): id: UUID diff --git a/backend/tests/api/test_user_routes.py b/backend/tests/api/test_user_routes.py index 3597d97..cb22c12 100755 --- a/backend/tests/api/test_user_routes.py +++ b/backend/tests/api/test_user_routes.py @@ -222,20 +222,22 @@ class TestUpdateCurrentUser: """Test that users cannot make themselves superuser.""" headers = await get_auth_headers(client, async_test_user.email, "TestPassword123!") - # Note: is_superuser is not in UserUpdate schema, but the endpoint checks for it - # This tests that even if someone tries to send it, it's rejected + # Note: is_superuser is now in UserUpdate schema with explicit validation + # This tests that Pydantic rejects the attempt at the schema level response = await client.patch( "/api/v1/users/me", headers=headers, json={"first_name": "Test", "is_superuser": True} ) - # Should succeed since is_superuser is not in schema and gets ignored by Pydantic - # The actual protection is at the database/service layer - assert response.status_code == status.HTTP_200_OK + # Pydantic validation should reject this at the schema level + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY data = response.json() - # Verify user is still not a superuser - assert data["is_superuser"] is False + assert data["success"] is False + assert "errors" in data + # Check that the error mentions is_superuser + error_fields = [err["field"] for err in data["errors"]] + assert "is_superuser" in error_fields @pytest.mark.asyncio async def test_update_profile_no_auth(self, client): diff --git a/backend/tests/api/test_users.py b/backend/tests/api/test_users.py index 0b01b8c..a33ef82 100644 --- a/backend/tests/api/test_users.py +++ b/backend/tests/api/test_users.py @@ -112,6 +112,26 @@ class TestUpdateCurrentUser: json={"first_name": "Updated"} ) + @pytest.mark.asyncio + async def test_update_current_user_cannot_make_superuser(self, client, user_token): + """Test that users cannot make themselves superuser (Pydantic validation).""" + response = await client.patch( + "/api/v1/users/me", + headers={"Authorization": f"Bearer {user_token}"}, + json={"is_superuser": True} + ) + + # Pydantic validation should reject this at the schema level + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + data = response.json() + assert data["success"] is False + assert "errors" in data + # Check that the error mentions is_superuser + error_fields = [err["field"] for err in data["errors"]] + assert "is_superuser" in error_fields + error_messages = [err["message"] for err in data["errors"]] + assert any("superuser" in msg.lower() for msg in error_messages) + @pytest.mark.asyncio async def test_update_current_user_value_error(self, client, user_token): """Test ValueError handling during update (covers lines 165-166).""" @@ -168,6 +188,18 @@ class TestUpdateUserById: assert response.status_code == status.HTTP_404_NOT_FOUND + @pytest.mark.asyncio + async def test_update_user_by_id_non_superuser_cannot_change_superuser_status(self, client, async_test_user, user_token): + """Test non-superuser cannot modify superuser status (Pydantic validation).""" + response = await client.patch( + f"/api/v1/users/{async_test_user.id}", + headers={"Authorization": f"Bearer {user_token}"}, + json={"is_superuser": True} + ) + + # Pydantic validation should reject this at the schema level + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + @pytest.mark.asyncio async def test_update_user_by_id_success(self, client, async_test_user, superuser_token): """Test updating user successfully (covers lines 276-278)."""