forked from cardosofelipe/pragma-stack
- Add tests for models: ActionMetadata, ActionRequest, ActionResult, ValidationRule, BudgetStatus, RateLimitConfig, ApprovalRequest/Response, Checkpoint, RollbackResult, AuditEvent, SafetyPolicy, GuardianResult - Add tests for validation: ActionValidator rules, priorities, patterns, bypass mode, batch validation, rule creation helpers - Add tests for loops: LoopDetector exact/semantic/oscillation detection, LoopBreaker throttle/backoff, history management - Add tests for content filter: PII filtering (email, phone, SSN, credit card), secret blocking (API keys, GitHub tokens, private keys), custom patterns, scan without filtering, dict filtering - Add tests for emergency controls: state management, pause/resume/reset, scoped emergency stops, callbacks, EmergencyTrigger events - Fix exception kwargs in content filter and emergency controls to match exception class signatures All 108 tests passing with lint and type checks clean.
438 lines
13 KiB
Python
438 lines
13 KiB
Python
"""Tests for safety framework models."""
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.services.safety.models import (
|
|
ActionMetadata,
|
|
ActionRequest,
|
|
ActionResult,
|
|
ActionType,
|
|
ApprovalRequest,
|
|
ApprovalResponse,
|
|
ApprovalStatus,
|
|
AuditEvent,
|
|
AuditEventType,
|
|
AutonomyLevel,
|
|
BudgetScope,
|
|
BudgetStatus,
|
|
Checkpoint,
|
|
CheckpointType,
|
|
GuardianResult,
|
|
PermissionLevel,
|
|
RateLimitConfig,
|
|
RollbackResult,
|
|
SafetyDecision,
|
|
SafetyPolicy,
|
|
ValidationResult,
|
|
ValidationRule,
|
|
)
|
|
|
|
|
|
class TestActionMetadata:
|
|
"""Tests for ActionMetadata model."""
|
|
|
|
def test_create_with_defaults(self) -> None:
|
|
"""Test creating metadata with default values."""
|
|
metadata = ActionMetadata(
|
|
agent_id="agent-1",
|
|
)
|
|
|
|
assert metadata.agent_id == "agent-1"
|
|
assert metadata.autonomy_level == AutonomyLevel.MILESTONE
|
|
assert metadata.project_id is None
|
|
assert metadata.session_id is None
|
|
|
|
def test_create_with_all_fields(self) -> None:
|
|
"""Test creating metadata with all fields."""
|
|
metadata = ActionMetadata(
|
|
agent_id="agent-1",
|
|
session_id="session-1",
|
|
project_id="project-1",
|
|
user_id="user-1",
|
|
autonomy_level=AutonomyLevel.AUTONOMOUS,
|
|
)
|
|
|
|
assert metadata.project_id == "project-1"
|
|
assert metadata.user_id == "user-1"
|
|
assert metadata.autonomy_level == AutonomyLevel.AUTONOMOUS
|
|
|
|
|
|
class TestActionRequest:
|
|
"""Tests for ActionRequest model."""
|
|
|
|
def test_create_basic_action(self) -> None:
|
|
"""Test creating a basic action request."""
|
|
metadata = ActionMetadata(agent_id="agent-1", session_id="session-1")
|
|
action = ActionRequest(
|
|
action_type=ActionType.FILE_READ,
|
|
tool_name="file_read",
|
|
metadata=metadata,
|
|
)
|
|
|
|
assert action.action_type == ActionType.FILE_READ
|
|
assert action.tool_name == "file_read"
|
|
assert action.id is not None
|
|
assert action.metadata.agent_id == "agent-1"
|
|
|
|
def test_action_with_arguments(self) -> None:
|
|
"""Test action with arguments."""
|
|
test_path = "/tmp/test.txt" # noqa: S108
|
|
metadata = ActionMetadata(agent_id="agent-1", session_id="session-1")
|
|
action = ActionRequest(
|
|
action_type=ActionType.FILE_WRITE,
|
|
tool_name="file_write",
|
|
arguments={"path": test_path, "content": "hello"},
|
|
resource=test_path,
|
|
metadata=metadata,
|
|
)
|
|
|
|
assert action.arguments["path"] == test_path
|
|
assert action.resource == test_path
|
|
|
|
|
|
class TestActionResult:
|
|
"""Tests for ActionResult model."""
|
|
|
|
def test_successful_result(self) -> None:
|
|
"""Test creating a successful result."""
|
|
result = ActionResult(
|
|
action_id="action-1",
|
|
success=True,
|
|
data={"output": "done"},
|
|
)
|
|
|
|
assert result.success is True
|
|
assert result.data["output"] == "done"
|
|
assert result.error is None
|
|
|
|
def test_failed_result(self) -> None:
|
|
"""Test creating a failed result."""
|
|
result = ActionResult(
|
|
action_id="action-1",
|
|
success=False,
|
|
error="Permission denied",
|
|
)
|
|
|
|
assert result.success is False
|
|
assert result.error == "Permission denied"
|
|
|
|
|
|
class TestValidationRule:
|
|
"""Tests for ValidationRule model."""
|
|
|
|
def test_create_rule(self) -> None:
|
|
"""Test creating a validation rule."""
|
|
rule = ValidationRule(
|
|
name="deny_shell",
|
|
description="Deny shell commands",
|
|
priority=100,
|
|
tool_patterns=["shell_*"],
|
|
decision=SafetyDecision.DENY,
|
|
reason="Shell commands are not allowed",
|
|
)
|
|
|
|
assert rule.name == "deny_shell"
|
|
assert rule.priority == 100
|
|
assert rule.decision == SafetyDecision.DENY
|
|
assert rule.enabled is True
|
|
|
|
def test_rule_defaults(self) -> None:
|
|
"""Test rule default values."""
|
|
rule = ValidationRule(name="test_rule", decision=SafetyDecision.ALLOW)
|
|
|
|
assert rule.id is not None
|
|
assert rule.priority == 0
|
|
assert rule.enabled is True
|
|
assert rule.decision == SafetyDecision.ALLOW
|
|
|
|
|
|
class TestValidationResult:
|
|
"""Tests for ValidationResult model."""
|
|
|
|
def test_allow_result(self) -> None:
|
|
"""Test an allow result."""
|
|
result = ValidationResult(
|
|
action_id="action-1",
|
|
decision=SafetyDecision.ALLOW,
|
|
applied_rules=["rule-1"],
|
|
reasons=["Action is allowed"],
|
|
)
|
|
|
|
assert result.decision == SafetyDecision.ALLOW
|
|
assert len(result.applied_rules) == 1
|
|
|
|
def test_deny_result(self) -> None:
|
|
"""Test a deny result."""
|
|
result = ValidationResult(
|
|
action_id="action-1",
|
|
decision=SafetyDecision.DENY,
|
|
applied_rules=["deny_rule"],
|
|
reasons=["Action is not permitted"],
|
|
)
|
|
|
|
assert result.decision == SafetyDecision.DENY
|
|
|
|
|
|
class TestBudgetStatus:
|
|
"""Tests for BudgetStatus model."""
|
|
|
|
def test_under_budget(self) -> None:
|
|
"""Test status when under budget."""
|
|
status = BudgetStatus(
|
|
scope=BudgetScope.SESSION,
|
|
scope_id="session-1",
|
|
tokens_used=5000,
|
|
tokens_limit=10000,
|
|
tokens_remaining=5000,
|
|
)
|
|
|
|
assert status.tokens_remaining == 5000
|
|
assert status.tokens_used == 5000
|
|
|
|
def test_over_budget(self) -> None:
|
|
"""Test status when over budget."""
|
|
status = BudgetStatus(
|
|
scope=BudgetScope.SESSION,
|
|
scope_id="session-1",
|
|
cost_used_usd=15.0,
|
|
cost_limit_usd=10.0,
|
|
cost_remaining_usd=0.0,
|
|
is_exceeded=True,
|
|
)
|
|
|
|
assert status.is_exceeded is True
|
|
assert status.cost_remaining_usd == 0.0
|
|
|
|
|
|
class TestRateLimitConfig:
|
|
"""Tests for RateLimitConfig model."""
|
|
|
|
def test_create_config(self) -> None:
|
|
"""Test creating rate limit config."""
|
|
config = RateLimitConfig(
|
|
name="actions",
|
|
limit=60,
|
|
window_seconds=60,
|
|
)
|
|
|
|
assert config.name == "actions"
|
|
assert config.limit == 60
|
|
assert config.window_seconds == 60
|
|
|
|
|
|
class TestApprovalRequest:
|
|
"""Tests for ApprovalRequest model."""
|
|
|
|
def test_create_request(self) -> None:
|
|
"""Test creating an approval request."""
|
|
metadata = ActionMetadata(agent_id="agent-1", session_id="session-1")
|
|
action = ActionRequest(
|
|
action_type=ActionType.DATABASE_MUTATE,
|
|
tool_name="db_delete",
|
|
metadata=metadata,
|
|
)
|
|
|
|
request = ApprovalRequest(
|
|
id="approval-1",
|
|
action=action,
|
|
reason="Database mutation requires approval",
|
|
urgency="high",
|
|
timeout_seconds=300,
|
|
)
|
|
|
|
assert request.id == "approval-1"
|
|
assert request.urgency == "high"
|
|
assert request.timeout_seconds == 300
|
|
|
|
|
|
class TestApprovalResponse:
|
|
"""Tests for ApprovalResponse model."""
|
|
|
|
def test_approved_response(self) -> None:
|
|
"""Test an approved response."""
|
|
response = ApprovalResponse(
|
|
request_id="approval-1",
|
|
status=ApprovalStatus.APPROVED,
|
|
decided_by="admin",
|
|
reason="Looks safe",
|
|
)
|
|
|
|
assert response.status == ApprovalStatus.APPROVED
|
|
assert response.decided_by == "admin"
|
|
|
|
def test_denied_response(self) -> None:
|
|
"""Test a denied response."""
|
|
response = ApprovalResponse(
|
|
request_id="approval-1",
|
|
status=ApprovalStatus.DENIED,
|
|
decided_by="admin",
|
|
reason="Too risky",
|
|
)
|
|
|
|
assert response.status == ApprovalStatus.DENIED
|
|
|
|
|
|
class TestCheckpoint:
|
|
"""Tests for Checkpoint model."""
|
|
|
|
def test_create_checkpoint(self) -> None:
|
|
"""Test creating a checkpoint."""
|
|
test_path = "/tmp/test.txt" # noqa: S108
|
|
checkpoint = Checkpoint(
|
|
id="checkpoint-1",
|
|
checkpoint_type=CheckpointType.FILE,
|
|
action_id="action-1",
|
|
created_at=datetime.utcnow(),
|
|
data={"path": test_path},
|
|
description="File checkpoint",
|
|
)
|
|
|
|
assert checkpoint.id == "checkpoint-1"
|
|
assert checkpoint.is_valid is True
|
|
|
|
def test_expired_checkpoint(self) -> None:
|
|
"""Test an expired checkpoint."""
|
|
checkpoint = Checkpoint(
|
|
id="checkpoint-1",
|
|
checkpoint_type=CheckpointType.FILE,
|
|
action_id="action-1",
|
|
created_at=datetime.utcnow() - timedelta(hours=2),
|
|
expires_at=datetime.utcnow() - timedelta(hours=1),
|
|
data={},
|
|
)
|
|
|
|
# is_valid is a simple bool, not computed from expires_at
|
|
# The RollbackManager handles expiration logic
|
|
assert checkpoint.is_valid is True # Default value
|
|
|
|
|
|
class TestRollbackResult:
|
|
"""Tests for RollbackResult model."""
|
|
|
|
def test_successful_rollback(self) -> None:
|
|
"""Test a successful rollback."""
|
|
result = RollbackResult(
|
|
checkpoint_id="checkpoint-1",
|
|
success=True,
|
|
actions_rolled_back=["file:/tmp/test.txt"],
|
|
failed_actions=[],
|
|
)
|
|
|
|
assert result.success is True
|
|
assert len(result.actions_rolled_back) == 1
|
|
|
|
def test_partial_rollback(self) -> None:
|
|
"""Test a partial rollback."""
|
|
result = RollbackResult(
|
|
checkpoint_id="checkpoint-1",
|
|
success=False,
|
|
actions_rolled_back=["file:/tmp/a.txt"],
|
|
failed_actions=["file:/tmp/b.txt"],
|
|
error="Failed to rollback 1 item",
|
|
)
|
|
|
|
assert result.success is False
|
|
assert len(result.failed_actions) == 1
|
|
|
|
|
|
class TestAuditEvent:
|
|
"""Tests for AuditEvent model."""
|
|
|
|
def test_create_event(self) -> None:
|
|
"""Test creating an audit event."""
|
|
event = AuditEvent(
|
|
id="event-1",
|
|
event_type=AuditEventType.ACTION_EXECUTED,
|
|
timestamp=datetime.utcnow(),
|
|
agent_id="agent-1",
|
|
action_id="action-1",
|
|
data={"tool": "file_read"},
|
|
)
|
|
|
|
assert event.event_type == AuditEventType.ACTION_EXECUTED
|
|
assert event.agent_id == "agent-1"
|
|
|
|
|
|
class TestSafetyPolicy:
|
|
"""Tests for SafetyPolicy model."""
|
|
|
|
def test_default_policy(self) -> None:
|
|
"""Test creating a default policy."""
|
|
policy = SafetyPolicy(
|
|
name="default",
|
|
description="Default safety policy",
|
|
)
|
|
|
|
assert policy.name == "default"
|
|
assert policy.enabled is True
|
|
|
|
def test_restrictive_policy(self) -> None:
|
|
"""Test creating a restrictive policy."""
|
|
policy = SafetyPolicy(
|
|
name="restrictive",
|
|
description="Restrictive policy",
|
|
denied_tools=["shell_*", "exec_*"],
|
|
require_approval_for=["database_*", "git_push"],
|
|
)
|
|
|
|
assert len(policy.denied_tools) == 2
|
|
assert len(policy.require_approval_for) == 2
|
|
|
|
|
|
class TestGuardianResult:
|
|
"""Tests for GuardianResult model."""
|
|
|
|
def test_allowed_result(self) -> None:
|
|
"""Test an allowed result."""
|
|
result = GuardianResult(
|
|
action_id="action-1",
|
|
allowed=True,
|
|
decision=SafetyDecision.ALLOW,
|
|
reasons=["All checks passed"],
|
|
)
|
|
|
|
assert result.decision == SafetyDecision.ALLOW
|
|
assert result.allowed is True
|
|
assert result.approval_id is None
|
|
|
|
def test_approval_required_result(self) -> None:
|
|
"""Test a result requiring approval."""
|
|
result = GuardianResult(
|
|
action_id="action-1",
|
|
allowed=False,
|
|
decision=SafetyDecision.REQUIRE_APPROVAL,
|
|
reasons=["Action requires human approval"],
|
|
approval_id="approval-123",
|
|
)
|
|
|
|
assert result.decision == SafetyDecision.REQUIRE_APPROVAL
|
|
assert result.approval_id == "approval-123"
|
|
|
|
|
|
class TestEnums:
|
|
"""Tests for enum values."""
|
|
|
|
def test_action_types(self) -> None:
|
|
"""Test action type enum values."""
|
|
assert ActionType.FILE_READ.value == "file_read"
|
|
assert ActionType.SHELL_COMMAND.value == "shell_command"
|
|
|
|
def test_autonomy_levels(self) -> None:
|
|
"""Test autonomy level enum values."""
|
|
assert AutonomyLevel.FULL_CONTROL.value == "full_control"
|
|
assert AutonomyLevel.MILESTONE.value == "milestone"
|
|
assert AutonomyLevel.AUTONOMOUS.value == "autonomous"
|
|
|
|
def test_permission_levels(self) -> None:
|
|
"""Test permission level enum values."""
|
|
assert PermissionLevel.NONE.value == "none"
|
|
assert PermissionLevel.READ.value == "read"
|
|
assert PermissionLevel.WRITE.value == "write"
|
|
assert PermissionLevel.ADMIN.value == "admin"
|
|
|
|
def test_safety_decisions(self) -> None:
|
|
"""Test safety decision enum values."""
|
|
assert SafetyDecision.ALLOW.value == "allow"
|
|
assert SafetyDecision.DENY.value == "deny"
|
|
assert SafetyDecision.REQUIRE_APPROVAL.value == "require_approval"
|