Compare commits

...

2 Commits

Author SHA1 Message Date
Felipe Cardoso
80c26c3df2 Enhance security headers middleware with configurable CSP modes
- Introduced `CSP_MODE` setting with `strict`, `relaxed`, and `disabled` options for flexible content security policies.
- Updated middleware to apply varying CSP rules based on mode and request context (e.g., API docs).
- Improved security and frontend compatibility with detailed policy definitions for external resources.
- Defaulted `CSP_MODE` to `relaxed`, ensuring balanced security and usability in modern applications.
2025-10-31 12:50:34 +01:00
Felipe Cardoso
2d909774df Add organization management and admin-specific APIs
- Introduced schemas for organizations, including creation, updates, and responses.
- Created models for `Organization` and `UserOrganization` with role-based access control and relationships.
- Implemented admin APIs for managing users, organizations, and bulk actions.
- Added advanced filtering, sorting, and pagination for user and organization queries.
- Updated `CRUD` logic to support organization-specific operations and member management.
- Enhanced database with necessary indexes and validation for improved performance and data integrity.
2025-10-31 12:18:43 +01:00
15 changed files with 2033 additions and 4 deletions

View File

@@ -0,0 +1,106 @@
"""add_organizations_and_user_organizations
Revision ID: fbf6318a8a36
Revises: 549b50ea888d
Create Date: 2025-10-31 12:08:05.141353
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'fbf6318a8a36'
down_revision: Union[str, None] = '549b50ea888d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create organizations table
op.create_table(
'organizations',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('slug', sa.String(length=255), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=False, server_default='true'),
sa.Column('settings', sa.JSON(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint('id')
)
# Create indexes for organizations
op.create_index('ix_organizations_name', 'organizations', ['name'])
op.create_index('ix_organizations_slug', 'organizations', ['slug'], unique=True)
op.create_index('ix_organizations_is_active', 'organizations', ['is_active'])
op.create_index('ix_organizations_name_active', 'organizations', ['name', 'is_active'])
op.create_index('ix_organizations_slug_active', 'organizations', ['slug', 'is_active'])
# Create user_organizations junction table
op.create_table(
'user_organizations',
sa.Column('user_id', sa.UUID(), nullable=False),
sa.Column('organization_id', sa.UUID(), nullable=False),
sa.Column('role', sa.Enum('OWNER', 'ADMIN', 'MEMBER', 'GUEST', name='organizationrole'), nullable=False, server_default='MEMBER'),
sa.Column('is_active', sa.Boolean(), nullable=False, server_default='true'),
sa.Column('custom_permissions', sa.String(length=500), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint('user_id', 'organization_id')
)
# Create foreign keys
op.create_foreign_key(
'fk_user_organizations_user_id',
'user_organizations',
'users',
['user_id'],
['id'],
ondelete='CASCADE'
)
op.create_foreign_key(
'fk_user_organizations_organization_id',
'user_organizations',
'organizations',
['organization_id'],
['id'],
ondelete='CASCADE'
)
# Create indexes for user_organizations
op.create_index('ix_user_organizations_role', 'user_organizations', ['role'])
op.create_index('ix_user_organizations_is_active', 'user_organizations', ['is_active'])
op.create_index('ix_user_org_user_active', 'user_organizations', ['user_id', 'is_active'])
op.create_index('ix_user_org_org_active', 'user_organizations', ['organization_id', 'is_active'])
def downgrade() -> None:
# Drop indexes for user_organizations
op.drop_index('ix_user_org_org_active', table_name='user_organizations')
op.drop_index('ix_user_org_user_active', table_name='user_organizations')
op.drop_index('ix_user_organizations_is_active', table_name='user_organizations')
op.drop_index('ix_user_organizations_role', table_name='user_organizations')
# Drop foreign keys
op.drop_constraint('fk_user_organizations_organization_id', 'user_organizations', type_='foreignkey')
op.drop_constraint('fk_user_organizations_user_id', 'user_organizations', type_='foreignkey')
# Drop user_organizations table
op.drop_table('user_organizations')
# Drop indexes for organizations
op.drop_index('ix_organizations_slug_active', table_name='organizations')
op.drop_index('ix_organizations_name_active', table_name='organizations')
op.drop_index('ix_organizations_is_active', table_name='organizations')
op.drop_index('ix_organizations_slug', table_name='organizations')
op.drop_index('ix_organizations_name', table_name='organizations')
# Drop organizations table
op.drop_table('organizations')
# Drop enum type
op.execute('DROP TYPE IF EXISTS organizationrole')

View File

@@ -0,0 +1,188 @@
# app/api/dependencies/permissions.py
"""
Permission checking dependencies for admin and organization-based access control.
These dependencies are optional and flexible:
- Use require_superuser for global admin access
- Use require_org_role for organization-specific access control
- Projects can choose to use these or implement their own permission system
"""
from typing import Optional
from uuid import UUID
from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.user import User
from app.models.user_organization import OrganizationRole
from app.api.dependencies.auth import get_current_user
from app.crud.organization import organization as organization_crud
def require_superuser(
current_user: User = Depends(get_current_user)
) -> User:
"""
Dependency to ensure the current user is a superuser.
Use this for admin-only endpoints that require global access.
Example:
@router.get("/admin/users")
def list_users(admin: User = Depends(require_superuser)):
...
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Superuser privileges required"
)
return current_user
def require_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
Dependency to ensure the current user is active.
Use this for endpoints that require an active account.
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive account"
)
return current_user
class OrganizationPermission:
"""
Factory for organization-based permission checking.
This allows flexible role-based access control within organizations.
Projects can extend this or implement custom permission logic.
"""
def __init__(self, allowed_roles: list[OrganizationRole]):
"""
Initialize with list of allowed roles.
Args:
allowed_roles: List of roles that can access the endpoint
"""
self.allowed_roles = allowed_roles
def __call__(
self,
organization_id: UUID,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> User:
"""
Check if user has required role in the organization.
Args:
organization_id: The organization to check access for
current_user: The authenticated user
db: Database session
Returns:
The current user if they have permission
Raises:
HTTPException: If user lacks permission
"""
# Superusers bypass organization checks
if current_user.is_superuser:
return current_user
# Get user's role in organization
user_role = organization_crud.get_user_role_in_org(
db,
user_id=current_user.id,
organization_id=organization_id
)
if not user_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not a member of this organization"
)
if user_role not in self.allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role {user_role} not authorized. Required: {self.allowed_roles}"
)
return current_user
# Common permission presets for convenience
require_org_owner = OrganizationPermission([OrganizationRole.OWNER])
require_org_admin = OrganizationPermission([OrganizationRole.OWNER, OrganizationRole.ADMIN])
require_org_member = OrganizationPermission([
OrganizationRole.OWNER,
OrganizationRole.ADMIN,
OrganizationRole.MEMBER
])
def get_current_org_role(
organization_id: UUID,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> Optional[OrganizationRole]:
"""
Get the current user's role in an organization.
This is a non-blocking dependency that returns the role or None.
Use this when you want to check permissions conditionally.
Example:
@router.get("/organizations/{org_id}/items")
def list_items(
org_id: UUID,
role: OrganizationRole = Depends(get_current_org_role)
):
if role in [OrganizationRole.OWNER, OrganizationRole.ADMIN]:
# Show admin features
...
"""
if current_user.is_superuser:
return OrganizationRole.OWNER
return organization_crud.get_user_role_in_org(
db,
user_id=current_user.id,
organization_id=organization_id
)
def require_org_membership(
organization_id: UUID,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> User:
"""
Ensure user is a member of the organization (any role).
Use this for endpoints that any organization member can access.
"""
if current_user.is_superuser:
return current_user
user_role = organization_crud.get_user_role_in_org(
db,
user_id=current_user.id,
organization_id=organization_id
)
if not user_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not a member of this organization"
)
return current_user

View File

@@ -1,8 +1,10 @@
from fastapi import APIRouter
from app.api.routes import auth, users, sessions
from app.api.routes import auth, users, sessions, admin, organizations
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["Authentication"])
api_router.include_router(users.router, prefix="/users", tags=["Users"])
api_router.include_router(sessions.router, prefix="/sessions", tags=["Sessions"])
api_router.include_router(admin.router, prefix="/admin", tags=["Admin"])
api_router.include_router(organizations.router, prefix="/organizations", tags=["Organizations"])

View File

@@ -0,0 +1,781 @@
# app/api/routes/admin.py
"""
Admin-specific endpoints for managing users and organizations.
These endpoints require superuser privileges and provide CMS-like functionality
for managing the application.
"""
import logging
from typing import Any, List, Optional
from uuid import UUID
from enum import Enum
from fastapi import APIRouter, Depends, Query, Body, status
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field
from app.api.dependencies.permissions import require_superuser
from app.core.database import get_db
from app.crud.user import user as user_crud
from app.crud.organization import organization as organization_crud
from app.models.user import User
from app.models.user_organization import OrganizationRole
from app.schemas.users import UserResponse, UserCreate, UserUpdate
from app.schemas.organizations import (
OrganizationResponse,
OrganizationCreate,
OrganizationUpdate,
OrganizationMemberResponse
)
from app.schemas.common import (
PaginationParams,
PaginatedResponse,
MessageResponse,
SortParams,
create_pagination_meta
)
from app.core.exceptions import NotFoundError, ErrorCode
logger = logging.getLogger(__name__)
router = APIRouter()
# Schemas for bulk operations
class BulkAction(str, Enum):
"""Supported bulk actions."""
ACTIVATE = "activate"
DEACTIVATE = "deactivate"
DELETE = "delete"
class BulkUserAction(BaseModel):
"""Schema for bulk user actions."""
action: BulkAction = Field(..., description="Action to perform on selected users")
user_ids: List[UUID] = Field(..., min_items=1, max_items=100, description="List of user IDs (max 100)")
class BulkActionResult(BaseModel):
"""Result of a bulk action."""
success: bool
affected_count: int
failed_count: int
message: str
failed_ids: Optional[List[UUID]] = []
# ===== User Management Endpoints =====
@router.get(
"/users",
response_model=PaginatedResponse[UserResponse],
summary="Admin: List All Users",
description="Get paginated list of all users with filtering and search (admin only)",
operation_id="admin_list_users"
)
def admin_list_users(
pagination: PaginationParams = Depends(),
sort: SortParams = Depends(),
is_active: Optional[bool] = Query(None, description="Filter by active status"),
is_superuser: Optional[bool] = Query(None, description="Filter by superuser status"),
search: Optional[str] = Query(None, description="Search by email, name"),
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""
List all users with comprehensive filtering and search.
Requires superuser privileges.
"""
try:
# Build filters
filters = {}
if is_active is not None:
filters["is_active"] = is_active
if is_superuser is not None:
filters["is_superuser"] = is_superuser
# Get users with search
users, total = user_crud.get_multi_with_total(
db,
skip=pagination.offset,
limit=pagination.limit,
sort_by=sort.sort_by or "created_at",
sort_order=sort.sort_order.value if sort.sort_order else "desc",
filters=filters if filters else None,
search=search
)
pagination_meta = create_pagination_meta(
total=total,
page=pagination.page,
limit=pagination.limit,
items_count=len(users)
)
return PaginatedResponse(data=users, pagination=pagination_meta)
except Exception as e:
logger.error(f"Error listing users (admin): {str(e)}", exc_info=True)
raise
@router.post(
"/users",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Admin: Create User",
description="Create a new user (admin only)",
operation_id="admin_create_user"
)
def admin_create_user(
user_in: UserCreate,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""
Create a new user with admin privileges.
Allows setting is_superuser and other fields.
"""
try:
user = user_crud.create(db, obj_in=user_in)
logger.info(f"Admin {admin.email} created user {user.email}")
return user
except ValueError as e:
logger.warning(f"Failed to create user: {str(e)}")
raise NotFoundError(
detail=str(e),
error_code=ErrorCode.USER_ALREADY_EXISTS
)
except Exception as e:
logger.error(f"Error creating user (admin): {str(e)}", exc_info=True)
raise
@router.get(
"/users/{user_id}",
response_model=UserResponse,
summary="Admin: Get User Details",
description="Get detailed user information (admin only)",
operation_id="admin_get_user"
)
def admin_get_user(
user_id: UUID,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Get detailed information about a specific user."""
user = user_crud.get(db, id=user_id)
if not user:
raise NotFoundError(
detail=f"User {user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
return user
@router.put(
"/users/{user_id}",
response_model=UserResponse,
summary="Admin: Update User",
description="Update user information (admin only)",
operation_id="admin_update_user"
)
def admin_update_user(
user_id: UUID,
user_in: UserUpdate,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Update user information with admin privileges."""
try:
user = user_crud.get(db, id=user_id)
if not user:
raise NotFoundError(
detail=f"User {user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
updated_user = user_crud.update(db, db_obj=user, obj_in=user_in)
logger.info(f"Admin {admin.email} updated user {updated_user.email}")
return updated_user
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error updating user (admin): {str(e)}", exc_info=True)
raise
@router.delete(
"/users/{user_id}",
response_model=MessageResponse,
summary="Admin: Delete User",
description="Soft delete a user (admin only)",
operation_id="admin_delete_user"
)
def admin_delete_user(
user_id: UUID,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Soft delete a user (sets deleted_at timestamp)."""
try:
user = user_crud.get(db, id=user_id)
if not user:
raise NotFoundError(
detail=f"User {user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
# Prevent deleting yourself
if user.id == admin.id:
raise NotFoundError(
detail="Cannot delete your own account",
error_code=ErrorCode.OPERATION_FORBIDDEN
)
user_crud.soft_delete(db, id=user_id)
logger.info(f"Admin {admin.email} deleted user {user.email}")
return MessageResponse(
success=True,
message=f"User {user.email} has been deleted"
)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error deleting user (admin): {str(e)}", exc_info=True)
raise
@router.post(
"/users/{user_id}/activate",
response_model=MessageResponse,
summary="Admin: Activate User",
description="Activate a user account (admin only)",
operation_id="admin_activate_user"
)
def admin_activate_user(
user_id: UUID,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Activate a user account."""
try:
user = user_crud.get(db, id=user_id)
if not user:
raise NotFoundError(
detail=f"User {user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
user_crud.update(db, db_obj=user, obj_in={"is_active": True})
logger.info(f"Admin {admin.email} activated user {user.email}")
return MessageResponse(
success=True,
message=f"User {user.email} has been activated"
)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error activating user (admin): {str(e)}", exc_info=True)
raise
@router.post(
"/users/{user_id}/deactivate",
response_model=MessageResponse,
summary="Admin: Deactivate User",
description="Deactivate a user account (admin only)",
operation_id="admin_deactivate_user"
)
def admin_deactivate_user(
user_id: UUID,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Deactivate a user account."""
try:
user = user_crud.get(db, id=user_id)
if not user:
raise NotFoundError(
detail=f"User {user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
# Prevent deactivating yourself
if user.id == admin.id:
raise NotFoundError(
detail="Cannot deactivate your own account",
error_code=ErrorCode.OPERATION_FORBIDDEN
)
user_crud.update(db, db_obj=user, obj_in={"is_active": False})
logger.info(f"Admin {admin.email} deactivated user {user.email}")
return MessageResponse(
success=True,
message=f"User {user.email} has been deactivated"
)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error deactivating user (admin): {str(e)}", exc_info=True)
raise
@router.post(
"/users/bulk-action",
response_model=BulkActionResult,
summary="Admin: Bulk User Action",
description="Perform bulk actions on multiple users (admin only)",
operation_id="admin_bulk_user_action"
)
def admin_bulk_user_action(
bulk_action: BulkUserAction,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""
Perform bulk actions on multiple users.
Supported actions: activate, deactivate, delete
"""
affected_count = 0
failed_count = 0
failed_ids = []
try:
for user_id in bulk_action.user_ids:
try:
user = user_crud.get(db, id=user_id)
if not user:
failed_count += 1
failed_ids.append(user_id)
continue
# Prevent affecting yourself
if user.id == admin.id:
failed_count += 1
failed_ids.append(user_id)
continue
if bulk_action.action == BulkAction.ACTIVATE:
user_crud.update(db, db_obj=user, obj_in={"is_active": True})
elif bulk_action.action == BulkAction.DEACTIVATE:
user_crud.update(db, db_obj=user, obj_in={"is_active": False})
elif bulk_action.action == BulkAction.DELETE:
user_crud.soft_delete(db, id=user_id)
affected_count += 1
except Exception as e:
logger.error(f"Error processing user {user_id} in bulk action: {str(e)}")
failed_count += 1
failed_ids.append(user_id)
logger.info(
f"Admin {admin.email} performed bulk {bulk_action.action.value} "
f"on {affected_count} users ({failed_count} failed)"
)
return BulkActionResult(
success=failed_count == 0,
affected_count=affected_count,
failed_count=failed_count,
message=f"Bulk {bulk_action.action.value}: {affected_count} users affected, {failed_count} failed",
failed_ids=failed_ids if failed_ids else None
)
except Exception as e:
logger.error(f"Error in bulk user action: {str(e)}", exc_info=True)
raise
# ===== Organization Management Endpoints =====
@router.get(
"/organizations",
response_model=PaginatedResponse[OrganizationResponse],
summary="Admin: List Organizations",
description="Get paginated list of all organizations (admin only)",
operation_id="admin_list_organizations"
)
def admin_list_organizations(
pagination: PaginationParams = Depends(),
is_active: Optional[bool] = Query(None, description="Filter by active status"),
search: Optional[str] = Query(None, description="Search by name, slug, description"),
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""List all organizations with filtering and search."""
try:
orgs, total = organization_crud.get_multi_with_filters(
db,
skip=pagination.offset,
limit=pagination.limit,
is_active=is_active,
search=search,
sort_by="created_at",
sort_order="desc"
)
# Add member count to each organization
orgs_with_count = []
for org in orgs:
org_dict = {
"id": org.id,
"name": org.name,
"slug": org.slug,
"description": org.description,
"is_active": org.is_active,
"settings": org.settings,
"created_at": org.created_at,
"updated_at": org.updated_at,
"member_count": organization_crud.get_member_count(db, organization_id=org.id)
}
orgs_with_count.append(OrganizationResponse(**org_dict))
pagination_meta = create_pagination_meta(
total=total,
page=pagination.page,
limit=pagination.limit,
items_count=len(orgs_with_count)
)
return PaginatedResponse(data=orgs_with_count, pagination=pagination_meta)
except Exception as e:
logger.error(f"Error listing organizations (admin): {str(e)}", exc_info=True)
raise
@router.post(
"/organizations",
response_model=OrganizationResponse,
status_code=status.HTTP_201_CREATED,
summary="Admin: Create Organization",
description="Create a new organization (admin only)",
operation_id="admin_create_organization"
)
def admin_create_organization(
org_in: OrganizationCreate,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Create a new organization."""
try:
org = organization_crud.create(db, obj_in=org_in)
logger.info(f"Admin {admin.email} created organization {org.name}")
# Add member count
org_dict = {
"id": org.id,
"name": org.name,
"slug": org.slug,
"description": org.description,
"is_active": org.is_active,
"settings": org.settings,
"created_at": org.created_at,
"updated_at": org.updated_at,
"member_count": 0
}
return OrganizationResponse(**org_dict)
except ValueError as e:
logger.warning(f"Failed to create organization: {str(e)}")
raise NotFoundError(
detail=str(e),
error_code=ErrorCode.ALREADY_EXISTS
)
except Exception as e:
logger.error(f"Error creating organization (admin): {str(e)}", exc_info=True)
raise
@router.get(
"/organizations/{org_id}",
response_model=OrganizationResponse,
summary="Admin: Get Organization Details",
description="Get detailed organization information (admin only)",
operation_id="admin_get_organization"
)
def admin_get_organization(
org_id: UUID,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Get detailed information about a specific organization."""
org = organization_crud.get(db, id=org_id)
if not org:
raise NotFoundError(
detail=f"Organization {org_id} not found",
error_code=ErrorCode.NOT_FOUND
)
org_dict = {
"id": org.id,
"name": org.name,
"slug": org.slug,
"description": org.description,
"is_active": org.is_active,
"settings": org.settings,
"created_at": org.created_at,
"updated_at": org.updated_at,
"member_count": organization_crud.get_member_count(db, organization_id=org.id)
}
return OrganizationResponse(**org_dict)
@router.put(
"/organizations/{org_id}",
response_model=OrganizationResponse,
summary="Admin: Update Organization",
description="Update organization information (admin only)",
operation_id="admin_update_organization"
)
def admin_update_organization(
org_id: UUID,
org_in: OrganizationUpdate,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Update organization information."""
try:
org = organization_crud.get(db, id=org_id)
if not org:
raise NotFoundError(
detail=f"Organization {org_id} not found",
error_code=ErrorCode.NOT_FOUND
)
updated_org = organization_crud.update(db, db_obj=org, obj_in=org_in)
logger.info(f"Admin {admin.email} updated organization {updated_org.name}")
org_dict = {
"id": updated_org.id,
"name": updated_org.name,
"slug": updated_org.slug,
"description": updated_org.description,
"is_active": updated_org.is_active,
"settings": updated_org.settings,
"created_at": updated_org.created_at,
"updated_at": updated_org.updated_at,
"member_count": organization_crud.get_member_count(db, organization_id=updated_org.id)
}
return OrganizationResponse(**org_dict)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error updating organization (admin): {str(e)}", exc_info=True)
raise
@router.delete(
"/organizations/{org_id}",
response_model=MessageResponse,
summary="Admin: Delete Organization",
description="Delete an organization (admin only)",
operation_id="admin_delete_organization"
)
def admin_delete_organization(
org_id: UUID,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Delete an organization and all its relationships."""
try:
org = organization_crud.get(db, id=org_id)
if not org:
raise NotFoundError(
detail=f"Organization {org_id} not found",
error_code=ErrorCode.NOT_FOUND
)
organization_crud.remove(db, id=org_id)
logger.info(f"Admin {admin.email} deleted organization {org.name}")
return MessageResponse(
success=True,
message=f"Organization {org.name} has been deleted"
)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error deleting organization (admin): {str(e)}", exc_info=True)
raise
@router.get(
"/organizations/{org_id}/members",
response_model=PaginatedResponse[OrganizationMemberResponse],
summary="Admin: List Organization Members",
description="Get all members of an organization (admin only)",
operation_id="admin_list_organization_members"
)
def admin_list_organization_members(
org_id: UUID,
pagination: PaginationParams = Depends(),
is_active: Optional[bool] = Query(True, description="Filter by active status"),
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""List all members of an organization."""
try:
org = organization_crud.get(db, id=org_id)
if not org:
raise NotFoundError(
detail=f"Organization {org_id} not found",
error_code=ErrorCode.NOT_FOUND
)
members, total = organization_crud.get_organization_members(
db,
organization_id=org_id,
skip=pagination.offset,
limit=pagination.limit,
is_active=is_active
)
# Convert to response models
member_responses = [OrganizationMemberResponse(**member) for member in members]
pagination_meta = create_pagination_meta(
total=total,
page=pagination.page,
limit=pagination.limit,
items_count=len(member_responses)
)
return PaginatedResponse(data=member_responses, pagination=pagination_meta)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error listing organization members (admin): {str(e)}", exc_info=True)
raise
class AddMemberRequest(BaseModel):
"""Request to add a member to an organization."""
user_id: UUID = Field(..., description="User ID to add")
role: OrganizationRole = Field(OrganizationRole.MEMBER, description="Role in organization")
@router.post(
"/organizations/{org_id}/members",
response_model=MessageResponse,
summary="Admin: Add Member to Organization",
description="Add a user to an organization (admin only)",
operation_id="admin_add_organization_member"
)
def admin_add_organization_member(
org_id: UUID,
request: AddMemberRequest,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Add a user to an organization."""
try:
org = organization_crud.get(db, id=org_id)
if not org:
raise NotFoundError(
detail=f"Organization {org_id} not found",
error_code=ErrorCode.NOT_FOUND
)
user = user_crud.get(db, id=request.user_id)
if not user:
raise NotFoundError(
detail=f"User {request.user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
organization_crud.add_user(
db,
organization_id=org_id,
user_id=request.user_id,
role=request.role
)
logger.info(
f"Admin {admin.email} added user {user.email} to organization {org.name} "
f"with role {request.role.value}"
)
return MessageResponse(
success=True,
message=f"User {user.email} added to organization {org.name}"
)
except ValueError as e:
logger.warning(f"Failed to add user to organization: {str(e)}")
raise NotFoundError(detail=str(e), error_code=ErrorCode.ALREADY_EXISTS)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error adding member to organization (admin): {str(e)}", exc_info=True)
raise
@router.delete(
"/organizations/{org_id}/members/{user_id}",
response_model=MessageResponse,
summary="Admin: Remove Member from Organization",
description="Remove a user from an organization (admin only)",
operation_id="admin_remove_organization_member"
)
def admin_remove_organization_member(
org_id: UUID,
user_id: UUID,
admin: User = Depends(require_superuser),
db: Session = Depends(get_db)
) -> Any:
"""Remove a user from an organization."""
try:
org = organization_crud.get(db, id=org_id)
if not org:
raise NotFoundError(
detail=f"Organization {org_id} not found",
error_code=ErrorCode.NOT_FOUND
)
user = user_crud.get(db, id=user_id)
if not user:
raise NotFoundError(
detail=f"User {user_id} not found",
error_code=ErrorCode.USER_NOT_FOUND
)
success = organization_crud.remove_user(
db,
organization_id=org_id,
user_id=user_id
)
if not success:
raise NotFoundError(
detail="User is not a member of this organization",
error_code=ErrorCode.NOT_FOUND
)
logger.info(f"Admin {admin.email} removed user {user.email} from organization {org.name}")
return MessageResponse(
success=True,
message=f"User {user.email} removed from organization {org.name}"
)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error removing member from organization (admin): {str(e)}", exc_info=True)
raise

View File

@@ -0,0 +1,226 @@
# app/api/routes/organizations.py
"""
Organization endpoints for regular users.
These endpoints allow users to view and manage organizations they belong to.
"""
import logging
from typing import Any, List, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query, status
from sqlalchemy.orm import Session
from app.api.dependencies.auth import get_current_user
from app.api.dependencies.permissions import require_org_admin, require_org_membership, get_current_org_role
from app.core.database import get_db
from app.crud.organization import organization as organization_crud
from app.models.user import User
from app.models.user_organization import OrganizationRole
from app.schemas.organizations import (
OrganizationResponse,
OrganizationMemberResponse,
OrganizationUpdate
)
from app.schemas.common import (
PaginationParams,
PaginatedResponse,
MessageResponse,
create_pagination_meta
)
from app.core.exceptions import NotFoundError, AuthorizationError, ErrorCode
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get(
"/me",
response_model=List[OrganizationResponse],
summary="Get My Organizations",
description="Get all organizations the current user belongs to",
operation_id="get_my_organizations"
)
def get_my_organizations(
is_active: bool = Query(True, description="Filter by active membership"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> Any:
"""
Get all organizations the current user belongs to.
Returns organizations with member count for each.
"""
try:
orgs = organization_crud.get_user_organizations(
db,
user_id=current_user.id,
is_active=is_active
)
# Add member count and role to each organization
orgs_with_data = []
for org in orgs:
role = organization_crud.get_user_role_in_org(
db,
user_id=current_user.id,
organization_id=org.id
)
org_dict = {
"id": org.id,
"name": org.name,
"slug": org.slug,
"description": org.description,
"is_active": org.is_active,
"settings": org.settings,
"created_at": org.created_at,
"updated_at": org.updated_at,
"member_count": organization_crud.get_member_count(db, organization_id=org.id)
}
orgs_with_data.append(OrganizationResponse(**org_dict))
return orgs_with_data
except Exception as e:
logger.error(f"Error getting user organizations: {str(e)}", exc_info=True)
raise
@router.get(
"/{organization_id}",
response_model=OrganizationResponse,
summary="Get Organization Details",
description="Get details of an organization the user belongs to",
operation_id="get_organization"
)
def get_organization(
organization_id: UUID,
current_user: User = Depends(require_org_membership),
db: Session = Depends(get_db)
) -> Any:
"""
Get details of a specific organization.
User must be a member of the organization.
"""
try:
org = organization_crud.get(db, id=organization_id)
if not org:
raise NotFoundError(
detail=f"Organization {organization_id} not found",
error_code=ErrorCode.NOT_FOUND
)
org_dict = {
"id": org.id,
"name": org.name,
"slug": org.slug,
"description": org.description,
"is_active": org.is_active,
"settings": org.settings,
"created_at": org.created_at,
"updated_at": org.updated_at,
"member_count": organization_crud.get_member_count(db, organization_id=org.id)
}
return OrganizationResponse(**org_dict)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error getting organization: {str(e)}", exc_info=True)
raise
@router.get(
"/{organization_id}/members",
response_model=PaginatedResponse[OrganizationMemberResponse],
summary="Get Organization Members",
description="Get all members of an organization (members can view)",
operation_id="get_organization_members"
)
def get_organization_members(
organization_id: UUID,
pagination: PaginationParams = Depends(),
is_active: bool = Query(True, description="Filter by active status"),
current_user: User = Depends(require_org_membership),
db: Session = Depends(get_db)
) -> Any:
"""
Get all members of an organization.
User must be a member of the organization to view members.
"""
try:
members, total = organization_crud.get_organization_members(
db,
organization_id=organization_id,
skip=pagination.offset,
limit=pagination.limit,
is_active=is_active
)
member_responses = [OrganizationMemberResponse(**member) for member in members]
pagination_meta = create_pagination_meta(
total=total,
page=pagination.page,
limit=pagination.limit,
items_count=len(member_responses)
)
return PaginatedResponse(data=member_responses, pagination=pagination_meta)
except Exception as e:
logger.error(f"Error getting organization members: {str(e)}", exc_info=True)
raise
@router.put(
"/{organization_id}",
response_model=OrganizationResponse,
summary="Update Organization",
description="Update organization details (admin/owner only)",
operation_id="update_organization"
)
def update_organization(
organization_id: UUID,
org_in: OrganizationUpdate,
current_user: User = Depends(require_org_admin),
db: Session = Depends(get_db)
) -> Any:
"""
Update organization details.
Requires owner or admin role in the organization.
"""
try:
org = organization_crud.get(db, id=organization_id)
if not org:
raise NotFoundError(
detail=f"Organization {organization_id} not found",
error_code=ErrorCode.NOT_FOUND
)
updated_org = organization_crud.update(db, db_obj=org, obj_in=org_in)
logger.info(f"User {current_user.email} updated organization {updated_org.name}")
org_dict = {
"id": updated_org.id,
"name": updated_org.name,
"slug": updated_org.slug,
"description": updated_org.description,
"is_active": updated_org.is_active,
"settings": updated_org.settings,
"created_at": updated_org.created_at,
"updated_at": updated_org.updated_at,
"member_count": organization_crud.get_member_count(db, organization_id=updated_org.id)
}
return OrganizationResponse(**org_dict)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error updating organization: {str(e)}", exc_info=True)
raise

View File

@@ -15,6 +15,15 @@ class Settings(BaseSettings):
description="Environment: development, staging, or production"
)
# Security: Content Security Policy
# Set to False to disable CSP entirely (not recommended)
# Set to True for strict CSP (blocks most external resources)
# Set to "relaxed" for modern frontend development
CSP_MODE: str = Field(
default="relaxed",
description="CSP mode: 'strict', 'relaxed', or 'disabled'"
)
# Database configuration
POSTGRES_USER: str = "postgres"
POSTGRES_PASSWORD: str = "postgres"

View File

@@ -0,0 +1,6 @@
# app/crud/__init__.py
from .user import user
from .session import session as session_crud
from .organization import organization
__all__ = ["user", "session_crud", "organization"]

View File

@@ -0,0 +1,322 @@
# app/crud/organization.py
from typing import Optional, List, Dict, Any, Union
from uuid import UUID
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from sqlalchemy import func, or_, and_
from app.crud.base import CRUDBase
from app.models.organization import Organization
from app.models.user_organization import UserOrganization, OrganizationRole
from app.models.user import User
from app.schemas.organizations import (
OrganizationCreate,
OrganizationUpdate,
UserOrganizationCreate,
UserOrganizationUpdate
)
import logging
logger = logging.getLogger(__name__)
class CRUDOrganization(CRUDBase[Organization, OrganizationCreate, OrganizationUpdate]):
"""CRUD operations for Organization model."""
def get_by_slug(self, db: Session, *, slug: str) -> Optional[Organization]:
"""Get organization by slug."""
return db.query(Organization).filter(Organization.slug == slug).first()
def create(self, db: Session, *, obj_in: OrganizationCreate) -> Organization:
"""Create a new organization with error handling."""
try:
db_obj = Organization(
name=obj_in.name,
slug=obj_in.slug,
description=obj_in.description,
is_active=obj_in.is_active,
settings=obj_in.settings or {}
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
except IntegrityError as e:
db.rollback()
error_msg = str(e.orig) if hasattr(e, 'orig') else str(e)
if "slug" in error_msg.lower():
logger.warning(f"Duplicate slug attempted: {obj_in.slug}")
raise ValueError(f"Organization with slug '{obj_in.slug}' already exists")
logger.error(f"Integrity error creating organization: {error_msg}")
raise ValueError(f"Database integrity error: {error_msg}")
except Exception as e:
db.rollback()
logger.error(f"Unexpected error creating organization: {str(e)}", exc_info=True)
raise
def get_multi_with_filters(
self,
db: Session,
*,
skip: int = 0,
limit: int = 100,
is_active: Optional[bool] = None,
search: Optional[str] = None,
sort_by: str = "created_at",
sort_order: str = "desc"
) -> tuple[List[Organization], int]:
"""
Get multiple organizations with filtering, searching, and sorting.
Returns:
Tuple of (organizations list, total count)
"""
query = db.query(Organization)
# Apply filters
if is_active is not None:
query = query.filter(Organization.is_active == is_active)
if search:
search_filter = or_(
Organization.name.ilike(f"%{search}%"),
Organization.slug.ilike(f"%{search}%"),
Organization.description.ilike(f"%{search}%")
)
query = query.filter(search_filter)
# Get total count before pagination
total = query.count()
# Apply sorting
sort_column = getattr(Organization, sort_by, Organization.created_at)
if sort_order == "desc":
query = query.order_by(sort_column.desc())
else:
query = query.order_by(sort_column.asc())
# Apply pagination
organizations = query.offset(skip).limit(limit).all()
return organizations, total
def get_member_count(self, db: Session, *, organization_id: UUID) -> int:
"""Get the count of active members in an organization."""
return db.query(func.count(UserOrganization.user_id)).filter(
and_(
UserOrganization.organization_id == organization_id,
UserOrganization.is_active == True
)
).scalar() or 0
def add_user(
self,
db: Session,
*,
organization_id: UUID,
user_id: UUID,
role: OrganizationRole = OrganizationRole.MEMBER,
custom_permissions: Optional[str] = None
) -> UserOrganization:
"""Add a user to an organization with a specific role."""
try:
# Check if relationship already exists
existing = db.query(UserOrganization).filter(
and_(
UserOrganization.user_id == user_id,
UserOrganization.organization_id == organization_id
)
).first()
if existing:
# Reactivate if inactive, or raise error if already active
if not existing.is_active:
existing.is_active = True
existing.role = role
existing.custom_permissions = custom_permissions
db.commit()
db.refresh(existing)
return existing
else:
raise ValueError("User is already a member of this organization")
# Create new relationship
user_org = UserOrganization(
user_id=user_id,
organization_id=organization_id,
role=role,
is_active=True,
custom_permissions=custom_permissions
)
db.add(user_org)
db.commit()
db.refresh(user_org)
return user_org
except IntegrityError as e:
db.rollback()
logger.error(f"Integrity error adding user to organization: {str(e)}")
raise ValueError("Failed to add user to organization")
except Exception as e:
db.rollback()
logger.error(f"Error adding user to organization: {str(e)}", exc_info=True)
raise
def remove_user(
self,
db: Session,
*,
organization_id: UUID,
user_id: UUID
) -> bool:
"""Remove a user from an organization (soft delete)."""
try:
user_org = db.query(UserOrganization).filter(
and_(
UserOrganization.user_id == user_id,
UserOrganization.organization_id == organization_id
)
).first()
if not user_org:
return False
user_org.is_active = False
db.commit()
return True
except Exception as e:
db.rollback()
logger.error(f"Error removing user from organization: {str(e)}", exc_info=True)
raise
def update_user_role(
self,
db: Session,
*,
organization_id: UUID,
user_id: UUID,
role: OrganizationRole,
custom_permissions: Optional[str] = None
) -> Optional[UserOrganization]:
"""Update a user's role in an organization."""
try:
user_org = db.query(UserOrganization).filter(
and_(
UserOrganization.user_id == user_id,
UserOrganization.organization_id == organization_id
)
).first()
if not user_org:
return None
user_org.role = role
if custom_permissions is not None:
user_org.custom_permissions = custom_permissions
db.commit()
db.refresh(user_org)
return user_org
except Exception as e:
db.rollback()
logger.error(f"Error updating user role: {str(e)}", exc_info=True)
raise
def get_organization_members(
self,
db: Session,
*,
organization_id: UUID,
skip: int = 0,
limit: int = 100,
is_active: bool = True
) -> tuple[List[Dict[str, Any]], int]:
"""
Get members of an organization with user details.
Returns:
Tuple of (members list with user details, total count)
"""
query = db.query(UserOrganization, User).join(
User, UserOrganization.user_id == User.id
).filter(UserOrganization.organization_id == organization_id)
if is_active is not None:
query = query.filter(UserOrganization.is_active == is_active)
total = query.count()
results = query.order_by(UserOrganization.created_at.desc()).offset(skip).limit(limit).all()
members = []
for user_org, user in results:
members.append({
"user_id": user.id,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"role": user_org.role,
"is_active": user_org.is_active,
"joined_at": user_org.created_at
})
return members, total
def get_user_organizations(
self,
db: Session,
*,
user_id: UUID,
is_active: bool = True
) -> List[Organization]:
"""Get all organizations a user belongs to."""
query = db.query(Organization).join(
UserOrganization, Organization.id == UserOrganization.organization_id
).filter(UserOrganization.user_id == user_id)
if is_active is not None:
query = query.filter(UserOrganization.is_active == is_active)
return query.all()
def get_user_role_in_org(
self,
db: Session,
*,
user_id: UUID,
organization_id: UUID
) -> Optional[OrganizationRole]:
"""Get a user's role in a specific organization."""
user_org = db.query(UserOrganization).filter(
and_(
UserOrganization.user_id == user_id,
UserOrganization.organization_id == organization_id,
UserOrganization.is_active == True
)
).first()
return user_org.role if user_org else None
def is_user_org_owner(
self,
db: Session,
*,
user_id: UUID,
organization_id: UUID
) -> bool:
"""Check if a user is an owner of an organization."""
role = self.get_user_role_in_org(db, user_id=user_id, organization_id=organization_id)
return role == OrganizationRole.OWNER
def is_user_org_admin(
self,
db: Session,
*,
user_id: UUID,
organization_id: UUID
) -> bool:
"""Check if a user is an owner or admin of an organization."""
role = self.get_user_role_in_org(db, user_id=user_id, organization_id=organization_id)
return role in [OrganizationRole.OWNER, OrganizationRole.ADMIN]
# Create a singleton instance for use across the application
organization = CRUDOrganization(Organization)

View File

@@ -1,7 +1,8 @@
# app/crud/user.py
from typing import Optional, Union, Dict, Any
from typing import Optional, Union, Dict, Any, List, Tuple
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from sqlalchemy import or_, asc, desc
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.users import UserCreate, UserUpdate
@@ -63,6 +64,82 @@ class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
return super().update(db, db_obj=db_obj, obj_in=update_data)
def get_multi_with_total(
self,
db: Session,
*,
skip: int = 0,
limit: int = 100,
sort_by: Optional[str] = None,
sort_order: str = "asc",
filters: Optional[Dict[str, Any]] = None,
search: Optional[str] = None
) -> Tuple[List[User], int]:
"""
Get multiple users with total count, filtering, sorting, and search.
Args:
db: Database session
skip: Number of records to skip
limit: Maximum number of records to return
sort_by: Field name to sort by
sort_order: Sort order ("asc" or "desc")
filters: Dictionary of filters (field_name: value)
search: Search term to match against email, first_name, last_name
Returns:
Tuple of (users list, total count)
"""
# Validate pagination
if skip < 0:
raise ValueError("skip must be non-negative")
if limit < 0:
raise ValueError("limit must be non-negative")
if limit > 1000:
raise ValueError("Maximum limit is 1000")
try:
# Build base query
query = db.query(User)
# Exclude soft-deleted users
query = query.filter(User.deleted_at.is_(None))
# Apply filters
if filters:
for field, value in filters.items():
if hasattr(User, field) and value is not None:
query = query.filter(getattr(User, field) == value)
# Apply search
if search:
search_filter = or_(
User.email.ilike(f"%{search}%"),
User.first_name.ilike(f"%{search}%"),
User.last_name.ilike(f"%{search}%")
)
query = query.filter(search_filter)
# Get total count
total = query.count()
# Apply sorting
if sort_by and hasattr(User, sort_by):
sort_column = getattr(User, sort_by)
if sort_order.lower() == "desc":
query = query.order_by(desc(sort_column))
else:
query = query.order_by(asc(sort_column))
# Apply pagination
users = query.offset(skip).limit(limit).all()
return users, total
except Exception as e:
logger.error(f"Error retrieving paginated users: {str(e)}")
raise
def is_active(self, user: User) -> bool:
return user.is_active

View File

@@ -72,7 +72,14 @@ app.add_middleware(
# Add security headers middleware
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
"""Add security headers to all responses"""
"""
Add security headers to all responses.
CSP modes (set via CSP_MODE env var):
- 'strict': Maximum security, blocks most external resources (default for production)
- 'relaxed': Allows common CDNs and modern frontend features (default for development)
- 'disabled': No CSP (not recommended, use only for debugging)
"""
response = await call_next(request)
# Prevent clickjacking
@@ -89,7 +96,70 @@ async def add_security_headers(request: Request, call_next):
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
# Content Security Policy
response.headers["Content-Security-Policy"] = "default-src 'self'; frame-ancestors 'none'"
csp_mode = settings.CSP_MODE.lower()
# Special handling for API docs
is_docs = request.url.path in ["/docs", "/redoc"] or \
request.url.path.startswith("/docs/") or \
request.url.path.startswith("/redoc/")
if csp_mode == "disabled":
# No CSP (only for local development/debugging)
pass
elif is_docs:
# Always allow Swagger UI/ReDoc resources on docs pages
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://fonts.googleapis.com; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src 'self' data: https://fastapi.tiangolo.com; "
"frame-ancestors 'none'"
)
elif csp_mode == "strict":
# Maximum security - blocks most external resources
# Use this in production if you don't need external CDNs
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self'; "
"img-src 'self' data:; "
"font-src 'self'; "
"connect-src 'self'; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'"
)
else: # 'relaxed' mode (default)
# Modern frontend development - allows common CDNs and features
# Safe for most production apps that use external resources
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
# Allow common script sources
"script-src 'self' 'unsafe-inline' 'unsafe-eval' "
"https://cdn.jsdelivr.net https://unpkg.com https://cdnjs.cloudflare.com "
"https://www.googletagmanager.com https://www.google-analytics.com; "
# Allow common style sources
"style-src 'self' 'unsafe-inline' "
"https://cdn.jsdelivr.net https://unpkg.com https://cdnjs.cloudflare.com "
"https://fonts.googleapis.com; "
# Allow common font sources
"font-src 'self' data: "
"https://fonts.gstatic.com https://cdn.jsdelivr.net; "
# Allow images from self, data URIs, and common CDNs
"img-src 'self' data: blob: https:; "
# Allow API calls to self and common services
"connect-src 'self' "
"https://www.google-analytics.com https://analytics.google.com; "
# Media from self and data URIs
"media-src 'self' data: blob:; "
# Prevent framing
"frame-ancestors 'none'; "
# Restrict base URI
"base-uri 'self'; "
# Restrict form submissions
"form-action 'self'"
)
# Permissions Policy (formerly Feature Policy)
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"

View File

@@ -9,8 +9,11 @@ from .base import TimestampMixin, UUIDMixin
# Import models
from .user import User
from .user_session import UserSession
from .organization import Organization
from .user_organization import UserOrganization, OrganizationRole
__all__ = [
'Base', 'TimestampMixin', 'UUIDMixin',
'User', 'UserSession',
'Organization', 'UserOrganization', 'OrganizationRole',
]

View File

@@ -0,0 +1,31 @@
# app/models/organization.py
from sqlalchemy import Column, String, Boolean, Text, Index
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import relationship
from .base import Base, TimestampMixin, UUIDMixin
class Organization(Base, UUIDMixin, TimestampMixin):
"""
Organization model for multi-tenant support.
Users can belong to multiple organizations with different roles.
"""
__tablename__ = 'organizations'
name = Column(String(255), nullable=False, index=True)
slug = Column(String(255), unique=True, nullable=False, index=True)
description = Column(Text, nullable=True)
is_active = Column(Boolean, default=True, nullable=False, index=True)
settings = Column(JSONB, default={})
# Relationships
user_organizations = relationship("UserOrganization", back_populates="organization", cascade="all, delete-orphan")
__table_args__ = (
Index('ix_organizations_name_active', 'name', 'is_active'),
Index('ix_organizations_slug_active', 'slug', 'is_active'),
)
def __repr__(self):
return f"<Organization {self.name} ({self.slug})>"

View File

@@ -1,5 +1,6 @@
from sqlalchemy import Column, String, Boolean, DateTime
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import relationship
from .base import Base, TimestampMixin, UUIDMixin
@@ -17,5 +18,8 @@ class User(Base, UUIDMixin, TimestampMixin):
preferences = Column(JSONB)
deleted_at = Column(DateTime(timezone=True), nullable=True, index=True)
# Relationships
user_organizations = relationship("UserOrganization", back_populates="user", cascade="all, delete-orphan")
def __repr__(self):
return f"<User {self.email}>"

View File

@@ -0,0 +1,50 @@
# app/models/user_organization.py
from enum import Enum as PyEnum
from sqlalchemy import Column, ForeignKey, Boolean, String, Index, Enum
from sqlalchemy.dialects.postgresql import UUID as PGUUID
from sqlalchemy.orm import relationship
from .base import Base, TimestampMixin
class OrganizationRole(str, PyEnum):
"""
Built-in organization roles.
These provide a baseline role system that can be optionally used.
Projects can extend this or implement their own permission system.
"""
OWNER = "owner" # Full control over organization
ADMIN = "admin" # Can manage users and settings
MEMBER = "member" # Regular member with standard access
GUEST = "guest" # Limited read-only access
class UserOrganization(Base, TimestampMixin):
"""
Junction table for many-to-many relationship between Users and Organizations.
Includes role information for flexible RBAC.
"""
__tablename__ = 'user_organizations'
user_id = Column(PGUUID(as_uuid=True), ForeignKey('users.id', ondelete='CASCADE'), primary_key=True)
organization_id = Column(PGUUID(as_uuid=True), ForeignKey('organizations.id', ondelete='CASCADE'), primary_key=True)
role = Column(Enum(OrganizationRole), default=OrganizationRole.MEMBER, nullable=False, index=True)
is_active = Column(Boolean, default=True, nullable=False, index=True)
# Optional: Custom permissions override for specific users
custom_permissions = Column(String(500), nullable=True) # JSON array of permission strings
# Relationships
user = relationship("User", back_populates="user_organizations")
organization = relationship("Organization", back_populates="user_organizations")
__table_args__ = (
Index('ix_user_org_user_active', 'user_id', 'is_active'),
Index('ix_user_org_org_active', 'organization_id', 'is_active'),
Index('ix_user_org_role', 'role'),
)
def __repr__(self):
return f"<UserOrganization user={self.user_id} org={self.organization_id} role={self.role}>"

View File

@@ -0,0 +1,154 @@
# app/schemas/organizations.py
import re
from datetime import datetime
from typing import Optional, Dict, Any, List
from uuid import UUID
from pydantic import BaseModel, field_validator, ConfigDict, Field
from app.models.user_organization import OrganizationRole
# Organization Schemas
class OrganizationBase(BaseModel):
"""Base organization schema with common fields."""
name: str = Field(..., min_length=1, max_length=255)
slug: Optional[str] = Field(None, min_length=1, max_length=255)
description: Optional[str] = None
is_active: bool = True
settings: Optional[Dict[str, Any]] = {}
@field_validator('slug')
@classmethod
def validate_slug(cls, v: Optional[str]) -> Optional[str]:
"""Validate slug format: lowercase, alphanumeric, hyphens only."""
if v is None:
return v
if not re.match(r'^[a-z0-9-]+$', v):
raise ValueError('Slug must contain only lowercase letters, numbers, and hyphens')
if v.startswith('-') or v.endswith('-'):
raise ValueError('Slug cannot start or end with a hyphen')
if '--' in v:
raise ValueError('Slug cannot contain consecutive hyphens')
return v
@field_validator('name')
@classmethod
def validate_name(cls, v: str) -> str:
"""Validate organization name."""
if not v or v.strip() == "":
raise ValueError('Organization name cannot be empty')
return v.strip()
class OrganizationCreate(OrganizationBase):
"""Schema for creating a new organization."""
name: str = Field(..., min_length=1, max_length=255)
slug: str = Field(..., min_length=1, max_length=255)
class OrganizationUpdate(BaseModel):
"""Schema for updating an organization."""
name: Optional[str] = Field(None, min_length=1, max_length=255)
slug: Optional[str] = Field(None, min_length=1, max_length=255)
description: Optional[str] = None
is_active: Optional[bool] = None
settings: Optional[Dict[str, Any]] = None
@field_validator('slug')
@classmethod
def validate_slug(cls, v: Optional[str]) -> Optional[str]:
"""Validate slug format."""
if v is None:
return v
if not re.match(r'^[a-z0-9-]+$', v):
raise ValueError('Slug must contain only lowercase letters, numbers, and hyphens')
if v.startswith('-') or v.endswith('-'):
raise ValueError('Slug cannot start or end with a hyphen')
if '--' in v:
raise ValueError('Slug cannot contain consecutive hyphens')
return v
@field_validator('name')
@classmethod
def validate_name(cls, v: Optional[str]) -> Optional[str]:
"""Validate organization name."""
if v is not None and (not v or v.strip() == ""):
raise ValueError('Organization name cannot be empty')
return v.strip() if v else v
class OrganizationResponse(OrganizationBase):
"""Schema for organization API responses."""
id: UUID
created_at: datetime
updated_at: Optional[datetime] = None
member_count: Optional[int] = 0
model_config = ConfigDict(from_attributes=True)
class OrganizationListResponse(BaseModel):
"""Schema for paginated organization list responses."""
organizations: List[OrganizationResponse]
total: int
page: int
page_size: int
pages: int
# User-Organization Relationship Schemas
class UserOrganizationBase(BaseModel):
"""Base schema for user-organization relationship."""
role: OrganizationRole = OrganizationRole.MEMBER
is_active: bool = True
custom_permissions: Optional[str] = None
class UserOrganizationCreate(BaseModel):
"""Schema for adding a user to an organization."""
user_id: UUID
role: OrganizationRole = OrganizationRole.MEMBER
custom_permissions: Optional[str] = None
class UserOrganizationUpdate(BaseModel):
"""Schema for updating user's role in an organization."""
role: Optional[OrganizationRole] = None
is_active: Optional[bool] = None
custom_permissions: Optional[str] = None
class UserOrganizationResponse(BaseModel):
"""Schema for user-organization relationship responses."""
user_id: UUID
organization_id: UUID
role: OrganizationRole
is_active: bool
custom_permissions: Optional[str] = None
created_at: datetime
updated_at: Optional[datetime] = None
model_config = ConfigDict(from_attributes=True)
class OrganizationMemberResponse(BaseModel):
"""Schema for organization member information."""
user_id: UUID
email: str
first_name: str
last_name: Optional[str] = None
role: OrganizationRole
is_active: bool
joined_at: datetime
model_config = ConfigDict(from_attributes=True)
class OrganizationMemberListResponse(BaseModel):
"""Schema for paginated organization member list."""
members: List[OrganizationMemberResponse]
total: int
page: int
page_size: int
pages: int