import base64 import hashlib import json import secrets import time from typing import Dict, Any, Optional from app.core.config import settings def create_upload_token(file_path: str, content_type: str, expires_in: int = 300) -> str: """ Create a signed token for secure file uploads. Args: file_path: The destination path for the file content_type: The expected content type expires_in: Expiration time in seconds Returns: A base64 encoded token string """ # Create the payload payload = { "path": file_path, "content_type": content_type, "exp": int(time.time()) + expires_in, "nonce": secrets.token_hex(8) # Add randomness to prevent token reuse } # Convert to JSON and encode payload_bytes = json.dumps(payload).encode('utf-8') # Create a signature using a secret key signature = hashlib.sha256(payload_bytes + settings.SECRET_KEY.encode('utf-8')).hexdigest() # Combine payload and signature token_data = { "payload": payload, "signature": signature } # Encode the final token token_json = json.dumps(token_data) token = base64.urlsafe_b64encode(token_json.encode('utf-8')).decode('utf-8') return token def verify_upload_token(token: str) -> Optional[Dict[str, Any]]: """ Verify an upload token and return the payload if valid. Args: token: The token string to verify Returns: The payload dictionary if valid, None if invalid """ try: # Decode the token token_json = base64.urlsafe_b64decode(token.encode('utf-8')).decode('utf-8') token_data = json.loads(token_json) # Extract payload and signature payload = token_data["payload"] signature = token_data["signature"] # Verify signature payload_bytes = json.dumps(payload).encode('utf-8') expected_signature = hashlib.sha256(payload_bytes + settings.SECRET_KEY.encode('utf-8')).hexdigest() if signature != expected_signature: return None # Check expiration if payload["exp"] < int(time.time()): return None return payload except (ValueError, KeyError, json.JSONDecodeError): return None