Add utility functions for creating and verifying upload tokens

Introduced `create_upload_token` and `verify_upload_token` functions to handle secure file uploads. These utilities generate signed tokens with expiration and content validation, ensuring upload security. Verification includes signature integrity and expiration checks.
This commit is contained in:
2025-03-12 18:35:47 +01:00
parent e27e2b75ee
commit 07f8435fcd

View File

@@ -0,0 +1,82 @@
import base64
import hashlib
import json
import secrets
import time
from typing import Dict, Any, Optional
from app.core.config import settings
def create_upload_token(file_path: str, content_type: str, expires_in: int = 300) -> str:
"""
Create a signed token for secure file uploads.
Args:
file_path: The destination path for the file
content_type: The expected content type
expires_in: Expiration time in seconds
Returns:
A base64 encoded token string
"""
# Create the payload
payload = {
"path": file_path,
"content_type": content_type,
"exp": int(time.time()) + expires_in,
"nonce": secrets.token_hex(8) # Add randomness to prevent token reuse
}
# Convert to JSON and encode
payload_bytes = json.dumps(payload).encode('utf-8')
# Create a signature using a secret key
signature = hashlib.sha256(payload_bytes + settings.SECRET_KEY.encode('utf-8')).hexdigest()
# Combine payload and signature
token_data = {
"payload": payload,
"signature": signature
}
# Encode the final token
token_json = json.dumps(token_data)
token = base64.urlsafe_b64encode(token_json.encode('utf-8')).decode('utf-8')
return token
def verify_upload_token(token: str) -> Optional[Dict[str, Any]]:
"""
Verify an upload token and return the payload if valid.
Args:
token: The token string to verify
Returns:
The payload dictionary if valid, None if invalid
"""
try:
# Decode the token
token_json = base64.urlsafe_b64decode(token.encode('utf-8')).decode('utf-8')
token_data = json.loads(token_json)
# Extract payload and signature
payload = token_data["payload"]
signature = token_data["signature"]
# Verify signature
payload_bytes = json.dumps(payload).encode('utf-8')
expected_signature = hashlib.sha256(payload_bytes + settings.SECRET_KEY.encode('utf-8')).hexdigest()
if signature != expected_signature:
return None
# Check expiration
if payload["exp"] < int(time.time()):
return None
return payload
except (ValueError, KeyError, json.JSONDecodeError):
return None