Files
pragma-stack/backend/app/services/session_cleanup.py
Felipe Cardoso 0553a1fc53 refactor(logging): switch to parameterized logging for improved performance and clarity
- Replaced f-strings with parameterized logging calls across routes, services, and repositories to optimize log message evaluation.
- Improved exception handling by using `logger.exception` where appropriate for automatic traceback logging.
2026-03-01 13:38:15 +01:00

89 lines
2.5 KiB
Python
Executable File

"""
Background job for cleaning up expired sessions.
This service runs periodically to remove old session records from the database.
"""
import logging
from datetime import UTC, datetime
from app.core.database import SessionLocal
from app.repositories.session import session_repo as session_crud
logger = logging.getLogger(__name__)
async def cleanup_expired_sessions(keep_days: int = 30) -> int:
"""
Clean up expired and inactive sessions.
This removes sessions that are:
- Inactive (is_active=False) AND
- Expired (expires_at < now) AND
- Older than keep_days
Args:
keep_days: Keep inactive sessions for this many days for audit purposes
Returns:
Number of sessions deleted
"""
logger.info("Starting session cleanup job...")
async with SessionLocal() as db:
try:
# Use CRUD method to cleanup
count = await session_crud.cleanup_expired(db, keep_days=keep_days)
logger.info("Session cleanup complete: %s sessions deleted", count)
return count
except Exception as e:
logger.exception("Error during session cleanup: %s", e)
return 0
async def get_session_statistics() -> dict:
"""
Get statistics about current sessions.
Returns:
Dictionary with session stats
"""
async with SessionLocal() as db:
try:
from sqlalchemy import func, select
from app.models.user_session import UserSession
total_result = await db.execute(select(func.count(UserSession.id)))
total_sessions = total_result.scalar_one()
active_result = await db.execute(
select(func.count(UserSession.id)).where(UserSession.is_active)
)
active_sessions = active_result.scalar_one()
expired_result = await db.execute(
select(func.count(UserSession.id)).where(
UserSession.expires_at < datetime.now(UTC)
)
)
expired_sessions = expired_result.scalar_one()
stats = {
"total": total_sessions,
"active": active_sessions,
"inactive": total_sessions - active_sessions,
"expired": expired_sessions,
}
logger.info("Session statistics: %s", stats)
return stats
except Exception as e:
logger.exception("Error getting session statistics: %s", e)
return {}