import time import logging from contextlib import contextmanager from typing import Generator, Any from sqlalchemy import create_engine, event from sqlalchemy.engine import Engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.exc import SQLAlchemyError, DBAPIError from app.core.config import settings # Configure logging logger = logging.getLogger(__name__) # PostgreSQL-specific engine configuration engine = create_engine( settings.database_url, # Connection pool settings pool_size=settings.db_pool_size, # Default number of connections to maintain max_overflow=settings.db_max_overflow, # Max extra connections when pool is fully used pool_timeout=settings.db_pool_timeout, # Seconds to wait before giving up on getting a connection pool_recycle=settings.db_pool_recycle, # Seconds after which a connection is recycled pool_pre_ping=True, # Test connections for liveness before using them # Query execution settings connect_args={ "application_name": "eventspace", # Helps identify app in PostgreSQL logs "keepalives": 1, # Enable TCP keepalive "keepalives_idle": 60, # Seconds before sending keepalive probes "keepalives_interval": 10, # Seconds between keepalive probes "keepalives_count": 5, # Number of probes before dropping connection "options": "-c timezone=UTC", # Set timezone to UTC for consistency }, # Performance tuning isolation_level="READ COMMITTED", # Default isolation level for transactions echo=settings.sql_echo, # Log SQL statements for debugging if enabled echo_pool=settings.sql_echo_pool, # Log pool events for debugging if enabled ) # SQLAlchemy session factory SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=engine, expire_on_commit=False # Prevents additional DB queries after commit ) # Declarative base for models Base = declarative_base() # Add performance metrics @event.listens_for(Engine, "before_cursor_execute") def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): conn.info.setdefault("query_start_time", []).append(time.time()) if settings.sql_echo_timing: logger.debug("Start Query: %s", statement) @event.listens_for(Engine, "after_cursor_execute") def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): total = time.time() - conn.info["query_start_time"].pop(-1) if settings.sql_echo_timing: logger.debug("Query Complete in %.3f seconds: %s", total, statement) if total > settings.slow_query_threshold: logger.warning("Slow Query (%.3f seconds): %s", total, statement) # Database health check def check_database_connection() -> bool: """Verify database connection is working properly.""" try: # Execute a simple query with engine.connect() as connection: connection.execute("SELECT 1") return True except SQLAlchemyError as e: logger.error(f"Database connection check failed: {str(e)}") return False # FastAPI dependency to get DB session def get_db() -> Generator[Session, None, None]: """Dependency that provides a database session. Usage: @app.get("/items/") def read_items(db: Session = Depends(get_db)): return db.query(Item).all() """ db = SessionLocal() try: yield db except DBAPIError as e: logger.error(f"Database error during request: {str(e)}") db.rollback() # Rollback in case of error raise finally: db.close() # Context manager for handling transactions @contextmanager def get_db_transaction() -> Generator[Session, None, None]: """Context manager for database transactions. Usage: with get_db_transaction() as db: db.add(obj) # Will automatically commit or rollback """ db = SessionLocal() try: yield db db.commit() except Exception as e: db.rollback() logger.error(f"Transaction error: {str(e)}") raise finally: db.close() # Function to initialize database connections at startup def init_db() -> None: """Initialize database connections pool at application startup.""" logger.info("Initializing database connection pool") try: with engine.connect() as conn: conn.execute("SELECT 1") logger.info("Database connection successful") except SQLAlchemyError as e: logger.error(f"Database initialization failed: {str(e)}") raise # Function to dispose of connections at shutdown def close_db() -> None: """Close all database connections at application shutdown.""" logger.info("Closing database connections") engine.dispose()