From 86f67a925c523d7c5982983a76f8317cb8f3496e Mon Sep 17 00:00:00 2001 From: Felipe Cardoso Date: Fri, 31 Oct 2025 19:02:46 +0100 Subject: [PATCH] Add detailed backend architecture documentation - Created `ARCHITECTURE.md` with an extensive overview of backend design, principles, and project structure. - Documented key architectural layers: API, dependencies, services, CRUD, and data layers. - Included comprehensive guidelines for database architecture, authentication/authorization, error handling, and testing strategy. - Provided examples for each layer, security practices, and performance considerations. - Aimed at improving developer onboarding and ensuring consistent implementation practices. --- backend/docs/ARCHITECTURE.md | 1171 ++++++++++++++++++++ backend/docs/CODING_STANDARDS.md | 877 +++++++++++++++ backend/docs/FEATURE_EXAMPLE.md | 1752 ++++++++++++++++++++++++++++++ 3 files changed, 3800 insertions(+) create mode 100644 backend/docs/ARCHITECTURE.md create mode 100644 backend/docs/CODING_STANDARDS.md create mode 100644 backend/docs/FEATURE_EXAMPLE.md diff --git a/backend/docs/ARCHITECTURE.md b/backend/docs/ARCHITECTURE.md new file mode 100644 index 0000000..a491b7c --- /dev/null +++ b/backend/docs/ARCHITECTURE.md @@ -0,0 +1,1171 @@ +# Architecture Guide + +This document provides a comprehensive overview of the backend architecture, design patterns, and structural organization. + +## Table of Contents + +- [Overview](#overview) +- [Technology Stack](#technology-stack) +- [Project Structure](#project-structure) +- [Layered Architecture](#layered-architecture) +- [Database Architecture](#database-architecture) +- [Authentication & Authorization](#authentication--authorization) +- [Error Handling](#error-handling) +- [API Design](#api-design) +- [Background Jobs](#background-jobs) +- [Testing Strategy](#testing-strategy) +- [Security Architecture](#security-architecture) +- [Performance Considerations](#performance-considerations) + +## Overview + +This FastAPI backend application follows a **clean layered architecture** pattern with clear separation of concerns. The architecture is designed to be: + +- **Scalable**: Can handle growing data and user load +- **Maintainable**: Easy to understand, modify, and extend +- **Testable**: Each layer can be tested independently +- **Secure**: Security built into every layer +- **Type-safe**: Comprehensive type hints throughout + +### Key Architectural Principles + +1. **Separation of Concerns**: Each layer has a single, well-defined responsibility +2. **Dependency Injection**: Dependencies are injected rather than hard-coded +3. **Single Responsibility**: Each module, class, and function does one thing well +4. **Open/Closed Principle**: Open for extension, closed for modification +5. **Interface Segregation**: Clients depend only on interfaces they use +6. **Dependency Inversion**: Depend on abstractions, not concretions + +## Technology Stack + +### Core Framework + +- **FastAPI 0.115.8+**: Modern async web framework + - Automatic OpenAPI documentation + - Built-in data validation + - High performance (based on Starlette and Pydantic) + - Type safety with Python 3.10+ + +- **Uvicorn**: ASGI server for production deployment + - Async request handling + - WebSocket support + - HTTP/2 support + +### Database Layer + +- **SQLAlchemy 2.0+**: ORM and database toolkit + - Supports async operations + - Type-safe query building + - Migration support via Alembic + +- **PostgreSQL**: Primary production database + - ACID compliance + - Advanced indexing + - JSON support + - Full-text search capabilities + +- **Alembic**: Database migration tool + - Version-controlled schema changes + - Automatic migration generation + - Rollback support + +### Data Validation + +- **Pydantic 2.10+**: Data validation using Python type hints + - Fast validation (Rust core) + - Automatic JSON schema generation + - Custom validators + - Type coercion + +### Authentication & Security + +- **python-jose**: JWT token generation and validation + - Cryptographic signing + - Token expiration handling + - Claims validation + +- **passlib + bcrypt**: Password hashing + - Industry-standard bcrypt algorithm + - Configurable cost factor + - Salt generation + +### Additional Features + +- **SlowAPI**: Rate limiting + - Per-IP rate limiting + - Per-route configuration + - Redis backend support (optional) + +- **APScheduler**: Background job scheduling + - Cron-style scheduling + - Interval-based jobs + - Async job support + +- **starlette-csrf**: CSRF protection + - Token-based CSRF prevention + - Cookie-based tokens + +## Project Structure + +``` +backend/ +├── app/ +│ ├── alembic/ # Database migrations +│ │ ├── versions/ # Migration files +│ │ └── env.py # Migration environment +│ │ +│ ├── api/ # API layer +│ │ ├── dependencies/ # Dependency injection +│ │ │ ├── auth.py # Authentication dependencies +│ │ │ └── permissions.py # Authorization dependencies +│ │ ├── routes/ # API endpoints +│ │ │ ├── auth.py # Authentication routes +│ │ │ ├── users.py # User management routes +│ │ │ ├── sessions.py # Session management routes +│ │ │ ├── organizations.py # Organization routes +│ │ │ └── admin.py # Admin routes +│ │ └── main.py # API router aggregation +│ │ +│ ├── core/ # Core functionality +│ │ ├── auth.py # JWT and password utilities +│ │ ├── config.py # Application configuration +│ │ ├── database.py # Database connection +│ │ ├── exceptions.py # Custom exception classes +│ │ └── middleware.py # Custom middleware +│ │ +│ ├── crud/ # Database operations +│ │ ├── base.py # Generic CRUD base class +│ │ ├── user.py # User CRUD operations +│ │ ├── session.py # Session CRUD operations +│ │ └── organization.py # Organization CRUD +│ │ +│ ├── models/ # SQLAlchemy models +│ │ ├── base.py # Base model with mixins +│ │ ├── user.py # User model +│ │ ├── user_session.py # Session tracking model +│ │ ├── organization.py # Organization model +│ │ └── user_organization.py # Many-to-many relationship +│ │ +│ ├── schemas/ # Pydantic schemas +│ │ ├── common.py # Common schemas (pagination, etc.) +│ │ ├── errors.py # Error response schemas +│ │ ├── users.py # User schemas +│ │ ├── sessions.py # Session schemas +│ │ └── organizations.py # Organization schemas +│ │ +│ ├── services/ # Business logic +│ │ ├── auth_service.py # Authentication service +│ │ ├── email_service.py # Email service +│ │ └── session_cleanup.py # Background cleanup +│ │ +│ ├── utils/ # Utility functions +│ │ ├── security.py # Security utilities +│ │ ├── device.py # Device detection +│ │ └── test_utils.py # Testing utilities +│ │ +│ ├── init_db.py # Database initialization +│ └── main.py # Application entry point +│ +├── tests/ # Test suite +│ ├── api/ # Integration tests +│ ├── crud/ # CRUD tests +│ ├── models/ # Model tests +│ ├── services/ # Service tests +│ └── conftest.py # Test configuration +│ +├── docs/ # Documentation +│ ├── ARCHITECTURE.md # This file +│ ├── CODING_STANDARDS.md # Coding standards +│ └── FEATURE_EXAMPLE.md # Feature implementation guide +│ +├── requirements.txt # Python dependencies +├── pytest.ini # Pytest configuration +├── .coveragerc # Coverage configuration +└── alembic.ini # Alembic configuration +``` + +## Layered Architecture + +The application follows a strict 5-layer architecture: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ API Layer (routes/) │ +│ - HTTP endpoints │ +│ - Request/response handling │ +│ - OpenAPI documentation │ +│ - Rate limiting │ +└──────────────────────────┬──────────────────────────────────┘ + │ calls +┌──────────────────────────▼──────────────────────────────────┐ +│ Dependencies (dependencies/) │ +│ - Authentication (get_current_user) │ +│ - Authorization (permission checks) │ +│ - Database session injection │ +│ - Request context │ +└──────────────────────────┬──────────────────────────────────┘ + │ injects +┌──────────────────────────▼──────────────────────────────────┐ +│ Service Layer (services/) │ +│ - Business logic │ +│ - Multi-step operations │ +│ - Cross-cutting concerns │ +│ - External service integration │ +└──────────────────────────┬──────────────────────────────────┘ + │ calls +┌──────────────────────────▼──────────────────────────────────┐ +│ CRUD Layer (crud/) │ +│ - Database operations │ +│ - Query building │ +│ - Transaction management │ +│ - Error handling │ +└──────────────────────────┬──────────────────────────────────┘ + │ uses +┌──────────────────────────▼──────────────────────────────────┐ +│ Data Layer (models/ + schemas/) │ +│ - SQLAlchemy models (database structure) │ +│ - Pydantic schemas (validation) │ +│ - Type definitions │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Layer Details + +#### 1. API Layer (`app/api/routes/`) + +**Responsibility**: Handle HTTP requests and responses + +**Key Functions**: +- Define API endpoints and routes +- Handle request validation via Pydantic schemas +- Return structured responses +- Apply rate limiting +- Generate OpenAPI documentation +- Handle file uploads/downloads + +**Example**: +```python +@router.get( + "/me", + response_model=UserResponse, + summary="Get current user" +) +@limiter.limit("30/minute") +async def get_current_user_info( + request: Request, + current_user: User = Depends(get_current_active_user), + db: Session = Depends(get_db) +) -> UserResponse: + """Get the currently authenticated user's information.""" + return current_user +``` + +**Rules**: +- Should NOT contain business logic +- Should NOT directly perform database operations (use CRUD or services) +- Must validate all input via Pydantic schemas +- Must specify response models +- Should apply appropriate rate limits + +#### 2. Dependencies Layer (`app/api/dependencies/`) + +**Responsibility**: Provide reusable dependency injection functions + +**Key Functions**: +- Authenticate users from JWT tokens +- Check user permissions and roles +- Inject database sessions +- Provide request context + +**Example**: +```python +def get_current_user( + token: str = Depends(oauth2_scheme), + db: Session = Depends(get_db) +) -> User: + """ + Extract and validate user from JWT token. + + Raises: + AuthenticationError: If token is invalid or user not found + """ + try: + payload = decode_access_token(token) + user_id = UUID(payload.get("sub")) + except Exception: + raise AuthenticationError("Invalid authentication credentials") + + user = user_crud.get(db, id=user_id) + if not user: + raise AuthenticationError("User not found") + + return user +``` + +**Rules**: +- Should be pure functions +- Should raise appropriate exceptions +- Should be reusable across multiple routes +- Must handle errors gracefully + +#### 3. Service Layer (`app/services/`) + +**Responsibility**: Implement complex business logic + +**Key Functions**: +- Orchestrate multiple CRUD operations +- Implement business rules +- Handle external service integration +- Coordinate transactions + +**Example**: +```python +class AuthService: + """Authentication service with business logic.""" + + def login( + self, + db: Session, + email: str, + password: str, + request: Request + ) -> dict: + """ + Authenticate user and create session. + + Business logic: + 1. Validate credentials + 2. Create session with device info + 3. Generate tokens + 4. Return tokens and user info + """ + # Validate credentials + user = user_crud.get_by_email(db, email=email) + if not user or not verify_password(password, user.hashed_password): + raise AuthenticationError("Invalid credentials") + + if not user.is_active: + raise AuthenticationError("Account is inactive") + + # Extract device info + device_info = extract_device_info(request) + + # Create session + session = session_crud.create_session( + db, + user_id=user.id, + device_info=device_info + ) + + # Generate tokens + access_token = create_access_token(subject=str(user.id)) + refresh_token = create_refresh_token( + subject=str(user.id), + jti=str(session.refresh_token_jti) + ) + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "user": user + } +``` + +**Rules**: +- Contains business logic, not just data operations +- Can call multiple CRUD operations +- Should handle complex workflows +- Must maintain data consistency +- Should use transactions when needed + +#### 4. CRUD Layer (`app/crud/`) + +**Responsibility**: Database operations and queries + +**Key Functions**: +- Create, read, update, delete operations +- Build database queries +- Handle database errors +- Manage soft deletes +- Implement pagination and filtering + +**Example**: +```python +class CRUDSession(CRUDBase[UserSession, SessionCreate, SessionUpdate]): + """CRUD operations for user sessions.""" + + def get_by_jti(self, db: Session, jti: UUID) -> Optional[UserSession]: + """Get session by refresh token JTI.""" + try: + return ( + db.query(UserSession) + .filter(UserSession.refresh_token_jti == jti) + .first() + ) + except Exception as e: + logger.error(f"Error getting session by JTI: {str(e)}") + return None + + def get_active_by_jti( + self, + db: Session, + jti: UUID + ) -> Optional[UserSession]: + """Get active session by refresh token JTI.""" + session = self.get_by_jti(db, jti=jti) + if session and session.is_active and not session.is_expired: + return session + return None + + def deactivate(self, db: Session, session_id: UUID) -> bool: + """Deactivate a session (logout).""" + try: + session = self.get(db, id=session_id) + if not session: + return False + + session.is_active = False + db.commit() + logger.info(f"Session {session_id} deactivated") + return True + + except Exception as e: + db.rollback() + logger.error(f"Error deactivating session: {str(e)}") + return False +``` + +**Rules**: +- Should NOT contain business logic +- Must handle database exceptions +- Must use parameterized queries (SQLAlchemy does this) +- Should log all database errors +- Must rollback on errors +- Should use soft deletes when possible + +#### 5. Data Layer (`app/models/` + `app/schemas/`) + +**Responsibility**: Define data structures + +##### Models (`app/models/`) + +Database schema definition using SQLAlchemy: + +```python +from app.models.base import Base, UUIDMixin, TimestampMixin + +class User(Base, UUIDMixin, TimestampMixin): + """User model.""" + + __tablename__ = "users" + + email = Column(String(255), unique=True, nullable=False, index=True) + hashed_password = Column(String(255), nullable=False) + is_active = Column(Boolean, default=True, nullable=False) + is_superuser = Column(Boolean, default=False, nullable=False) + deleted_at = Column(DateTime(timezone=True), nullable=True) + + # Relationships + sessions = relationship("UserSession", back_populates="user", cascade="all, delete-orphan") + + # Indexes + __table_args__ = ( + Index("idx_user_email_active", "email", "is_active"), + ) +``` + +##### Schemas (`app/schemas/`) + +Data validation and serialization using Pydantic: + +```python +from pydantic import BaseModel, Field, ConfigDict + +class UserBase(BaseModel): + """Base user schema with common fields.""" + + email: str = Field(..., description="User's email address") + +class UserCreate(UserBase): + """Schema for creating a user.""" + + password: str = Field(..., min_length=8) + +class UserUpdate(UserBase): + """Schema for updating a user.""" + + email: Optional[str] = None + password: Optional[str] = None + +class UserResponse(UserBase): + """Schema for user API responses.""" + + model_config = ConfigDict(from_attributes=True) + + id: UUID + is_active: bool + created_at: datetime +``` + +**Rules**: +- Models define database structure +- Schemas define API contracts +- Never expose sensitive fields (passwords, tokens) +- Use mixins for common fields +- Define appropriate indexes + +## Database Architecture + +### Connection Management + +```python +# app/core/database.py + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +# Connection pooling configuration +engine = create_engine( + DATABASE_URL, + pool_size=20, # Number of persistent connections + max_overflow=50, # Additional connections when pool exhausted + pool_timeout=30, # Seconds to wait for connection + pool_recycle=3600, # Recycle connections after 1 hour + pool_pre_ping=True, # Verify connections before use +) + +SessionLocal = sessionmaker( + autocommit=False, + autoflush=False, + bind=engine +) +``` + +### Session Management + +#### Dependency Injection Pattern + +```python +def get_db() -> Generator[Session, None, None]: + """ + Database session dependency for FastAPI routes. + + Automatically commits on success, rolls back on error. + """ + db = SessionLocal() + try: + yield db + finally: + db.close() + +# Usage in routes +@router.get("/users") +def list_users(db: Session = Depends(get_db)): + return user_crud.get_multi(db) +``` + +#### Context Manager Pattern + +```python +@contextmanager +def transaction_scope() -> Generator[Session, None, None]: + """ + Context manager for database transactions. + + Use for complex operations requiring multiple steps. + Automatically commits on success, rolls back on error. + """ + db = SessionLocal() + try: + yield db + db.commit() + except Exception: + db.rollback() + raise + finally: + db.close() + +# Usage in services +def complex_operation(): + with transaction_scope() as db: + user = user_crud.create(db, obj_in=user_data) + session = session_crud.create(db, session_data) + return user, session +``` + +### Model Mixins + +Common functionality shared across models: + +```python +# app/models/base.py + +class UUIDMixin: + """Add UUID primary key to model.""" + + id = Column( + UUID(as_uuid=True), + primary_key=True, + default=uuid.uuid4, + unique=True, + nullable=False + ) + +class TimestampMixin: + """Add created_at and updated_at timestamps.""" + + created_at = Column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + nullable=False + ) + + updated_at = Column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + onupdate=lambda: datetime.now(timezone.utc), + nullable=True + ) + +# All models inherit both mixins +class User(Base, UUIDMixin, TimestampMixin): + __tablename__ = "users" + # ... +``` + +### Migration System + +Database migrations managed by Alembic: + +```bash +# Create a new migration +alembic revision --autogenerate -m "Add user_sessions table" + +# Apply migrations +alembic upgrade head + +# Rollback one migration +alembic downgrade -1 + +# Show migration history +alembic history +``` + +### Indexing Strategy + +```python +class UserSession(Base): + __tablename__ = "user_sessions" + + # Single-column indexes + user_id = Column(UUID, ForeignKey("users.id"), index=True) + refresh_token_jti = Column(UUID, unique=True, index=True) + + # Composite indexes for common queries + __table_args__ = ( + Index("idx_user_session_active", "user_id", "is_active"), + Index("idx_session_expiry", "expires_at", "is_active"), + ) +``` + +**Indexing Guidelines**: +- Index foreign keys +- Index columns used in WHERE clauses +- Index columns used in JOIN conditions +- Use composite indexes for multi-column queries +- Monitor query performance with EXPLAIN + +## Authentication & Authorization + +### JWT Token System + +Two-token strategy for security: + +```python +# Access Token (short-lived) +access_token = create_access_token( + subject=str(user.id), + additional_claims={"is_superuser": user.is_superuser} +) +# Expiry: 15 minutes +# Used for: API authentication +# Stored: Client-side (memory or secure storage) + +# Refresh Token (long-lived) +refresh_token = create_refresh_token( + subject=str(user.id), + jti=str(session.refresh_token_jti) # Session tracking +) +# Expiry: 7 days +# Used for: Getting new access tokens +# Stored: HttpOnly cookie or secure storage +``` + +### Token Claims + +```python +{ + "sub": "user-uuid-here", # Subject (user ID) + "type": "access", # Token type + "exp": 1234567890, # Expiration timestamp + "iat": 1234567800, # Issued at timestamp + "is_superuser": false, # User role + "jti": "session-uuid-here" # JWT ID (for refresh tokens) +} +``` + +### Authentication Flow + +``` +┌─────────┐ ┌─────────┐ +│ Client │ │ Backend │ +└────┬────┘ └────┬────┘ + │ │ + │ POST /auth/login │ + │ {email, password} │ + │───────────────────────────────────────────────>│ + │ │ + │ Verify credentials + │ Create session + │ Generate tokens + │ │ + │ {access_token, refresh_token, user} │ + │<───────────────────────────────────────────────│ + │ │ + │ GET /api/v1/users/me │ + │ Authorization: Bearer {access_token} │ + │───────────────────────────────────────────────>│ + │ │ + │ Validate token + │ Get user + │ │ + │ {user data} │ + │<───────────────────────────────────────────────│ + │ │ + │ (after 15 minutes) │ + │ POST /auth/refresh │ + │ {refresh_token} │ + │───────────────────────────────────────────────>│ + │ │ + │ Validate refresh token + │ Check session active + │ Generate new tokens + │ │ + │ {access_token, refresh_token} │ + │<───────────────────────────────────────────────│ + │ │ +``` + +### Authorization Patterns + +#### Role-Based Access Control (RBAC) + +```python +# Superuser check +@router.post("/admin/users") +def admin_endpoint( + current_user: User = Depends(get_current_superuser) +): + """Only superusers can access this endpoint.""" + pass + +# Active user check +@router.get("/users/me") +def get_profile( + current_user: User = Depends(get_current_active_user) +): + """Only active users can access this endpoint.""" + pass +``` + +#### Resource Ownership + +```python +@router.delete("/sessions/{session_id}") +def revoke_session( + session_id: UUID, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Users can only revoke their own sessions.""" + session = session_crud.get(db, id=session_id) + + if not session: + raise NotFoundError("Session not found") + + # Check ownership + if session.user_id != current_user.id: + raise AuthorizationError("You can only revoke your own sessions") + + session_crud.deactivate(db, session_id=session_id) + return MessageResponse(success=True, message="Session revoked") +``` + +#### Organization-Based Permissions + +```python +from app.api.dependencies.permissions import require_org_admin + +@router.post("/organizations/{org_id}/members") +def add_member( + org_id: UUID, + member_data: MemberCreate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), + _: None = Depends(require_org_admin(org_id)) # Permission check +): + """Only organization admins can add members.""" + pass +``` + +## Error Handling + +### Exception Hierarchy + +```python +class APIException(Exception): + """Base exception for all API errors.""" + + def __init__( + self, + message: str, + status_code: int, + error_code: str, + field: Optional[str] = None + ): + self.message = message + self.status_code = status_code + self.error_code = error_code + self.field = field + +# Specific exceptions +class AuthenticationError(APIException): + """401 Unauthorized""" + def __init__(self, message: str, error_code: str = "AUTH_001", field: Optional[str] = None): + super().__init__(message, 401, error_code, field) + +class AuthorizationError(APIException): + """403 Forbidden""" + def __init__(self, message: str, error_code: str = "AUTH_002", field: Optional[str] = None): + super().__init__(message, 403, error_code, field) + +class NotFoundError(APIException): + """404 Not Found""" + def __init__(self, message: str, error_code: str = "NOT_001", field: Optional[str] = None): + super().__init__(message, 404, error_code, field) + +class DuplicateError(APIException): + """409 Conflict""" + def __init__(self, message: str, error_code: str = "DUP_001", field: Optional[str] = None): + super().__init__(message, 409, error_code, field) +``` + +### Global Exception Handlers + +Registered in `app/main.py`: + +```python +@app.exception_handler(APIException) +async def api_exception_handler(request: Request, exc: APIException): + """Handle custom API exceptions.""" + return JSONResponse( + status_code=exc.status_code, + content={ + "success": False, + "errors": [ + { + "code": exc.error_code, + "message": exc.message, + "field": exc.field + } + ] + } + ) + +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + """Handle Pydantic validation errors.""" + errors = [] + for error in exc.errors(): + errors.append({ + "code": "VAL_001", + "message": error["msg"], + "field": ".".join(str(x) for x in error["loc"]) + }) + return JSONResponse( + status_code=422, + content={"success": False, "errors": errors} + ) +``` + +### Error Response Format + +All errors follow this structure: + +```json +{ + "success": false, + "errors": [ + { + "code": "AUTH_001", + "message": "Invalid credentials", + "field": "email" + } + ] +} +``` + +## API Design + +### Versioning + +API versioned via URL path: + +```python +# app/api/main.py + +api_router = APIRouter(prefix="/api/v1") + +api_router.include_router(auth_router, tags=["auth"]) +api_router.include_router(users_router, tags=["users"]) +api_router.include_router(sessions_router, tags=["sessions"]) +``` + +### Pagination + +Consistent pagination across all list endpoints: + +```python +# Request +GET /api/v1/users?page=1&limit=20 + +# Response +{ + "data": [...], + "pagination": { + "total": 100, + "page": 1, + "page_size": 20, + "total_pages": 5, + "has_next": true, + "has_prev": false + } +} +``` + +### Rate Limiting + +Applied per-endpoint based on sensitivity: + +```python +# Read operations - 60/minute +@limiter.limit("60/minute") +@router.get("/users") + +# Write operations - 10/minute +@limiter.limit("10/minute") +@router.post("/users") + +# Authentication - 5/minute +@limiter.limit("5/minute") +@router.post("/auth/login") +``` + +## Background Jobs + +### APScheduler Integration + +```python +# app/main.py + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from app.services.session_cleanup import cleanup_expired_sessions + +scheduler = AsyncIOScheduler() + +@app.on_event("startup") +async def startup_event(): + """Start background jobs on application startup.""" + if not settings.IS_TEST: # Don't run in tests + scheduler.add_job( + cleanup_expired_sessions, + "cron", + hour=2, # Run at 2 AM daily + id="cleanup_expired_sessions" + ) + scheduler.start() + logger.info("Background jobs started") + +@app.on_event("shutdown") +async def shutdown_event(): + """Stop background jobs on application shutdown.""" + scheduler.shutdown() +``` + +### Job Implementation + +```python +# app/services/session_cleanup.py + +async def cleanup_expired_sessions(): + """ + Clean up expired sessions. + + Runs daily at 2 AM. Removes sessions expired for more than 30 days. + """ + try: + with transaction_scope() as db: + count = session_crud.cleanup_expired(db, keep_days=30) + logger.info(f"Cleaned up {count} expired sessions") + except Exception as e: + logger.error(f"Error cleaning up sessions: {str(e)}", exc_info=True) +``` + +## Testing Strategy + +### Test Pyramid + +``` + ┌─────────────┐ + │ E2E Tests │ ← Few, high-level + ├─────────────┤ + │Integration │ ← API endpoint tests + │ Tests │ + ├─────────────┤ + │ Unit │ ← CRUD, services, utilities + │ Tests │ + └─────────────┘ +``` + +### Test Database + +Use SQLite in-memory for fast tests: + +```python +# tests/conftest.py + +@pytest.fixture(scope="session") +def test_engine(): + """Create test database engine.""" + return create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False} + ) + +@pytest.fixture +def db_session(test_engine): + """Create a fresh database session for each test.""" + Base.metadata.create_all(bind=test_engine) + Session = sessionmaker(bind=test_engine) + session = Session() + yield session + session.close() + Base.metadata.drop_all(bind=test_engine) +``` + +### Test Coverage + +Aim for 80%+ coverage: + +```bash +# Run tests with coverage +pytest --cov=app --cov-report=html --cov-report=term + +# View coverage report +open htmlcov/index.html +``` + +## Security Architecture + +### Security Headers + +```python +# Content Security Policy +@app.middleware("http") +async def add_security_headers(request: Request, call_next): + response = await call_next(request) + + # CSP + response.headers["Content-Security-Policy"] = "default-src 'self'" + + # Other security headers + response.headers["X-Content-Type-Options"] = "nosniff" + response.headers["X-Frame-Options"] = "DENY" + response.headers["X-XSS-Protection"] = "1; mode=block" + + return response +``` + +### CORS Configuration + +```python +app.add_middleware( + CORSMiddleware, + allow_origins=[ + "http://localhost:3000", + "https://yourdomain.com" + ], + allow_credentials=True, + allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + allow_headers=["*"], + expose_headers=["X-Total-Count"] +) +``` + +### Password Requirements + +- Minimum 8 characters +- Bcrypt hashing with cost factor 12 +- No password history (can be added if needed) + +### Session Security + +- Per-device session tracking +- Automatic session expiration +- Manual session revocation +- Token rotation on refresh + +## Performance Considerations + +### Database Connection Pooling + +- Pool size: 20 connections +- Max overflow: 50 connections +- Connection recycling every hour +- Pre-ping for connection health + +### Query Optimization + +- Eager loading for relationships +- Appropriate indexes on frequently queried columns +- Query result pagination +- Avoid N+1 queries + +### Caching Strategy + +Currently no caching implemented. Consider adding: +- Redis for session storage +- Response caching for read-heavy endpoints +- Query result caching + +### Rate Limiting + +Protects against abuse and DoS attacks: +- IP-based rate limiting +- Per-endpoint limits +- Can be extended with Redis for distributed systems + +## Conclusion + +This architecture provides a solid foundation for a scalable, maintainable, and secure FastAPI application. Key benefits: + +- **Clear separation of concerns**: Each layer has a specific responsibility +- **Type safety**: Comprehensive type hints throughout +- **Security**: Built-in authentication, authorization, and security best practices +- **Testability**: Each layer can be tested independently +- **Maintainability**: Clean code structure and comprehensive documentation +- **Scalability**: Connection pooling, rate limiting, and efficient queries + +For implementation examples, see: +- **Coding Standards**: `backend/docs/CODING_STANDARDS.md` +- **Feature Example**: `backend/docs/FEATURE_EXAMPLE.md` diff --git a/backend/docs/CODING_STANDARDS.md b/backend/docs/CODING_STANDARDS.md new file mode 100644 index 0000000..329f275 --- /dev/null +++ b/backend/docs/CODING_STANDARDS.md @@ -0,0 +1,877 @@ +# Coding Standards + +This document outlines the coding standards and best practices for the FastAPI backend application. + +## Table of Contents + +- [General Principles](#general-principles) +- [Code Organization](#code-organization) +- [Naming Conventions](#naming-conventions) +- [Error Handling](#error-handling) +- [Database Operations](#database-operations) +- [API Endpoints](#api-endpoints) +- [Authentication & Security](#authentication--security) +- [Testing](#testing) +- [Logging](#logging) +- [Documentation](#documentation) + +## General Principles + +### 1. Follow PEP 8 + +All Python code should follow [PEP 8](https://www.python.org/dev/peps/pep-0008/) style guidelines: + +- Use 4 spaces for indentation (never tabs) +- Maximum line length: 88 characters (Black formatter default) +- Two blank lines between top-level functions and classes +- One blank line between methods in a class + +### 2. Type Hints + +Always use type hints for function parameters and return values: + +```python +from typing import Optional, List +from uuid import UUID + +def get_user(db: Session, user_id: UUID) -> Optional[User]: + """Retrieve a user by ID.""" + return db.query(User).filter(User.id == user_id).first() +``` + +### 3. Docstrings + +Use Google-style docstrings for all public functions, classes, and methods: + +```python +def create_user(db: Session, user_in: UserCreate) -> User: + """ + Create a new user in the database. + + Args: + db: Database session + user_in: User creation schema with validated data + + Returns: + The newly created user object + + Raises: + DuplicateError: If user with email already exists + ValidationException: If validation fails + """ + # Implementation +``` + +### 4. Code Formatting + +Use automated formatters: +- **Black**: Code formatting +- **isort**: Import sorting +- **flake8**: Linting + +Run before committing: +```bash +black app tests +isort app tests +flake8 app tests +``` + +## Code Organization + +### Layer Separation + +Follow the 5-layer architecture strictly: + +``` +API Layer (routes/) + ↓ calls +Dependencies (dependencies/) + ↓ injects +Service Layer (services/) + ↓ calls +CRUD Layer (crud/) + ↓ uses +Models & Schemas (models/, schemas/) +``` + +**Rules:** +- Routes should NOT directly call CRUD operations (use services when business logic is needed) +- CRUD operations should NOT contain business logic +- Models should NOT import from higher layers +- Each layer should only depend on the layer directly below it + +### File Organization + +```python +# Standard import order: +# 1. Standard library +import logging +from datetime import datetime +from typing import Optional, List + +# 2. Third-party packages +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session + +# 3. Local application imports +from app.api.dependencies.auth import get_current_user +from app.crud import user_crud +from app.models.user import User +from app.schemas.users import UserResponse, UserCreate +``` + +## Naming Conventions + +### Variables and Functions + +```python +# Use snake_case for variables and functions +user_id = "123" +def get_user_by_email(email: str) -> Optional[User]: + pass + +# Use UPPER_CASE for constants +MAX_LOGIN_ATTEMPTS = 5 +DEFAULT_PAGE_SIZE = 20 +``` + +### Classes + +```python +# Use PascalCase for class names +class UserSession: + pass + +class AuthenticationError(APIException): + pass +``` + +### Database Tables and Columns + +```python +# Use snake_case for table and column names +class User(Base): + __tablename__ = "users" + + first_name = Column(String(100)) + last_name = Column(String(100)) + created_at = Column(DateTime) +``` + +### API Endpoints + +```python +# Use kebab-case for URL paths +@router.get("/user-sessions") +@router.post("/password-reset") +@router.delete("/user-sessions/expired") +``` + +### Files and Directories + +```python +# Use snake_case for file and directory names +user_session.py +auth_service.py +email_service.py +``` + +## Error Handling + +### Use Custom Exceptions + +Always use custom exceptions from `app.core.exceptions`: + +```python +from app.core.exceptions import ( + NotFoundError, + DuplicateError, + AuthenticationError, + AuthorizationError, + ValidationException, + DatabaseError +) + +# Good +if not user: + raise NotFoundError( + message="User not found", + error_code="USER_001", + field="user_id" + ) + +# Bad - Don't use generic exceptions +if not user: + raise ValueError("User not found") +``` + +### Error Handling Pattern + +Always follow this pattern in CRUD operations: + +```python +from sqlalchemy.exc import IntegrityError, OperationalError, DataError + +def create_user(db: Session, user_in: UserCreate) -> User: + """Create a new user.""" + try: + db_user = User(**user_in.model_dump()) + db.add(db_user) + db.commit() + db.refresh(db_user) + logger.info(f"User created: {db_user.id}") + return db_user + + except IntegrityError as e: + db.rollback() + logger.error(f"Integrity error creating user: {str(e)}") + + # Check for specific constraint violations + if "unique constraint" in str(e).lower(): + if "email" in str(e).lower(): + raise DuplicateError( + message="User with this email already exists", + error_code="USER_002", + field="email" + ) + raise DatabaseError(message="Failed to create user") + + except OperationalError as e: + db.rollback() + logger.error(f"Database operational error: {str(e)}", exc_info=True) + raise DatabaseError(message="Database is currently unavailable") + + except DataError as e: + db.rollback() + logger.error(f"Invalid data error: {str(e)}") + raise ValidationException( + message="Invalid data format", + error_code="VAL_001" + ) + + except Exception as e: + db.rollback() + logger.error(f"Unexpected error creating user: {str(e)}", exc_info=True) + raise +``` + +### Error Response Format + +All error responses follow this structure: + +```python +{ + "success": false, + "errors": [ + { + "code": "AUTH_001", + "message": "Invalid credentials", + "field": "email" # Optional + } + ] +} +``` + +## Database Operations + +### Use the CRUD Base Class + +Always inherit from `CRUDBase` for database operations: + +```python +from app.crud.base import CRUDBase +from app.models.user import User +from app.schemas.users import UserCreate, UserUpdate + +class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]): + """CRUD operations for User model.""" + + def get_by_email(self, db: Session, email: str) -> Optional[User]: + """Get user by email address.""" + return db.query(User).filter(User.email == email).first() + +user_crud = CRUDUser(User) +``` + +### Transaction Management + +#### In Routes (Dependency Injection) + +```python +@router.post("/users", response_model=UserResponse) +def create_user( + user_in: UserCreate, + db: Session = Depends(get_db) +): + """ + Create a new user. + + The database session is automatically managed by FastAPI. + Commit on success, rollback on error. + """ + return user_crud.create(db, obj_in=user_in) +``` + +#### In Services (Context Manager) + +```python +from app.core.database import transaction_scope + +def complex_operation(): + """ + Perform multiple database operations atomically. + + The context manager automatically commits on success + or rolls back on error. + """ + with transaction_scope() as db: + user = user_crud.create(db, obj_in=user_data) + session = session_crud.create(db, obj_in=session_data) + return user, session +``` + +### Use Soft Deletes + +Prefer soft deletes over hard deletes for audit trails: + +```python +# Good - Soft delete (sets deleted_at) +user_crud.soft_delete(db, id=user_id) + +# Acceptable only when required - Hard delete +user_crud.remove(db, id=user_id) +``` + +### Query Patterns + +```python +# Always use parameterized queries (SQLAlchemy does this automatically) +# Good +user = db.query(User).filter(User.email == email).first() + +# Bad - Never construct raw SQL with string interpolation +db.execute(f"SELECT * FROM users WHERE email = '{email}'") # SQL INJECTION! + +# For complex queries, use SQLAlchemy query builder +from sqlalchemy import and_, or_ + +active_users = ( + db.query(User) + .filter( + and_( + User.is_active == True, + User.deleted_at.is_(None), + or_( + User.role == "admin", + User.role == "user" + ) + ) + ) + .all() +) +``` + +## API Endpoints + +### Endpoint Structure + +```python +from fastapi import APIRouter, Depends, Request, status +from slowapi import Limiter + +router = APIRouter(prefix="/api/v1/users", tags=["users"]) +limiter = Limiter(key_func=lambda request: request.client.host) + +@router.get( + "/me", + response_model=UserResponse, + summary="Get current user", + description="Retrieve the currently authenticated user's information.", + status_code=status.HTTP_200_OK +) +@limiter.limit("30/minute") +async def get_current_user_info( + request: Request, + current_user: User = Depends(get_current_active_user), + db: Session = Depends(get_db) +) -> UserResponse: + """ + Get current user information. + + Requires authentication. Rate limited to 30 requests per minute. + """ + return current_user +``` + +### Rate Limiting + +Apply appropriate rate limits to all endpoints: + +```python +# Read operations - More permissive +@limiter.limit("60/minute") + +# Write operations - More restrictive +@limiter.limit("10/minute") + +# Sensitive operations - Very restrictive +@limiter.limit("5/minute") + +# Authentication endpoints - Strict +@limiter.limit("5/minute") +``` + +### Response Models + +Always specify response models: + +```python +# Single object +@router.get("/users/{user_id}", response_model=UserResponse) + +# List with pagination +@router.get("/users", response_model=PaginatedResponse[UserResponse]) + +# Message response +@router.delete("/users/{user_id}", response_model=MessageResponse) + +# Generic success message +return MessageResponse(success=True, message="User deleted successfully") +``` + +### Request Validation + +Use Pydantic schemas for request validation: + +```python +from pydantic import Field, field_validator, ConfigDict + +class UserCreate(BaseModel): + """Schema for creating a new user.""" + + model_config = ConfigDict(from_attributes=True) + + email: str = Field( + ..., + description="User's email address", + examples=["user@example.com"] + ) + password: str = Field( + ..., + min_length=8, + description="Password (minimum 8 characters)", + ) + + @field_validator("email") + @classmethod + def validate_email(cls, v: str) -> str: + """Validate email format.""" + if not "@" in v: + raise ValueError("Invalid email format") + return v.lower() +``` + +### Pagination + +Always implement pagination for list endpoints: + +```python +from app.schemas.common import PaginationParams, PaginatedResponse + +@router.get("/users", response_model=PaginatedResponse[UserResponse]) +def list_users( + pagination: PaginationParams = Depends(), + db: Session = Depends(get_db) +): + """ + List all users with pagination. + + Default page size: 20 + Maximum page size: 100 + """ + users, total = user_crud.get_multi_with_total( + db, + skip=pagination.offset, + limit=pagination.limit + ) + return PaginatedResponse(data=users, pagination=pagination.create_meta(total)) +``` + +## Authentication & Security + +### Password Security + +```python +from app.core.auth import get_password_hash, verify_password + +# Always hash passwords before storing +hashed_password = get_password_hash(plain_password) + +# Never log or return passwords +logger.info(f"User {user.email} logged in") # Good +logger.info(f"Password: {password}") # NEVER DO THIS! + +# Use bcrypt with appropriate cost factor (current: 12) +``` + +### JWT Tokens + +```python +from app.core.auth import create_access_token, create_refresh_token + +# Create tokens with appropriate expiry +access_token = create_access_token( + subject=str(user.id), + additional_claims={"is_superuser": user.is_superuser} +) + +# Always include token type in payload +# Access tokens: 15 minutes +# Refresh tokens: 7 days +``` + +### Authorization Checks + +```python +# Use dependency injection for authorization +from app.api.dependencies.auth import ( + get_current_user, + get_current_active_user, + get_current_superuser +) + +# Require authentication +@router.get("/protected") +def protected_route( + current_user: User = Depends(get_current_active_user) +): + pass + +# Require superuser role +@router.post("/admin/users") +def admin_route( + current_user: User = Depends(get_current_superuser) +): + pass + +# Check ownership +def delete_resource( + resource_id: UUID, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + resource = resource_crud.get(db, id=resource_id) + if not resource: + raise NotFoundError("Resource not found") + + if resource.user_id != current_user.id and not current_user.is_superuser: + raise AuthorizationError("You can only delete your own resources") + + resource_crud.remove(db, id=resource_id) +``` + +### Input Validation + +```python +# Always validate and sanitize user input +from pydantic import field_validator + +class UserUpdate(BaseModel): + email: Optional[str] = None + + @field_validator("email") + @classmethod + def validate_email(cls, v: Optional[str]) -> Optional[str]: + if v is not None: + v = v.strip().lower() + if not "@" in v: + raise ValueError("Invalid email format") + return v +``` + +## Testing + +### Test Structure + +Follow the existing test structure: + +``` +tests/ +├── conftest.py # Shared fixtures +├── api/ # Integration tests +│ ├── test_users.py +│ └── test_auth.py +├── crud/ # Unit tests for CRUD +├── models/ # Model tests +└── services/ # Service tests +``` + +### Test Naming Convention + +```python +# Test function names should be descriptive +def test_create_user_with_valid_data(): + """Test creating a user with valid data succeeds.""" + pass + +def test_create_user_with_duplicate_email_raises_error(): + """Test creating a user with duplicate email raises DuplicateError.""" + pass + +def test_get_user_that_does_not_exist_returns_none(): + """Test getting non-existent user returns None.""" + pass +``` + +### Use Fixtures + +```python +import pytest +from app.models.user import User + +@pytest.fixture +def test_user(db_session: Session) -> User: + """Create a test user.""" + user = User( + email="test@example.com", + hashed_password="hashed", + is_active=True + ) + db_session.add(user) + db_session.commit() + db_session.refresh(user) + return user + +def test_get_user(db_session: Session, test_user: User): + """Test retrieving a user by ID.""" + user = user_crud.get(db_session, id=test_user.id) + assert user is not None + assert user.email == test_user.email +``` + +### Test Coverage + +Aim for high test coverage: + +```python +# Test the happy path +def test_create_user_success(): + pass + +# Test error cases +def test_create_user_with_duplicate_email(): + pass + +def test_create_user_with_invalid_email(): + pass + +# Test edge cases +def test_create_user_with_empty_password(): + pass + +# Test authorization +def test_user_cannot_delete_other_users_resources(): + pass + +def test_superuser_can_delete_any_resource(): + pass +``` + +### API Testing Pattern + +```python +from fastapi.testclient import TestClient + +def test_create_user_endpoint(client: TestClient): + """Test POST /api/v1/users endpoint.""" + response = client.post( + "/api/v1/users", + json={ + "email": "newuser@example.com", + "password": "securepassword123" + } + ) + + assert response.status_code == 201 + data = response.json() + assert data["email"] == "newuser@example.com" + assert "password" not in data # Never return password + assert "id" in data +``` + +## Logging + +### Logging Levels + +Use appropriate logging levels: + +```python +import logging + +logger = logging.getLogger(__name__) + +# DEBUG - Detailed diagnostic information +logger.debug(f"Processing user {user_id} with parameters: {params}") + +# INFO - General informational messages +logger.info(f"User {user_id} logged in successfully") + +# WARNING - Warning messages for unexpected but handled situations +logger.warning(f"User {user_id} attempted to access restricted resource") + +# ERROR - Error messages for failures +logger.error(f"Failed to create user: {str(e)}", exc_info=True) + +# CRITICAL - Critical messages for severe failures +logger.critical(f"Database connection lost: {str(e)}") +``` + +### What to Log + +```python +# DO log: +# - User actions (login, logout, resource access) +# - System events (startup, shutdown, scheduled jobs) +# - Errors and exceptions +# - Performance metrics +# - Security events + +logger.info(f"User {user.email} logged in from {ip_address}") +logger.error(f"Failed to send email to {user.email}: {str(e)}", exc_info=True) +logger.warning(f"Rate limit exceeded for IP {ip_address}") + +# DON'T log: +# - Passwords or tokens +# - Sensitive personal information +# - Credit card numbers or payment details +# - Full request/response bodies (may contain sensitive data) + +# Bad examples: +logger.info(f"User password: {password}") # NEVER! +logger.debug(f"Token: {access_token}") # NEVER! +``` + +### Structured Logging + +Use structured logging for better parsing: + +```python +logger.info( + "User action", + extra={ + "user_id": str(user.id), + "action": "login", + "ip_address": request.client.host, + "user_agent": request.headers.get("user-agent") + } +) +``` + +## Documentation + +### Code Comments + +```python +# Use comments to explain WHY, not WHAT +# Good +# Hash password using bcrypt to protect against rainbow table attacks +hashed = get_password_hash(password) + +# Bad - The code already shows what it does +# Get password hash +hashed = get_password_hash(password) + +# Use comments for complex logic +# Calculate the number of days until token expiration, accounting for +# timezone differences and daylight saving time changes +days_until_expiry = (expires_at - now).total_seconds() / 86400 +``` + +### API Documentation + +```python +# Use comprehensive docstrings and FastAPI's automatic documentation + +@router.post( + "/users", + response_model=UserResponse, + status_code=status.HTTP_201_CREATED, + summary="Create a new user", + description=""" + Create a new user account. + + Requirements: + - Email must be unique + - Password must be at least 8 characters + - No authentication required + + Returns the created user object with a generated UUID. + """, + responses={ + 201: {"description": "User created successfully"}, + 400: {"description": "Invalid input data"}, + 409: {"description": "User with email already exists"} + } +) +def create_user(user_in: UserCreate, db: Session = Depends(get_db)): + """Create a new user account.""" + pass +``` + +### Schema Documentation + +```python +from pydantic import BaseModel, Field + +class UserCreate(BaseModel): + """ + Schema for creating a new user. + + This schema is used for user registration and admin user creation. + Passwords are automatically hashed before storage. + """ + + email: str = Field( + ..., + description="User's email address (must be unique)", + examples=["user@example.com"], + json_schema_extra={"format": "email"} + ) + + password: str = Field( + ..., + min_length=8, + description="User's password (minimum 8 characters)", + examples=["SecurePass123!"] + ) + + is_active: bool = Field( + default=True, + description="Whether the user account is active" + ) +``` + +### README Files + +Each major feature or module should have a README explaining: +- Purpose and overview +- Architecture and design decisions +- Setup and configuration +- Usage examples +- API endpoints (if applicable) +- Testing instructions + +Example: `backend/SESSION_IMPLEMENTATION_CONTEXT.md` + +## Summary + +Following these coding standards ensures: +- **Consistency**: Code is uniform across the project +- **Maintainability**: Easy to understand and modify +- **Security**: Best practices prevent vulnerabilities +- **Quality**: High test coverage and error handling +- **Documentation**: Clear and comprehensive docs + +For feature implementation examples, see: +- **Architecture Guide**: `backend/docs/ARCHITECTURE.md` +- **Feature Example**: `backend/docs/FEATURE_EXAMPLE.md` diff --git a/backend/docs/FEATURE_EXAMPLE.md b/backend/docs/FEATURE_EXAMPLE.md new file mode 100644 index 0000000..263bba0 --- /dev/null +++ b/backend/docs/FEATURE_EXAMPLE.md @@ -0,0 +1,1752 @@ +# Feature Implementation Guide + +This guide walks through implementing a complete feature using the **User Session Management** feature as a real-world example. This feature allows users to track their login sessions across multiple devices and manage them individually. + +## Table of Contents + +- [Feature Overview](#feature-overview) +- [Implementation Steps](#implementation-steps) + - [Step 1: Design the Database Model](#step-1-design-the-database-model) + - [Step 2: Create Pydantic Schemas](#step-2-create-pydantic-schemas) + - [Step 3: Implement CRUD Operations](#step-3-implement-crud-operations) + - [Step 4: Create API Endpoints](#step-4-create-api-endpoints) + - [Step 5: Integrate with Existing Features](#step-5-integrate-with-existing-features) + - [Step 6: Add Background Jobs](#step-6-add-background-jobs) + - [Step 7: Write Tests](#step-7-write-tests) + - [Step 8: Create Database Migration](#step-8-create-database-migration) + - [Step 9: Update Documentation](#step-9-update-documentation) +- [Summary](#summary) +- [Best Practices](#best-practices) + +## Feature Overview + +**Feature**: User Session Management + +**Purpose**: Allow users to see where they're logged in and manage sessions across multiple devices. + +**Key Requirements**: +- Track each login session with device information +- Allow users to view all their active sessions +- Enable users to logout from specific devices +- Automatically cleanup expired sessions +- Provide session security and audit trail + +**User Stories**: +1. As a user, I want to see all devices where I'm logged in +2. As a user, I want to logout from a specific device remotely +3. As a user, I want to see when and where my account was accessed +4. As a security-conscious user, I want expired sessions automatically cleaned up + +## Implementation Steps + +### Step 1: Design the Database Model + +**File**: `app/models/user_session.py` + +First, design the database schema. Consider: +- What data needs to be stored? +- What relationships exist? +- What indexes are needed for performance? +- What constraints ensure data integrity? + +#### 1.1 Identify Required Fields + +For session tracking, we need: +- **Identity**: Primary key (UUID) +- **Relationship**: Foreign key to user +- **Session Tracking**: Refresh token identifier (JTI) +- **Device Info**: Device name, ID, IP, user agent +- **Timing**: Created, last used, expiration +- **State**: Active/inactive flag +- **Optional**: Geographic information + +#### 1.2 Create the Model + +```python +""" +User session model for tracking per-device authentication sessions. + +This allows users to: +- See where they're logged in +- Logout from specific devices +- Manage their active sessions +""" +from sqlalchemy import Column, String, Boolean, DateTime, ForeignKey, Index +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import relationship + +from .base import Base, TimestampMixin, UUIDMixin + + +class UserSession(Base, UUIDMixin, TimestampMixin): + """ + Tracks individual user sessions (per-device). + + Each time a user logs in from a device, a new session is created. + Sessions are identified by the refresh token JTI (JWT ID). + """ + __tablename__ = 'user_sessions' + + # Foreign key to user with CASCADE delete + # When user is deleted, all their sessions are deleted + user_id = Column( + UUID(as_uuid=True), + ForeignKey('users.id', ondelete='CASCADE'), + nullable=False, + index=True # Index for fast lookups + ) + + # Refresh token identifier (JWT ID from the refresh token) + # Unique because each session has one refresh token + refresh_token_jti = Column( + String(255), + unique=True, + nullable=False, + index=True # Index for fast lookups during token validation + ) + + # Device information + device_name = Column(String(255), nullable=True) # "iPhone 14", "Chrome on MacBook" + device_id = Column(String(255), nullable=True) # Persistent device identifier + ip_address = Column(String(45), nullable=True) # IPv4 (15) or IPv6 (45 chars) + user_agent = Column(String(500), nullable=True) # Browser/app user agent + + # Session timing + last_used_at = Column(DateTime(timezone=True), nullable=False) + expires_at = Column(DateTime(timezone=True), nullable=False) + + # Session state + is_active = Column(Boolean, default=True, nullable=False, index=True) + + # Geographic information (optional, can be populated from IP) + location_city = Column(String(100), nullable=True) + location_country = Column(String(100), nullable=True) + + # Relationship to user + # back_populates creates bidirectional relationship + user = relationship("User", back_populates="sessions") + + # Composite indexes for performance + # These speed up common queries + __table_args__ = ( + # Index for "get all active sessions for user" + Index('ix_user_sessions_user_active', 'user_id', 'is_active'), + # Index for "find active session by JTI" + Index('ix_user_sessions_jti_active', 'refresh_token_jti', 'is_active'), + ) + + def __repr__(self): + return f"" + + @property + def is_expired(self) -> bool: + """Check if session has expired.""" + from datetime import datetime, timezone + return self.expires_at < datetime.now(timezone.utc) + + def to_dict(self): + """Convert session to dictionary for serialization.""" + return { + 'id': str(self.id), + 'user_id': str(self.user_id), + 'device_name': self.device_name, + 'device_id': self.device_id, + 'ip_address': self.ip_address, + 'last_used_at': self.last_used_at.isoformat() if self.last_used_at else None, + 'expires_at': self.expires_at.isoformat() if self.expires_at else None, + 'is_active': self.is_active, + 'location_city': self.location_city, + 'location_country': self.location_country, + 'created_at': self.created_at.isoformat() if self.created_at else None, + } +``` + +**Key Design Decisions**: + +1. **UUID Primary Key**: Using `UUIDMixin` for globally unique, non-sequential IDs +2. **Timestamps**: Using `TimestampMixin` for automatic created_at/updated_at tracking +3. **Cascade Delete**: `ondelete='CASCADE'` ensures sessions are deleted when user is deleted +4. **Indexes**: + - Single-column indexes on foreign key and unique fields + - Composite indexes for common query patterns +5. **Nullable Fields**: Device info is optional (might not always be available) +6. **String Lengths**: Appropriate sizes based on data type (IPv6 = 45 chars, etc.) +7. **Computed Property**: `is_expired` property for convenient expiration checking +8. **Helper Method**: `to_dict()` for easy serialization + +#### 1.3 Update Related Models + +Don't forget to update the `User` model to include the relationship: + +```python +# In app/models/user.py + +class User(Base, UUIDMixin, TimestampMixin): + # ... existing fields ... + + # Add relationship to sessions + sessions = relationship( + "UserSession", + back_populates="user", + cascade="all, delete-orphan" # Delete sessions when user is deleted + ) +``` + +### Step 2: Create Pydantic Schemas + +**File**: `app/schemas/sessions.py` + +Schemas define the API contract: what data comes in and what goes out. + +#### 2.1 Design Schema Hierarchy + +Follow the standard pattern: + +``` +SessionBase (common fields) + ├── SessionCreate (internal: CRUD operations) + ├── SessionUpdate (internal: CRUD operations) + └── SessionResponse (external: API responses) +``` + +#### 2.2 Implement Schemas + +```python +""" +Pydantic schemas for user session management. +""" +from datetime import datetime +from typing import Optional +from uuid import UUID + +from pydantic import BaseModel, Field, ConfigDict + + +class SessionBase(BaseModel): + """Base schema for user sessions with common fields.""" + device_name: Optional[str] = Field( + None, + max_length=255, + description="Friendly device name" + ) + device_id: Optional[str] = Field( + None, + max_length=255, + description="Persistent device identifier" + ) + + +class SessionCreate(SessionBase): + """ + Schema for creating a new session (internal use). + + Used by CRUD operations, not exposed to API. + Contains all fields needed to create a session. + """ + user_id: UUID + refresh_token_jti: str = Field(..., max_length=255) + ip_address: Optional[str] = Field(None, max_length=45) + user_agent: Optional[str] = Field(None, max_length=500) + last_used_at: datetime + expires_at: datetime + location_city: Optional[str] = Field(None, max_length=100) + location_country: Optional[str] = Field(None, max_length=100) + + +class SessionUpdate(BaseModel): + """ + Schema for updating a session (internal use). + + All fields are optional - only update what's provided. + """ + last_used_at: Optional[datetime] = None + is_active: Optional[bool] = None + refresh_token_jti: Optional[str] = None + expires_at: Optional[datetime] = None + + +class SessionResponse(SessionBase): + """ + Schema for session responses to clients. + + This is what users see when they list their active sessions. + Note: We don't expose sensitive fields like refresh_token_jti. + """ + id: UUID + ip_address: Optional[str] = None + location_city: Optional[str] = None + location_country: Optional[str] = None + last_used_at: datetime + created_at: datetime + expires_at: datetime + is_current: bool = Field( + default=False, + description="Whether this is the current session" + ) + + # Configuration for ORM integration and OpenAPI docs + model_config = ConfigDict( + from_attributes=True, # Enable ORM mode (was orm_mode in Pydantic v1) + json_schema_extra={ + "example": { + "id": "123e4567-e89b-12d3-a456-426614174000", + "device_name": "iPhone 14", + "device_id": "device-abc-123", + "ip_address": "192.168.1.100", + "location_city": "San Francisco", + "location_country": "United States", + "last_used_at": "2025-10-31T12:00:00Z", + "created_at": "2025-10-30T09:00:00Z", + "expires_at": "2025-11-06T09:00:00Z", + "is_current": True + } + } + ) + + +class SessionListResponse(BaseModel): + """Response containing list of sessions with metadata.""" + sessions: list[SessionResponse] + total: int = Field(..., description="Total number of active sessions") + + model_config = ConfigDict( + json_schema_extra={ + "example": { + "sessions": [...], + "total": 3 + } + } + ) + + +class DeviceInfo(BaseModel): + """ + Device information extracted from request. + + Helper schema used internally to pass device info around. + """ + device_name: Optional[str] = None + device_id: Optional[str] = None + ip_address: Optional[str] = None + user_agent: Optional[str] = None + location_city: Optional[str] = None + location_country: Optional[str] = None +``` + +**Key Design Decisions**: + +1. **Separation of Concerns**: + - `SessionCreate` for internal operations (includes all fields) + - `SessionResponse` for external API (only safe fields) +2. **Security**: Never expose sensitive fields like `refresh_token_jti` in API responses +3. **Computed Fields**: `is_current` is computed at runtime, not stored in DB +4. **Field Validation**: Use `Field()` for constraints, descriptions, and examples +5. **OpenAPI Documentation**: `json_schema_extra` provides examples in API docs +6. **Type Safety**: Comprehensive type hints for all fields + +### Step 3: Implement CRUD Operations + +**File**: `app/crud/session.py` + +CRUD layer handles all database operations. No business logic here! + +#### 3.1 Extend the Base CRUD Class + +```python +""" +CRUD operations for user sessions. +""" +from datetime import datetime, timezone, timedelta +from typing import List, Optional +from uuid import UUID +from sqlalchemy.orm import Session +from sqlalchemy import and_ +import logging + +from app.crud.base import CRUDBase +from app.models.user_session import UserSession +from app.schemas.sessions import SessionCreate, SessionUpdate + +logger = logging.getLogger(__name__) + + +class CRUDSession(CRUDBase[UserSession, SessionCreate, SessionUpdate]): + """ + CRUD operations for user sessions. + + Inherits standard operations from CRUDBase: + - get(db, id) - Get by ID + - get_multi(db, skip, limit) - List with pagination + - create(db, obj_in) - Create new session + - update(db, db_obj, obj_in) - Update session + - remove(db, id) - Delete session + """ + + # Custom query methods + # -------------------- + + def get_by_jti(self, db: Session, *, jti: str) -> Optional[UserSession]: + """ + Get session by refresh token JTI. + + Used during token refresh to find the corresponding session. + + Args: + db: Database session + jti: Refresh token JWT ID + + Returns: + UserSession if found, None otherwise + """ + try: + return db.query(UserSession).filter( + UserSession.refresh_token_jti == jti + ).first() + except Exception as e: + logger.error(f"Error getting session by JTI {jti}: {str(e)}") + raise + + def get_active_by_jti(self, db: Session, *, jti: str) -> Optional[UserSession]: + """ + Get active session by refresh token JTI. + + Only returns the session if it's currently active. + + Args: + db: Database session + jti: Refresh token JWT ID + + Returns: + Active UserSession if found, None otherwise + """ + try: + return db.query(UserSession).filter( + and_( + UserSession.refresh_token_jti == jti, + UserSession.is_active == True + ) + ).first() + except Exception as e: + logger.error(f"Error getting active session by JTI {jti}: {str(e)}") + raise + + def get_user_sessions( + self, + db: Session, + *, + user_id: str, + active_only: bool = True + ) -> List[UserSession]: + """ + Get all sessions for a user. + + Args: + db: Database session + user_id: User ID + active_only: If True, return only active sessions + + Returns: + List of UserSession objects, ordered by most recently used + """ + try: + # Convert user_id string to UUID if needed + user_uuid = UUID(user_id) if isinstance(user_id, str) else user_id + + query = db.query(UserSession).filter(UserSession.user_id == user_uuid) + + if active_only: + query = query.filter(UserSession.is_active == True) + + # Order by most recently used first + return query.order_by(UserSession.last_used_at.desc()).all() + except Exception as e: + logger.error(f"Error getting sessions for user {user_id}: {str(e)}") + raise + + # Creation methods + # ---------------- + + def create_session( + self, + db: Session, + *, + obj_in: SessionCreate + ) -> UserSession: + """ + Create a new user session. + + Args: + db: Database session + obj_in: SessionCreate schema with session data + + Returns: + Created UserSession + + Raises: + ValueError: If session creation fails + """ + try: + # Create model instance from schema + db_obj = UserSession( + user_id=obj_in.user_id, + refresh_token_jti=obj_in.refresh_token_jti, + device_name=obj_in.device_name, + device_id=obj_in.device_id, + ip_address=obj_in.ip_address, + user_agent=obj_in.user_agent, + last_used_at=obj_in.last_used_at, + expires_at=obj_in.expires_at, + is_active=True, + location_city=obj_in.location_city, + location_country=obj_in.location_country, + ) + + db.add(db_obj) + db.commit() + db.refresh(db_obj) + + logger.info( + f"Session created for user {obj_in.user_id} from {obj_in.device_name} " + f"(IP: {obj_in.ip_address})" + ) + + return db_obj + + except Exception as e: + db.rollback() + logger.error(f"Error creating session: {str(e)}", exc_info=True) + raise ValueError(f"Failed to create session: {str(e)}") + + # Update methods + # -------------- + + def deactivate(self, db: Session, *, session_id: str) -> Optional[UserSession]: + """ + Deactivate a session (logout from device). + + Args: + db: Database session + session_id: Session UUID + + Returns: + Deactivated UserSession if found, None otherwise + """ + try: + session = self.get(db, id=session_id) + if not session: + logger.warning(f"Session {session_id} not found for deactivation") + return None + + session.is_active = False + db.add(session) + db.commit() + db.refresh(session) + + logger.info( + f"Session {session_id} deactivated for user {session.user_id} " + f"({session.device_name})" + ) + + return session + + except Exception as e: + db.rollback() + logger.error(f"Error deactivating session {session_id}: {str(e)}") + raise + + def deactivate_all_user_sessions( + self, + db: Session, + *, + user_id: str + ) -> int: + """ + Deactivate all active sessions for a user (logout from all devices). + + Uses bulk update for efficiency. + + Args: + db: Database session + user_id: User ID + + Returns: + Number of sessions deactivated + """ + try: + # Convert user_id string to UUID if needed + user_uuid = UUID(user_id) if isinstance(user_id, str) else user_id + + # Bulk update query + count = db.query(UserSession).filter( + and_( + UserSession.user_id == user_uuid, + UserSession.is_active == True + ) + ).update({"is_active": False}) + + db.commit() + + logger.info(f"Deactivated {count} sessions for user {user_id}") + + return count + + except Exception as e: + db.rollback() + logger.error(f"Error deactivating all sessions for user {user_id}: {str(e)}") + raise + + def update_last_used( + self, + db: Session, + *, + session: UserSession + ) -> UserSession: + """ + Update the last_used_at timestamp for a session. + + Called when a refresh token is used. + + Args: + db: Database session + session: UserSession object + + Returns: + Updated UserSession + """ + try: + session.last_used_at = datetime.now(timezone.utc) + db.add(session) + db.commit() + db.refresh(session) + return session + except Exception as e: + db.rollback() + logger.error(f"Error updating last_used for session {session.id}: {str(e)}") + raise + + def update_refresh_token( + self, + db: Session, + *, + session: UserSession, + new_jti: str, + new_expires_at: datetime + ) -> UserSession: + """ + Update session with new refresh token JTI and expiration. + + Called during token refresh (token rotation). + + Args: + db: Database session + session: UserSession object + new_jti: New refresh token JTI + new_expires_at: New expiration datetime + + Returns: + Updated UserSession + """ + try: + session.refresh_token_jti = new_jti + session.expires_at = new_expires_at + session.last_used_at = datetime.now(timezone.utc) + db.add(session) + db.commit() + db.refresh(session) + return session + except Exception as e: + db.rollback() + logger.error(f"Error updating refresh token for session {session.id}: {str(e)}") + raise + + # Cleanup methods + # --------------- + + def cleanup_expired(self, db: Session, *, keep_days: int = 30) -> int: + """ + Clean up expired sessions. + + Deletes sessions that are: + - Expired (expires_at < now) AND inactive + - Older than keep_days (for audit trail) + + Args: + db: Database session + keep_days: Keep inactive sessions for this many days + + Returns: + Number of sessions deleted + """ + try: + cutoff_date = datetime.now(timezone.utc) - timedelta(days=keep_days) + + count = db.query(UserSession).filter( + and_( + UserSession.is_active == False, + UserSession.expires_at < datetime.now(timezone.utc), + UserSession.created_at < cutoff_date + ) + ).delete() + + db.commit() + + if count > 0: + logger.info(f"Cleaned up {count} expired sessions") + + return count + + except Exception as e: + db.rollback() + logger.error(f"Error cleaning up expired sessions: {str(e)}") + raise + + # Utility methods + # --------------- + + def get_user_session_count(self, db: Session, *, user_id: str) -> int: + """ + Get count of active sessions for a user. + + Useful for session limits or security monitoring. + + Args: + db: Database session + user_id: User ID + + Returns: + Number of active sessions + """ + try: + return db.query(UserSession).filter( + and_( + UserSession.user_id == user_id, + UserSession.is_active == True + ) + ).count() + except Exception as e: + logger.error(f"Error counting sessions for user {user_id}: {str(e)}") + raise + + +# Create singleton instance +# This is the instance that will be imported and used throughout the app +session = CRUDSession(UserSession) +``` + +**Key Patterns**: + +1. **Error Handling**: Every method has try/except with rollback +2. **Logging**: Log all significant actions (create, delete, errors) +3. **Type Safety**: Full type hints for parameters and returns +4. **Docstrings**: Document what each method does, args, returns, raises +5. **Bulk Operations**: Use `query().update()` for efficiency when updating many rows +6. **UUID Handling**: Convert string UUIDs to UUID objects when needed +7. **Ordering**: Return results in a logical order (most recent first) +8. **Singleton Pattern**: Create one instance to be imported elsewhere + +### Step 4: Create API Endpoints + +**File**: `app/api/routes/sessions.py` + +API layer handles HTTP requests and responses. + +#### 4.1 Design Endpoints + +For session management, we need: +- `GET /api/v1/sessions/me` - List my sessions +- `DELETE /api/v1/sessions/{session_id}` - Revoke specific session +- `DELETE /api/v1/sessions/me/expired` - Cleanup expired sessions + +#### 4.2 Implement Endpoints + +```python +""" +Session management endpoints. + +Allows users to view and manage their active sessions across devices. +""" +import logging +from typing import Any +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, status, Request +from slowapi import Limiter +from slowapi.util import get_remote_address +from sqlalchemy.orm import Session + +from app.api.dependencies.auth import get_current_user +from app.core.database import get_db +from app.models.user import User +from app.schemas.sessions import SessionResponse, SessionListResponse +from app.schemas.common import MessageResponse +from app.crud.session import session as session_crud +from app.core.exceptions import NotFoundError, AuthorizationError, ErrorCode + +router = APIRouter() +logger = logging.getLogger(__name__) + +# Initialize rate limiter +limiter = Limiter(key_func=get_remote_address) + + +@router.get( + "/me", + response_model=SessionListResponse, + summary="List My Active Sessions", + description=""" + Get a list of all active sessions for the current user. + + This shows where you're currently logged in. + + **Rate Limit**: 30 requests/minute + """, + operation_id="list_my_sessions" +) +@limiter.limit("30/minute") +def list_my_sessions( + request: Request, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +) -> Any: + """ + List all active sessions for the current user. + + Args: + request: FastAPI request object (for rate limiting) + current_user: Current authenticated user (injected) + db: Database session (injected) + + Returns: + SessionListResponse with list of active sessions + """ + try: + # Get all active sessions for user + sessions = session_crud.get_user_sessions( + db, + user_id=str(current_user.id), + active_only=True + ) + + # Convert to response format + session_responses = [] + for idx, s in enumerate(sessions): + session_response = SessionResponse( + id=s.id, + device_name=s.device_name, + device_id=s.device_id, + ip_address=s.ip_address, + location_city=s.location_city, + location_country=s.location_country, + last_used_at=s.last_used_at, + created_at=s.created_at, + expires_at=s.expires_at, + # Mark the most recently used session as current + is_current=(idx == 0) + ) + session_responses.append(session_response) + + logger.info(f"User {current_user.id} listed {len(session_responses)} active sessions") + + return SessionListResponse( + sessions=session_responses, + total=len(session_responses) + ) + + except Exception as e: + logger.error(f"Error listing sessions for user {current_user.id}: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to retrieve sessions" + ) + + +@router.delete( + "/{session_id}", + response_model=MessageResponse, + status_code=status.HTTP_200_OK, + summary="Revoke Specific Session", + description=""" + Revoke a specific session by ID. + + This logs you out from that particular device. + You can only revoke your own sessions. + + **Rate Limit**: 10 requests/minute + """, + operation_id="revoke_session" +) +@limiter.limit("10/minute") +def revoke_session( + request: Request, + session_id: UUID, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +) -> Any: + """ + Revoke a specific session by ID. + + Args: + request: FastAPI request object (for rate limiting) + session_id: UUID of the session to revoke + current_user: Current authenticated user (injected) + db: Database session (injected) + + Returns: + MessageResponse with success message + + Raises: + NotFoundError: If session doesn't exist + AuthorizationError: If session belongs to another user + """ + try: + # Get the session + session = session_crud.get(db, id=str(session_id)) + + if not session: + raise NotFoundError( + message=f"Session {session_id} not found", + error_code=ErrorCode.NOT_FOUND + ) + + # Verify session belongs to current user (authorization check) + if str(session.user_id) != str(current_user.id): + logger.warning( + f"User {current_user.id} attempted to revoke session {session_id} " + f"belonging to user {session.user_id}" + ) + raise AuthorizationError( + message="You can only revoke your own sessions", + error_code=ErrorCode.INSUFFICIENT_PERMISSIONS + ) + + # Deactivate the session + session_crud.deactivate(db, session_id=str(session_id)) + + logger.info( + f"User {current_user.id} revoked session {session_id} " + f"({session.device_name})" + ) + + return MessageResponse( + success=True, + message=f"Session revoked: {session.device_name or 'Unknown device'}" + ) + + except (NotFoundError, AuthorizationError): + # Re-raise custom exceptions (they'll be handled by global handlers) + raise + except Exception as e: + logger.error(f"Error revoking session {session_id}: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to revoke session" + ) + + +@router.delete( + "/me/expired", + response_model=MessageResponse, + status_code=status.HTTP_200_OK, + summary="Cleanup Expired Sessions", + description=""" + Remove expired sessions for the current user. + + This is a cleanup operation to remove old session records. + + **Rate Limit**: 5 requests/minute + """, + operation_id="cleanup_expired_sessions" +) +@limiter.limit("5/minute") +def cleanup_expired_sessions( + request: Request, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +) -> Any: + """ + Cleanup expired sessions for the current user. + + Args: + request: FastAPI request object (for rate limiting) + current_user: Current authenticated user (injected) + db: Database session (injected) + + Returns: + MessageResponse with count of sessions cleaned + """ + try: + from datetime import datetime, timezone + + # Get all sessions for user (including inactive) + all_sessions = session_crud.get_user_sessions( + db, + user_id=str(current_user.id), + active_only=False + ) + + # Delete expired and inactive sessions + deleted_count = 0 + for s in all_sessions: + if not s.is_active and s.expires_at < datetime.now(timezone.utc): + db.delete(s) + deleted_count += 1 + + db.commit() + + logger.info(f"User {current_user.id} cleaned up {deleted_count} expired sessions") + + return MessageResponse( + success=True, + message=f"Cleaned up {deleted_count} expired sessions" + ) + + except Exception as e: + logger.error(f"Error cleaning up sessions for user {current_user.id}: {str(e)}", exc_info=True) + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to cleanup sessions" + ) +``` + +**Key Patterns**: + +1. **Dependency Injection**: + - `current_user = Depends(get_current_user)` - Automatic authentication + - `db = Depends(get_db)` - Database session management + +2. **Rate Limiting**: + - Read operations: Higher limits (30/min) + - Write operations: Lower limits (10/min) + - Cleanup operations: Very restrictive (5/min) + +3. **Authorization**: + - Always check resource ownership + - Log security violations + - Return 403 Forbidden, not 404 Not Found + +4. **Error Handling**: + - Catch and log all errors + - Return user-friendly messages + - Don't expose internal details + +5. **Documentation**: + - OpenAPI summary and description + - Docstrings for code documentation + - Operation IDs for client generation + +6. **Response Models**: + - Always specify `response_model` + - Ensures response validation + - Generates accurate API docs + +#### 4.3 Register Routes + +In `app/api/main.py`: + +```python +from fastapi import APIRouter +from app.api.routes import auth, users, sessions, admin, organizations + +api_router = APIRouter() + +# Include all route modules +api_router.include_router(auth.router, prefix="/auth", tags=["auth"]) +api_router.include_router(users.router, prefix="/users", tags=["users"]) +api_router.include_router(sessions.router, prefix="/sessions", tags=["sessions"]) +api_router.include_router(admin.router, prefix="/admin", tags=["admin"]) +api_router.include_router(organizations.router, prefix="/organizations", tags=["organizations"]) +``` + +In `app/main.py`: + +```python +from app.api.main import api_router + +app = FastAPI(title="My API") + +# Include API router with /api/v1 prefix +app.include_router(api_router, prefix="/api/v1") +``` + +### Step 5: Integrate with Existing Features + +Session management needs to be integrated into the authentication flow. + +#### 5.1 Update Login Endpoint + +**File**: `app/api/routes/auth.py` + +```python +from app.utils.device import extract_device_info +from app.crud.session import session as session_crud +from app.schemas.sessions import SessionCreate + +@router.post("/login") +async def login( + request: Request, + credentials: OAuth2PasswordRequestForm = Depends(), + db: Session = Depends(get_db) +): + """Authenticate user and create session.""" + + # 1. Validate credentials + user = user_crud.get_by_email(db, email=credentials.username) + if not user or not verify_password(credentials.password, user.hashed_password): + raise AuthenticationError("Invalid credentials") + + if not user.is_active: + raise AuthenticationError("Account is inactive") + + # 2. Extract device information from request + device_info = extract_device_info(request) + + # 3. Generate tokens + jti = str(uuid.uuid4()) # Generate JTI for refresh token + access_token = create_access_token(subject=str(user.id)) + refresh_token = create_refresh_token(subject=str(user.id), jti=jti) + + # 4. Create session record + from datetime import datetime, timezone, timedelta + + session_data = SessionCreate( + user_id=user.id, + refresh_token_jti=jti, + device_name=device_info.device_name, + device_id=device_info.device_id, + ip_address=device_info.ip_address, + user_agent=device_info.user_agent, + last_used_at=datetime.now(timezone.utc), + expires_at=datetime.now(timezone.utc) + timedelta(days=7), + location_city=device_info.location_city, + location_country=device_info.location_country, + ) + + session_crud.create_session(db, obj_in=session_data) + + logger.info(f"User {user.email} logged in from {device_info.device_name}") + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "token_type": "bearer", + "user": UserResponse.model_validate(user) + } +``` + +#### 5.2 Create Device Info Utility + +**File**: `app/utils/device.py` + +```python +""" +Device detection utilities. +""" +from typing import Optional +from fastapi import Request +from user_agents import parse + +from app.schemas.sessions import DeviceInfo + + +def extract_device_info(request: Request) -> DeviceInfo: + """ + Extract device information from HTTP request. + + Args: + request: FastAPI Request object + + Returns: + DeviceInfo with extracted information + """ + # Get user agent + user_agent_string = request.headers.get("user-agent", "") + + # Parse user agent + user_agent = parse(user_agent_string) + + # Determine device name + if user_agent.is_mobile: + device_name = f"{user_agent.device.brand} {user_agent.device.model}".strip() + elif user_agent.is_tablet: + device_name = f"{user_agent.device.brand} {user_agent.device.model} Tablet".strip() + else: + device_name = f"{user_agent.browser.family} on {user_agent.os.family}".strip() + + # Get IP address + ip_address = request.client.host if request.client else None + + # Get device ID from custom header (if client provides one) + device_id = request.headers.get("x-device-id") + + return DeviceInfo( + device_name=device_name or "Unknown Device", + device_id=device_id, + ip_address=ip_address, + user_agent=user_agent_string, + location_city=None, # Can be populated with IP geolocation service + location_country=None + ) +``` + +#### 5.3 Update Token Refresh Endpoint + +```python +@router.post("/refresh") +def refresh_token( + refresh_request: RefreshRequest, + db: Session = Depends(get_db) +): + """Refresh access token using refresh token.""" + + try: + # 1. Decode and validate refresh token + payload = decode_token(refresh_request.refresh_token) + + if payload.get("type") != "refresh": + raise AuthenticationError("Invalid token type") + + user_id = UUID(payload.get("sub")) + jti = payload.get("jti") + + # 2. Find and validate session + session = session_crud.get_active_by_jti(db, jti=jti) + + if not session: + raise AuthenticationError("Session not found or expired") + + if session.user_id != user_id: + raise AuthenticationError("Token mismatch") + + # 3. Generate new tokens (token rotation) + new_jti = str(uuid.uuid4()) + new_access_token = create_access_token(subject=str(user_id)) + new_refresh_token = create_refresh_token(subject=str(user_id), jti=new_jti) + + # 4. Update session with new JTI + session_crud.update_refresh_token( + db, + session=session, + new_jti=new_jti, + new_expires_at=datetime.now(timezone.utc) + timedelta(days=7) + ) + + logger.info(f"Tokens refreshed for user {user_id}") + + return { + "access_token": new_access_token, + "refresh_token": new_refresh_token, + "token_type": "bearer" + } + + except Exception as e: + logger.error(f"Token refresh failed: {str(e)}") + raise AuthenticationError("Failed to refresh token") +``` + +#### 5.4 Update Logout Endpoint + +```python +@router.post("/logout") +def logout( + logout_request: LogoutRequest, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """Logout from current device.""" + + try: + # Decode refresh token to get JTI + payload = decode_token(logout_request.refresh_token) + jti = payload.get("jti") + + # Find and deactivate session + session = session_crud.get_by_jti(db, jti=jti) + + if session and session.user_id == current_user.id: + session_crud.deactivate(db, session_id=str(session.id)) + logger.info(f"User {current_user.id} logged out from {session.device_name}") + + return MessageResponse( + success=True, + message="Logged out successfully" + ) + + except Exception as e: + logger.error(f"Logout failed: {str(e)}") + # Even if cleanup fails, return success (user intended to logout) + return MessageResponse(success=True, message="Logged out") +``` + +### Step 6: Add Background Jobs + +**File**: `app/services/session_cleanup.py` + +```python +""" +Background job for cleaning up expired sessions. +""" +import logging +from app.core.database import SessionLocal +from app.crud.session import session as session_crud + +logger = logging.getLogger(__name__) + + +async def cleanup_expired_sessions(): + """ + Clean up expired sessions. + + Runs daily at 2 AM. Removes sessions that are: + - Expired (expires_at < now) + - Inactive (is_active = False) + - Older than 30 days (for audit trail) + """ + db = SessionLocal() + try: + count = session_crud.cleanup_expired(db, keep_days=30) + logger.info(f"Background cleanup: Removed {count} expired sessions") + except Exception as e: + logger.error(f"Error in session cleanup job: {str(e)}", exc_info=True) + finally: + db.close() +``` + +**Register in** `app/main.py`: + +```python +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from app.services.session_cleanup import cleanup_expired_sessions +from app.core.config import settings + +scheduler = AsyncIOScheduler() + +@app.on_event("startup") +async def startup_event(): + """Start background jobs on application startup.""" + if not settings.IS_TEST: # Don't run scheduler in tests + # Schedule cleanup job to run daily at 2 AM + scheduler.add_job( + cleanup_expired_sessions, + "cron", + hour=2, + id="cleanup_expired_sessions" + ) + scheduler.start() + logger.info("Background scheduler started") + +@app.on_event("shutdown") +async def shutdown_event(): + """Stop background jobs on application shutdown.""" + scheduler.shutdown() + logger.info("Background scheduler stopped") +``` + +### Step 7: Write Tests + +**File**: `tests/api/test_session_management.py` + +Write comprehensive tests covering all scenarios. + +```python +""" +Tests for session management endpoints. +""" +import pytest +from datetime import datetime, timezone, timedelta +from uuid import uuid4 + +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session + +from app.models.user import User +from app.models.user_session import UserSession +from app.core.auth import create_access_token, create_refresh_token + + +def test_list_active_sessions( + client: TestClient, + test_user: User, + test_user_token: str, + db_session: Session +): + """Test listing active sessions for current user.""" + + # Create multiple sessions for the user + for i in range(3): + session = UserSession( + user_id=test_user.id, + refresh_token_jti=str(uuid4()), + device_name=f"Device {i}", + ip_address=f"192.168.1.{i}", + last_used_at=datetime.now(timezone.utc), + expires_at=datetime.now(timezone.utc) + timedelta(days=7), + is_active=True + ) + db_session.add(session) + db_session.commit() + + # Make request + response = client.get( + "/api/v1/sessions/me", + headers={"Authorization": f"Bearer {test_user_token}"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["total"] == 3 + assert len(data["sessions"]) == 3 + assert data["sessions"][0]["is_current"] == True # Most recent marked as current + + +def test_revoke_specific_session( + client: TestClient, + test_user: User, + test_user_token: str, + db_session: Session +): + """Test revoking a specific session.""" + + # Create a session + session = UserSession( + user_id=test_user.id, + refresh_token_jti=str(uuid4()), + device_name="Test Device", + ip_address="192.168.1.1", + last_used_at=datetime.now(timezone.utc), + expires_at=datetime.now(timezone.utc) + timedelta(days=7), + is_active=True + ) + db_session.add(session) + db_session.commit() + + # Revoke the session + response = client.delete( + f"/api/v1/sessions/{session.id}", + headers={"Authorization": f"Bearer {test_user_token}"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["success"] == True + + # Verify session is inactive + db_session.refresh(session) + assert session.is_active == False + + +def test_cannot_revoke_other_users_session( + client: TestClient, + test_user: User, + test_user_token: str, + db_session: Session +): + """Test that users cannot revoke other users' sessions.""" + + # Create another user + other_user = User( + email="other@example.com", + hashed_password="hashed", + is_active=True + ) + db_session.add(other_user) + db_session.commit() + + # Create a session for other user + session = UserSession( + user_id=other_user.id, + refresh_token_jti=str(uuid4()), + device_name="Other Device", + ip_address="192.168.1.2", + last_used_at=datetime.now(timezone.utc), + expires_at=datetime.now(timezone.utc) + timedelta(days=7), + is_active=True + ) + db_session.add(session) + db_session.commit() + + # Try to revoke other user's session + response = client.delete( + f"/api/v1/sessions/{session.id}", + headers={"Authorization": f"Bearer {test_user_token}"} + ) + + # Should get 403 Forbidden + assert response.status_code == 403 + data = response.json() + assert data["success"] == False + + +def test_logout_from_one_device_does_not_affect_other( + client: TestClient, + test_user: User, + db_session: Session +): + """ + CRITICAL TEST: Verify multi-device support. + + Logging out from one device should not affect other devices. + """ + + # Create two sessions + jti1 = str(uuid4()) + jti2 = str(uuid4()) + + session1 = UserSession( + user_id=test_user.id, + refresh_token_jti=jti1, + device_name="Device 1", + ip_address="192.168.1.1", + last_used_at=datetime.now(timezone.utc), + expires_at=datetime.now(timezone.utc) + timedelta(days=7), + is_active=True + ) + + session2 = UserSession( + user_id=test_user.id, + refresh_token_jti=jti2, + device_name="Device 2", + ip_address="192.168.1.2", + last_used_at=datetime.now(timezone.utc), + expires_at=datetime.now(timezone.utc) + timedelta(days=7), + is_active=True + ) + + db_session.add_all([session1, session2]) + db_session.commit() + + # Create tokens for device 1 + token1 = create_access_token(subject=str(test_user.id)) + + # Logout from device 1 + response = client.delete( + f"/api/v1/sessions/{session1.id}", + headers={"Authorization": f"Bearer {token1}"} + ) + + assert response.status_code == 200 + + # Verify session 1 is inactive + db_session.refresh(session1) + assert session1.is_active == False + + # Verify session 2 is still active + db_session.refresh(session2) + assert session2.is_active == True + + +def test_cleanup_expired_sessions( + client: TestClient, + test_user: User, + test_user_token: str, + db_session: Session +): + """Test cleanup of expired sessions.""" + + # Create an expired, inactive session + expired_session = UserSession( + user_id=test_user.id, + refresh_token_jti=str(uuid4()), + device_name="Expired Device", + ip_address="192.168.1.1", + last_used_at=datetime.now(timezone.utc) - timedelta(days=10), + expires_at=datetime.now(timezone.utc) - timedelta(days=3), + is_active=False + ) + db_session.add(expired_session) + db_session.commit() + + # Cleanup expired sessions + response = client.delete( + "/api/v1/sessions/me/expired", + headers={"Authorization": f"Bearer {test_user_token}"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["success"] == True + assert "1" in data["message"] # Should have cleaned 1 session +``` + +**Run tests**: + +```bash +pytest tests/api/test_session_management.py -v +``` + +### Step 8: Create Database Migration + +**Generate migration**: + +```bash +alembic revision --autogenerate -m "Add user_sessions table" +``` + +**Review and edit** `app/alembic/versions/xxx_add_user_sessions_table.py`: + +```python +"""Add user_sessions table + +Revision ID: abc123 +Revises: previous_revision +Create Date: 2025-10-31 12:00:00 +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers +revision = 'abc123' +down_revision = 'previous_revision' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Create user_sessions table + op.create_table( + 'user_sessions', + sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('refresh_token_jti', sa.String(255), nullable=False), + sa.Column('device_name', sa.String(255), nullable=True), + sa.Column('device_id', sa.String(255), nullable=True), + sa.Column('ip_address', sa.String(45), nullable=True), + sa.Column('user_agent', sa.String(500), nullable=True), + sa.Column('last_used_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('expires_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('is_active', sa.Boolean(), nullable=False, server_default='true'), + sa.Column('location_city', sa.String(100), nullable=True), + sa.Column('location_country', sa.String(100), nullable=True), + sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()')), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='CASCADE'), + sa.UniqueConstraint('refresh_token_jti') + ) + + # Create indexes + op.create_index('ix_user_sessions_user_id', 'user_sessions', ['user_id']) + op.create_index('ix_user_sessions_jti', 'user_sessions', ['refresh_token_jti']) + op.create_index('ix_user_sessions_is_active', 'user_sessions', ['is_active']) + op.create_index('ix_user_sessions_user_active', 'user_sessions', ['user_id', 'is_active']) + op.create_index('ix_user_sessions_jti_active', 'user_sessions', ['refresh_token_jti', 'is_active']) + + +def downgrade() -> None: + # Drop indexes + op.drop_index('ix_user_sessions_jti_active', 'user_sessions') + op.drop_index('ix_user_sessions_user_active', 'user_sessions') + op.drop_index('ix_user_sessions_is_active', 'user_sessions') + op.drop_index('ix_user_sessions_jti', 'user_sessions') + op.drop_index('ix_user_sessions_user_id', 'user_sessions') + + # Drop table + op.drop_table('user_sessions') +``` + +**Apply migration**: + +```bash +alembic upgrade head +``` + +### Step 9: Update Documentation + +**Create feature documentation**: `backend/SESSION_IMPLEMENTATION_CONTEXT.md` + +Document: +- Feature overview +- Architecture decisions +- Database schema +- API endpoints +- Usage examples +- Testing strategy + +**Update API documentation**: + +The API documentation is auto-generated by FastAPI at `/docs` (Swagger UI) and `/redoc` (ReDoc). + +Ensure all endpoints have: +- Clear summaries +- Detailed descriptions +- Example responses +- Error cases documented + +## Summary + +You've now implemented a complete feature! Here's what was created: + +**Files Created/Modified**: +1. `app/models/user_session.py` - Database model +2. `app/schemas/sessions.py` - Pydantic schemas +3. `app/crud/session.py` - CRUD operations +4. `app/api/routes/sessions.py` - API endpoints +5. `app/utils/device.py` - Device detection utility +6. `app/services/session_cleanup.py` - Background job +7. `app/api/routes/auth.py` - Integration with auth +8. `tests/api/test_session_management.py` - Tests +9. `app/alembic/versions/xxx_add_user_sessions.py` - Migration + +**API Endpoints**: +- `GET /api/v1/sessions/me` - List active sessions +- `DELETE /api/v1/sessions/{id}` - Revoke specific session +- `DELETE /api/v1/sessions/me/expired` - Cleanup expired sessions + +**Database Tables**: +- `user_sessions` - Session tracking with indexes + +**Background Jobs**: +- Daily cleanup of expired sessions at 2 AM + +## Best Practices + +### Do's + +1. **Plan First**: Design database schema and API before coding +2. **Follow Patterns**: Use existing patterns consistently +3. **Type Everything**: Comprehensive type hints everywhere +4. **Document Everything**: Docstrings, comments, and external docs +5. **Test Thoroughly**: Unit tests, integration tests, edge cases +6. **Handle Errors**: Proper exception handling and logging +7. **Security First**: Authorization checks, input validation, rate limiting +8. **Use Transactions**: Rollback on errors, commit on success +9. **Index Strategically**: Index columns used in WHERE and JOIN clauses +10. **Log Appropriately**: Info for actions, errors for failures + +### Don'ts + +1. **Don't Mix Layers**: Keep business logic out of CRUD, database ops out of routes +2. **Don't Expose Internals**: Never return sensitive data in API responses +3. **Don't Trust Input**: Always validate and sanitize user input +4. **Don't Ignore Errors**: Always handle exceptions properly +5. **Don't Skip Tests**: Tests catch bugs early +6. **Don't Hardcode**: Use configuration for environment-specific values +7. **Don't Over-optimize**: Profile before optimizing +8. **Don't Skip Documentation**: Code without docs is hard to maintain +9. **Don't Forget Migration**: Always create and test database migrations +10. **Don't Rush**: Take time to design properly upfront + +### Checklist + +When implementing a new feature, use this checklist: + +- [ ] Design database schema +- [ ] Create SQLAlchemy model +- [ ] Design Pydantic schemas (Create, Update, Response) +- [ ] Implement CRUD operations +- [ ] Create API endpoints +- [ ] Add authentication/authorization +- [ ] Implement rate limiting +- [ ] Add error handling +- [ ] Write comprehensive tests +- [ ] Create database migration +- [ ] Update documentation +- [ ] Test manually via API docs +- [ ] Review security implications +- [ ] Check performance (indexes, queries) +- [ ] Add logging +- [ ] Handle background jobs (if needed) + +--- + +This guide provides a complete reference for implementing features in the FastAPI backend. Use this as a template for new features, adapting the patterns to your specific requirements.