Files
syndarix/backend/app/utils/security.py
Felipe Cardoso 4de440ed2d Improve error handling, logging, and security in authentication services and utilities
- Refactored `create_user` and `change_password` methods to add transaction rollback on failures and enhanced logging for error contexts.
- Updated security utilities to use constant-time comparison (`hmac.compare_digest`) to mitigate timing attacks.
- Adjusted API responses in registration and password reset flows for better security and user experience.
- Added session invalidation after password resets to enhance account security.
2025-11-01 01:13:02 +01:00

299 lines
8.5 KiB
Python

"""
Security utilities for token-based operations.
This module provides utilities for creating and verifying signed tokens,
useful for operations like file uploads, password resets, or any other
time-limited, single-use operations.
"""
import base64
import hashlib
import hmac
import json
import secrets
import time
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from app.core.config import settings
def create_upload_token(file_path: str, content_type: str, expires_in: int = 300) -> str:
"""
Create a signed token for secure file uploads.
This generates a time-limited, single-use token that can be verified
to ensure the upload is authorized.
Args:
file_path: The destination path for the file
content_type: The expected content type (e.g., "image/jpeg")
expires_in: Expiration time in seconds (default: 300 = 5 minutes)
Returns:
A base64 encoded token string
Example:
>>> token = create_upload_token("/uploads/avatar.jpg", "image/jpeg")
>>> # Send token to client, client includes it in upload request
"""
# Create the payload
payload = {
"path": file_path,
"content_type": content_type,
"exp": int(time.time()) + expires_in,
"nonce": secrets.token_hex(8) # Add randomness to prevent token reuse
}
# Convert to JSON and encode
payload_bytes = json.dumps(payload).encode('utf-8')
# Create a signature using the secret key
signature = hashlib.sha256(
payload_bytes + settings.SECRET_KEY.encode('utf-8')
).hexdigest()
# Combine payload and signature
token_data = {
"payload": payload,
"signature": signature
}
# Encode the final token
token_json = json.dumps(token_data)
token = base64.urlsafe_b64encode(token_json.encode('utf-8')).decode('utf-8')
return token
def verify_upload_token(token: str) -> Optional[Dict[str, Any]]:
"""
Verify an upload token and return the payload if valid.
Args:
token: The token string to verify
Returns:
The payload dictionary if valid, None if invalid or expired
Example:
>>> payload = verify_upload_token(token_from_client)
>>> if payload:
... file_path = payload["path"]
... content_type = payload["content_type"]
... # Proceed with upload
... else:
... # Token invalid or expired
"""
try:
# Decode the token
token_json = base64.urlsafe_b64decode(token.encode('utf-8')).decode('utf-8')
token_data = json.loads(token_json)
# Extract payload and signature
payload = token_data["payload"]
signature = token_data["signature"]
# Verify signature using constant-time comparison to prevent timing attacks
payload_bytes = json.dumps(payload).encode('utf-8')
expected_signature = hashlib.sha256(
payload_bytes + settings.SECRET_KEY.encode('utf-8')
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return None
# Check expiration
if payload["exp"] < int(time.time()):
return None
return payload
except (ValueError, KeyError, json.JSONDecodeError):
return None
def create_password_reset_token(email: str, expires_in: int = 3600) -> str:
"""
Create a signed token for password reset.
Args:
email: User's email address
expires_in: Expiration time in seconds (default: 3600 = 1 hour)
Returns:
A base64 encoded token string
Example:
>>> token = create_password_reset_token("user@example.com")
>>> # Send token to user via email
"""
# Create the payload
payload = {
"email": email,
"exp": int(time.time()) + expires_in,
"nonce": secrets.token_hex(16), # Extra randomness
"purpose": "password_reset"
}
# Convert to JSON and encode
payload_bytes = json.dumps(payload).encode('utf-8')
# Create a signature using the secret key
signature = hashlib.sha256(
payload_bytes + settings.SECRET_KEY.encode('utf-8')
).hexdigest()
# Combine payload and signature
token_data = {
"payload": payload,
"signature": signature
}
# Encode the final token
token_json = json.dumps(token_data)
token = base64.urlsafe_b64encode(token_json.encode('utf-8')).decode('utf-8')
return token
def verify_password_reset_token(token: str) -> Optional[str]:
"""
Verify a password reset token and return the email if valid.
Args:
token: The token string to verify
Returns:
The email address if valid, None if invalid or expired
Example:
>>> email = verify_password_reset_token(token_from_user)
>>> if email:
... # Proceed with password reset
... else:
... # Token invalid or expired
"""
try:
# Decode the token
token_json = base64.urlsafe_b64decode(token.encode('utf-8')).decode('utf-8')
token_data = json.loads(token_json)
# Extract payload and signature
payload = token_data["payload"]
signature = token_data["signature"]
# Verify it's a password reset token
if payload.get("purpose") != "password_reset":
return None
# Verify signature using constant-time comparison to prevent timing attacks
payload_bytes = json.dumps(payload).encode('utf-8')
expected_signature = hashlib.sha256(
payload_bytes + settings.SECRET_KEY.encode('utf-8')
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return None
# Check expiration
if payload["exp"] < int(time.time()):
return None
return payload["email"]
except (ValueError, KeyError, json.JSONDecodeError):
return None
def create_email_verification_token(email: str, expires_in: int = 86400) -> str:
"""
Create a signed token for email verification.
Args:
email: User's email address
expires_in: Expiration time in seconds (default: 86400 = 24 hours)
Returns:
A base64 encoded token string
Example:
>>> token = create_email_verification_token("user@example.com")
>>> # Send token to user via email
"""
# Create the payload
payload = {
"email": email,
"exp": int(time.time()) + expires_in,
"nonce": secrets.token_hex(16),
"purpose": "email_verification"
}
# Convert to JSON and encode
payload_bytes = json.dumps(payload).encode('utf-8')
# Create a signature using the secret key
signature = hashlib.sha256(
payload_bytes + settings.SECRET_KEY.encode('utf-8')
).hexdigest()
# Combine payload and signature
token_data = {
"payload": payload,
"signature": signature
}
# Encode the final token
token_json = json.dumps(token_data)
token = base64.urlsafe_b64encode(token_json.encode('utf-8')).decode('utf-8')
return token
def verify_email_verification_token(token: str) -> Optional[str]:
"""
Verify an email verification token and return the email if valid.
Args:
token: The token string to verify
Returns:
The email address if valid, None if invalid or expired
Example:
>>> email = verify_email_verification_token(token_from_user)
>>> if email:
... # Mark email as verified
... else:
... # Token invalid or expired
"""
try:
# Decode the token
token_json = base64.urlsafe_b64decode(token.encode('utf-8')).decode('utf-8')
token_data = json.loads(token_json)
# Extract payload and signature
payload = token_data["payload"]
signature = token_data["signature"]
# Verify it's an email verification token
if payload.get("purpose") != "email_verification":
return None
# Verify signature using constant-time comparison to prevent timing attacks
payload_bytes = json.dumps(payload).encode('utf-8')
expected_signature = hashlib.sha256(
payload_bytes + settings.SECRET_KEY.encode('utf-8')
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return None
# Check expiration
if payload["exp"] < int(time.time()):
return None
return payload["email"]
except (ValueError, KeyError, json.JSONDecodeError):
return None