diff --git a/backend/app/api/routes/organizations.py b/backend/app/api/routes/organizations.py index a6b4394..4a559f3 100755 --- a/backend/app/api/routes/organizations.py +++ b/backend/app/api/routes/organizations.py @@ -102,7 +102,7 @@ async def get_organization( """ try: org = await organization_crud.get(db, id=organization_id) - if not org: + if not org: # pragma: no cover - Permission check prevents this (see docs/UNREACHABLE_DEFENSIVE_CODE_ANALYSIS.md) raise NotFoundError( detail=f"Organization {organization_id} not found", error_code=ErrorCode.NOT_FOUND @@ -121,7 +121,7 @@ async def get_organization( } return OrganizationResponse(**org_dict) - except NotFoundError: + except NotFoundError: # pragma: no cover - See above raise except Exception as e: logger.error(f"Error getting organization: {str(e)}", exc_info=True) @@ -192,7 +192,7 @@ async def update_organization( """ try: org = await organization_crud.get(db, id=organization_id) - if not org: + if not org: # pragma: no cover - Permission check prevents this (see docs/UNREACHABLE_DEFENSIVE_CODE_ANALYSIS.md) raise NotFoundError( detail=f"Organization {organization_id} not found", error_code=ErrorCode.NOT_FOUND @@ -214,7 +214,7 @@ async def update_organization( } return OrganizationResponse(**org_dict) - except NotFoundError: + except NotFoundError: # pragma: no cover - See above raise except Exception as e: logger.error(f"Error updating organization: {str(e)}", exc_info=True) diff --git a/backend/tests/api/test_organizations.py b/backend/tests/api/test_organizations.py new file mode 100644 index 0000000..a35975d --- /dev/null +++ b/backend/tests/api/test_organizations.py @@ -0,0 +1,651 @@ +# tests/api/test_organizations.py +""" +Tests for organization routes (user endpoints). + +These test the routes in app/api/routes/organizations.py which allow +users to view and manage organizations they belong to. +""" +import pytest +import pytest_asyncio +from fastapi import status +from uuid import uuid4 +from unittest.mock import patch, AsyncMock + +from app.models.organization import Organization +from app.models.user import User +from app.models.user_organization import UserOrganization, OrganizationRole +from app.core.auth import get_password_hash + + +@pytest_asyncio.fixture +async def user_token(client, async_test_user): + """Get access token for regular user.""" + response = await client.post( + "/api/v1/auth/login", + json={ + "email": "testuser@example.com", + "password": "TestPassword123!" + } + ) + assert response.status_code == 200 + return response.json()["access_token"] + + +@pytest_asyncio.fixture +async def second_user(async_test_db): + """Create a second test user.""" + test_engine, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + user = User( + id=uuid4(), + email="seconduser@example.com", + password_hash=get_password_hash("TestPassword123!"), + first_name="Second", + last_name="User", + phone_number="+1234567891", + is_active=True, + is_superuser=False, + preferences=None, + ) + session.add(user) + await session.commit() + await session.refresh(user) + return user + + +@pytest_asyncio.fixture +async def test_org_with_user_member(async_test_db, async_test_user): + """Create a test organization with async_test_user as a member.""" + test_engine, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + org = Organization( + name="Member Org", + slug="member-org", + description="Test organization where user is a member" + ) + session.add(org) + await session.commit() + await session.refresh(org) + + # Add user as member + membership = UserOrganization( + user_id=async_test_user.id, + organization_id=org.id, + role=OrganizationRole.MEMBER, + is_active=True + ) + session.add(membership) + await session.commit() + + return org + + +@pytest_asyncio.fixture +async def test_org_with_user_admin(async_test_db, async_test_user): + """Create a test organization with async_test_user as an admin.""" + test_engine, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + org = Organization( + name="Admin Org", + slug="admin-org", + description="Test organization where user is an admin" + ) + session.add(org) + await session.commit() + await session.refresh(org) + + # Add user as admin + membership = UserOrganization( + user_id=async_test_user.id, + organization_id=org.id, + role=OrganizationRole.ADMIN, + is_active=True + ) + session.add(membership) + await session.commit() + + return org + + +@pytest_asyncio.fixture +async def test_org_with_user_owner(async_test_db, async_test_user): + """Create a test organization with async_test_user as owner.""" + test_engine, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + org = Organization( + name="Owner Org", + slug="owner-org", + description="Test organization where user is owner" + ) + session.add(org) + await session.commit() + await session.refresh(org) + + # Add user as owner + membership = UserOrganization( + user_id=async_test_user.id, + organization_id=org.id, + role=OrganizationRole.OWNER, + is_active=True + ) + session.add(membership) + await session.commit() + + return org + + +# ===== GET /api/v1/organizations/me ===== + +class TestGetMyOrganizations: + """Tests for GET /api/v1/organizations/me endpoint.""" + + @pytest.mark.asyncio + async def test_get_my_organizations_success( + self, + client, + user_token, + test_org_with_user_member, + test_org_with_user_admin + ): + """Test successfully getting user's organizations (covers lines 54-79).""" + response = await client.get( + "/api/v1/organizations/me", + headers={"Authorization": f"Bearer {user_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert isinstance(data, list) + assert len(data) >= 2 # At least the two test orgs + + # Verify structure + for org in data: + assert "id" in org + assert "name" in org + assert "slug" in org + assert "member_count" in org + + @pytest.mark.asyncio + async def test_get_my_organizations_filter_active( + self, + client, + async_test_db, + async_test_user, + user_token + ): + """Test filtering organizations by active status.""" + test_engine, AsyncTestingSessionLocal = async_test_db + + # Create active org + async with AsyncTestingSessionLocal() as session: + active_org = Organization( + name="Active Org", + slug="active-org-filter", + is_active=True + ) + session.add(active_org) + await session.commit() + await session.refresh(active_org) + + # Add user membership + membership = UserOrganization( + user_id=async_test_user.id, + organization_id=active_org.id, + role=OrganizationRole.MEMBER, + is_active=True + ) + session.add(membership) + await session.commit() + + response = await client.get( + "/api/v1/organizations/me?is_active=true", + headers={"Authorization": f"Bearer {user_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert isinstance(data, list) + + @pytest.mark.asyncio + async def test_get_my_organizations_empty(self, client, async_test_db): + """Test getting organizations when user has none.""" + test_engine, AsyncTestingSessionLocal = async_test_db + + # Create user with no org memberships + async with AsyncTestingSessionLocal() as session: + user = User( + id=uuid4(), + email="noorg@example.com", + password_hash=get_password_hash("TestPassword123!"), + first_name="No", + last_name="Org", + is_active=True + ) + session.add(user) + await session.commit() + + # Login to get token + login_response = await client.post( + "/api/v1/auth/login", + json={"email": "noorg@example.com", "password": "TestPassword123!"} + ) + token = login_response.json()["access_token"] + + response = await client.get( + "/api/v1/organizations/me", + headers={"Authorization": f"Bearer {token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data == [] + + +# ===== GET /api/v1/organizations/{organization_id} ===== + +class TestGetOrganization: + """Tests for GET /api/v1/organizations/{organization_id} endpoint.""" + + @pytest.mark.asyncio + async def test_get_organization_success( + self, + client, + user_token, + test_org_with_user_member + ): + """Test successfully getting organization details (covers lines 103-122).""" + response = await client.get( + f"/api/v1/organizations/{test_org_with_user_member.id}", + headers={"Authorization": f"Bearer {user_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["id"] == str(test_org_with_user_member.id) + assert data["name"] == "Member Org" + assert data["slug"] == "member-org" + assert "member_count" in data + + @pytest.mark.asyncio + async def test_get_organization_not_found(self, client, user_token): + """Test getting nonexistent organization returns 403 (permission check happens first).""" + fake_org_id = uuid4() + response = await client.get( + f"/api/v1/organizations/{fake_org_id}", + headers={"Authorization": f"Bearer {user_token}"} + ) + + # Permission dependency checks membership before endpoint logic + # So non-existent org returns 403 (not a member) instead of 404 + assert response.status_code == status.HTTP_403_FORBIDDEN + data = response.json() + assert "errors" in data or "detail" in data + + @pytest.mark.asyncio + async def test_get_organization_not_member( + self, + client, + async_test_db, + async_test_user + ): + """Test getting organization where user is not a member fails.""" + test_engine, AsyncTestingSessionLocal = async_test_db + + # Create org without adding user + async with AsyncTestingSessionLocal() as session: + org = Organization( + name="Not Member Org", + slug="not-member-org" + ) + session.add(org) + await session.commit() + await session.refresh(org) + org_id = org.id + + # Login as user + login_response = await client.post( + "/api/v1/auth/login", + json={"email": "testuser@example.com", "password": "TestPassword123!"} + ) + token = login_response.json()["access_token"] + + response = await client.get( + f"/api/v1/organizations/{org_id}", + headers={"Authorization": f"Bearer {token}"} + ) + + # Should fail permission check + assert response.status_code == status.HTTP_403_FORBIDDEN + + +# ===== GET /api/v1/organizations/{organization_id}/members ===== + +class TestGetOrganizationMembers: + """Tests for GET /api/v1/organizations/{organization_id}/members endpoint.""" + + @pytest.mark.asyncio + async def test_get_organization_members_success( + self, + client, + async_test_db, + async_test_user, + second_user, + user_token, + test_org_with_user_member + ): + """Test successfully getting organization members (covers lines 150-168).""" + test_engine, AsyncTestingSessionLocal = async_test_db + + # Add second user to org + async with AsyncTestingSessionLocal() as session: + membership = UserOrganization( + user_id=second_user.id, + organization_id=test_org_with_user_member.id, + role=OrganizationRole.MEMBER, + is_active=True + ) + session.add(membership) + await session.commit() + + response = await client.get( + f"/api/v1/organizations/{test_org_with_user_member.id}/members", + headers={"Authorization": f"Bearer {user_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert "data" in data + assert "pagination" in data + assert len(data["data"]) >= 2 # At least the two users + + @pytest.mark.asyncio + async def test_get_organization_members_with_pagination( + self, + client, + user_token, + test_org_with_user_member + ): + """Test pagination parameters.""" + response = await client.get( + f"/api/v1/organizations/{test_org_with_user_member.id}/members?page=1&limit=10", + headers={"Authorization": f"Bearer {user_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["pagination"]["page"] == 1 + assert "page_size" in data["pagination"] # Uses page_size, not limit + assert "total" in data["pagination"] + + @pytest.mark.asyncio + async def test_get_organization_members_filter_active( + self, + client, + async_test_db, + async_test_user, + second_user, + user_token, + test_org_with_user_member + ): + """Test filtering members by active status.""" + test_engine, AsyncTestingSessionLocal = async_test_db + + # Add second user as inactive member + async with AsyncTestingSessionLocal() as session: + membership = UserOrganization( + user_id=second_user.id, + organization_id=test_org_with_user_member.id, + role=OrganizationRole.MEMBER, + is_active=False + ) + session.add(membership) + await session.commit() + + # Filter for active only + response = await client.get( + f"/api/v1/organizations/{test_org_with_user_member.id}/members?is_active=true", + headers={"Authorization": f"Bearer {user_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + # Should only see active members + for member in data["data"]: + assert member["is_active"] is True + + +# ===== PUT /api/v1/organizations/{organization_id} ===== + +class TestUpdateOrganization: + """Tests for PUT /api/v1/organizations/{organization_id} endpoint.""" + + @pytest.mark.asyncio + async def test_update_organization_as_admin_success( + self, + client, + async_test_user, + test_org_with_user_admin + ): + """Test successfully updating organization as admin (covers lines 193-215).""" + # Login as admin user + login_response = await client.post( + "/api/v1/auth/login", + json={"email": "testuser@example.com", "password": "TestPassword123!"} + ) + admin_token = login_response.json()["access_token"] + + response = await client.put( + f"/api/v1/organizations/{test_org_with_user_admin.id}", + json={ + "name": "Updated Admin Org", + "description": "Updated description" + }, + headers={"Authorization": f"Bearer {admin_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["name"] == "Updated Admin Org" + assert data["description"] == "Updated description" + + @pytest.mark.asyncio + async def test_update_organization_as_owner_success( + self, + client, + async_test_user, + test_org_with_user_owner + ): + """Test successfully updating organization as owner.""" + # Login as owner user + login_response = await client.post( + "/api/v1/auth/login", + json={"email": "testuser@example.com", "password": "TestPassword123!"} + ) + owner_token = login_response.json()["access_token"] + + response = await client.put( + f"/api/v1/organizations/{test_org_with_user_owner.id}", + json={"name": "Updated Owner Org"}, + headers={"Authorization": f"Bearer {owner_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["name"] == "Updated Owner Org" + + @pytest.mark.asyncio + async def test_update_organization_as_member_fails( + self, + client, + user_token, + test_org_with_user_member + ): + """Test updating organization as regular member fails.""" + response = await client.put( + f"/api/v1/organizations/{test_org_with_user_member.id}", + json={"name": "Should Fail"}, + headers={"Authorization": f"Bearer {user_token}"} + ) + + # Should fail permission check (need admin or owner) + assert response.status_code == status.HTTP_403_FORBIDDEN + + @pytest.mark.asyncio + async def test_update_organization_not_found( + self, + client, + test_org_with_user_admin + ): + """Test updating nonexistent organization returns 403 (permission check first).""" + # Login as admin + login_response = await client.post( + "/api/v1/auth/login", + json={"email": "testuser@example.com", "password": "TestPassword123!"} + ) + admin_token = login_response.json()["access_token"] + + fake_org_id = uuid4() + response = await client.put( + f"/api/v1/organizations/{fake_org_id}", + json={"name": "Updated"}, + headers={"Authorization": f"Bearer {admin_token}"} + ) + + # Permission dependency checks admin role before endpoint logic + # So non-existent org returns 403 (not an admin) instead of 404 + assert response.status_code == status.HTTP_403_FORBIDDEN + data = response.json() + assert "errors" in data or "detail" in data + + +# ===== Authentication Tests ===== + +class TestOrganizationAuthentication: + """Test authentication requirements for organization endpoints.""" + + @pytest.mark.asyncio + async def test_get_my_organizations_unauthenticated(self, client): + """Test unauthenticated access to /me fails.""" + response = await client.get("/api/v1/organizations/me") + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + @pytest.mark.asyncio + async def test_get_organization_unauthenticated(self, client): + """Test unauthenticated access to organization details fails.""" + fake_id = uuid4() + response = await client.get(f"/api/v1/organizations/{fake_id}") + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + @pytest.mark.asyncio + async def test_get_members_unauthenticated(self, client): + """Test unauthenticated access to members list fails.""" + fake_id = uuid4() + response = await client.get(f"/api/v1/organizations/{fake_id}/members") + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + @pytest.mark.asyncio + async def test_update_organization_unauthenticated(self, client): + """Test unauthenticated access to update fails.""" + fake_id = uuid4() + response = await client.put( + f"/api/v1/organizations/{fake_id}", + json={"name": "Test"} + ) + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +# ===== Exception Handler Tests (Database Error Scenarios) ===== + +class TestOrganizationExceptionHandlers: + """ + Test exception handlers in organization endpoints. + + These tests use mocks to trigger database errors and verify + proper error handling. Covers lines: 81-83, 124-128, 170-172, 217-221 + """ + + @pytest.mark.asyncio + async def test_get_my_organizations_database_error( + self, + client, + user_token, + test_org_with_user_member + ): + """Test generic exception handler in get_my_organizations (covers lines 81-83).""" + with patch( + "app.crud.organization.organization.get_user_organizations_with_details", + side_effect=Exception("Database connection lost") + ): + # The exception handler logs and re-raises, so we expect the exception + # to propagate (which proves the handler executed) + with pytest.raises(Exception, match="Database connection lost"): + await client.get( + "/api/v1/organizations/me", + headers={"Authorization": f"Bearer {user_token}"} + ) + + @pytest.mark.asyncio + async def test_get_organization_database_error( + self, + client, + user_token, + test_org_with_user_member + ): + """Test generic exception handler in get_organization (covers lines 124-128).""" + with patch( + "app.crud.organization.organization.get", + side_effect=Exception("Database timeout") + ): + with pytest.raises(Exception, match="Database timeout"): + await client.get( + f"/api/v1/organizations/{test_org_with_user_member.id}", + headers={"Authorization": f"Bearer {user_token}"} + ) + + @pytest.mark.asyncio + async def test_get_organization_members_database_error( + self, + client, + user_token, + test_org_with_user_member + ): + """Test generic exception handler in get_organization_members (covers lines 170-172).""" + with patch( + "app.crud.organization.organization.get_organization_members", + side_effect=Exception("Connection pool exhausted") + ): + with pytest.raises(Exception, match="Connection pool exhausted"): + await client.get( + f"/api/v1/organizations/{test_org_with_user_member.id}/members", + headers={"Authorization": f"Bearer {user_token}"} + ) + + @pytest.mark.asyncio + async def test_update_organization_database_error( + self, + client, + async_test_user, + test_org_with_user_admin + ): + """Test generic exception handler in update_organization (covers lines 217-221).""" + # Login as admin user + login_response = await client.post( + "/api/v1/auth/login", + json={"email": "testuser@example.com", "password": "TestPassword123!"} + ) + admin_token = login_response.json()["access_token"] + + with patch( + "app.crud.organization.organization.get", + return_value=test_org_with_user_admin + ): + with patch( + "app.crud.organization.organization.update", + side_effect=Exception("Write lock timeout") + ): + with pytest.raises(Exception, match="Write lock timeout"): + await client.put( + f"/api/v1/organizations/{test_org_with_user_admin.id}", + json={"name": "Should Fail"}, + headers={"Authorization": f"Bearer {admin_token}"} + ) diff --git a/backend/tests/api/test_permissions.py b/backend/tests/api/test_permissions.py new file mode 100644 index 0000000..3802c51 --- /dev/null +++ b/backend/tests/api/test_permissions.py @@ -0,0 +1,310 @@ +# tests/api/test_permissions.py +""" +Tests for permission dependencies - CRITICAL SECURITY PATHS. + +These tests ensure superusers can bypass organization checks correctly, +and that regular users are properly blocked. +""" +import pytest +import pytest_asyncio +from fastapi import status +from uuid import uuid4 + +from app.models.organization import Organization +from app.models.user import User +from app.models.user_organization import UserOrganization, OrganizationRole +from app.core.auth import get_password_hash + + +@pytest_asyncio.fixture +async def superuser_token(client, async_test_superuser): + """Get access token for superuser.""" + response = await client.post( + "/api/v1/auth/login", + json={ + "email": "superuser@example.com", + "password": "SuperPassword123!" + } + ) + assert response.status_code == 200 + return response.json()["access_token"] + + +@pytest_asyncio.fixture +async def regular_user_token(client, async_test_user): + """Get access token for regular user.""" + response = await client.post( + "/api/v1/auth/login", + json={ + "email": "testuser@example.com", + "password": "TestPassword123!" + } + ) + assert response.status_code == 200 + return response.json()["access_token"] + + +@pytest_asyncio.fixture +async def test_org_no_members(async_test_db): + """Create a test organization with NO members.""" + test_engine, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + org = Organization( + name="No Members Org", + slug="no-members-org", + description="Test org with no members" + ) + session.add(org) + await session.commit() + await session.refresh(org) + return org + + +@pytest_asyncio.fixture +async def test_org_with_member(async_test_db, async_test_user): + """Create a test organization with async_test_user as member (not admin).""" + test_engine, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + org = Organization( + name="Member Only Org", + slug="member-only-org", + description="Test org where user is just a member" + ) + session.add(org) + await session.commit() + await session.refresh(org) + + # Add user as MEMBER (not admin/owner) + membership = UserOrganization( + user_id=async_test_user.id, + organization_id=org.id, + role=OrganizationRole.MEMBER, + is_active=True + ) + session.add(membership) + await session.commit() + + return org + + +# ===== CRITICAL SECURITY TESTS: Superuser Bypass ===== + +class TestSuperuserBypass: + """ + CRITICAL: Test that superusers can bypass organization checks. + + Missing coverage lines: 99, 154-155, 175 + These are critical security paths that MUST work correctly. + """ + + @pytest.mark.asyncio + async def test_superuser_can_access_org_not_member_of( + self, + client, + superuser_token, + test_org_no_members + ): + """ + CRITICAL: Superuser should bypass membership check (covers line 175). + + Bug scenario: If this fails, superusers can't manage orgs they're not members of. + """ + response = await client.get( + f"/api/v1/organizations/{test_org_no_members.id}", + headers={"Authorization": f"Bearer {superuser_token}"} + ) + + # Superuser should succeed even though they're not a member + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["id"] == str(test_org_no_members.id) + + @pytest.mark.asyncio + async def test_regular_user_cannot_access_org_not_member_of( + self, + client, + regular_user_token, + test_org_no_members + ): + """Regular user should be blocked from org they're not a member of.""" + response = await client.get( + f"/api/v1/organizations/{test_org_no_members.id}", + headers={"Authorization": f"Bearer {regular_user_token}"} + ) + + # Regular user should fail permission check + assert response.status_code == status.HTTP_403_FORBIDDEN + + @pytest.mark.asyncio + async def test_superuser_can_update_org_not_admin_of( + self, + client, + superuser_token, + test_org_no_members + ): + """ + CRITICAL: Superuser should bypass admin check (covers line 99). + + Bug scenario: If this fails, superusers can't manage orgs. + """ + response = await client.put( + f"/api/v1/organizations/{test_org_no_members.id}", + json={"name": "Updated by Superuser"}, + headers={"Authorization": f"Bearer {superuser_token}"} + ) + + # Superuser should succeed in updating org + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["name"] == "Updated by Superuser" + + @pytest.mark.asyncio + async def test_regular_member_cannot_update_org( + self, + client, + regular_user_token, + test_org_with_member + ): + """Regular member (not admin) should NOT be able to update org.""" + response = await client.put( + f"/api/v1/organizations/{test_org_with_member.id}", + json={"name": "Should Fail"}, + headers={"Authorization": f"Bearer {regular_user_token}"} + ) + + # Member should fail - need admin or owner role + assert response.status_code == status.HTTP_403_FORBIDDEN + + @pytest.mark.asyncio + async def test_superuser_can_list_org_members_not_member_of( + self, + client, + superuser_token, + test_org_no_members + ): + """CRITICAL: Superuser should bypass membership check to list members.""" + response = await client.get( + f"/api/v1/organizations/{test_org_no_members.id}/members", + headers={"Authorization": f"Bearer {superuser_token}"} + ) + + # Superuser should succeed + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert "data" in data + assert "pagination" in data + + +# ===== Edge Cases and Security Tests ===== + +class TestPermissionEdgeCases: + """Test edge cases in permission system.""" + + @pytest.mark.asyncio + async def test_inactive_user_blocked(self, client, async_test_db): + """Test that inactive users are blocked.""" + test_engine, AsyncTestingSessionLocal = async_test_db + + # Create inactive user + async with AsyncTestingSessionLocal() as session: + user = User( + id=uuid4(), + email="inactive@example.com", + password_hash=get_password_hash("TestPassword123!"), + first_name="Inactive", + last_name="User", + is_active=False # INACTIVE + ) + session.add(user) + await session.commit() + + # Try to login (should work - auth checks active status separately) + # But accessing protected endpoints should fail + login_response = await client.post( + "/api/v1/auth/login", + json={"email": "inactive@example.com", "password": "TestPassword123!"} + ) + + # Login might fail for inactive users depending on auth implementation + if login_response.status_code == 200: + token = login_response.json()["access_token"] + + # Try to access protected endpoint + response = await client.get( + "/api/v1/users/me", + headers={"Authorization": f"Bearer {token}"} + ) + + # Should be blocked + assert response.status_code in [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN] + + @pytest.mark.asyncio + async def test_nonexistent_organization_returns_403_not_404( + self, + client, + regular_user_token + ): + """ + Test that accessing nonexistent org returns 403, not 404. + + This is correct behavior - don't leak info about org existence. + The permission check runs BEFORE the org lookup, so if user + is not a member, they get 403 regardless of org existence. + """ + fake_org_id = uuid4() + response = await client.get( + f"/api/v1/organizations/{fake_org_id}", + headers={"Authorization": f"Bearer {regular_user_token}"} + ) + + # Should get 403 (not a member), not 404 (doesn't exist) + # This prevents leaking information about org existence + assert response.status_code == status.HTTP_403_FORBIDDEN + + +# ===== Admin Role Tests ===== + +class TestAdminRolePermissions: + """Test admin role can perform admin actions.""" + + @pytest_asyncio.fixture + async def test_org_with_admin(self, async_test_db, async_test_user): + """Create org where user is ADMIN.""" + test_engine, AsyncTestingSessionLocal = async_test_db + async with AsyncTestingSessionLocal() as session: + org = Organization( + name="Admin Org", + slug="admin-org" + ) + session.add(org) + await session.commit() + await session.refresh(org) + + membership = UserOrganization( + user_id=async_test_user.id, + organization_id=org.id, + role=OrganizationRole.ADMIN, + is_active=True + ) + session.add(membership) + await session.commit() + + return org + + @pytest.mark.asyncio + async def test_admin_can_update_org( + self, + client, + regular_user_token, + test_org_with_admin + ): + """Admin should be able to update organization.""" + response = await client.put( + f"/api/v1/organizations/{test_org_with_admin.id}", + json={"name": "Updated by Admin"}, + headers={"Authorization": f"Bearer {regular_user_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["name"] == "Updated by Admin" diff --git a/backend/tests/api/test_security_headers.py b/backend/tests/api/test_security_headers.py index 98f24fc..49898b6 100755 --- a/backend/tests/api/test_security_headers.py +++ b/backend/tests/api/test_security_headers.py @@ -72,3 +72,82 @@ class TestSecurityHeaders: assert "X-Frame-Options" in response.headers assert "X-Content-Type-Options" in response.headers assert "X-XSS-Protection" in response.headers + + def test_hsts_in_production(self): + """Test that HSTS header is set in production (covers line 95)""" + with patch("app.core.config.settings.ENVIRONMENT", "production"): + with patch("app.core.database.get_db") as mock_get_db: + async def mock_session_generator(): + from unittest.mock import MagicMock, AsyncMock + mock_session = MagicMock() + mock_session.execute = AsyncMock(return_value=None) + mock_session.close = AsyncMock(return_value=None) + yield mock_session + + mock_get_db.side_effect = lambda: mock_session_generator() + + # Need to reimport app to pick up the new settings + from importlib import reload + import app.main + reload(app.main) + test_client = TestClient(app.main.app) + + response = test_client.get("/health") + assert "Strict-Transport-Security" in response.headers + assert "max-age=31536000" in response.headers["Strict-Transport-Security"] + + def test_csp_strict_mode(self): + """Test CSP strict mode (covers line 121)""" + with patch("app.core.config.settings.CSP_MODE", "strict"): + with patch("app.core.database.get_db") as mock_get_db: + async def mock_session_generator(): + from unittest.mock import MagicMock, AsyncMock + mock_session = MagicMock() + mock_session.execute = AsyncMock(return_value=None) + mock_session.close = AsyncMock(return_value=None) + yield mock_session + + mock_get_db.side_effect = lambda: mock_session_generator() + + from importlib import reload + import app.main + reload(app.main) + test_client = TestClient(app.main.app) + + response = test_client.get("/health") + csp = response.headers.get("Content-Security-Policy", "") + # Strict mode should only allow 'self' + assert "script-src 'self'" in csp + assert "style-src 'self'" in csp + assert "cdn.jsdelivr.net" not in csp # No external CDNs in strict mode + + def test_csp_docs_endpoint(self, client): + """Test CSP on /docs endpoint allows Swagger resources (covers line 110)""" + response = client.get("/docs") + csp = response.headers.get("Content-Security-Policy", "") + # Docs endpoint should allow Swagger UI resources + assert "cdn.jsdelivr.net" in csp + assert "fastapi.tiangolo.com" in csp + + +class TestRootEndpoint: + """Tests for the root endpoint""" + + def test_root_endpoint(self): + """Test root endpoint returns HTML (covers line 174)""" + with patch("app.core.database.get_db") as mock_get_db: + async def mock_session_generator(): + from unittest.mock import MagicMock, AsyncMock + mock_session = MagicMock() + mock_session.execute = AsyncMock(return_value=None) + mock_session.close = AsyncMock(return_value=None) + yield mock_session + + mock_get_db.side_effect = lambda: mock_session_generator() + test_client = TestClient(app) + + response = test_client.get("/") + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + assert "Welcome to app API" in response.text + assert "/docs" in response.text diff --git a/backend/tests/crud/test_base.py b/backend/tests/crud/test_base.py index b3425a5..aab66c2 100644 --- a/backend/tests/crud/test_base.py +++ b/backend/tests/crud/test_base.py @@ -833,3 +833,131 @@ class TestCRUDBasePaginationValidation: sort_order="asc" ) assert isinstance(users, list) + + +class TestCRUDBaseModelsWithoutSoftDelete: + """ + Test soft_delete and restore on models without deleted_at column. + Covers lines 342-343, 383-384 - error handling for unsupported models. + """ + + @pytest.mark.asyncio + async def test_soft_delete_model_without_deleted_at(self, async_test_db, async_test_user): + """Test soft_delete on Organization model (no deleted_at) raises ValueError (covers lines 342-343).""" + test_engine, SessionLocal = async_test_db + + # Create an organization (which doesn't have deleted_at) + from app.models.organization import Organization + from app.crud.organization import organization as org_crud + + async with SessionLocal() as session: + org = Organization(name="Test Org", slug="test-org") + session.add(org) + await session.commit() + org_id = org.id + + # Try to soft delete organization (should fail) + async with SessionLocal() as session: + with pytest.raises(ValueError, match="does not have a deleted_at column"): + await org_crud.soft_delete(session, id=str(org_id)) + + @pytest.mark.asyncio + async def test_restore_model_without_deleted_at(self, async_test_db): + """Test restore on Organization model (no deleted_at) raises ValueError (covers lines 383-384).""" + test_engine, SessionLocal = async_test_db + + # Create an organization (which doesn't have deleted_at) + from app.models.organization import Organization + from app.crud.organization import organization as org_crud + + async with SessionLocal() as session: + org = Organization(name="Restore Test", slug="restore-test") + session.add(org) + await session.commit() + org_id = org.id + + # Try to restore organization (should fail) + async with SessionLocal() as session: + with pytest.raises(ValueError, match="does not have a deleted_at column"): + await org_crud.restore(session, id=str(org_id)) + + +class TestCRUDBaseEagerLoadingWithRealOptions: + """ + Test eager loading with actual SQLAlchemy load options. + Covers lines 77-78, 119-120 - options loop execution. + """ + + @pytest.mark.asyncio + async def test_get_with_real_eager_loading_options(self, async_test_db, async_test_user): + """Test get() with actual eager loading options (covers lines 77-78).""" + from datetime import datetime, timedelta, timezone + test_engine, SessionLocal = async_test_db + + # Create a session for the user + from app.models.user_session import UserSession + from app.crud.session import session as session_crud + + async with SessionLocal() as session: + user_session = UserSession( + user_id=async_test_user.id, + refresh_token_jti="test_jti_eager", + device_id="test-device", + ip_address="192.168.1.1", + user_agent="Test Agent", + last_used_at=datetime.now(timezone.utc), + expires_at=datetime.now(timezone.utc) + timedelta(days=60) + ) + session.add(user_session) + await session.commit() + session_id = user_session.id + + # Get session with eager loading of user relationship + async with SessionLocal() as session: + result = await session_crud.get( + session, + id=str(session_id), + options=[joinedload(UserSession.user)] # Real option, not empty list + ) + assert result is not None + assert result.id == session_id + # User should be loaded (accessing it won't cause additional query) + assert result.user.email == async_test_user.email + + @pytest.mark.asyncio + async def test_get_multi_with_real_eager_loading_options(self, async_test_db, async_test_user): + """Test get_multi() with actual eager loading options (covers lines 119-120).""" + from datetime import datetime, timedelta, timezone + test_engine, SessionLocal = async_test_db + + # Create multiple sessions for the user + from app.models.user_session import UserSession + from app.crud.session import session as session_crud + + async with SessionLocal() as session: + for i in range(3): + user_session = UserSession( + user_id=async_test_user.id, + refresh_token_jti=f"jti_eager_{i}", + device_id=f"device-{i}", + ip_address=f"192.168.1.{i}", + user_agent=f"Agent {i}", + last_used_at=datetime.now(timezone.utc), + expires_at=datetime.now(timezone.utc) + timedelta(days=60) + ) + session.add(user_session) + await session.commit() + + # Get sessions with eager loading + async with SessionLocal() as session: + results = await session_crud.get_multi( + session, + skip=0, + limit=10, + options=[joinedload(UserSession.user)] # Real option, not empty list + ) + assert len(results) >= 3 + # Verify we can access user without additional queries + for result in results: + if result.user_id == async_test_user.id: + assert result.user.email == async_test_user.email