"""Tests for safety validation module.""" import pytest from app.services.safety.models import ( ActionMetadata, ActionRequest, ActionType, AutonomyLevel, SafetyDecision, SafetyPolicy, ValidationRule, ) from app.services.safety.validation.validator import ( ActionValidator, create_allow_rule, create_approval_rule, create_deny_rule, ) @pytest.fixture def validator() -> ActionValidator: """Create a fresh ActionValidator.""" return ActionValidator(cache_enabled=False) @pytest.fixture def sample_action() -> ActionRequest: """Create a sample action request.""" metadata = ActionMetadata( agent_id="test-agent", session_id="test-session", autonomy_level=AutonomyLevel.MILESTONE, ) return ActionRequest( action_type=ActionType.FILE_READ, tool_name="file_read", resource="/tmp/test.txt", # noqa: S108 metadata=metadata, ) class TestActionValidator: """Tests for ActionValidator class.""" @pytest.mark.asyncio async def test_no_rules_allows_by_default( self, validator: ActionValidator, sample_action: ActionRequest, ) -> None: """Test that actions are allowed by default with no rules.""" result = await validator.validate(sample_action) assert result.decision == SafetyDecision.ALLOW assert "No matching rules" in result.reasons[0] @pytest.mark.asyncio async def test_deny_rule_blocks_action( self, validator: ActionValidator, ) -> None: """Test that a deny rule blocks matching actions.""" validator.add_rule( create_deny_rule( name="deny_shell", tool_patterns=["shell_*"], reason="Shell commands not allowed", ) ) metadata = ActionMetadata(agent_id="test-agent", session_id="session-1") action = ActionRequest( action_type=ActionType.SHELL_COMMAND, tool_name="shell_exec", metadata=metadata, ) result = await validator.validate(action) assert result.decision == SafetyDecision.DENY assert len(result.applied_rules) == 1 # One rule applied @pytest.mark.asyncio async def test_approval_rule_requires_approval( self, validator: ActionValidator, ) -> None: """Test that an approval rule requires approval.""" validator.add_rule( create_approval_rule( name="approve_db", tool_patterns=["database_*"], reason="Database operations require approval", ) ) metadata = ActionMetadata(agent_id="test-agent", session_id="session-1") action = ActionRequest( action_type=ActionType.DATABASE_MUTATE, tool_name="database_delete", metadata=metadata, ) result = await validator.validate(action) assert result.decision == SafetyDecision.REQUIRE_APPROVAL @pytest.mark.asyncio async def test_deny_takes_precedence( self, validator: ActionValidator, ) -> None: """Test that deny rules take precedence over allow rules.""" validator.add_rule( create_allow_rule( name="allow_files", tool_patterns=["file_*"], priority=10, ) ) validator.add_rule( create_deny_rule( name="deny_delete", action_types=[ActionType.FILE_DELETE], priority=100, ) ) metadata = ActionMetadata(agent_id="test-agent", session_id="session-1") action = ActionRequest( action_type=ActionType.FILE_DELETE, tool_name="file_delete", metadata=metadata, ) result = await validator.validate(action) assert result.decision == SafetyDecision.DENY @pytest.mark.asyncio async def test_rule_priority_ordering( self, validator: ActionValidator, ) -> None: """Test that rules are evaluated in priority order.""" validator.add_rule( ValidationRule( name="low_priority", priority=1, decision=SafetyDecision.ALLOW, ) ) validator.add_rule( ValidationRule( name="high_priority", priority=100, decision=SafetyDecision.DENY, ) ) # High priority should be first in the list assert validator._rules[0].name == "high_priority" @pytest.mark.asyncio async def test_disabled_rule_not_applied( self, validator: ActionValidator, sample_action: ActionRequest, ) -> None: """Test that disabled rules are not applied.""" rule = create_deny_rule( name="deny_all", tool_patterns=["*"], ) rule.enabled = False validator.add_rule(rule) result = await validator.validate(sample_action) assert result.decision == SafetyDecision.ALLOW @pytest.mark.asyncio async def test_resource_pattern_matching( self, validator: ActionValidator, ) -> None: """Test resource pattern matching.""" validator.add_rule( create_deny_rule( name="deny_secrets", resource_patterns=["*/secrets/*", "*.env"], ) ) metadata = ActionMetadata(agent_id="test-agent", session_id="session-1") action = ActionRequest( action_type=ActionType.FILE_READ, tool_name="file_read", resource="/app/secrets/api_key.txt", metadata=metadata, ) result = await validator.validate(action) assert result.decision == SafetyDecision.DENY @pytest.mark.asyncio async def test_agent_id_filter( self, validator: ActionValidator, ) -> None: """Test filtering by agent ID.""" rule = ValidationRule( name="restrict_agent", agent_ids=["restricted-agent"], decision=SafetyDecision.DENY, reason="Restricted agent", ) validator.add_rule(rule) # Restricted agent should be denied metadata1 = ActionMetadata(agent_id="restricted-agent") action1 = ActionRequest( action_type=ActionType.FILE_READ, tool_name="file_read", metadata=metadata1, ) result1 = await validator.validate(action1) assert result1.decision == SafetyDecision.DENY # Other agents should be allowed metadata2 = ActionMetadata(agent_id="normal-agent") action2 = ActionRequest( action_type=ActionType.FILE_READ, tool_name="file_read", metadata=metadata2, ) result2 = await validator.validate(action2) assert result2.decision == SafetyDecision.ALLOW @pytest.mark.asyncio async def test_bypass_mode( self, validator: ActionValidator, sample_action: ActionRequest, ) -> None: """Test validation bypass mode.""" validator.add_rule(create_deny_rule(name="deny_all", tool_patterns=["*"])) # Should be denied normally result1 = await validator.validate(sample_action) assert result1.decision == SafetyDecision.DENY # Enable bypass validator.enable_bypass("Emergency situation") result2 = await validator.validate(sample_action) assert result2.decision == SafetyDecision.ALLOW assert "bypassed" in result2.reasons[0].lower() # Disable bypass validator.disable_bypass() result3 = await validator.validate(sample_action) assert result3.decision == SafetyDecision.DENY def test_remove_rule(self, validator: ActionValidator) -> None: """Test removing a rule.""" rule = create_deny_rule(name="test_rule", tool_patterns=["test"]) validator.add_rule(rule) assert len(validator._rules) == 1 assert validator.remove_rule(rule.id) is True assert len(validator._rules) == 0 def test_remove_nonexistent_rule(self, validator: ActionValidator) -> None: """Test removing a nonexistent rule returns False.""" assert validator.remove_rule("nonexistent") is False def test_clear_rules(self, validator: ActionValidator) -> None: """Test clearing all rules.""" validator.add_rule(create_deny_rule(name="rule1", tool_patterns=["a"])) validator.add_rule(create_deny_rule(name="rule2", tool_patterns=["b"])) assert len(validator._rules) == 2 validator.clear_rules() assert len(validator._rules) == 0 class TestLoadRulesFromPolicy: """Tests for loading rules from policies.""" @pytest.mark.asyncio async def test_load_denied_tools( self, validator: ActionValidator, ) -> None: """Test loading denied tools from policy.""" policy = SafetyPolicy( name="test", denied_tools=["shell_*", "exec_*"], ) validator.load_rules_from_policy(policy) # Should have 2 deny rules deny_rules = [r for r in validator._rules if r.decision == SafetyDecision.DENY] assert len(deny_rules) == 2 @pytest.mark.asyncio async def test_load_approval_patterns( self, validator: ActionValidator, ) -> None: """Test loading approval patterns from policy.""" policy = SafetyPolicy( name="test", require_approval_for=["database_*"], ) validator.load_rules_from_policy(policy) approval_rules = [ r for r in validator._rules if r.decision == SafetyDecision.REQUIRE_APPROVAL ] assert len(approval_rules) == 1 class TestValidationBatch: """Tests for batch validation.""" @pytest.mark.asyncio async def test_validate_batch( self, validator: ActionValidator, ) -> None: """Test validating multiple actions.""" validator.add_rule( create_deny_rule( name="deny_shell", tool_patterns=["shell_*"], ) ) metadata = ActionMetadata(agent_id="test-agent", session_id="session-1") actions = [ ActionRequest( action_type=ActionType.FILE_READ, tool_name="file_read", metadata=metadata, ), ActionRequest( action_type=ActionType.SHELL_COMMAND, tool_name="shell_exec", metadata=metadata, ), ] results = await validator.validate_batch(actions) assert len(results) == 2 assert results[0].decision == SafetyDecision.ALLOW assert results[1].decision == SafetyDecision.DENY class TestHelperFunctions: """Tests for rule creation helper functions.""" def test_create_allow_rule(self) -> None: """Test creating an allow rule.""" rule = create_allow_rule( name="allow_test", tool_patterns=["test_*"], priority=50, ) assert rule.name == "allow_test" assert rule.decision == SafetyDecision.ALLOW assert rule.priority == 50 def test_create_deny_rule(self) -> None: """Test creating a deny rule.""" rule = create_deny_rule( name="deny_test", tool_patterns=["dangerous_*"], reason="Too dangerous", ) assert rule.name == "deny_test" assert rule.decision == SafetyDecision.DENY assert rule.reason == "Too dangerous" assert rule.priority == 100 # Default priority for deny def test_create_approval_rule(self) -> None: """Test creating an approval rule.""" rule = create_approval_rule( name="approve_test", action_types=[ActionType.DATABASE_MUTATE], ) assert rule.name == "approve_test" assert rule.decision == SafetyDecision.REQUIRE_APPROVAL assert rule.priority == 50 # Default priority for approval