Refactor database handling and enhance connection management.
Introduced robust database connection configuration with PostgreSQL-specific tuning, connection pooling, and performance metrics logging. Added utility functions to manage database health checks, initialization, and cleanup at application lifecycle stages. Improved error handling with logging support for transactions and slow queries.
This commit is contained in:
@@ -1,20 +1,142 @@
|
||||
from sqlalchemy import create_engine
|
||||
import time
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from typing import Generator, Any
|
||||
|
||||
from sqlalchemy import create_engine, event
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.orm import sessionmaker, Session
|
||||
from sqlalchemy.exc import SQLAlchemyError, DBAPIError
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
# Use the database URL from settings
|
||||
engine = create_engine(settings.database_url)
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# PostgreSQL-specific engine configuration
|
||||
engine = create_engine(
|
||||
settings.database_url,
|
||||
# Connection pool settings
|
||||
pool_size=settings.db_pool_size, # Default number of connections to maintain
|
||||
max_overflow=settings.db_max_overflow, # Max extra connections when pool is fully used
|
||||
pool_timeout=settings.db_pool_timeout, # Seconds to wait before giving up on getting a connection
|
||||
pool_recycle=settings.db_pool_recycle, # Seconds after which a connection is recycled
|
||||
pool_pre_ping=True, # Test connections for liveness before using them
|
||||
# Query execution settings
|
||||
connect_args={
|
||||
"application_name": "eventspace", # Helps identify app in PostgreSQL logs
|
||||
"keepalives": 1, # Enable TCP keepalive
|
||||
"keepalives_idle": 60, # Seconds before sending keepalive probes
|
||||
"keepalives_interval": 10, # Seconds between keepalive probes
|
||||
"keepalives_count": 5, # Number of probes before dropping connection
|
||||
"options": "-c timezone=UTC", # Set timezone to UTC for consistency
|
||||
},
|
||||
# Performance tuning
|
||||
isolation_level="READ COMMITTED", # Default isolation level for transactions
|
||||
echo=settings.sql_echo, # Log SQL statements for debugging if enabled
|
||||
echo_pool=settings.sql_echo_pool, # Log pool events for debugging if enabled
|
||||
)
|
||||
|
||||
# SQLAlchemy session factory
|
||||
SessionLocal = sessionmaker(
|
||||
autocommit=False,
|
||||
autoflush=False,
|
||||
bind=engine,
|
||||
expire_on_commit=False # Prevents additional DB queries after commit
|
||||
)
|
||||
|
||||
# Declarative base for models
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
# Dependency to get DB session
|
||||
def get_db():
|
||||
# Add performance metrics
|
||||
@event.listens_for(Engine, "before_cursor_execute")
|
||||
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
|
||||
conn.info.setdefault("query_start_time", []).append(time.time())
|
||||
if settings.sql_echo_timing:
|
||||
logger.debug("Start Query: %s", statement)
|
||||
|
||||
|
||||
@event.listens_for(Engine, "after_cursor_execute")
|
||||
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
|
||||
total = time.time() - conn.info["query_start_time"].pop(-1)
|
||||
if settings.sql_echo_timing:
|
||||
logger.debug("Query Complete in %.3f seconds: %s", total, statement)
|
||||
if total > settings.slow_query_threshold:
|
||||
logger.warning("Slow Query (%.3f seconds): %s", total, statement)
|
||||
|
||||
|
||||
# Database health check
|
||||
def check_database_connection() -> bool:
|
||||
"""Verify database connection is working properly."""
|
||||
try:
|
||||
# Execute a simple query
|
||||
with engine.connect() as connection:
|
||||
connection.execute("SELECT 1")
|
||||
return True
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"Database connection check failed: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
# FastAPI dependency to get DB session
|
||||
def get_db() -> Generator[Session, None, None]:
|
||||
"""Dependency that provides a database session.
|
||||
|
||||
Usage:
|
||||
@app.get("/items/")
|
||||
def read_items(db: Session = Depends(get_db)):
|
||||
return db.query(Item).all()
|
||||
"""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
yield db
|
||||
except DBAPIError as e:
|
||||
logger.error(f"Database error during request: {str(e)}")
|
||||
db.rollback() # Rollback in case of error
|
||||
raise
|
||||
finally:
|
||||
db.close()
|
||||
db.close()
|
||||
|
||||
|
||||
# Context manager for handling transactions
|
||||
@contextmanager
|
||||
def get_db_transaction() -> Generator[Session, None, None]:
|
||||
"""Context manager for database transactions.
|
||||
|
||||
Usage:
|
||||
with get_db_transaction() as db:
|
||||
db.add(obj)
|
||||
# Will automatically commit or rollback
|
||||
"""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
yield db
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"Transaction error: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# Function to initialize database connections at startup
|
||||
def init_db() -> None:
|
||||
"""Initialize database connections pool at application startup."""
|
||||
logger.info("Initializing database connection pool")
|
||||
try:
|
||||
with engine.connect() as conn:
|
||||
conn.execute("SELECT 1")
|
||||
logger.info("Database connection successful")
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"Database initialization failed: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
# Function to dispose of connections at shutdown
|
||||
def close_db() -> None:
|
||||
"""Close all database connections at application shutdown."""
|
||||
logger.info("Closing database connections")
|
||||
engine.dispose()
|
||||
|
||||
Reference in New Issue
Block a user