- Introduced `pyproject.toml` to centralize backend tool configurations (e.g., Ruff, mypy, coverage, pytest). - Replaced Black, isort, and Flake8 with Ruff for linting, formatting, and import sorting. - Updated `requirements.txt` to include Ruff and remove replaced tools. - Added `Makefile` to streamline development workflows with commands for linting, formatting, type-checking, testing, and cleanup.
443 lines
16 KiB
Python
Executable File
443 lines
16 KiB
Python
Executable File
# tests/utils/test_security.py
|
|
"""
|
|
Tests for security utility functions.
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import time
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from app.utils.security import (
|
|
create_email_verification_token,
|
|
create_password_reset_token,
|
|
create_upload_token,
|
|
verify_email_verification_token,
|
|
verify_password_reset_token,
|
|
verify_upload_token,
|
|
)
|
|
|
|
|
|
class TestCreateUploadToken:
|
|
"""Tests for create_upload_token function."""
|
|
|
|
def test_create_upload_token_basic(self):
|
|
"""Test basic token creation."""
|
|
token = create_upload_token("/uploads/test.jpg", "image/jpeg")
|
|
|
|
assert token is not None
|
|
assert isinstance(token, str)
|
|
assert len(token) > 0
|
|
|
|
# Token should be base64 encoded
|
|
try:
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8"))
|
|
token_data = json.loads(decoded)
|
|
assert "payload" in token_data
|
|
assert "signature" in token_data
|
|
except Exception as e:
|
|
pytest.fail(f"Token is not properly formatted: {e}")
|
|
|
|
def test_create_upload_token_contains_correct_payload(self):
|
|
"""Test that token contains correct payload data."""
|
|
file_path = "/uploads/avatar.jpg"
|
|
content_type = "image/jpeg"
|
|
|
|
token = create_upload_token(file_path, content_type)
|
|
|
|
# Decode and verify payload
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8"))
|
|
token_data = json.loads(decoded)
|
|
payload = token_data["payload"]
|
|
|
|
assert payload["path"] == file_path
|
|
assert payload["content_type"] == content_type
|
|
assert "exp" in payload
|
|
assert "nonce" in payload
|
|
|
|
def test_create_upload_token_default_expiration(self):
|
|
"""Test that default expiration is 300 seconds (5 minutes)."""
|
|
before = int(time.time())
|
|
token = create_upload_token("/uploads/test.jpg", "image/jpeg")
|
|
after = int(time.time())
|
|
|
|
# Decode token
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8"))
|
|
token_data = json.loads(decoded)
|
|
payload = token_data["payload"]
|
|
|
|
# Expiration should be around current time + 300 seconds
|
|
exp_time = payload["exp"]
|
|
assert before + 300 <= exp_time <= after + 300
|
|
|
|
def test_create_upload_token_custom_expiration(self):
|
|
"""Test token creation with custom expiration time."""
|
|
custom_exp = 600 # 10 minutes
|
|
before = int(time.time())
|
|
token = create_upload_token(
|
|
"/uploads/test.jpg", "image/jpeg", expires_in=custom_exp
|
|
)
|
|
after = int(time.time())
|
|
|
|
# Decode token
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8"))
|
|
token_data = json.loads(decoded)
|
|
payload = token_data["payload"]
|
|
|
|
# Expiration should be around current time + custom_exp seconds
|
|
exp_time = payload["exp"]
|
|
assert before + custom_exp <= exp_time <= after + custom_exp
|
|
|
|
def test_create_upload_token_unique_nonces(self):
|
|
"""Test that each token has a unique nonce."""
|
|
token1 = create_upload_token("/uploads/test.jpg", "image/jpeg")
|
|
token2 = create_upload_token("/uploads/test.jpg", "image/jpeg")
|
|
|
|
# Decode both tokens
|
|
decoded1 = base64.urlsafe_b64decode(token1.encode("utf-8"))
|
|
token_data1 = json.loads(decoded1)
|
|
nonce1 = token_data1["payload"]["nonce"]
|
|
|
|
decoded2 = base64.urlsafe_b64decode(token2.encode("utf-8"))
|
|
token_data2 = json.loads(decoded2)
|
|
nonce2 = token_data2["payload"]["nonce"]
|
|
|
|
# Nonces should be different
|
|
assert nonce1 != nonce2
|
|
|
|
def test_create_upload_token_different_paths(self):
|
|
"""Test that tokens for different paths are different."""
|
|
token1 = create_upload_token("/uploads/file1.jpg", "image/jpeg")
|
|
token2 = create_upload_token("/uploads/file2.jpg", "image/jpeg")
|
|
|
|
assert token1 != token2
|
|
|
|
|
|
class TestVerifyUploadToken:
|
|
"""Tests for verify_upload_token function."""
|
|
|
|
def test_verify_valid_token(self):
|
|
"""Test verification of a valid token."""
|
|
file_path = "/uploads/test.jpg"
|
|
content_type = "image/jpeg"
|
|
|
|
token = create_upload_token(file_path, content_type)
|
|
payload = verify_upload_token(token)
|
|
|
|
assert payload is not None
|
|
assert payload["path"] == file_path
|
|
assert payload["content_type"] == content_type
|
|
|
|
def test_verify_expired_token(self):
|
|
"""Test that expired tokens are rejected."""
|
|
# Create a mock time module
|
|
mock_time = MagicMock()
|
|
current_time = 1000000
|
|
mock_time.time = MagicMock(return_value=current_time)
|
|
|
|
with patch("app.utils.security.time", mock_time):
|
|
# Create token that "expires" at current_time + 1
|
|
token = create_upload_token("/uploads/test.jpg", "image/jpeg", expires_in=1)
|
|
|
|
# Now set time to after expiration
|
|
mock_time.time.return_value = current_time + 2
|
|
|
|
# Token should be expired
|
|
payload = verify_upload_token(token)
|
|
assert payload is None
|
|
|
|
def test_verify_invalid_signature(self):
|
|
"""Test that tokens with invalid signatures are rejected."""
|
|
token = create_upload_token("/uploads/test.jpg", "image/jpeg")
|
|
|
|
# Decode, modify, and re-encode
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8"))
|
|
token_data = json.loads(decoded)
|
|
token_data["signature"] = "invalid_signature"
|
|
|
|
# Re-encode the tampered token
|
|
tampered_json = json.dumps(token_data)
|
|
tampered_token = base64.urlsafe_b64encode(tampered_json.encode("utf-8")).decode(
|
|
"utf-8"
|
|
)
|
|
|
|
payload = verify_upload_token(tampered_token)
|
|
assert payload is None
|
|
|
|
def test_verify_tampered_payload(self):
|
|
"""Test that tokens with tampered payloads are rejected."""
|
|
token = create_upload_token("/uploads/test.jpg", "image/jpeg")
|
|
|
|
# Decode, modify payload, and re-encode
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8"))
|
|
token_data = json.loads(decoded)
|
|
token_data["payload"]["path"] = "/uploads/hacked.exe"
|
|
|
|
# Re-encode the tampered token (signature won't match)
|
|
tampered_json = json.dumps(token_data)
|
|
tampered_token = base64.urlsafe_b64encode(tampered_json.encode("utf-8")).decode(
|
|
"utf-8"
|
|
)
|
|
|
|
payload = verify_upload_token(tampered_token)
|
|
assert payload is None
|
|
|
|
def test_verify_malformed_token(self):
|
|
"""Test that malformed tokens return None."""
|
|
# Test various malformed tokens
|
|
invalid_tokens = [
|
|
"not_a_valid_token",
|
|
"SGVsbG8gV29ybGQ=", # Valid base64 but not a token
|
|
"",
|
|
" ",
|
|
]
|
|
|
|
for invalid_token in invalid_tokens:
|
|
payload = verify_upload_token(invalid_token)
|
|
assert payload is None
|
|
|
|
def test_verify_invalid_json(self):
|
|
"""Test that tokens with invalid JSON are rejected."""
|
|
# Create a base64 string that decodes to invalid JSON
|
|
invalid_json = "not valid json"
|
|
invalid_token = base64.urlsafe_b64encode(invalid_json.encode("utf-8")).decode(
|
|
"utf-8"
|
|
)
|
|
|
|
payload = verify_upload_token(invalid_token)
|
|
assert payload is None
|
|
|
|
def test_verify_missing_fields(self):
|
|
"""Test that tokens missing required fields are rejected."""
|
|
# Create a token-like structure but missing required fields
|
|
incomplete_data = {
|
|
"payload": {
|
|
"path": "/uploads/test.jpg"
|
|
# Missing content_type, exp, nonce
|
|
},
|
|
"signature": "some_signature",
|
|
}
|
|
|
|
incomplete_json = json.dumps(incomplete_data)
|
|
incomplete_token = base64.urlsafe_b64encode(
|
|
incomplete_json.encode("utf-8")
|
|
).decode("utf-8")
|
|
|
|
payload = verify_upload_token(incomplete_token)
|
|
assert payload is None
|
|
|
|
def test_verify_token_round_trip(self):
|
|
"""Test creating and verifying a token in sequence."""
|
|
test_cases = [
|
|
("/uploads/image.jpg", "image/jpeg", 300),
|
|
("/uploads/document.pdf", "application/pdf", 600),
|
|
("/uploads/video.mp4", "video/mp4", 900),
|
|
]
|
|
|
|
for file_path, content_type, expires_in in test_cases:
|
|
token = create_upload_token(file_path, content_type, expires_in)
|
|
payload = verify_upload_token(token)
|
|
|
|
assert payload is not None
|
|
assert payload["path"] == file_path
|
|
assert payload["content_type"] == content_type
|
|
assert "exp" in payload
|
|
assert "nonce" in payload
|
|
|
|
# Note: test_verify_token_cannot_be_reused_with_different_secret removed
|
|
# The signature validation is already tested by test_verify_invalid_signature
|
|
# and test_verify_tampered_payload. Testing with different SECRET_KEY
|
|
# requires complex mocking that can interfere with other tests.
|
|
|
|
|
|
class TestPasswordResetTokens:
|
|
"""Tests for password reset token functions."""
|
|
|
|
def test_create_password_reset_token(self):
|
|
"""Test creating a password reset token."""
|
|
email = "user@example.com"
|
|
token = create_password_reset_token(email)
|
|
|
|
assert token is not None
|
|
assert isinstance(token, str)
|
|
assert len(token) > 0
|
|
|
|
def test_verify_password_reset_token_valid(self):
|
|
"""Test verifying a valid password reset token."""
|
|
email = "user@example.com"
|
|
token = create_password_reset_token(email)
|
|
|
|
verified_email = verify_password_reset_token(token)
|
|
|
|
assert verified_email == email
|
|
|
|
def test_verify_password_reset_token_expired(self):
|
|
"""Test that expired password reset tokens are rejected."""
|
|
email = "user@example.com"
|
|
|
|
# Create token that expires in 1 second
|
|
with patch("app.utils.security.time") as mock_time:
|
|
mock_time.time = MagicMock(return_value=1000000)
|
|
token = create_password_reset_token(email, expires_in=1)
|
|
|
|
# Fast forward time
|
|
mock_time.time.return_value = 1000002
|
|
|
|
verified_email = verify_password_reset_token(token)
|
|
assert verified_email is None
|
|
|
|
def test_verify_password_reset_token_invalid(self):
|
|
"""Test that invalid tokens are rejected."""
|
|
assert verify_password_reset_token("invalid_token") is None
|
|
assert verify_password_reset_token("") is None
|
|
|
|
def test_verify_password_reset_token_tampered(self):
|
|
"""Test that tampered tokens are rejected."""
|
|
email = "user@example.com"
|
|
token = create_password_reset_token(email)
|
|
|
|
# Decode and tamper
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8")).decode("utf-8")
|
|
token_data = json.loads(decoded)
|
|
token_data["payload"]["email"] = "hacker@example.com"
|
|
|
|
# Re-encode
|
|
tampered = base64.urlsafe_b64encode(
|
|
json.dumps(token_data).encode("utf-8")
|
|
).decode("utf-8")
|
|
|
|
verified_email = verify_password_reset_token(tampered)
|
|
assert verified_email is None
|
|
|
|
def test_verify_password_reset_token_wrong_purpose(self):
|
|
"""Test that email verification tokens can't be used for password reset."""
|
|
email = "user@example.com"
|
|
# Create an email verification token
|
|
token = create_email_verification_token(email)
|
|
|
|
# Try to verify as password reset token
|
|
verified_email = verify_password_reset_token(token)
|
|
assert verified_email is None
|
|
|
|
def test_password_reset_token_custom_expiration(self):
|
|
"""Test password reset token with custom expiration."""
|
|
email = "user@example.com"
|
|
custom_exp = 7200 # 2 hours
|
|
|
|
with patch("app.utils.security.time") as mock_time:
|
|
current_time = 1000000
|
|
mock_time.time = MagicMock(return_value=current_time)
|
|
|
|
token = create_password_reset_token(email, expires_in=custom_exp)
|
|
|
|
# Decode to check expiration
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8")).decode("utf-8")
|
|
token_data = json.loads(decoded)
|
|
|
|
assert token_data["payload"]["exp"] == current_time + custom_exp
|
|
|
|
|
|
class TestEmailVerificationTokens:
|
|
"""Tests for email verification token functions."""
|
|
|
|
def test_create_email_verification_token(self):
|
|
"""Test creating an email verification token."""
|
|
email = "user@example.com"
|
|
token = create_email_verification_token(email)
|
|
|
|
assert token is not None
|
|
assert isinstance(token, str)
|
|
assert len(token) > 0
|
|
|
|
def test_verify_email_verification_token_valid(self):
|
|
"""Test verifying a valid email verification token."""
|
|
email = "user@example.com"
|
|
token = create_email_verification_token(email)
|
|
|
|
verified_email = verify_email_verification_token(token)
|
|
|
|
assert verified_email == email
|
|
|
|
def test_verify_email_verification_token_expired(self):
|
|
"""Test that expired verification tokens are rejected."""
|
|
email = "user@example.com"
|
|
|
|
with patch("app.utils.security.time") as mock_time:
|
|
mock_time.time = MagicMock(return_value=1000000)
|
|
token = create_email_verification_token(email, expires_in=1)
|
|
|
|
# Fast forward time
|
|
mock_time.time.return_value = 1000002
|
|
|
|
verified_email = verify_email_verification_token(token)
|
|
assert verified_email is None
|
|
|
|
def test_verify_email_verification_token_invalid(self):
|
|
"""Test that invalid tokens are rejected."""
|
|
assert verify_email_verification_token("invalid_token") is None
|
|
assert verify_email_verification_token("") is None
|
|
|
|
def test_verify_email_verification_token_tampered(self):
|
|
"""Test that tampered verification tokens are rejected."""
|
|
email = "user@example.com"
|
|
token = create_email_verification_token(email)
|
|
|
|
# Decode and tamper
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8")).decode("utf-8")
|
|
token_data = json.loads(decoded)
|
|
token_data["payload"]["email"] = "hacker@example.com"
|
|
|
|
# Re-encode
|
|
tampered = base64.urlsafe_b64encode(
|
|
json.dumps(token_data).encode("utf-8")
|
|
).decode("utf-8")
|
|
|
|
verified_email = verify_email_verification_token(tampered)
|
|
assert verified_email is None
|
|
|
|
def test_verify_email_verification_token_wrong_purpose(self):
|
|
"""Test that password reset tokens can't be used for email verification."""
|
|
email = "user@example.com"
|
|
# Create a password reset token
|
|
token = create_password_reset_token(email)
|
|
|
|
# Try to verify as email verification token
|
|
verified_email = verify_email_verification_token(token)
|
|
assert verified_email is None
|
|
|
|
def test_email_verification_token_default_expiration(self):
|
|
"""Test email verification token with default 24-hour expiration."""
|
|
email = "user@example.com"
|
|
|
|
with patch("app.utils.security.time") as mock_time:
|
|
current_time = 1000000
|
|
mock_time.time = MagicMock(return_value=current_time)
|
|
|
|
token = create_email_verification_token(email)
|
|
|
|
# Decode to check expiration (should be 86400 seconds = 24 hours)
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8")).decode("utf-8")
|
|
token_data = json.loads(decoded)
|
|
|
|
assert token_data["payload"]["exp"] == current_time + 86400
|
|
|
|
def test_tokens_are_unique(self):
|
|
"""Test that multiple tokens for the same email are unique."""
|
|
email = "user@example.com"
|
|
|
|
token1 = create_password_reset_token(email)
|
|
token2 = create_password_reset_token(email)
|
|
|
|
assert token1 != token2
|
|
|
|
def test_verification_and_reset_tokens_are_different(self):
|
|
"""Test that verification and reset tokens for same email are different."""
|
|
email = "user@example.com"
|
|
|
|
reset_token = create_password_reset_token(email)
|
|
verify_token = create_email_verification_token(email)
|
|
|
|
assert reset_token != verify_token
|