Add comprehensive tests for security headers, permissions, CRUD operations, and organizations

- **Security Headers:** Add tests for HSTS in production, CSP in strict mode, and root endpoint response types.
- **Permissions:** Introduce tests for critical security paths, including superuser bypass and edge case scenarios.
- **CRUD Testing Enhancements:** Cover error scenarios for soft deletes, restores, and eager loading with SQLAlchemy options.
- **Organization Routes:** Validate user organization endpoints for memberships, details, and member listings.
- Add defensive code comments with `# pragma: no cover` for unreachable code sections.
This commit is contained in:
2025-11-02 06:10:04 +01:00
parent 789a76071d
commit 461d3caf31
5 changed files with 1172 additions and 4 deletions

View File

@@ -102,7 +102,7 @@ async def get_organization(
"""
try:
org = await organization_crud.get(db, id=organization_id)
if not org:
if not org: # pragma: no cover - Permission check prevents this (see docs/UNREACHABLE_DEFENSIVE_CODE_ANALYSIS.md)
raise NotFoundError(
detail=f"Organization {organization_id} not found",
error_code=ErrorCode.NOT_FOUND
@@ -121,7 +121,7 @@ async def get_organization(
}
return OrganizationResponse(**org_dict)
except NotFoundError:
except NotFoundError: # pragma: no cover - See above
raise
except Exception as e:
logger.error(f"Error getting organization: {str(e)}", exc_info=True)
@@ -192,7 +192,7 @@ async def update_organization(
"""
try:
org = await organization_crud.get(db, id=organization_id)
if not org:
if not org: # pragma: no cover - Permission check prevents this (see docs/UNREACHABLE_DEFENSIVE_CODE_ANALYSIS.md)
raise NotFoundError(
detail=f"Organization {organization_id} not found",
error_code=ErrorCode.NOT_FOUND
@@ -214,7 +214,7 @@ async def update_organization(
}
return OrganizationResponse(**org_dict)
except NotFoundError:
except NotFoundError: # pragma: no cover - See above
raise
except Exception as e:
logger.error(f"Error updating organization: {str(e)}", exc_info=True)

View File

@@ -0,0 +1,651 @@
# tests/api/test_organizations.py
"""
Tests for organization routes (user endpoints).
These test the routes in app/api/routes/organizations.py which allow
users to view and manage organizations they belong to.
"""
import pytest
import pytest_asyncio
from fastapi import status
from uuid import uuid4
from unittest.mock import patch, AsyncMock
from app.models.organization import Organization
from app.models.user import User
from app.models.user_organization import UserOrganization, OrganizationRole
from app.core.auth import get_password_hash
@pytest_asyncio.fixture
async def user_token(client, async_test_user):
"""Get access token for regular user."""
response = await client.post(
"/api/v1/auth/login",
json={
"email": "testuser@example.com",
"password": "TestPassword123!"
}
)
assert response.status_code == 200
return response.json()["access_token"]
@pytest_asyncio.fixture
async def second_user(async_test_db):
"""Create a second test user."""
test_engine, AsyncTestingSessionLocal = async_test_db
async with AsyncTestingSessionLocal() as session:
user = User(
id=uuid4(),
email="seconduser@example.com",
password_hash=get_password_hash("TestPassword123!"),
first_name="Second",
last_name="User",
phone_number="+1234567891",
is_active=True,
is_superuser=False,
preferences=None,
)
session.add(user)
await session.commit()
await session.refresh(user)
return user
@pytest_asyncio.fixture
async def test_org_with_user_member(async_test_db, async_test_user):
"""Create a test organization with async_test_user as a member."""
test_engine, AsyncTestingSessionLocal = async_test_db
async with AsyncTestingSessionLocal() as session:
org = Organization(
name="Member Org",
slug="member-org",
description="Test organization where user is a member"
)
session.add(org)
await session.commit()
await session.refresh(org)
# Add user as member
membership = UserOrganization(
user_id=async_test_user.id,
organization_id=org.id,
role=OrganizationRole.MEMBER,
is_active=True
)
session.add(membership)
await session.commit()
return org
@pytest_asyncio.fixture
async def test_org_with_user_admin(async_test_db, async_test_user):
"""Create a test organization with async_test_user as an admin."""
test_engine, AsyncTestingSessionLocal = async_test_db
async with AsyncTestingSessionLocal() as session:
org = Organization(
name="Admin Org",
slug="admin-org",
description="Test organization where user is an admin"
)
session.add(org)
await session.commit()
await session.refresh(org)
# Add user as admin
membership = UserOrganization(
user_id=async_test_user.id,
organization_id=org.id,
role=OrganizationRole.ADMIN,
is_active=True
)
session.add(membership)
await session.commit()
return org
@pytest_asyncio.fixture
async def test_org_with_user_owner(async_test_db, async_test_user):
"""Create a test organization with async_test_user as owner."""
test_engine, AsyncTestingSessionLocal = async_test_db
async with AsyncTestingSessionLocal() as session:
org = Organization(
name="Owner Org",
slug="owner-org",
description="Test organization where user is owner"
)
session.add(org)
await session.commit()
await session.refresh(org)
# Add user as owner
membership = UserOrganization(
user_id=async_test_user.id,
organization_id=org.id,
role=OrganizationRole.OWNER,
is_active=True
)
session.add(membership)
await session.commit()
return org
# ===== GET /api/v1/organizations/me =====
class TestGetMyOrganizations:
"""Tests for GET /api/v1/organizations/me endpoint."""
@pytest.mark.asyncio
async def test_get_my_organizations_success(
self,
client,
user_token,
test_org_with_user_member,
test_org_with_user_admin
):
"""Test successfully getting user's organizations (covers lines 54-79)."""
response = await client.get(
"/api/v1/organizations/me",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert isinstance(data, list)
assert len(data) >= 2 # At least the two test orgs
# Verify structure
for org in data:
assert "id" in org
assert "name" in org
assert "slug" in org
assert "member_count" in org
@pytest.mark.asyncio
async def test_get_my_organizations_filter_active(
self,
client,
async_test_db,
async_test_user,
user_token
):
"""Test filtering organizations by active status."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create active org
async with AsyncTestingSessionLocal() as session:
active_org = Organization(
name="Active Org",
slug="active-org-filter",
is_active=True
)
session.add(active_org)
await session.commit()
await session.refresh(active_org)
# Add user membership
membership = UserOrganization(
user_id=async_test_user.id,
organization_id=active_org.id,
role=OrganizationRole.MEMBER,
is_active=True
)
session.add(membership)
await session.commit()
response = await client.get(
"/api/v1/organizations/me?is_active=true",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert isinstance(data, list)
@pytest.mark.asyncio
async def test_get_my_organizations_empty(self, client, async_test_db):
"""Test getting organizations when user has none."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create user with no org memberships
async with AsyncTestingSessionLocal() as session:
user = User(
id=uuid4(),
email="noorg@example.com",
password_hash=get_password_hash("TestPassword123!"),
first_name="No",
last_name="Org",
is_active=True
)
session.add(user)
await session.commit()
# Login to get token
login_response = await client.post(
"/api/v1/auth/login",
json={"email": "noorg@example.com", "password": "TestPassword123!"}
)
token = login_response.json()["access_token"]
response = await client.get(
"/api/v1/organizations/me",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data == []
# ===== GET /api/v1/organizations/{organization_id} =====
class TestGetOrganization:
"""Tests for GET /api/v1/organizations/{organization_id} endpoint."""
@pytest.mark.asyncio
async def test_get_organization_success(
self,
client,
user_token,
test_org_with_user_member
):
"""Test successfully getting organization details (covers lines 103-122)."""
response = await client.get(
f"/api/v1/organizations/{test_org_with_user_member.id}",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["id"] == str(test_org_with_user_member.id)
assert data["name"] == "Member Org"
assert data["slug"] == "member-org"
assert "member_count" in data
@pytest.mark.asyncio
async def test_get_organization_not_found(self, client, user_token):
"""Test getting nonexistent organization returns 403 (permission check happens first)."""
fake_org_id = uuid4()
response = await client.get(
f"/api/v1/organizations/{fake_org_id}",
headers={"Authorization": f"Bearer {user_token}"}
)
# Permission dependency checks membership before endpoint logic
# So non-existent org returns 403 (not a member) instead of 404
assert response.status_code == status.HTTP_403_FORBIDDEN
data = response.json()
assert "errors" in data or "detail" in data
@pytest.mark.asyncio
async def test_get_organization_not_member(
self,
client,
async_test_db,
async_test_user
):
"""Test getting organization where user is not a member fails."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create org without adding user
async with AsyncTestingSessionLocal() as session:
org = Organization(
name="Not Member Org",
slug="not-member-org"
)
session.add(org)
await session.commit()
await session.refresh(org)
org_id = org.id
# Login as user
login_response = await client.post(
"/api/v1/auth/login",
json={"email": "testuser@example.com", "password": "TestPassword123!"}
)
token = login_response.json()["access_token"]
response = await client.get(
f"/api/v1/organizations/{org_id}",
headers={"Authorization": f"Bearer {token}"}
)
# Should fail permission check
assert response.status_code == status.HTTP_403_FORBIDDEN
# ===== GET /api/v1/organizations/{organization_id}/members =====
class TestGetOrganizationMembers:
"""Tests for GET /api/v1/organizations/{organization_id}/members endpoint."""
@pytest.mark.asyncio
async def test_get_organization_members_success(
self,
client,
async_test_db,
async_test_user,
second_user,
user_token,
test_org_with_user_member
):
"""Test successfully getting organization members (covers lines 150-168)."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Add second user to org
async with AsyncTestingSessionLocal() as session:
membership = UserOrganization(
user_id=second_user.id,
organization_id=test_org_with_user_member.id,
role=OrganizationRole.MEMBER,
is_active=True
)
session.add(membership)
await session.commit()
response = await client.get(
f"/api/v1/organizations/{test_org_with_user_member.id}/members",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "data" in data
assert "pagination" in data
assert len(data["data"]) >= 2 # At least the two users
@pytest.mark.asyncio
async def test_get_organization_members_with_pagination(
self,
client,
user_token,
test_org_with_user_member
):
"""Test pagination parameters."""
response = await client.get(
f"/api/v1/organizations/{test_org_with_user_member.id}/members?page=1&limit=10",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["pagination"]["page"] == 1
assert "page_size" in data["pagination"] # Uses page_size, not limit
assert "total" in data["pagination"]
@pytest.mark.asyncio
async def test_get_organization_members_filter_active(
self,
client,
async_test_db,
async_test_user,
second_user,
user_token,
test_org_with_user_member
):
"""Test filtering members by active status."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Add second user as inactive member
async with AsyncTestingSessionLocal() as session:
membership = UserOrganization(
user_id=second_user.id,
organization_id=test_org_with_user_member.id,
role=OrganizationRole.MEMBER,
is_active=False
)
session.add(membership)
await session.commit()
# Filter for active only
response = await client.get(
f"/api/v1/organizations/{test_org_with_user_member.id}/members?is_active=true",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# Should only see active members
for member in data["data"]:
assert member["is_active"] is True
# ===== PUT /api/v1/organizations/{organization_id} =====
class TestUpdateOrganization:
"""Tests for PUT /api/v1/organizations/{organization_id} endpoint."""
@pytest.mark.asyncio
async def test_update_organization_as_admin_success(
self,
client,
async_test_user,
test_org_with_user_admin
):
"""Test successfully updating organization as admin (covers lines 193-215)."""
# Login as admin user
login_response = await client.post(
"/api/v1/auth/login",
json={"email": "testuser@example.com", "password": "TestPassword123!"}
)
admin_token = login_response.json()["access_token"]
response = await client.put(
f"/api/v1/organizations/{test_org_with_user_admin.id}",
json={
"name": "Updated Admin Org",
"description": "Updated description"
},
headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["name"] == "Updated Admin Org"
assert data["description"] == "Updated description"
@pytest.mark.asyncio
async def test_update_organization_as_owner_success(
self,
client,
async_test_user,
test_org_with_user_owner
):
"""Test successfully updating organization as owner."""
# Login as owner user
login_response = await client.post(
"/api/v1/auth/login",
json={"email": "testuser@example.com", "password": "TestPassword123!"}
)
owner_token = login_response.json()["access_token"]
response = await client.put(
f"/api/v1/organizations/{test_org_with_user_owner.id}",
json={"name": "Updated Owner Org"},
headers={"Authorization": f"Bearer {owner_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["name"] == "Updated Owner Org"
@pytest.mark.asyncio
async def test_update_organization_as_member_fails(
self,
client,
user_token,
test_org_with_user_member
):
"""Test updating organization as regular member fails."""
response = await client.put(
f"/api/v1/organizations/{test_org_with_user_member.id}",
json={"name": "Should Fail"},
headers={"Authorization": f"Bearer {user_token}"}
)
# Should fail permission check (need admin or owner)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_update_organization_not_found(
self,
client,
test_org_with_user_admin
):
"""Test updating nonexistent organization returns 403 (permission check first)."""
# Login as admin
login_response = await client.post(
"/api/v1/auth/login",
json={"email": "testuser@example.com", "password": "TestPassword123!"}
)
admin_token = login_response.json()["access_token"]
fake_org_id = uuid4()
response = await client.put(
f"/api/v1/organizations/{fake_org_id}",
json={"name": "Updated"},
headers={"Authorization": f"Bearer {admin_token}"}
)
# Permission dependency checks admin role before endpoint logic
# So non-existent org returns 403 (not an admin) instead of 404
assert response.status_code == status.HTTP_403_FORBIDDEN
data = response.json()
assert "errors" in data or "detail" in data
# ===== Authentication Tests =====
class TestOrganizationAuthentication:
"""Test authentication requirements for organization endpoints."""
@pytest.mark.asyncio
async def test_get_my_organizations_unauthenticated(self, client):
"""Test unauthenticated access to /me fails."""
response = await client.get("/api/v1/organizations/me")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_get_organization_unauthenticated(self, client):
"""Test unauthenticated access to organization details fails."""
fake_id = uuid4()
response = await client.get(f"/api/v1/organizations/{fake_id}")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_get_members_unauthenticated(self, client):
"""Test unauthenticated access to members list fails."""
fake_id = uuid4()
response = await client.get(f"/api/v1/organizations/{fake_id}/members")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_update_organization_unauthenticated(self, client):
"""Test unauthenticated access to update fails."""
fake_id = uuid4()
response = await client.put(
f"/api/v1/organizations/{fake_id}",
json={"name": "Test"}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
# ===== Exception Handler Tests (Database Error Scenarios) =====
class TestOrganizationExceptionHandlers:
"""
Test exception handlers in organization endpoints.
These tests use mocks to trigger database errors and verify
proper error handling. Covers lines: 81-83, 124-128, 170-172, 217-221
"""
@pytest.mark.asyncio
async def test_get_my_organizations_database_error(
self,
client,
user_token,
test_org_with_user_member
):
"""Test generic exception handler in get_my_organizations (covers lines 81-83)."""
with patch(
"app.crud.organization.organization.get_user_organizations_with_details",
side_effect=Exception("Database connection lost")
):
# The exception handler logs and re-raises, so we expect the exception
# to propagate (which proves the handler executed)
with pytest.raises(Exception, match="Database connection lost"):
await client.get(
"/api/v1/organizations/me",
headers={"Authorization": f"Bearer {user_token}"}
)
@pytest.mark.asyncio
async def test_get_organization_database_error(
self,
client,
user_token,
test_org_with_user_member
):
"""Test generic exception handler in get_organization (covers lines 124-128)."""
with patch(
"app.crud.organization.organization.get",
side_effect=Exception("Database timeout")
):
with pytest.raises(Exception, match="Database timeout"):
await client.get(
f"/api/v1/organizations/{test_org_with_user_member.id}",
headers={"Authorization": f"Bearer {user_token}"}
)
@pytest.mark.asyncio
async def test_get_organization_members_database_error(
self,
client,
user_token,
test_org_with_user_member
):
"""Test generic exception handler in get_organization_members (covers lines 170-172)."""
with patch(
"app.crud.organization.organization.get_organization_members",
side_effect=Exception("Connection pool exhausted")
):
with pytest.raises(Exception, match="Connection pool exhausted"):
await client.get(
f"/api/v1/organizations/{test_org_with_user_member.id}/members",
headers={"Authorization": f"Bearer {user_token}"}
)
@pytest.mark.asyncio
async def test_update_organization_database_error(
self,
client,
async_test_user,
test_org_with_user_admin
):
"""Test generic exception handler in update_organization (covers lines 217-221)."""
# Login as admin user
login_response = await client.post(
"/api/v1/auth/login",
json={"email": "testuser@example.com", "password": "TestPassword123!"}
)
admin_token = login_response.json()["access_token"]
with patch(
"app.crud.organization.organization.get",
return_value=test_org_with_user_admin
):
with patch(
"app.crud.organization.organization.update",
side_effect=Exception("Write lock timeout")
):
with pytest.raises(Exception, match="Write lock timeout"):
await client.put(
f"/api/v1/organizations/{test_org_with_user_admin.id}",
json={"name": "Should Fail"},
headers={"Authorization": f"Bearer {admin_token}"}
)

View File

@@ -0,0 +1,310 @@
# tests/api/test_permissions.py
"""
Tests for permission dependencies - CRITICAL SECURITY PATHS.
These tests ensure superusers can bypass organization checks correctly,
and that regular users are properly blocked.
"""
import pytest
import pytest_asyncio
from fastapi import status
from uuid import uuid4
from app.models.organization import Organization
from app.models.user import User
from app.models.user_organization import UserOrganization, OrganizationRole
from app.core.auth import get_password_hash
@pytest_asyncio.fixture
async def superuser_token(client, async_test_superuser):
"""Get access token for superuser."""
response = await client.post(
"/api/v1/auth/login",
json={
"email": "superuser@example.com",
"password": "SuperPassword123!"
}
)
assert response.status_code == 200
return response.json()["access_token"]
@pytest_asyncio.fixture
async def regular_user_token(client, async_test_user):
"""Get access token for regular user."""
response = await client.post(
"/api/v1/auth/login",
json={
"email": "testuser@example.com",
"password": "TestPassword123!"
}
)
assert response.status_code == 200
return response.json()["access_token"]
@pytest_asyncio.fixture
async def test_org_no_members(async_test_db):
"""Create a test organization with NO members."""
test_engine, AsyncTestingSessionLocal = async_test_db
async with AsyncTestingSessionLocal() as session:
org = Organization(
name="No Members Org",
slug="no-members-org",
description="Test org with no members"
)
session.add(org)
await session.commit()
await session.refresh(org)
return org
@pytest_asyncio.fixture
async def test_org_with_member(async_test_db, async_test_user):
"""Create a test organization with async_test_user as member (not admin)."""
test_engine, AsyncTestingSessionLocal = async_test_db
async with AsyncTestingSessionLocal() as session:
org = Organization(
name="Member Only Org",
slug="member-only-org",
description="Test org where user is just a member"
)
session.add(org)
await session.commit()
await session.refresh(org)
# Add user as MEMBER (not admin/owner)
membership = UserOrganization(
user_id=async_test_user.id,
organization_id=org.id,
role=OrganizationRole.MEMBER,
is_active=True
)
session.add(membership)
await session.commit()
return org
# ===== CRITICAL SECURITY TESTS: Superuser Bypass =====
class TestSuperuserBypass:
"""
CRITICAL: Test that superusers can bypass organization checks.
Missing coverage lines: 99, 154-155, 175
These are critical security paths that MUST work correctly.
"""
@pytest.mark.asyncio
async def test_superuser_can_access_org_not_member_of(
self,
client,
superuser_token,
test_org_no_members
):
"""
CRITICAL: Superuser should bypass membership check (covers line 175).
Bug scenario: If this fails, superusers can't manage orgs they're not members of.
"""
response = await client.get(
f"/api/v1/organizations/{test_org_no_members.id}",
headers={"Authorization": f"Bearer {superuser_token}"}
)
# Superuser should succeed even though they're not a member
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["id"] == str(test_org_no_members.id)
@pytest.mark.asyncio
async def test_regular_user_cannot_access_org_not_member_of(
self,
client,
regular_user_token,
test_org_no_members
):
"""Regular user should be blocked from org they're not a member of."""
response = await client.get(
f"/api/v1/organizations/{test_org_no_members.id}",
headers={"Authorization": f"Bearer {regular_user_token}"}
)
# Regular user should fail permission check
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_superuser_can_update_org_not_admin_of(
self,
client,
superuser_token,
test_org_no_members
):
"""
CRITICAL: Superuser should bypass admin check (covers line 99).
Bug scenario: If this fails, superusers can't manage orgs.
"""
response = await client.put(
f"/api/v1/organizations/{test_org_no_members.id}",
json={"name": "Updated by Superuser"},
headers={"Authorization": f"Bearer {superuser_token}"}
)
# Superuser should succeed in updating org
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["name"] == "Updated by Superuser"
@pytest.mark.asyncio
async def test_regular_member_cannot_update_org(
self,
client,
regular_user_token,
test_org_with_member
):
"""Regular member (not admin) should NOT be able to update org."""
response = await client.put(
f"/api/v1/organizations/{test_org_with_member.id}",
json={"name": "Should Fail"},
headers={"Authorization": f"Bearer {regular_user_token}"}
)
# Member should fail - need admin or owner role
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_superuser_can_list_org_members_not_member_of(
self,
client,
superuser_token,
test_org_no_members
):
"""CRITICAL: Superuser should bypass membership check to list members."""
response = await client.get(
f"/api/v1/organizations/{test_org_no_members.id}/members",
headers={"Authorization": f"Bearer {superuser_token}"}
)
# Superuser should succeed
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "data" in data
assert "pagination" in data
# ===== Edge Cases and Security Tests =====
class TestPermissionEdgeCases:
"""Test edge cases in permission system."""
@pytest.mark.asyncio
async def test_inactive_user_blocked(self, client, async_test_db):
"""Test that inactive users are blocked."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create inactive user
async with AsyncTestingSessionLocal() as session:
user = User(
id=uuid4(),
email="inactive@example.com",
password_hash=get_password_hash("TestPassword123!"),
first_name="Inactive",
last_name="User",
is_active=False # INACTIVE
)
session.add(user)
await session.commit()
# Try to login (should work - auth checks active status separately)
# But accessing protected endpoints should fail
login_response = await client.post(
"/api/v1/auth/login",
json={"email": "inactive@example.com", "password": "TestPassword123!"}
)
# Login might fail for inactive users depending on auth implementation
if login_response.status_code == 200:
token = login_response.json()["access_token"]
# Try to access protected endpoint
response = await client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {token}"}
)
# Should be blocked
assert response.status_code in [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]
@pytest.mark.asyncio
async def test_nonexistent_organization_returns_403_not_404(
self,
client,
regular_user_token
):
"""
Test that accessing nonexistent org returns 403, not 404.
This is correct behavior - don't leak info about org existence.
The permission check runs BEFORE the org lookup, so if user
is not a member, they get 403 regardless of org existence.
"""
fake_org_id = uuid4()
response = await client.get(
f"/api/v1/organizations/{fake_org_id}",
headers={"Authorization": f"Bearer {regular_user_token}"}
)
# Should get 403 (not a member), not 404 (doesn't exist)
# This prevents leaking information about org existence
assert response.status_code == status.HTTP_403_FORBIDDEN
# ===== Admin Role Tests =====
class TestAdminRolePermissions:
"""Test admin role can perform admin actions."""
@pytest_asyncio.fixture
async def test_org_with_admin(self, async_test_db, async_test_user):
"""Create org where user is ADMIN."""
test_engine, AsyncTestingSessionLocal = async_test_db
async with AsyncTestingSessionLocal() as session:
org = Organization(
name="Admin Org",
slug="admin-org"
)
session.add(org)
await session.commit()
await session.refresh(org)
membership = UserOrganization(
user_id=async_test_user.id,
organization_id=org.id,
role=OrganizationRole.ADMIN,
is_active=True
)
session.add(membership)
await session.commit()
return org
@pytest.mark.asyncio
async def test_admin_can_update_org(
self,
client,
regular_user_token,
test_org_with_admin
):
"""Admin should be able to update organization."""
response = await client.put(
f"/api/v1/organizations/{test_org_with_admin.id}",
json={"name": "Updated by Admin"},
headers={"Authorization": f"Bearer {regular_user_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["name"] == "Updated by Admin"

View File

@@ -72,3 +72,82 @@ class TestSecurityHeaders:
assert "X-Frame-Options" in response.headers
assert "X-Content-Type-Options" in response.headers
assert "X-XSS-Protection" in response.headers
def test_hsts_in_production(self):
"""Test that HSTS header is set in production (covers line 95)"""
with patch("app.core.config.settings.ENVIRONMENT", "production"):
with patch("app.core.database.get_db") as mock_get_db:
async def mock_session_generator():
from unittest.mock import MagicMock, AsyncMock
mock_session = MagicMock()
mock_session.execute = AsyncMock(return_value=None)
mock_session.close = AsyncMock(return_value=None)
yield mock_session
mock_get_db.side_effect = lambda: mock_session_generator()
# Need to reimport app to pick up the new settings
from importlib import reload
import app.main
reload(app.main)
test_client = TestClient(app.main.app)
response = test_client.get("/health")
assert "Strict-Transport-Security" in response.headers
assert "max-age=31536000" in response.headers["Strict-Transport-Security"]
def test_csp_strict_mode(self):
"""Test CSP strict mode (covers line 121)"""
with patch("app.core.config.settings.CSP_MODE", "strict"):
with patch("app.core.database.get_db") as mock_get_db:
async def mock_session_generator():
from unittest.mock import MagicMock, AsyncMock
mock_session = MagicMock()
mock_session.execute = AsyncMock(return_value=None)
mock_session.close = AsyncMock(return_value=None)
yield mock_session
mock_get_db.side_effect = lambda: mock_session_generator()
from importlib import reload
import app.main
reload(app.main)
test_client = TestClient(app.main.app)
response = test_client.get("/health")
csp = response.headers.get("Content-Security-Policy", "")
# Strict mode should only allow 'self'
assert "script-src 'self'" in csp
assert "style-src 'self'" in csp
assert "cdn.jsdelivr.net" not in csp # No external CDNs in strict mode
def test_csp_docs_endpoint(self, client):
"""Test CSP on /docs endpoint allows Swagger resources (covers line 110)"""
response = client.get("/docs")
csp = response.headers.get("Content-Security-Policy", "")
# Docs endpoint should allow Swagger UI resources
assert "cdn.jsdelivr.net" in csp
assert "fastapi.tiangolo.com" in csp
class TestRootEndpoint:
"""Tests for the root endpoint"""
def test_root_endpoint(self):
"""Test root endpoint returns HTML (covers line 174)"""
with patch("app.core.database.get_db") as mock_get_db:
async def mock_session_generator():
from unittest.mock import MagicMock, AsyncMock
mock_session = MagicMock()
mock_session.execute = AsyncMock(return_value=None)
mock_session.close = AsyncMock(return_value=None)
yield mock_session
mock_get_db.side_effect = lambda: mock_session_generator()
test_client = TestClient(app)
response = test_client.get("/")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "Welcome to app API" in response.text
assert "/docs" in response.text

View File

@@ -833,3 +833,131 @@ class TestCRUDBasePaginationValidation:
sort_order="asc"
)
assert isinstance(users, list)
class TestCRUDBaseModelsWithoutSoftDelete:
"""
Test soft_delete and restore on models without deleted_at column.
Covers lines 342-343, 383-384 - error handling for unsupported models.
"""
@pytest.mark.asyncio
async def test_soft_delete_model_without_deleted_at(self, async_test_db, async_test_user):
"""Test soft_delete on Organization model (no deleted_at) raises ValueError (covers lines 342-343)."""
test_engine, SessionLocal = async_test_db
# Create an organization (which doesn't have deleted_at)
from app.models.organization import Organization
from app.crud.organization import organization as org_crud
async with SessionLocal() as session:
org = Organization(name="Test Org", slug="test-org")
session.add(org)
await session.commit()
org_id = org.id
# Try to soft delete organization (should fail)
async with SessionLocal() as session:
with pytest.raises(ValueError, match="does not have a deleted_at column"):
await org_crud.soft_delete(session, id=str(org_id))
@pytest.mark.asyncio
async def test_restore_model_without_deleted_at(self, async_test_db):
"""Test restore on Organization model (no deleted_at) raises ValueError (covers lines 383-384)."""
test_engine, SessionLocal = async_test_db
# Create an organization (which doesn't have deleted_at)
from app.models.organization import Organization
from app.crud.organization import organization as org_crud
async with SessionLocal() as session:
org = Organization(name="Restore Test", slug="restore-test")
session.add(org)
await session.commit()
org_id = org.id
# Try to restore organization (should fail)
async with SessionLocal() as session:
with pytest.raises(ValueError, match="does not have a deleted_at column"):
await org_crud.restore(session, id=str(org_id))
class TestCRUDBaseEagerLoadingWithRealOptions:
"""
Test eager loading with actual SQLAlchemy load options.
Covers lines 77-78, 119-120 - options loop execution.
"""
@pytest.mark.asyncio
async def test_get_with_real_eager_loading_options(self, async_test_db, async_test_user):
"""Test get() with actual eager loading options (covers lines 77-78)."""
from datetime import datetime, timedelta, timezone
test_engine, SessionLocal = async_test_db
# Create a session for the user
from app.models.user_session import UserSession
from app.crud.session import session as session_crud
async with SessionLocal() as session:
user_session = UserSession(
user_id=async_test_user.id,
refresh_token_jti="test_jti_eager",
device_id="test-device",
ip_address="192.168.1.1",
user_agent="Test Agent",
last_used_at=datetime.now(timezone.utc),
expires_at=datetime.now(timezone.utc) + timedelta(days=60)
)
session.add(user_session)
await session.commit()
session_id = user_session.id
# Get session with eager loading of user relationship
async with SessionLocal() as session:
result = await session_crud.get(
session,
id=str(session_id),
options=[joinedload(UserSession.user)] # Real option, not empty list
)
assert result is not None
assert result.id == session_id
# User should be loaded (accessing it won't cause additional query)
assert result.user.email == async_test_user.email
@pytest.mark.asyncio
async def test_get_multi_with_real_eager_loading_options(self, async_test_db, async_test_user):
"""Test get_multi() with actual eager loading options (covers lines 119-120)."""
from datetime import datetime, timedelta, timezone
test_engine, SessionLocal = async_test_db
# Create multiple sessions for the user
from app.models.user_session import UserSession
from app.crud.session import session as session_crud
async with SessionLocal() as session:
for i in range(3):
user_session = UserSession(
user_id=async_test_user.id,
refresh_token_jti=f"jti_eager_{i}",
device_id=f"device-{i}",
ip_address=f"192.168.1.{i}",
user_agent=f"Agent {i}",
last_used_at=datetime.now(timezone.utc),
expires_at=datetime.now(timezone.utc) + timedelta(days=60)
)
session.add(user_session)
await session.commit()
# Get sessions with eager loading
async with SessionLocal() as session:
results = await session_crud.get_multi(
session,
skip=0,
limit=10,
options=[joinedload(UserSession.user)] # Real option, not empty list
)
assert len(results) >= 3
# Verify we can access user without additional queries
for result in results:
if result.user_id == async_test_user.id:
assert result.user.email == async_test_user.email