Add E2E tests for admin and organization workflows

- Introduced E2E tests for admin user and organization management workflows: user listing, creation, updates, bulk actions, and organization membership management.
- Added comprehensive tests for organization CRUD operations, membership visibility, roles, and permission validation.
- Expanded fixtures for superuser and member setup to streamline testing of admin-specific operations.
- Verified pagination, filtering, and action consistency across admin endpoints.
This commit is contained in:
Felipe Cardoso
2025-11-25 23:50:02 +01:00
parent 48f052200f
commit 7716468d72
5 changed files with 1195 additions and 0 deletions

View File

@@ -203,3 +203,171 @@ async def e2e_client(async_postgres_url):
app.dependency_overrides.clear()
await engine.dispose()
@pytest_asyncio.fixture
async def e2e_superuser(e2e_client):
"""
Create a superuser and return credentials + tokens.
Returns dict with: email, password, tokens, user_id
"""
from uuid import uuid4
from app.crud.user import user as user_crud
from app.schemas.users import UserCreate
email = f"admin-{uuid4().hex[:8]}@example.com"
password = "SuperAdmin123!"
# Register via API first to get proper password hashing
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Super",
"last_name": "Admin",
},
)
# Login to get tokens
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# Now we need to make this user a superuser directly via SQL
# Get the db session from the client's override
from sqlalchemy import text
from app.core.database import get_db
from app.main import app
async for db in app.dependency_overrides[get_db]():
# Update user to be superuser
await db.execute(
text("UPDATE users SET is_superuser = true WHERE email = :email"),
{"email": email},
)
await db.commit()
# Get user ID
result = await db.execute(
text("SELECT id FROM users WHERE email = :email"),
{"email": email},
)
user_id = str(result.scalar())
break
return {
"email": email,
"password": password,
"tokens": tokens,
"user_id": user_id,
}
@pytest_asyncio.fixture
async def e2e_org_with_members(e2e_client, e2e_superuser):
"""
Create an organization with owner and member.
Returns dict with: org_id, org_slug, owner (tokens), member (tokens)
"""
from uuid import uuid4
# Create organization via admin API
org_name = f"Test Org {uuid4().hex[:8]}"
org_slug = f"test-org-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/admin/organizations",
headers={"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"},
json={
"name": org_name,
"slug": org_slug,
"description": "Test organization for E2E tests",
},
)
org_data = create_resp.json()
org_id = org_data["id"]
# Create owner user
owner_email = f"owner-{uuid4().hex[:8]}@example.com"
owner_password = "OwnerPass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": owner_email,
"password": owner_password,
"first_name": "Org",
"last_name": "Owner",
},
)
owner_login = await e2e_client.post(
"/api/v1/auth/login",
json={"email": owner_email, "password": owner_password},
)
owner_tokens = owner_login.json()
# Get owner user ID
owner_me = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {owner_tokens['access_token']}"},
)
owner_id = owner_me.json()["id"]
# Add owner to organization as owner role
await e2e_client.post(
f"/api/v1/admin/organizations/{org_id}/members",
headers={"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"},
json={"user_id": owner_id, "role": "owner"},
)
# Create member user
member_email = f"member-{uuid4().hex[:8]}@example.com"
member_password = "MemberPass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": member_email,
"password": member_password,
"first_name": "Org",
"last_name": "Member",
},
)
member_login = await e2e_client.post(
"/api/v1/auth/login",
json={"email": member_email, "password": member_password},
)
member_tokens = member_login.json()
# Get member user ID
member_me = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {member_tokens['access_token']}"},
)
member_id = member_me.json()["id"]
# Add member to organization
await e2e_client.post(
f"/api/v1/admin/organizations/{org_id}/members",
headers={"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"},
json={"user_id": member_id, "role": "member"},
)
return {
"org_id": org_id,
"org_slug": org_slug,
"org_name": org_name,
"owner": {"email": owner_email, "tokens": owner_tokens, "user_id": owner_id},
"member": {
"email": member_email,
"tokens": member_tokens,
"user_id": member_id,
},
}

View File

@@ -0,0 +1,649 @@
"""
Admin superuser E2E workflow tests with real PostgreSQL.
These tests validate admin operations with actual superuser privileges:
- User management (list, create, update, delete, bulk actions)
- Organization management (create, update, delete, members)
- Admin statistics
Usage:
make test-e2e # Run all E2E tests
"""
from uuid import uuid4
import pytest
pytestmark = [
pytest.mark.e2e,
pytest.mark.postgres,
pytest.mark.asyncio,
]
class TestAdminUserManagement:
"""Test admin user management with superuser."""
async def test_admin_list_users(self, e2e_client, e2e_superuser):
"""Superuser can list all users."""
response = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "pagination" in data
assert len(data["data"]) >= 1 # At least the superuser
async def test_admin_list_users_with_pagination(self, e2e_client, e2e_superuser):
"""Superuser can list users with pagination."""
# Create a few more users
for i in range(3):
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": f"user{i}-{uuid4().hex[:8]}@example.com",
"password": "TestPass123!",
"first_name": f"User{i}",
"last_name": "Test",
},
)
response = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
params={"page": 1, "limit": 2},
)
assert response.status_code == 200
data = response.json()
assert len(data["data"]) <= 2
assert data["pagination"]["page_size"] <= 2
async def test_admin_create_user(self, e2e_client, e2e_superuser):
"""Superuser can create new users."""
email = f"newuser-{uuid4().hex[:8]}@example.com"
response = await e2e_client.post(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"email": email,
"password": "NewUserPass123!",
"first_name": "New",
"last_name": "User",
},
)
assert response.status_code in [200, 201]
data = response.json()
assert data["email"] == email
async def test_admin_get_user_by_id(self, e2e_client, e2e_superuser):
"""Superuser can get any user by ID."""
# Create a user
email = f"target-{uuid4().hex[:8]}@example.com"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "TargetPass123!",
"first_name": "Target",
"last_name": "User",
},
)
# Get user list to find the ID
list_resp = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
users = list_resp.json()["data"]
target_user = next(u for u in users if u["email"] == email)
# Get user by ID
response = await e2e_client.get(
f"/api/v1/admin/users/{target_user['id']}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200
assert response.json()["email"] == email
async def test_admin_update_user(self, e2e_client, e2e_superuser):
"""Superuser can update any user."""
# Create a user
email = f"update-{uuid4().hex[:8]}@example.com"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "UpdatePass123!",
"first_name": "Update",
"last_name": "User",
},
)
# Get user ID
list_resp = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
users = list_resp.json()["data"]
target_user = next(u for u in users if u["email"] == email)
# Update user
response = await e2e_client.put(
f"/api/v1/admin/users/{target_user['id']}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"first_name": "Updated", "last_name": "Name"},
)
assert response.status_code == 200
assert response.json()["first_name"] == "Updated"
async def test_admin_deactivate_user(self, e2e_client, e2e_superuser):
"""Superuser can deactivate users."""
# Create a user
email = f"deactivate-{uuid4().hex[:8]}@example.com"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "DeactivatePass123!",
"first_name": "Deactivate",
"last_name": "User",
},
)
# Get user ID
list_resp = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
users = list_resp.json()["data"]
target_user = next(u for u in users if u["email"] == email)
# Deactivate user
response = await e2e_client.post(
f"/api/v1/admin/users/{target_user['id']}/deactivate",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200
async def test_admin_bulk_action(self, e2e_client, e2e_superuser):
"""Superuser can perform bulk actions on users."""
# Create users for bulk action
user_ids = []
for i in range(2):
email = f"bulk-{i}-{uuid4().hex[:8]}@example.com"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "BulkPass123!",
"first_name": f"Bulk{i}",
"last_name": "User",
},
)
# Get user IDs
list_resp = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
users = list_resp.json()["data"]
bulk_users = [u for u in users if u["email"].startswith("bulk-")]
user_ids = [u["id"] for u in bulk_users]
# Bulk deactivate
response = await e2e_client.post(
"/api/v1/admin/users/bulk-action",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"action": "deactivate", "user_ids": user_ids},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["affected_count"] >= 1
class TestAdminOrganizationManagement:
"""Test admin organization management with superuser."""
async def test_admin_list_organizations(self, e2e_client, e2e_superuser):
"""Superuser can list all organizations."""
response = await e2e_client.get(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "pagination" in data
async def test_admin_create_organization(self, e2e_client, e2e_superuser):
"""Superuser can create organizations."""
org_name = f"Admin Org {uuid4().hex[:8]}"
org_slug = f"admin-org-{uuid4().hex[:8]}"
response = await e2e_client.post(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": org_name,
"slug": org_slug,
"description": "Created by admin",
},
)
assert response.status_code in [200, 201]
data = response.json()
assert data["name"] == org_name
assert data["slug"] == org_slug
async def test_admin_get_organization(self, e2e_client, e2e_superuser):
"""Superuser can get organization details."""
# Create org first
org_slug = f"get-org-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Get Org Test",
"slug": org_slug,
},
)
org_id = create_resp.json()["id"]
# Get org
response = await e2e_client.get(
f"/api/v1/admin/organizations/{org_id}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200
assert response.json()["slug"] == org_slug
async def test_admin_update_organization(self, e2e_client, e2e_superuser):
"""Superuser can update organizations."""
# Create org
org_slug = f"update-org-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Update Org Test", "slug": org_slug},
)
org_id = create_resp.json()["id"]
# Update org
response = await e2e_client.put(
f"/api/v1/admin/organizations/{org_id}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Updated Org Name", "description": "Updated description"},
)
assert response.status_code == 200
assert response.json()["name"] == "Updated Org Name"
async def test_admin_add_member_to_organization(self, e2e_client, e2e_superuser):
"""Superuser can add members to organizations."""
# Create org
org_slug = f"member-org-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Member Org Test", "slug": org_slug},
)
org_id = create_resp.json()["id"]
# Create user to add
email = f"new-member-{uuid4().hex[:8]}@example.com"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "MemberPass123!",
"first_name": "New",
"last_name": "Member",
},
)
# Get user ID
list_resp = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
users = list_resp.json()["data"]
new_user = next(u for u in users if u["email"] == email)
# Add to org
response = await e2e_client.post(
f"/api/v1/admin/organizations/{org_id}/members",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"user_id": new_user["id"], "role": "member"},
)
assert response.status_code in [200, 201]
async def test_admin_list_organization_members(self, e2e_client, e2e_superuser):
"""Superuser can list organization members."""
# Create org with member
org_slug = f"list-members-org-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "List Members Org", "slug": org_slug},
)
org_id = create_resp.json()["id"]
# List members
response = await e2e_client.get(
f"/api/v1/admin/organizations/{org_id}/members",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200
class TestAdminStats:
"""Test admin statistics endpoints."""
async def test_admin_get_stats(self, e2e_client, e2e_superuser):
"""Superuser can get admin statistics."""
response = await e2e_client.get(
"/api/v1/admin/stats",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200
data = response.json()
# Stats should have user growth, org distribution, etc.
assert "user_growth" in data or "user_status" in data
class TestAdminSessionManagement:
"""Test admin session management."""
async def test_admin_list_all_sessions(self, e2e_client, e2e_superuser):
"""Superuser can list all sessions."""
response = await e2e_client.get(
"/api/v1/admin/sessions",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200
data = response.json()
assert "data" in data
class TestAdminDeleteOperations:
"""Test admin delete operations."""
async def test_admin_delete_user(self, e2e_client, e2e_superuser):
"""Superuser can delete users."""
# Create user
email = f"delete-{uuid4().hex[:8]}@example.com"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "DeletePass123!",
"first_name": "Delete",
"last_name": "User",
},
)
# Get user ID
list_resp = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
users = list_resp.json()["data"]
target_user = next(u for u in users if u["email"] == email)
# Delete user
response = await e2e_client.delete(
f"/api/v1/admin/users/{target_user['id']}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code in [200, 204]
async def test_admin_delete_organization(self, e2e_client, e2e_superuser):
"""Superuser can delete organizations."""
# Create org
org_slug = f"delete-org-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Delete Org Test", "slug": org_slug},
)
org_id = create_resp.json()["id"]
# Delete org
response = await e2e_client.delete(
f"/api/v1/admin/organizations/{org_id}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code in [200, 204]
async def test_admin_remove_org_member(self, e2e_client, e2e_superuser):
"""Superuser can remove members from organizations."""
# Create org
org_slug = f"remove-member-org-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Remove Member Org", "slug": org_slug},
)
org_id = create_resp.json()["id"]
# Create user
email = f"remove-member-{uuid4().hex[:8]}@example.com"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "RemovePass123!",
"first_name": "Remove",
"last_name": "Member",
},
)
# Get user ID
list_resp = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
users = list_resp.json()["data"]
target_user = next(u for u in users if u["email"] == email)
# Add to org
await e2e_client.post(
f"/api/v1/admin/organizations/{org_id}/members",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"user_id": target_user["id"], "role": "member"},
)
# Remove from org
response = await e2e_client.delete(
f"/api/v1/admin/organizations/{org_id}/members/{target_user['id']}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code in [200, 204]
class TestAdminSearchAndFilter:
"""Test admin search and filter capabilities."""
async def test_admin_search_users_by_email(self, e2e_client, e2e_superuser):
"""Superuser can search users by email."""
# Create user with unique prefix
prefix = f"searchable-{uuid4().hex[:8]}"
email = f"{prefix}@example.com"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "SearchPass123!",
"first_name": "Search",
"last_name": "User",
},
)
response = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
params={"search": prefix},
)
assert response.status_code == 200
data = response.json()
# Search should find the user
assert len(data["data"]) >= 1
emails = [u["email"] for u in data["data"]]
assert any(prefix in e for e in emails)
async def test_admin_filter_active_users(self, e2e_client, e2e_superuser):
"""Superuser can filter by active status."""
response = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
params={"is_active": True},
)
assert response.status_code == 200
data = response.json()
# All returned users should be active
for user in data["data"]:
assert user["is_active"] is True
async def test_admin_filter_superusers(self, e2e_client, e2e_superuser):
"""Superuser can filter superusers."""
response = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
params={"is_superuser": True},
)
assert response.status_code == 200
data = response.json()
# Should find at least the test superuser
assert len(data["data"]) >= 1
async def test_admin_sort_users(self, e2e_client, e2e_superuser):
"""Superuser can sort users by different fields."""
response = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
params={"sort_by": "created_at", "sort_order": "desc"},
)
assert response.status_code == 200
data = response.json()
assert "data" in data
async def test_admin_search_organizations(self, e2e_client, e2e_superuser):
"""Superuser can search organizations."""
# Create org with unique name
prefix = f"searchorg-{uuid4().hex[:8]}"
await e2e_client.post(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": f"{prefix} Test", "slug": f"{prefix}-slug"},
)
response = await e2e_client.get(
"/api/v1/admin/organizations",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
params={"search": prefix},
)
assert response.status_code == 200
data = response.json()
assert len(data["data"]) >= 1

View File

@@ -188,3 +188,134 @@ class TestHealthEndpoint:
assert response.status_code in [200, 503]
data = response.json()
assert "status" in data
class TestLogoutWorkflows:
"""Test logout workflows."""
async def test_logout_invalidates_session(self, e2e_client):
"""Test that logout invalidates the session."""
email = f"e2e-logout-{uuid4().hex[:8]}@example.com"
password = "SecurePassword123!"
# Register and login
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Logout",
"last_name": "Test",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# Logout requires both access token (auth) and refresh token (body)
logout_resp = await e2e_client.post(
"/api/v1/auth/logout",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"refresh_token": tokens["refresh_token"]},
)
assert logout_resp.status_code == 200
async def test_invalid_refresh_token_rejected(self, e2e_client):
"""Test that invalid refresh tokens are rejected."""
response = await e2e_client.post(
"/api/v1/auth/refresh",
json={"refresh_token": "invalid_refresh_token"},
)
assert response.status_code in [401, 422]
class TestValidationWorkflows:
"""Test input validation workflows."""
async def test_register_invalid_email(self, e2e_client):
"""Test that invalid email format is rejected."""
response = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": "not_an_email",
"password": "ValidPassword123!",
"first_name": "Test",
"last_name": "User",
},
)
assert response.status_code == 422
async def test_register_weak_password(self, e2e_client):
"""Test that weak passwords are rejected."""
email = f"e2e-weak-{uuid4().hex[:8]}@example.com"
response = await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": "weak", # Too weak
"first_name": "Test",
"last_name": "User",
},
)
assert response.status_code == 422
async def test_login_missing_fields(self, e2e_client):
"""Test that login requires all fields."""
response = await e2e_client.post(
"/api/v1/auth/login",
json={"email": "test@example.com"}, # Missing password
)
assert response.status_code == 422
class TestRootEndpoint:
"""Test root endpoint."""
async def test_root_responds(self, e2e_client):
"""Root endpoint should respond with HTML."""
response = await e2e_client.get("/")
assert response.status_code == 200
# Root returns HTML
assert "html" in response.text.lower() or "Welcome" in response.text
async def test_openapi_available(self, e2e_client):
"""OpenAPI schema should be available."""
response = await e2e_client.get("/api/v1/openapi.json")
assert response.status_code == 200
data = response.json()
assert "openapi" in data
assert "paths" in data
class TestAuthTokenWorkflows:
"""Test authentication token workflows."""
async def test_access_token_expires(self, e2e_client):
"""Test using expired access token."""
# Use a fake/expired token
fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwiZXhwIjoxNjAwMDAwMDAwfQ.invalid"
response = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {fake_token}"},
)
assert response.status_code == 401
async def test_malformed_token_rejected(self, e2e_client):
"""Test that malformed tokens are rejected."""
response = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": "Bearer not-a-valid-token"},
)
assert response.status_code == 401
async def test_missing_bearer_prefix(self, e2e_client):
"""Test that tokens without Bearer prefix are rejected."""
response = await e2e_client.get(
"/api/v1/users/me",
headers={"Authorization": "some-token"},
)
assert response.status_code == 401

View File

@@ -155,3 +155,175 @@ class TestOrganizationMembershipWorkflows:
)
assert response.status_code in [403, 404]
class TestOrganizationWithMembers:
"""Test organization workflows using e2e_org_with_members fixture."""
async def test_owner_can_view_organization(self, e2e_client, e2e_org_with_members):
"""Organization owner can view organization details."""
org = e2e_org_with_members
response = await e2e_client.get(
f"/api/v1/organizations/{org['org_id']}",
headers={"Authorization": f"Bearer {org['owner']['tokens']['access_token']}"},
)
assert response.status_code == 200
data = response.json()
assert data["id"] == org["org_id"]
assert data["name"] == org["org_name"]
async def test_member_can_view_organization(self, e2e_client, e2e_org_with_members):
"""Organization member can view organization details."""
org = e2e_org_with_members
response = await e2e_client.get(
f"/api/v1/organizations/{org['org_id']}",
headers={"Authorization": f"Bearer {org['member']['tokens']['access_token']}"},
)
assert response.status_code == 200
data = response.json()
assert data["id"] == org["org_id"]
async def test_owner_can_list_members(self, e2e_client, e2e_org_with_members):
"""Organization owner can list members."""
org = e2e_org_with_members
response = await e2e_client.get(
f"/api/v1/organizations/{org['org_id']}/members",
headers={"Authorization": f"Bearer {org['owner']['tokens']['access_token']}"},
)
assert response.status_code == 200
data = response.json()
# Should have owner + member = at least 2 members
assert len(data) >= 2
async def test_member_can_list_members(self, e2e_client, e2e_org_with_members):
"""Organization member can list members."""
org = e2e_org_with_members
response = await e2e_client.get(
f"/api/v1/organizations/{org['org_id']}/members",
headers={"Authorization": f"Bearer {org['member']['tokens']['access_token']}"},
)
assert response.status_code == 200
async def test_owner_appears_in_my_organizations(self, e2e_client, e2e_org_with_members):
"""Owner sees organization in their organizations list."""
org = e2e_org_with_members
response = await e2e_client.get(
"/api/v1/organizations/me",
headers={"Authorization": f"Bearer {org['owner']['tokens']['access_token']}"},
)
assert response.status_code == 200
data = response.json()
org_ids = [o["id"] for o in data]
assert org["org_id"] in org_ids
async def test_member_appears_in_my_organizations(self, e2e_client, e2e_org_with_members):
"""Member sees organization in their organizations list."""
org = e2e_org_with_members
response = await e2e_client.get(
"/api/v1/organizations/me",
headers={"Authorization": f"Bearer {org['member']['tokens']['access_token']}"},
)
assert response.status_code == 200
data = response.json()
org_ids = [o["id"] for o in data]
assert org["org_id"] in org_ids
async def test_owner_can_update_organization(self, e2e_client, e2e_org_with_members):
"""Organization owner can update organization details."""
org = e2e_org_with_members
new_description = f"Updated at {uuid4().hex[:8]}"
response = await e2e_client.put(
f"/api/v1/organizations/{org['org_id']}",
headers={"Authorization": f"Bearer {org['owner']['tokens']['access_token']}"},
json={"description": new_description},
)
assert response.status_code == 200
data = response.json()
assert data["description"] == new_description
async def test_member_cannot_update_organization(self, e2e_client, e2e_org_with_members):
"""Regular member cannot update organization details."""
org = e2e_org_with_members
response = await e2e_client.put(
f"/api/v1/organizations/{org['org_id']}",
headers={"Authorization": f"Bearer {org['member']['tokens']['access_token']}"},
json={"description": "Should fail"},
)
assert response.status_code == 403
async def test_non_member_cannot_view_organization(
self, e2e_client, e2e_org_with_members
):
"""Non-members cannot view organization details."""
org = e2e_org_with_members
# Create a new user who is not a member
non_member_email = f"nonmember-{uuid4().hex[:8]}@example.com"
tokens = await register_and_login(e2e_client, non_member_email)
response = await e2e_client.get(
f"/api/v1/organizations/{org['org_id']}",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert response.status_code == 403
async def test_get_organization_by_slug(self, e2e_client, e2e_org_with_members):
"""Organization can be retrieved by slug."""
org = e2e_org_with_members
response = await e2e_client.get(
f"/api/v1/organizations/slug/{org['org_slug']}",
headers={
"Authorization": f"Bearer {org['owner']['tokens']['access_token']}"
},
)
# May be 200 or 404/403 depending on implementation
assert response.status_code in [200, 403, 404]
class TestOrganizationAdminOperations:
"""Test organization admin operations."""
async def test_admin_list_org_members_with_pagination(
self, e2e_client, e2e_superuser, e2e_org_with_members
):
"""Admin can list org members with pagination."""
org = e2e_org_with_members
response = await e2e_client.get(
f"/api/v1/admin/organizations/{org['org_id']}/members",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
params={"page": 1, "limit": 10},
)
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "pagination" in data
async def test_admin_list_org_members_filter_active(
self, e2e_client, e2e_superuser, e2e_org_with_members
):
"""Admin can filter org members by active status."""
org = e2e_org_with_members
response = await e2e_client.get(
f"/api/v1/admin/organizations/{org['org_id']}/members",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
params={"is_active": True},
)
assert response.status_code == 200

View File

@@ -274,3 +274,78 @@ class TestUserUpdateWorkflows:
)
assert response.status_code == 403
class TestAdminUserListWorkflows:
"""Test admin user list workflows via /users endpoint."""
async def test_superuser_can_list_all_users(self, e2e_client, e2e_superuser):
"""Superuser can list all users via /users endpoint."""
response = await e2e_client.get(
"/api/v1/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "pagination" in data
async def test_regular_user_cannot_list_users(self, e2e_client):
"""Regular users cannot list all users."""
email = f"e2e-{uuid4().hex[:8]}@example.com"
tokens = await register_and_login(e2e_client, email)
response = await e2e_client.get(
"/api/v1/users",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert response.status_code == 403
class TestDeactivatedUserWorkflows:
"""Test workflows involving deactivated users."""
async def test_deactivated_user_cannot_login(self, e2e_client, e2e_superuser):
"""Deactivated users cannot login."""
# Create user
email = f"deactivate-login-{uuid4().hex[:8]}@example.com"
password = "DeactivatePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Deactivate",
"last_name": "Login",
},
)
# Get user ID
list_resp = await e2e_client.get(
"/api/v1/admin/users",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
users = list_resp.json()["data"]
target_user = next(u for u in users if u["email"] == email)
# Deactivate user
await e2e_client.post(
f"/api/v1/admin/users/{target_user['id']}/deactivate",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
# Try to login - should fail
response = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
assert response.status_code in [401, 403]