Refactor authentication and session management for optimized performance, enhanced security, and improved error handling
- Replaced N+1 deletion pattern with a bulk `DELETE` in session cleanup for better efficiency in `session_async`. - Updated security utilities to use HMAC-SHA256 signatures to mitigate length extension attacks and added constant-time comparisons to prevent timing attacks. - Improved exception hierarchy with custom error types `AuthError` and `DatabaseError` for better granularity in error handling. - Enhanced logging with `exc_info=True` for detailed error contexts across authentication services. - Removed unused imports and reordered imports for cleaner code structure.
This commit is contained in:
@@ -12,7 +12,6 @@ import json
|
||||
import secrets
|
||||
import time
|
||||
from typing import Dict, Any, Optional
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
@@ -47,9 +46,12 @@ def create_upload_token(file_path: str, content_type: str, expires_in: int = 300
|
||||
# Convert to JSON and encode
|
||||
payload_bytes = json.dumps(payload).encode('utf-8')
|
||||
|
||||
# Create a signature using the secret key
|
||||
signature = hashlib.sha256(
|
||||
payload_bytes + settings.SECRET_KEY.encode('utf-8')
|
||||
# Create a signature using HMAC-SHA256 for security
|
||||
# This prevents length extension attacks that plain SHA-256 is vulnerable to
|
||||
signature = hmac.new(
|
||||
settings.SECRET_KEY.encode('utf-8'),
|
||||
payload_bytes,
|
||||
hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
# Combine payload and signature
|
||||
@@ -93,10 +95,12 @@ def verify_upload_token(token: str) -> Optional[Dict[str, Any]]:
|
||||
payload = token_data["payload"]
|
||||
signature = token_data["signature"]
|
||||
|
||||
# Verify signature using constant-time comparison to prevent timing attacks
|
||||
# Verify signature using HMAC and constant-time comparison
|
||||
payload_bytes = json.dumps(payload).encode('utf-8')
|
||||
expected_signature = hashlib.sha256(
|
||||
payload_bytes + settings.SECRET_KEY.encode('utf-8')
|
||||
expected_signature = hmac.new(
|
||||
settings.SECRET_KEY.encode('utf-8'),
|
||||
payload_bytes,
|
||||
hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
if not hmac.compare_digest(signature, expected_signature):
|
||||
@@ -138,9 +142,12 @@ def create_password_reset_token(email: str, expires_in: int = 3600) -> str:
|
||||
# Convert to JSON and encode
|
||||
payload_bytes = json.dumps(payload).encode('utf-8')
|
||||
|
||||
# Create a signature using the secret key
|
||||
signature = hashlib.sha256(
|
||||
payload_bytes + settings.SECRET_KEY.encode('utf-8')
|
||||
# Create a signature using HMAC-SHA256 for security
|
||||
# This prevents length extension attacks that plain SHA-256 is vulnerable to
|
||||
signature = hmac.new(
|
||||
settings.SECRET_KEY.encode('utf-8'),
|
||||
payload_bytes,
|
||||
hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
# Combine payload and signature
|
||||
@@ -186,10 +193,12 @@ def verify_password_reset_token(token: str) -> Optional[str]:
|
||||
if payload.get("purpose") != "password_reset":
|
||||
return None
|
||||
|
||||
# Verify signature using constant-time comparison to prevent timing attacks
|
||||
# Verify signature using HMAC and constant-time comparison
|
||||
payload_bytes = json.dumps(payload).encode('utf-8')
|
||||
expected_signature = hashlib.sha256(
|
||||
payload_bytes + settings.SECRET_KEY.encode('utf-8')
|
||||
expected_signature = hmac.new(
|
||||
settings.SECRET_KEY.encode('utf-8'),
|
||||
payload_bytes,
|
||||
hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
if not hmac.compare_digest(signature, expected_signature):
|
||||
@@ -231,9 +240,12 @@ def create_email_verification_token(email: str, expires_in: int = 86400) -> str:
|
||||
# Convert to JSON and encode
|
||||
payload_bytes = json.dumps(payload).encode('utf-8')
|
||||
|
||||
# Create a signature using the secret key
|
||||
signature = hashlib.sha256(
|
||||
payload_bytes + settings.SECRET_KEY.encode('utf-8')
|
||||
# Create a signature using HMAC-SHA256 for security
|
||||
# This prevents length extension attacks that plain SHA-256 is vulnerable to
|
||||
signature = hmac.new(
|
||||
settings.SECRET_KEY.encode('utf-8'),
|
||||
payload_bytes,
|
||||
hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
# Combine payload and signature
|
||||
@@ -279,10 +291,12 @@ def verify_email_verification_token(token: str) -> Optional[str]:
|
||||
if payload.get("purpose") != "email_verification":
|
||||
return None
|
||||
|
||||
# Verify signature using constant-time comparison to prevent timing attacks
|
||||
# Verify signature using HMAC and constant-time comparison
|
||||
payload_bytes = json.dumps(payload).encode('utf-8')
|
||||
expected_signature = hashlib.sha256(
|
||||
payload_bytes + settings.SECRET_KEY.encode('utf-8')
|
||||
expected_signature = hmac.new(
|
||||
settings.SECRET_KEY.encode('utf-8'),
|
||||
payload_bytes,
|
||||
hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
if not hmac.compare_digest(signature, expected_signature):
|
||||
|
||||
Reference in New Issue
Block a user