Files
fast-next-template/backend/tests/services/safety/test_validation.py
Felipe Cardoso 60ebeaa582 test(safety): add comprehensive tests for safety framework modules
Add tests to improve backend coverage from 85% to 93%:

- test_audit.py: 60 tests for AuditLogger (20% -> 99%)
  - Hash chain integrity, sanitization, retention, handlers
  - Fixed bug: hash chain modification after event creation
  - Fixed bug: verification not using correct prev_hash

- test_hitl.py: Tests for HITL manager (0% -> 100%)
- test_permissions.py: Tests for permissions manager (0% -> 99%)
- test_rollback.py: Tests for rollback manager (0% -> 100%)
- test_metrics.py: Tests for metrics collector (0% -> 100%)
- test_mcp_integration.py: Tests for MCP safety wrapper (0% -> 100%)
- test_validation.py: Additional cache and edge case tests (76% -> 100%)
- test_scoring.py: Lock cleanup and edge case tests (78% -> 91%)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-04 19:41:54 +01:00

763 lines
24 KiB
Python

"""Tests for safety validation module."""
import pytest
from app.services.safety.models import (
ActionMetadata,
ActionRequest,
ActionType,
AutonomyLevel,
SafetyDecision,
SafetyPolicy,
ValidationRule,
)
from app.services.safety.validation.validator import (
ActionValidator,
create_allow_rule,
create_approval_rule,
create_deny_rule,
)
@pytest.fixture
def validator() -> ActionValidator:
"""Create a fresh ActionValidator."""
return ActionValidator(cache_enabled=False)
@pytest.fixture
def sample_action() -> ActionRequest:
"""Create a sample action request."""
metadata = ActionMetadata(
agent_id="test-agent",
session_id="test-session",
autonomy_level=AutonomyLevel.MILESTONE,
)
return ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
resource="/tmp/test.txt", # noqa: S108
metadata=metadata,
)
class TestActionValidator:
"""Tests for ActionValidator class."""
@pytest.mark.asyncio
async def test_no_rules_allows_by_default(
self,
validator: ActionValidator,
sample_action: ActionRequest,
) -> None:
"""Test that actions are allowed by default with no rules."""
result = await validator.validate(sample_action)
assert result.decision == SafetyDecision.ALLOW
assert "No matching rules" in result.reasons[0]
@pytest.mark.asyncio
async def test_deny_rule_blocks_action(
self,
validator: ActionValidator,
) -> None:
"""Test that a deny rule blocks matching actions."""
validator.add_rule(
create_deny_rule(
name="deny_shell",
tool_patterns=["shell_*"],
reason="Shell commands not allowed",
)
)
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
action = ActionRequest(
action_type=ActionType.SHELL_COMMAND,
tool_name="shell_exec",
metadata=metadata,
)
result = await validator.validate(action)
assert result.decision == SafetyDecision.DENY
assert len(result.applied_rules) == 1 # One rule applied
@pytest.mark.asyncio
async def test_approval_rule_requires_approval(
self,
validator: ActionValidator,
) -> None:
"""Test that an approval rule requires approval."""
validator.add_rule(
create_approval_rule(
name="approve_db",
tool_patterns=["database_*"],
reason="Database operations require approval",
)
)
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
action = ActionRequest(
action_type=ActionType.DATABASE_MUTATE,
tool_name="database_delete",
metadata=metadata,
)
result = await validator.validate(action)
assert result.decision == SafetyDecision.REQUIRE_APPROVAL
@pytest.mark.asyncio
async def test_deny_takes_precedence(
self,
validator: ActionValidator,
) -> None:
"""Test that deny rules take precedence over allow rules."""
validator.add_rule(
create_allow_rule(
name="allow_files",
tool_patterns=["file_*"],
priority=10,
)
)
validator.add_rule(
create_deny_rule(
name="deny_delete",
action_types=[ActionType.FILE_DELETE],
priority=100,
)
)
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
action = ActionRequest(
action_type=ActionType.FILE_DELETE,
tool_name="file_delete",
metadata=metadata,
)
result = await validator.validate(action)
assert result.decision == SafetyDecision.DENY
@pytest.mark.asyncio
async def test_rule_priority_ordering(
self,
validator: ActionValidator,
) -> None:
"""Test that rules are evaluated in priority order."""
validator.add_rule(
ValidationRule(
name="low_priority",
priority=1,
decision=SafetyDecision.ALLOW,
)
)
validator.add_rule(
ValidationRule(
name="high_priority",
priority=100,
decision=SafetyDecision.DENY,
)
)
# High priority should be first in the list
assert validator._rules[0].name == "high_priority"
@pytest.mark.asyncio
async def test_disabled_rule_not_applied(
self,
validator: ActionValidator,
sample_action: ActionRequest,
) -> None:
"""Test that disabled rules are not applied."""
rule = create_deny_rule(
name="deny_all",
tool_patterns=["*"],
)
rule.enabled = False
validator.add_rule(rule)
result = await validator.validate(sample_action)
assert result.decision == SafetyDecision.ALLOW
@pytest.mark.asyncio
async def test_resource_pattern_matching(
self,
validator: ActionValidator,
) -> None:
"""Test resource pattern matching."""
validator.add_rule(
create_deny_rule(
name="deny_secrets",
resource_patterns=["*/secrets/*", "*.env"],
)
)
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
action = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
resource="/app/secrets/api_key.txt",
metadata=metadata,
)
result = await validator.validate(action)
assert result.decision == SafetyDecision.DENY
@pytest.mark.asyncio
async def test_agent_id_filter(
self,
validator: ActionValidator,
) -> None:
"""Test filtering by agent ID."""
rule = ValidationRule(
name="restrict_agent",
agent_ids=["restricted-agent"],
decision=SafetyDecision.DENY,
reason="Restricted agent",
)
validator.add_rule(rule)
# Restricted agent should be denied
metadata1 = ActionMetadata(agent_id="restricted-agent")
action1 = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
metadata=metadata1,
)
result1 = await validator.validate(action1)
assert result1.decision == SafetyDecision.DENY
# Other agents should be allowed
metadata2 = ActionMetadata(agent_id="normal-agent")
action2 = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
metadata=metadata2,
)
result2 = await validator.validate(action2)
assert result2.decision == SafetyDecision.ALLOW
@pytest.mark.asyncio
async def test_bypass_mode(
self,
validator: ActionValidator,
sample_action: ActionRequest,
) -> None:
"""Test validation bypass mode."""
validator.add_rule(create_deny_rule(name="deny_all", tool_patterns=["*"]))
# Should be denied normally
result1 = await validator.validate(sample_action)
assert result1.decision == SafetyDecision.DENY
# Enable bypass
validator.enable_bypass("Emergency situation")
result2 = await validator.validate(sample_action)
assert result2.decision == SafetyDecision.ALLOW
assert "bypassed" in result2.reasons[0].lower()
# Disable bypass
validator.disable_bypass()
result3 = await validator.validate(sample_action)
assert result3.decision == SafetyDecision.DENY
def test_remove_rule(self, validator: ActionValidator) -> None:
"""Test removing a rule."""
rule = create_deny_rule(name="test_rule", tool_patterns=["test"])
validator.add_rule(rule)
assert len(validator._rules) == 1
assert validator.remove_rule(rule.id) is True
assert len(validator._rules) == 0
def test_remove_nonexistent_rule(self, validator: ActionValidator) -> None:
"""Test removing a nonexistent rule returns False."""
assert validator.remove_rule("nonexistent") is False
def test_clear_rules(self, validator: ActionValidator) -> None:
"""Test clearing all rules."""
validator.add_rule(create_deny_rule(name="rule1", tool_patterns=["a"]))
validator.add_rule(create_deny_rule(name="rule2", tool_patterns=["b"]))
assert len(validator._rules) == 2
validator.clear_rules()
assert len(validator._rules) == 0
class TestLoadRulesFromPolicy:
"""Tests for loading rules from policies."""
@pytest.mark.asyncio
async def test_load_denied_tools(
self,
validator: ActionValidator,
) -> None:
"""Test loading denied tools from policy."""
policy = SafetyPolicy(
name="test",
denied_tools=["shell_*", "exec_*"],
)
validator.load_rules_from_policy(policy)
# Should have 2 deny rules
deny_rules = [r for r in validator._rules if r.decision == SafetyDecision.DENY]
assert len(deny_rules) == 2
@pytest.mark.asyncio
async def test_load_approval_patterns(
self,
validator: ActionValidator,
) -> None:
"""Test loading approval patterns from policy."""
policy = SafetyPolicy(
name="test",
require_approval_for=["database_*"],
)
validator.load_rules_from_policy(policy)
approval_rules = [
r for r in validator._rules if r.decision == SafetyDecision.REQUIRE_APPROVAL
]
assert len(approval_rules) == 1
class TestValidationBatch:
"""Tests for batch validation."""
@pytest.mark.asyncio
async def test_validate_batch(
self,
validator: ActionValidator,
) -> None:
"""Test validating multiple actions."""
validator.add_rule(
create_deny_rule(
name="deny_shell",
tool_patterns=["shell_*"],
)
)
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
actions = [
ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
metadata=metadata,
),
ActionRequest(
action_type=ActionType.SHELL_COMMAND,
tool_name="shell_exec",
metadata=metadata,
),
]
results = await validator.validate_batch(actions)
assert len(results) == 2
assert results[0].decision == SafetyDecision.ALLOW
assert results[1].decision == SafetyDecision.DENY
class TestValidationCache:
"""Tests for ValidationCache class."""
@pytest.mark.asyncio
async def test_cache_get_miss(self) -> None:
"""Test cache miss."""
from app.services.safety.validation.validator import ValidationCache
cache = ValidationCache(max_size=10, ttl_seconds=60)
result = await cache.get("nonexistent")
assert result is None
@pytest.mark.asyncio
async def test_cache_get_hit(self) -> None:
"""Test cache hit."""
from app.services.safety.models import ValidationResult
from app.services.safety.validation.validator import ValidationCache
cache = ValidationCache(max_size=10, ttl_seconds=60)
vr = ValidationResult(
action_id="action-1",
decision=SafetyDecision.ALLOW,
applied_rules=[],
reasons=["test"],
)
await cache.set("key1", vr)
result = await cache.get("key1")
assert result is not None
assert result.action_id == "action-1"
@pytest.mark.asyncio
async def test_cache_ttl_expiry(self) -> None:
"""Test cache TTL expiry."""
import time
from unittest.mock import patch
from app.services.safety.models import ValidationResult
from app.services.safety.validation.validator import ValidationCache
cache = ValidationCache(max_size=10, ttl_seconds=1)
vr = ValidationResult(
action_id="action-1",
decision=SafetyDecision.ALLOW,
applied_rules=[],
reasons=["test"],
)
await cache.set("key1", vr)
# Advance time past TTL
with patch("time.time", return_value=time.time() + 2):
result = await cache.get("key1")
assert result is None # Should be expired
@pytest.mark.asyncio
async def test_cache_eviction_on_full(self) -> None:
"""Test cache eviction when full."""
from app.services.safety.models import ValidationResult
from app.services.safety.validation.validator import ValidationCache
cache = ValidationCache(max_size=2, ttl_seconds=60)
vr1 = ValidationResult(action_id="a1", decision=SafetyDecision.ALLOW)
vr2 = ValidationResult(action_id="a2", decision=SafetyDecision.ALLOW)
vr3 = ValidationResult(action_id="a3", decision=SafetyDecision.ALLOW)
await cache.set("key1", vr1)
await cache.set("key2", vr2)
await cache.set("key3", vr3) # Should evict key1
# key1 should be evicted
assert await cache.get("key1") is None
assert await cache.get("key2") is not None
assert await cache.get("key3") is not None
@pytest.mark.asyncio
async def test_cache_update_existing_key(self) -> None:
"""Test updating existing key in cache."""
from app.services.safety.models import ValidationResult
from app.services.safety.validation.validator import ValidationCache
cache = ValidationCache(max_size=10, ttl_seconds=60)
vr1 = ValidationResult(action_id="a1", decision=SafetyDecision.ALLOW)
vr2 = ValidationResult(action_id="a1-updated", decision=SafetyDecision.DENY)
await cache.set("key1", vr1)
await cache.set("key1", vr2) # Should update, not add
result = await cache.get("key1")
assert result is not None
assert result.action_id == "a1" # Still old value since we move_to_end
@pytest.mark.asyncio
async def test_cache_clear(self) -> None:
"""Test clearing cache."""
from app.services.safety.models import ValidationResult
from app.services.safety.validation.validator import ValidationCache
cache = ValidationCache(max_size=10, ttl_seconds=60)
vr = ValidationResult(action_id="a1", decision=SafetyDecision.ALLOW)
await cache.set("key1", vr)
await cache.set("key2", vr)
await cache.clear()
assert await cache.get("key1") is None
assert await cache.get("key2") is None
class TestValidatorCaching:
"""Tests for validator caching functionality."""
@pytest.mark.asyncio
async def test_cache_hit(self) -> None:
"""Test that cache is used for repeated validations."""
validator = ActionValidator(cache_enabled=True, cache_ttl=60)
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
action = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
resource="/tmp/test.txt", # noqa: S108
metadata=metadata,
)
# First call populates cache
result1 = await validator.validate(action)
# Second call should use cache
result2 = await validator.validate(action)
assert result1.decision == result2.decision
@pytest.mark.asyncio
async def test_clear_cache(self) -> None:
"""Test clearing the validation cache."""
validator = ActionValidator(cache_enabled=True)
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
action = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
metadata=metadata,
)
await validator.validate(action)
await validator.clear_cache()
# Cache should be empty now (no error)
result = await validator.validate(action)
assert result.decision == SafetyDecision.ALLOW
class TestRuleMatching:
"""Tests for rule matching edge cases."""
@pytest.mark.asyncio
async def test_action_type_mismatch(self) -> None:
"""Test that rule doesn't match when action type doesn't match."""
validator = ActionValidator(cache_enabled=False)
validator.add_rule(
ValidationRule(
name="file_only",
action_types=[ActionType.FILE_READ],
decision=SafetyDecision.DENY,
)
)
metadata = ActionMetadata(agent_id="test-agent")
action = ActionRequest(
action_type=ActionType.SHELL_COMMAND, # Different type
tool_name="shell_exec",
metadata=metadata,
)
result = await validator.validate(action)
assert result.decision == SafetyDecision.ALLOW # Rule didn't match
@pytest.mark.asyncio
async def test_tool_pattern_no_tool_name(self) -> None:
"""Test rule with tool pattern when action has no tool_name."""
validator = ActionValidator(cache_enabled=False)
validator.add_rule(
create_deny_rule(
name="deny_files",
tool_patterns=["file_*"],
)
)
metadata = ActionMetadata(agent_id="test-agent")
action = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name=None, # No tool name
metadata=metadata,
)
result = await validator.validate(action)
assert result.decision == SafetyDecision.ALLOW # Rule didn't match
@pytest.mark.asyncio
async def test_resource_pattern_no_resource(self) -> None:
"""Test rule with resource pattern when action has no resource."""
validator = ActionValidator(cache_enabled=False)
validator.add_rule(
create_deny_rule(
name="deny_secrets",
resource_patterns=["/secret/*"],
)
)
metadata = ActionMetadata(agent_id="test-agent")
action = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
resource=None, # No resource
metadata=metadata,
)
result = await validator.validate(action)
assert result.decision == SafetyDecision.ALLOW # Rule didn't match
@pytest.mark.asyncio
async def test_resource_pattern_no_match(self) -> None:
"""Test rule with resource pattern that doesn't match."""
validator = ActionValidator(cache_enabled=False)
validator.add_rule(
create_deny_rule(
name="deny_secrets",
resource_patterns=["/secret/*"],
)
)
metadata = ActionMetadata(agent_id="test-agent")
action = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
resource="/public/file.txt", # Doesn't match
metadata=metadata,
)
result = await validator.validate(action)
assert result.decision == SafetyDecision.ALLOW # Pattern didn't match
class TestPolicyLoading:
"""Tests for policy loading edge cases."""
@pytest.mark.asyncio
async def test_load_rules_from_policy_with_validation_rules(self) -> None:
"""Test loading policy with explicit validation rules."""
validator = ActionValidator(cache_enabled=False)
rule = ValidationRule(
name="policy_rule",
tool_patterns=["test_*"],
decision=SafetyDecision.DENY,
reason="From policy",
)
policy = SafetyPolicy(
name="test",
validation_rules=[rule],
require_approval_for=[], # Clear defaults
denied_tools=[], # Clear defaults
)
validator.load_rules_from_policy(policy)
assert len(validator._rules) == 1
assert validator._rules[0].name == "policy_rule"
@pytest.mark.asyncio
async def test_load_approval_all_pattern(self) -> None:
"""Test loading policy with * approval pattern (all actions)."""
validator = ActionValidator(cache_enabled=False)
policy = SafetyPolicy(
name="test",
require_approval_for=["*"], # All actions require approval
denied_tools=[], # Clear defaults
)
validator.load_rules_from_policy(policy)
approval_rules = [
r for r in validator._rules if r.decision == SafetyDecision.REQUIRE_APPROVAL
]
assert len(approval_rules) == 1
assert approval_rules[0].name == "require_approval_all"
assert approval_rules[0].action_types == list(ActionType)
@pytest.mark.asyncio
async def test_validate_with_policy_loads_rules(self) -> None:
"""Test that validate() loads rules from policy if none exist."""
validator = ActionValidator(cache_enabled=False)
policy = SafetyPolicy(
name="test",
denied_tools=["dangerous_*"],
)
metadata = ActionMetadata(agent_id="test-agent")
action = ActionRequest(
action_type=ActionType.SHELL_COMMAND,
tool_name="dangerous_exec",
metadata=metadata,
)
# Validate with policy - should load rules
result = await validator.validate(action, policy=policy)
assert result.decision == SafetyDecision.DENY
class TestCacheKeyGeneration:
"""Tests for cache key generation."""
def test_get_cache_key(self) -> None:
"""Test cache key generation."""
validator = ActionValidator(cache_enabled=True)
metadata = ActionMetadata(
agent_id="test-agent",
autonomy_level=AutonomyLevel.MILESTONE,
)
action = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
resource="/tmp/test.txt", # noqa: S108
metadata=metadata,
)
key = validator._get_cache_key(action)
assert "file_read" in key
assert "file_read" in key
assert "/tmp/test.txt" in key # noqa: S108
assert "test-agent" in key
assert "milestone" in key
def test_get_cache_key_no_resource(self) -> None:
"""Test cache key generation without resource."""
validator = ActionValidator(cache_enabled=True)
metadata = ActionMetadata(agent_id="agent-1")
action = ActionRequest(
action_type=ActionType.SHELL_COMMAND,
tool_name="shell_exec",
resource=None,
metadata=metadata,
)
key = validator._get_cache_key(action)
# Should not error with None resource
assert "shell" in key
assert "agent-1" in key
class TestHelperFunctions:
"""Tests for rule creation helper functions."""
def test_create_allow_rule(self) -> None:
"""Test creating an allow rule."""
rule = create_allow_rule(
name="allow_test",
tool_patterns=["test_*"],
priority=50,
)
assert rule.name == "allow_test"
assert rule.decision == SafetyDecision.ALLOW
assert rule.priority == 50
def test_create_deny_rule(self) -> None:
"""Test creating a deny rule."""
rule = create_deny_rule(
name="deny_test",
tool_patterns=["dangerous_*"],
reason="Too dangerous",
)
assert rule.name == "deny_test"
assert rule.decision == SafetyDecision.DENY
assert rule.reason == "Too dangerous"
assert rule.priority == 100 # Default priority for deny
def test_create_approval_rule(self) -> None:
"""Test creating an approval rule."""
rule = create_approval_rule(
name="approve_test",
action_types=[ActionType.DATABASE_MUTATE],
)
assert rule.name == "approve_test"
assert rule.decision == SafetyDecision.REQUIRE_APPROVAL
assert rule.priority == 50 # Default priority for approval