Files
fast-next-template/backend/tests/services/safety/test_models.py
Felipe Cardoso 015f2de6c6 test(safety): add Phase E comprehensive safety tests
- Add tests for models: ActionMetadata, ActionRequest, ActionResult,
  ValidationRule, BudgetStatus, RateLimitConfig, ApprovalRequest/Response,
  Checkpoint, RollbackResult, AuditEvent, SafetyPolicy, GuardianResult
- Add tests for validation: ActionValidator rules, priorities, patterns,
  bypass mode, batch validation, rule creation helpers
- Add tests for loops: LoopDetector exact/semantic/oscillation detection,
  LoopBreaker throttle/backoff, history management
- Add tests for content filter: PII filtering (email, phone, SSN, credit card),
  secret blocking (API keys, GitHub tokens, private keys), custom patterns,
  scan without filtering, dict filtering
- Add tests for emergency controls: state management, pause/resume/reset,
  scoped emergency stops, callbacks, EmergencyTrigger events
- Fix exception kwargs in content filter and emergency controls to match
  exception class signatures

All 108 tests passing with lint and type checks clean.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-03 11:52:35 +01:00

438 lines
13 KiB
Python

"""Tests for safety framework models."""
from datetime import datetime, timedelta
from app.services.safety.models import (
ActionMetadata,
ActionRequest,
ActionResult,
ActionType,
ApprovalRequest,
ApprovalResponse,
ApprovalStatus,
AuditEvent,
AuditEventType,
AutonomyLevel,
BudgetScope,
BudgetStatus,
Checkpoint,
CheckpointType,
GuardianResult,
PermissionLevel,
RateLimitConfig,
RollbackResult,
SafetyDecision,
SafetyPolicy,
ValidationResult,
ValidationRule,
)
class TestActionMetadata:
"""Tests for ActionMetadata model."""
def test_create_with_defaults(self) -> None:
"""Test creating metadata with default values."""
metadata = ActionMetadata(
agent_id="agent-1",
)
assert metadata.agent_id == "agent-1"
assert metadata.autonomy_level == AutonomyLevel.MILESTONE
assert metadata.project_id is None
assert metadata.session_id is None
def test_create_with_all_fields(self) -> None:
"""Test creating metadata with all fields."""
metadata = ActionMetadata(
agent_id="agent-1",
session_id="session-1",
project_id="project-1",
user_id="user-1",
autonomy_level=AutonomyLevel.AUTONOMOUS,
)
assert metadata.project_id == "project-1"
assert metadata.user_id == "user-1"
assert metadata.autonomy_level == AutonomyLevel.AUTONOMOUS
class TestActionRequest:
"""Tests for ActionRequest model."""
def test_create_basic_action(self) -> None:
"""Test creating a basic action request."""
metadata = ActionMetadata(agent_id="agent-1", session_id="session-1")
action = ActionRequest(
action_type=ActionType.FILE_READ,
tool_name="file_read",
metadata=metadata,
)
assert action.action_type == ActionType.FILE_READ
assert action.tool_name == "file_read"
assert action.id is not None
assert action.metadata.agent_id == "agent-1"
def test_action_with_arguments(self) -> None:
"""Test action with arguments."""
test_path = "/tmp/test.txt" # noqa: S108
metadata = ActionMetadata(agent_id="agent-1", session_id="session-1")
action = ActionRequest(
action_type=ActionType.FILE_WRITE,
tool_name="file_write",
arguments={"path": test_path, "content": "hello"},
resource=test_path,
metadata=metadata,
)
assert action.arguments["path"] == test_path
assert action.resource == test_path
class TestActionResult:
"""Tests for ActionResult model."""
def test_successful_result(self) -> None:
"""Test creating a successful result."""
result = ActionResult(
action_id="action-1",
success=True,
data={"output": "done"},
)
assert result.success is True
assert result.data["output"] == "done"
assert result.error is None
def test_failed_result(self) -> None:
"""Test creating a failed result."""
result = ActionResult(
action_id="action-1",
success=False,
error="Permission denied",
)
assert result.success is False
assert result.error == "Permission denied"
class TestValidationRule:
"""Tests for ValidationRule model."""
def test_create_rule(self) -> None:
"""Test creating a validation rule."""
rule = ValidationRule(
name="deny_shell",
description="Deny shell commands",
priority=100,
tool_patterns=["shell_*"],
decision=SafetyDecision.DENY,
reason="Shell commands are not allowed",
)
assert rule.name == "deny_shell"
assert rule.priority == 100
assert rule.decision == SafetyDecision.DENY
assert rule.enabled is True
def test_rule_defaults(self) -> None:
"""Test rule default values."""
rule = ValidationRule(name="test_rule", decision=SafetyDecision.ALLOW)
assert rule.id is not None
assert rule.priority == 0
assert rule.enabled is True
assert rule.decision == SafetyDecision.ALLOW
class TestValidationResult:
"""Tests for ValidationResult model."""
def test_allow_result(self) -> None:
"""Test an allow result."""
result = ValidationResult(
action_id="action-1",
decision=SafetyDecision.ALLOW,
applied_rules=["rule-1"],
reasons=["Action is allowed"],
)
assert result.decision == SafetyDecision.ALLOW
assert len(result.applied_rules) == 1
def test_deny_result(self) -> None:
"""Test a deny result."""
result = ValidationResult(
action_id="action-1",
decision=SafetyDecision.DENY,
applied_rules=["deny_rule"],
reasons=["Action is not permitted"],
)
assert result.decision == SafetyDecision.DENY
class TestBudgetStatus:
"""Tests for BudgetStatus model."""
def test_under_budget(self) -> None:
"""Test status when under budget."""
status = BudgetStatus(
scope=BudgetScope.SESSION,
scope_id="session-1",
tokens_used=5000,
tokens_limit=10000,
tokens_remaining=5000,
)
assert status.tokens_remaining == 5000
assert status.tokens_used == 5000
def test_over_budget(self) -> None:
"""Test status when over budget."""
status = BudgetStatus(
scope=BudgetScope.SESSION,
scope_id="session-1",
cost_used_usd=15.0,
cost_limit_usd=10.0,
cost_remaining_usd=0.0,
is_exceeded=True,
)
assert status.is_exceeded is True
assert status.cost_remaining_usd == 0.0
class TestRateLimitConfig:
"""Tests for RateLimitConfig model."""
def test_create_config(self) -> None:
"""Test creating rate limit config."""
config = RateLimitConfig(
name="actions",
limit=60,
window_seconds=60,
)
assert config.name == "actions"
assert config.limit == 60
assert config.window_seconds == 60
class TestApprovalRequest:
"""Tests for ApprovalRequest model."""
def test_create_request(self) -> None:
"""Test creating an approval request."""
metadata = ActionMetadata(agent_id="agent-1", session_id="session-1")
action = ActionRequest(
action_type=ActionType.DATABASE_MUTATE,
tool_name="db_delete",
metadata=metadata,
)
request = ApprovalRequest(
id="approval-1",
action=action,
reason="Database mutation requires approval",
urgency="high",
timeout_seconds=300,
)
assert request.id == "approval-1"
assert request.urgency == "high"
assert request.timeout_seconds == 300
class TestApprovalResponse:
"""Tests for ApprovalResponse model."""
def test_approved_response(self) -> None:
"""Test an approved response."""
response = ApprovalResponse(
request_id="approval-1",
status=ApprovalStatus.APPROVED,
decided_by="admin",
reason="Looks safe",
)
assert response.status == ApprovalStatus.APPROVED
assert response.decided_by == "admin"
def test_denied_response(self) -> None:
"""Test a denied response."""
response = ApprovalResponse(
request_id="approval-1",
status=ApprovalStatus.DENIED,
decided_by="admin",
reason="Too risky",
)
assert response.status == ApprovalStatus.DENIED
class TestCheckpoint:
"""Tests for Checkpoint model."""
def test_create_checkpoint(self) -> None:
"""Test creating a checkpoint."""
test_path = "/tmp/test.txt" # noqa: S108
checkpoint = Checkpoint(
id="checkpoint-1",
checkpoint_type=CheckpointType.FILE,
action_id="action-1",
created_at=datetime.utcnow(),
data={"path": test_path},
description="File checkpoint",
)
assert checkpoint.id == "checkpoint-1"
assert checkpoint.is_valid is True
def test_expired_checkpoint(self) -> None:
"""Test an expired checkpoint."""
checkpoint = Checkpoint(
id="checkpoint-1",
checkpoint_type=CheckpointType.FILE,
action_id="action-1",
created_at=datetime.utcnow() - timedelta(hours=2),
expires_at=datetime.utcnow() - timedelta(hours=1),
data={},
)
# is_valid is a simple bool, not computed from expires_at
# The RollbackManager handles expiration logic
assert checkpoint.is_valid is True # Default value
class TestRollbackResult:
"""Tests for RollbackResult model."""
def test_successful_rollback(self) -> None:
"""Test a successful rollback."""
result = RollbackResult(
checkpoint_id="checkpoint-1",
success=True,
actions_rolled_back=["file:/tmp/test.txt"],
failed_actions=[],
)
assert result.success is True
assert len(result.actions_rolled_back) == 1
def test_partial_rollback(self) -> None:
"""Test a partial rollback."""
result = RollbackResult(
checkpoint_id="checkpoint-1",
success=False,
actions_rolled_back=["file:/tmp/a.txt"],
failed_actions=["file:/tmp/b.txt"],
error="Failed to rollback 1 item",
)
assert result.success is False
assert len(result.failed_actions) == 1
class TestAuditEvent:
"""Tests for AuditEvent model."""
def test_create_event(self) -> None:
"""Test creating an audit event."""
event = AuditEvent(
id="event-1",
event_type=AuditEventType.ACTION_EXECUTED,
timestamp=datetime.utcnow(),
agent_id="agent-1",
action_id="action-1",
data={"tool": "file_read"},
)
assert event.event_type == AuditEventType.ACTION_EXECUTED
assert event.agent_id == "agent-1"
class TestSafetyPolicy:
"""Tests for SafetyPolicy model."""
def test_default_policy(self) -> None:
"""Test creating a default policy."""
policy = SafetyPolicy(
name="default",
description="Default safety policy",
)
assert policy.name == "default"
assert policy.enabled is True
def test_restrictive_policy(self) -> None:
"""Test creating a restrictive policy."""
policy = SafetyPolicy(
name="restrictive",
description="Restrictive policy",
denied_tools=["shell_*", "exec_*"],
require_approval_for=["database_*", "git_push"],
)
assert len(policy.denied_tools) == 2
assert len(policy.require_approval_for) == 2
class TestGuardianResult:
"""Tests for GuardianResult model."""
def test_allowed_result(self) -> None:
"""Test an allowed result."""
result = GuardianResult(
action_id="action-1",
allowed=True,
decision=SafetyDecision.ALLOW,
reasons=["All checks passed"],
)
assert result.decision == SafetyDecision.ALLOW
assert result.allowed is True
assert result.approval_id is None
def test_approval_required_result(self) -> None:
"""Test a result requiring approval."""
result = GuardianResult(
action_id="action-1",
allowed=False,
decision=SafetyDecision.REQUIRE_APPROVAL,
reasons=["Action requires human approval"],
approval_id="approval-123",
)
assert result.decision == SafetyDecision.REQUIRE_APPROVAL
assert result.approval_id == "approval-123"
class TestEnums:
"""Tests for enum values."""
def test_action_types(self) -> None:
"""Test action type enum values."""
assert ActionType.FILE_READ.value == "file_read"
assert ActionType.SHELL_COMMAND.value == "shell_command"
def test_autonomy_levels(self) -> None:
"""Test autonomy level enum values."""
assert AutonomyLevel.FULL_CONTROL.value == "full_control"
assert AutonomyLevel.MILESTONE.value == "milestone"
assert AutonomyLevel.AUTONOMOUS.value == "autonomous"
def test_permission_levels(self) -> None:
"""Test permission level enum values."""
assert PermissionLevel.NONE.value == "none"
assert PermissionLevel.READ.value == "read"
assert PermissionLevel.WRITE.value == "write"
assert PermissionLevel.ADMIN.value == "admin"
def test_safety_decisions(self) -> None:
"""Test safety decision enum values."""
assert SafetyDecision.ALLOW.value == "allow"
assert SafetyDecision.DENY.value == "deny"
assert SafetyDecision.REQUIRE_APPROVAL.value == "require_approval"