forked from cardosofelipe/pragma-stack
- Add tests for models: ActionMetadata, ActionRequest, ActionResult, ValidationRule, BudgetStatus, RateLimitConfig, ApprovalRequest/Response, Checkpoint, RollbackResult, AuditEvent, SafetyPolicy, GuardianResult - Add tests for validation: ActionValidator rules, priorities, patterns, bypass mode, batch validation, rule creation helpers - Add tests for loops: LoopDetector exact/semantic/oscillation detection, LoopBreaker throttle/backoff, history management - Add tests for content filter: PII filtering (email, phone, SSN, credit card), secret blocking (API keys, GitHub tokens, private keys), custom patterns, scan without filtering, dict filtering - Add tests for emergency controls: state management, pause/resume/reset, scoped emergency stops, callbacks, EmergencyTrigger events - Fix exception kwargs in content filter and emergency controls to match exception class signatures All 108 tests passing with lint and type checks clean.
405 lines
12 KiB
Python
405 lines
12 KiB
Python
"""Tests for safety validation module."""
|
|
|
|
import pytest
|
|
|
|
from app.services.safety.models import (
|
|
ActionMetadata,
|
|
ActionRequest,
|
|
ActionType,
|
|
AutonomyLevel,
|
|
SafetyDecision,
|
|
SafetyPolicy,
|
|
ValidationRule,
|
|
)
|
|
from app.services.safety.validation.validator import (
|
|
ActionValidator,
|
|
create_allow_rule,
|
|
create_approval_rule,
|
|
create_deny_rule,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def validator() -> ActionValidator:
|
|
"""Create a fresh ActionValidator."""
|
|
return ActionValidator(cache_enabled=False)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_action() -> ActionRequest:
|
|
"""Create a sample action request."""
|
|
metadata = ActionMetadata(
|
|
agent_id="test-agent",
|
|
session_id="test-session",
|
|
autonomy_level=AutonomyLevel.MILESTONE,
|
|
)
|
|
return ActionRequest(
|
|
action_type=ActionType.FILE_READ,
|
|
tool_name="file_read",
|
|
resource="/tmp/test.txt", # noqa: S108
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
class TestActionValidator:
|
|
"""Tests for ActionValidator class."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_rules_allows_by_default(
|
|
self,
|
|
validator: ActionValidator,
|
|
sample_action: ActionRequest,
|
|
) -> None:
|
|
"""Test that actions are allowed by default with no rules."""
|
|
result = await validator.validate(sample_action)
|
|
|
|
assert result.decision == SafetyDecision.ALLOW
|
|
assert "No matching rules" in result.reasons[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deny_rule_blocks_action(
|
|
self,
|
|
validator: ActionValidator,
|
|
) -> None:
|
|
"""Test that a deny rule blocks matching actions."""
|
|
validator.add_rule(
|
|
create_deny_rule(
|
|
name="deny_shell",
|
|
tool_patterns=["shell_*"],
|
|
reason="Shell commands not allowed",
|
|
)
|
|
)
|
|
|
|
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
|
|
action = ActionRequest(
|
|
action_type=ActionType.SHELL_COMMAND,
|
|
tool_name="shell_exec",
|
|
metadata=metadata,
|
|
)
|
|
|
|
result = await validator.validate(action)
|
|
|
|
assert result.decision == SafetyDecision.DENY
|
|
assert len(result.applied_rules) == 1 # One rule applied
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_approval_rule_requires_approval(
|
|
self,
|
|
validator: ActionValidator,
|
|
) -> None:
|
|
"""Test that an approval rule requires approval."""
|
|
validator.add_rule(
|
|
create_approval_rule(
|
|
name="approve_db",
|
|
tool_patterns=["database_*"],
|
|
reason="Database operations require approval",
|
|
)
|
|
)
|
|
|
|
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
|
|
action = ActionRequest(
|
|
action_type=ActionType.DATABASE_MUTATE,
|
|
tool_name="database_delete",
|
|
metadata=metadata,
|
|
)
|
|
|
|
result = await validator.validate(action)
|
|
|
|
assert result.decision == SafetyDecision.REQUIRE_APPROVAL
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deny_takes_precedence(
|
|
self,
|
|
validator: ActionValidator,
|
|
) -> None:
|
|
"""Test that deny rules take precedence over allow rules."""
|
|
validator.add_rule(
|
|
create_allow_rule(
|
|
name="allow_files",
|
|
tool_patterns=["file_*"],
|
|
priority=10,
|
|
)
|
|
)
|
|
validator.add_rule(
|
|
create_deny_rule(
|
|
name="deny_delete",
|
|
action_types=[ActionType.FILE_DELETE],
|
|
priority=100,
|
|
)
|
|
)
|
|
|
|
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
|
|
action = ActionRequest(
|
|
action_type=ActionType.FILE_DELETE,
|
|
tool_name="file_delete",
|
|
metadata=metadata,
|
|
)
|
|
|
|
result = await validator.validate(action)
|
|
|
|
assert result.decision == SafetyDecision.DENY
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rule_priority_ordering(
|
|
self,
|
|
validator: ActionValidator,
|
|
) -> None:
|
|
"""Test that rules are evaluated in priority order."""
|
|
validator.add_rule(
|
|
ValidationRule(
|
|
name="low_priority",
|
|
priority=1,
|
|
decision=SafetyDecision.ALLOW,
|
|
)
|
|
)
|
|
validator.add_rule(
|
|
ValidationRule(
|
|
name="high_priority",
|
|
priority=100,
|
|
decision=SafetyDecision.DENY,
|
|
)
|
|
)
|
|
|
|
# High priority should be first in the list
|
|
assert validator._rules[0].name == "high_priority"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disabled_rule_not_applied(
|
|
self,
|
|
validator: ActionValidator,
|
|
sample_action: ActionRequest,
|
|
) -> None:
|
|
"""Test that disabled rules are not applied."""
|
|
rule = create_deny_rule(
|
|
name="deny_all",
|
|
tool_patterns=["*"],
|
|
)
|
|
rule.enabled = False
|
|
validator.add_rule(rule)
|
|
|
|
result = await validator.validate(sample_action)
|
|
|
|
assert result.decision == SafetyDecision.ALLOW
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resource_pattern_matching(
|
|
self,
|
|
validator: ActionValidator,
|
|
) -> None:
|
|
"""Test resource pattern matching."""
|
|
validator.add_rule(
|
|
create_deny_rule(
|
|
name="deny_secrets",
|
|
resource_patterns=["*/secrets/*", "*.env"],
|
|
)
|
|
)
|
|
|
|
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
|
|
action = ActionRequest(
|
|
action_type=ActionType.FILE_READ,
|
|
tool_name="file_read",
|
|
resource="/app/secrets/api_key.txt",
|
|
metadata=metadata,
|
|
)
|
|
|
|
result = await validator.validate(action)
|
|
|
|
assert result.decision == SafetyDecision.DENY
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_agent_id_filter(
|
|
self,
|
|
validator: ActionValidator,
|
|
) -> None:
|
|
"""Test filtering by agent ID."""
|
|
rule = ValidationRule(
|
|
name="restrict_agent",
|
|
agent_ids=["restricted-agent"],
|
|
decision=SafetyDecision.DENY,
|
|
reason="Restricted agent",
|
|
)
|
|
validator.add_rule(rule)
|
|
|
|
# Restricted agent should be denied
|
|
metadata1 = ActionMetadata(agent_id="restricted-agent")
|
|
action1 = ActionRequest(
|
|
action_type=ActionType.FILE_READ,
|
|
tool_name="file_read",
|
|
metadata=metadata1,
|
|
)
|
|
result1 = await validator.validate(action1)
|
|
assert result1.decision == SafetyDecision.DENY
|
|
|
|
# Other agents should be allowed
|
|
metadata2 = ActionMetadata(agent_id="normal-agent")
|
|
action2 = ActionRequest(
|
|
action_type=ActionType.FILE_READ,
|
|
tool_name="file_read",
|
|
metadata=metadata2,
|
|
)
|
|
result2 = await validator.validate(action2)
|
|
assert result2.decision == SafetyDecision.ALLOW
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bypass_mode(
|
|
self,
|
|
validator: ActionValidator,
|
|
sample_action: ActionRequest,
|
|
) -> None:
|
|
"""Test validation bypass mode."""
|
|
validator.add_rule(create_deny_rule(name="deny_all", tool_patterns=["*"]))
|
|
|
|
# Should be denied normally
|
|
result1 = await validator.validate(sample_action)
|
|
assert result1.decision == SafetyDecision.DENY
|
|
|
|
# Enable bypass
|
|
validator.enable_bypass("Emergency situation")
|
|
result2 = await validator.validate(sample_action)
|
|
assert result2.decision == SafetyDecision.ALLOW
|
|
assert "bypassed" in result2.reasons[0].lower()
|
|
|
|
# Disable bypass
|
|
validator.disable_bypass()
|
|
result3 = await validator.validate(sample_action)
|
|
assert result3.decision == SafetyDecision.DENY
|
|
|
|
def test_remove_rule(self, validator: ActionValidator) -> None:
|
|
"""Test removing a rule."""
|
|
rule = create_deny_rule(name="test_rule", tool_patterns=["test"])
|
|
validator.add_rule(rule)
|
|
|
|
assert len(validator._rules) == 1
|
|
assert validator.remove_rule(rule.id) is True
|
|
assert len(validator._rules) == 0
|
|
|
|
def test_remove_nonexistent_rule(self, validator: ActionValidator) -> None:
|
|
"""Test removing a nonexistent rule returns False."""
|
|
assert validator.remove_rule("nonexistent") is False
|
|
|
|
def test_clear_rules(self, validator: ActionValidator) -> None:
|
|
"""Test clearing all rules."""
|
|
validator.add_rule(create_deny_rule(name="rule1", tool_patterns=["a"]))
|
|
validator.add_rule(create_deny_rule(name="rule2", tool_patterns=["b"]))
|
|
|
|
assert len(validator._rules) == 2
|
|
validator.clear_rules()
|
|
assert len(validator._rules) == 0
|
|
|
|
|
|
class TestLoadRulesFromPolicy:
|
|
"""Tests for loading rules from policies."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_load_denied_tools(
|
|
self,
|
|
validator: ActionValidator,
|
|
) -> None:
|
|
"""Test loading denied tools from policy."""
|
|
policy = SafetyPolicy(
|
|
name="test",
|
|
denied_tools=["shell_*", "exec_*"],
|
|
)
|
|
|
|
validator.load_rules_from_policy(policy)
|
|
|
|
# Should have 2 deny rules
|
|
deny_rules = [r for r in validator._rules if r.decision == SafetyDecision.DENY]
|
|
assert len(deny_rules) == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_load_approval_patterns(
|
|
self,
|
|
validator: ActionValidator,
|
|
) -> None:
|
|
"""Test loading approval patterns from policy."""
|
|
policy = SafetyPolicy(
|
|
name="test",
|
|
require_approval_for=["database_*"],
|
|
)
|
|
|
|
validator.load_rules_from_policy(policy)
|
|
|
|
approval_rules = [
|
|
r for r in validator._rules
|
|
if r.decision == SafetyDecision.REQUIRE_APPROVAL
|
|
]
|
|
assert len(approval_rules) == 1
|
|
|
|
|
|
class TestValidationBatch:
|
|
"""Tests for batch validation."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_validate_batch(
|
|
self,
|
|
validator: ActionValidator,
|
|
) -> None:
|
|
"""Test validating multiple actions."""
|
|
validator.add_rule(
|
|
create_deny_rule(
|
|
name="deny_shell",
|
|
tool_patterns=["shell_*"],
|
|
)
|
|
)
|
|
|
|
metadata = ActionMetadata(agent_id="test-agent", session_id="session-1")
|
|
actions = [
|
|
ActionRequest(
|
|
action_type=ActionType.FILE_READ,
|
|
tool_name="file_read",
|
|
metadata=metadata,
|
|
),
|
|
ActionRequest(
|
|
action_type=ActionType.SHELL_COMMAND,
|
|
tool_name="shell_exec",
|
|
metadata=metadata,
|
|
),
|
|
]
|
|
|
|
results = await validator.validate_batch(actions)
|
|
|
|
assert len(results) == 2
|
|
assert results[0].decision == SafetyDecision.ALLOW
|
|
assert results[1].decision == SafetyDecision.DENY
|
|
|
|
|
|
class TestHelperFunctions:
|
|
"""Tests for rule creation helper functions."""
|
|
|
|
def test_create_allow_rule(self) -> None:
|
|
"""Test creating an allow rule."""
|
|
rule = create_allow_rule(
|
|
name="allow_test",
|
|
tool_patterns=["test_*"],
|
|
priority=50,
|
|
)
|
|
|
|
assert rule.name == "allow_test"
|
|
assert rule.decision == SafetyDecision.ALLOW
|
|
assert rule.priority == 50
|
|
|
|
def test_create_deny_rule(self) -> None:
|
|
"""Test creating a deny rule."""
|
|
rule = create_deny_rule(
|
|
name="deny_test",
|
|
tool_patterns=["dangerous_*"],
|
|
reason="Too dangerous",
|
|
)
|
|
|
|
assert rule.name == "deny_test"
|
|
assert rule.decision == SafetyDecision.DENY
|
|
assert rule.reason == "Too dangerous"
|
|
assert rule.priority == 100 # Default priority for deny
|
|
|
|
def test_create_approval_rule(self) -> None:
|
|
"""Test creating an approval rule."""
|
|
rule = create_approval_rule(
|
|
name="approve_test",
|
|
action_types=[ActionType.DATABASE_MUTATE],
|
|
)
|
|
|
|
assert rule.name == "approve_test"
|
|
assert rule.decision == SafetyDecision.REQUIRE_APPROVAL
|
|
assert rule.priority == 50 # Default priority for approval
|