Add admin session management endpoint and tests

- Introduced `/api/v1/admin/sessions` endpoint to fetch paginated session data for admin monitoring.
- Added `AdminSessionResponse` schema to include user details in session responses.
- Implemented session data retrieval with filtering and pagination in `session_crud`.
- Created comprehensive test suite for session management, covering success, filtering, pagination, and unauthorized access scenarios.
This commit is contained in:
Felipe Cardoso
2025-11-06 20:05:35 +01:00
parent da1f4e365a
commit ff758f5d10
4 changed files with 332 additions and 0 deletions

View File

@@ -19,6 +19,7 @@ from app.core.database import get_db
from app.core.exceptions import NotFoundError, DuplicateError, AuthorizationError, ErrorCode
from app.crud.organization import organization as organization_crud
from app.crud.user import user as user_crud
from app.crud.session import session as session_crud
from app.models.user import User
from app.models.user_organization import OrganizationRole
from app.schemas.common import (
@@ -35,6 +36,7 @@ from app.schemas.organizations import (
OrganizationMemberResponse
)
from app.schemas.users import UserResponse, UserCreate, UserUpdate
from app.schemas.sessions import AdminSessionResponse
logger = logging.getLogger(__name__)
@@ -784,3 +786,82 @@ async def admin_remove_organization_member(
except Exception as e:
logger.error(f"Error removing member from organization (admin): {str(e)}", exc_info=True)
raise
# ============================================================================
# Session Management Endpoints
# ============================================================================
@router.get(
"/sessions",
response_model=PaginatedResponse[AdminSessionResponse],
summary="Admin: List All Sessions",
description="""
List all sessions across all users (admin only).
Returns paginated list of sessions with user information.
Useful for admin dashboard statistics and session monitoring.
""",
operation_id="admin_list_sessions"
)
async def admin_list_sessions(
pagination: PaginationParams = Depends(),
is_active: Optional[bool] = Query(None, description="Filter by active status"),
admin: User = Depends(require_superuser),
db: AsyncSession = Depends(get_db)
) -> Any:
"""List all sessions across all users with filtering and pagination."""
try:
# Get sessions with user info (eager loaded to prevent N+1)
sessions, total = await session_crud.get_all_sessions(
db,
skip=pagination.offset,
limit=pagination.limit,
active_only=is_active if is_active is not None else True,
with_user=True
)
# Build response objects with user information
session_responses = []
for session in sessions:
# Get user full name
user_full_name = None
if session.user.first_name or session.user.last_name:
parts = []
if session.user.first_name:
parts.append(session.user.first_name)
if session.user.last_name:
parts.append(session.user.last_name)
user_full_name = " ".join(parts)
session_response = AdminSessionResponse(
id=session.id,
user_id=session.user_id,
user_email=session.user.email,
user_full_name=user_full_name,
device_name=session.device_name,
device_id=session.device_id,
ip_address=session.ip_address,
location_city=session.location_city,
location_country=session.location_country,
last_used_at=session.last_used_at,
created_at=session.created_at,
expires_at=session.expires_at,
is_active=session.is_active
)
session_responses.append(session_response)
logger.info(f"Admin {admin.email} listed {len(session_responses)} sessions (total: {total})")
pagination_meta = create_pagination_meta(
total=total,
page=pagination.page,
limit=pagination.limit,
items_count=len(session_responses)
)
return PaginatedResponse(data=session_responses, pagination=pagination_meta)
except Exception as e:
logger.error(f"Error listing sessions (admin): {str(e)}", exc_info=True)
raise

View File

@@ -420,6 +420,59 @@ class CRUDSession(CRUDBase[UserSession, SessionCreate, SessionUpdate]):
logger.error(f"Error counting sessions for user {user_id}: {str(e)}")
raise
async def get_all_sessions(
self,
db: AsyncSession,
*,
skip: int = 0,
limit: int = 100,
active_only: bool = True,
with_user: bool = True
) -> tuple[List[UserSession], int]:
"""
Get all sessions across all users with pagination (admin only).
Args:
db: Database session
skip: Number of records to skip
limit: Maximum number of records to return
active_only: If True, return only active sessions
with_user: If True, eager load user relationship to prevent N+1
Returns:
Tuple of (list of UserSession objects, total count)
"""
try:
# Build query
query = select(UserSession)
# Add eager loading if requested to prevent N+1 queries
if with_user:
query = query.options(joinedload(UserSession.user))
if active_only:
query = query.where(UserSession.is_active == True)
# Get total count
count_query = select(func.count(UserSession.id))
if active_only:
count_query = count_query.where(UserSession.is_active == True)
count_result = await db.execute(count_query)
total = count_result.scalar_one()
# Apply pagination and ordering
query = query.order_by(UserSession.last_used_at.desc()).offset(skip).limit(limit)
result = await db.execute(query)
sessions = list(result.scalars().all())
return sessions, total
except Exception as e:
logger.error(f"Error getting all sessions: {str(e)}", exc_info=True)
raise
# Create singleton instance
session = CRUDSession(UserSession)

View File

@@ -110,6 +110,46 @@ class LogoutRequest(BaseModel):
)
class AdminSessionResponse(SessionBase):
"""
Schema for session responses in admin panel.
Includes user information for admin to see who owns each session.
"""
id: UUID
user_id: UUID
user_email: str = Field(..., description="Email of the user who owns this session")
user_full_name: Optional[str] = Field(None, description="Full name of the user")
ip_address: Optional[str] = None
location_city: Optional[str] = None
location_country: Optional[str] = None
last_used_at: datetime
created_at: datetime
expires_at: datetime
is_active: bool
model_config = ConfigDict(
from_attributes=True,
json_schema_extra={
"example": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"user_id": "456e7890-e89b-12d3-a456-426614174001",
"user_email": "user@example.com",
"user_full_name": "John Doe",
"device_name": "iPhone 14",
"device_id": "device-abc-123",
"ip_address": "192.168.1.100",
"location_city": "San Francisco",
"location_country": "United States",
"last_used_at": "2025-10-31T12:00:00Z",
"created_at": "2025-10-30T09:00:00Z",
"expires_at": "2025-11-06T09:00:00Z",
"is_active": True
}
}
)
class DeviceInfo(BaseModel):
"""Device information extracted from request."""
device_name: Optional[str] = None

View File

@@ -9,6 +9,8 @@ from fastapi import status
from app.models.organization import Organization
from app.models.user_organization import UserOrganization, OrganizationRole
from app.models.user_session import UserSession
from datetime import datetime, timezone, timedelta
@pytest_asyncio.fixture
@@ -837,3 +839,159 @@ class TestAdminRemoveOrganizationMember:
)
assert response.status_code == status.HTTP_404_NOT_FOUND
# ===== SESSION MANAGEMENT TESTS =====
class TestAdminListSessions:
"""Tests for admin sessions list endpoint."""
@pytest.mark.asyncio
async def test_admin_list_sessions_success(self, client, async_test_superuser, async_test_user, async_test_db, superuser_token):
"""Test listing all sessions as admin."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create some test sessions
async with AsyncTestingSessionLocal() as session:
now = datetime.now(timezone.utc)
expires_at = now + timedelta(days=7)
session1 = UserSession(
user_id=async_test_user.id,
refresh_token_jti="jti-test-1",
device_name="iPhone 14",
device_id="device-1",
ip_address="192.168.1.100",
user_agent="Mozilla/5.0",
last_used_at=now,
expires_at=expires_at,
is_active=True,
location_city="San Francisco",
location_country="United States"
)
session2 = UserSession(
user_id=async_test_superuser.id,
refresh_token_jti="jti-test-2",
device_name="MacBook Pro",
device_id="device-2",
ip_address="192.168.1.101",
user_agent="Mozilla/5.0",
last_used_at=now,
expires_at=expires_at,
is_active=True
)
session.add_all([session1, session2])
await session.commit()
response = await client.get(
"/api/v1/admin/sessions?page=1&limit=10",
headers={"Authorization": f"Bearer {superuser_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "data" in data
assert "pagination" in data
assert len(data["data"]) >= 2 # At least our 2 test sessions
assert data["pagination"]["total"] >= 2
# Verify session structure includes user info
first_session = data["data"][0]
assert "id" in first_session
assert "user_id" in first_session
assert "user_email" in first_session
assert "device_name" in first_session
assert "ip_address" in first_session
assert "is_active" in first_session
@pytest.mark.asyncio
async def test_admin_list_sessions_filter_active(self, client, async_test_superuser, async_test_user, async_test_db, superuser_token):
"""Test filtering sessions by active status."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create active and inactive sessions
async with AsyncTestingSessionLocal() as session:
now = datetime.now(timezone.utc)
expires_at = now + timedelta(days=7)
active_session = UserSession(
user_id=async_test_user.id,
refresh_token_jti="jti-active",
device_name="Active Device",
ip_address="192.168.1.100",
last_used_at=now,
expires_at=expires_at,
is_active=True
)
inactive_session = UserSession(
user_id=async_test_user.id,
refresh_token_jti="jti-inactive",
device_name="Inactive Device",
ip_address="192.168.1.101",
last_used_at=now,
expires_at=expires_at,
is_active=False
)
session.add_all([active_session, inactive_session])
await session.commit()
# Get only active sessions (default)
response = await client.get(
"/api/v1/admin/sessions?page=1&limit=100",
headers={"Authorization": f"Bearer {superuser_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# All returned sessions should be active
for sess in data["data"]:
assert sess["is_active"] is True
@pytest.mark.asyncio
async def test_admin_list_sessions_pagination(self, client, async_test_superuser, async_test_db, superuser_token):
"""Test pagination of sessions list."""
test_engine, AsyncTestingSessionLocal = async_test_db
# Create multiple sessions
async with AsyncTestingSessionLocal() as session:
now = datetime.now(timezone.utc)
expires_at = now + timedelta(days=7)
sessions = []
for i in range(5):
sess = UserSession(
user_id=async_test_superuser.id,
refresh_token_jti=f"jti-pagination-{i}",
device_name=f"Device {i}",
ip_address=f"192.168.1.{100+i}",
last_used_at=now,
expires_at=expires_at,
is_active=True
)
sessions.append(sess)
session.add_all(sessions)
await session.commit()
# Get first page with limit 2
response = await client.get(
"/api/v1/admin/sessions?page=1&limit=2",
headers={"Authorization": f"Bearer {superuser_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data["data"]) == 2
assert data["pagination"]["page"] == 1
assert data["pagination"]["page_size"] == 2
assert data["pagination"]["total"] >= 5
@pytest.mark.asyncio
async def test_admin_list_sessions_unauthorized(self, client, async_test_user, user_token):
"""Test that non-admin users cannot access admin sessions endpoint."""
response = await client.get(
"/api/v1/admin/sessions?page=1&limit=10",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == status.HTTP_403_FORBIDDEN