- Add tests for models: ActionMetadata, ActionRequest, ActionResult, ValidationRule, BudgetStatus, RateLimitConfig, ApprovalRequest/Response, Checkpoint, RollbackResult, AuditEvent, SafetyPolicy, GuardianResult - Add tests for validation: ActionValidator rules, priorities, patterns, bypass mode, batch validation, rule creation helpers - Add tests for loops: LoopDetector exact/semantic/oscillation detection, LoopBreaker throttle/backoff, history management - Add tests for content filter: PII filtering (email, phone, SSN, credit card), secret blocking (API keys, GitHub tokens, private keys), custom patterns, scan without filtering, dict filtering - Add tests for emergency controls: state management, pause/resume/reset, scoped emergency stops, callbacks, EmergencyTrigger events - Fix exception kwargs in content filter and emergency controls to match exception class signatures All 108 tests passing with lint and type checks clean. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
317 lines
9.7 KiB
Python
317 lines
9.7 KiB
Python
"""Tests for loop detection module."""
|
|
|
|
import pytest
|
|
|
|
from app.services.safety.exceptions import LoopDetectedError
|
|
from app.services.safety.loops.detector import (
|
|
ActionSignature,
|
|
LoopBreaker,
|
|
LoopDetector,
|
|
)
|
|
from app.services.safety.models import (
|
|
ActionMetadata,
|
|
ActionRequest,
|
|
ActionType,
|
|
AutonomyLevel,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def detector() -> LoopDetector:
|
|
"""Create a fresh LoopDetector with low thresholds for testing."""
|
|
return LoopDetector(
|
|
history_size=20,
|
|
max_exact_repetitions=3,
|
|
max_semantic_repetitions=5,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_metadata() -> ActionMetadata:
|
|
"""Create sample action metadata."""
|
|
return ActionMetadata(
|
|
agent_id="test-agent",
|
|
session_id="test-session",
|
|
autonomy_level=AutonomyLevel.MILESTONE,
|
|
)
|
|
|
|
|
|
def create_action(
|
|
metadata: ActionMetadata,
|
|
tool_name: str,
|
|
resource: str = "/tmp/test.txt", # noqa: S108
|
|
arguments: dict | None = None,
|
|
) -> ActionRequest:
|
|
"""Helper to create test actions."""
|
|
return ActionRequest(
|
|
action_type=ActionType.FILE_READ,
|
|
tool_name=tool_name,
|
|
resource=resource,
|
|
arguments=arguments or {},
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
class TestActionSignature:
|
|
"""Tests for ActionSignature class."""
|
|
|
|
def test_exact_key_includes_args(self, sample_metadata: ActionMetadata) -> None:
|
|
"""Test that exact key includes argument hash."""
|
|
action1 = create_action(sample_metadata, "file_read", arguments={"path": "a"})
|
|
action2 = create_action(sample_metadata, "file_read", arguments={"path": "b"})
|
|
|
|
sig1 = ActionSignature(action1)
|
|
sig2 = ActionSignature(action2)
|
|
|
|
assert sig1.exact_key() != sig2.exact_key()
|
|
|
|
def test_semantic_key_ignores_args(self, sample_metadata: ActionMetadata) -> None:
|
|
"""Test that semantic key ignores arguments."""
|
|
action1 = create_action(sample_metadata, "file_read", arguments={"path": "a"})
|
|
action2 = create_action(sample_metadata, "file_read", arguments={"path": "b"})
|
|
|
|
sig1 = ActionSignature(action1)
|
|
sig2 = ActionSignature(action2)
|
|
|
|
assert sig1.semantic_key() == sig2.semantic_key()
|
|
|
|
def test_type_key(self, sample_metadata: ActionMetadata) -> None:
|
|
"""Test type key extraction."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
sig = ActionSignature(action)
|
|
|
|
assert sig.type_key() == "file_read"
|
|
|
|
|
|
class TestLoopDetector:
|
|
"""Tests for LoopDetector class."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_loop_on_first_action(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test that first action is never a loop."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
|
|
is_loop, loop_type = await detector.check(action)
|
|
|
|
assert is_loop is False
|
|
assert loop_type is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exact_loop_detection(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test detection of exact repetitions."""
|
|
action = create_action(
|
|
sample_metadata,
|
|
"file_read",
|
|
resource="/tmp/same.txt", # noqa: S108
|
|
arguments={"path": "/tmp/same.txt"}, # noqa: S108
|
|
)
|
|
|
|
# Record the same action 3 times (threshold)
|
|
for _ in range(3):
|
|
await detector.record(action)
|
|
|
|
# Next should be detected as a loop
|
|
is_loop, loop_type = await detector.check(action)
|
|
|
|
assert is_loop is True
|
|
assert loop_type == "exact"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_semantic_loop_detection(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test detection of semantic (similar) repetitions."""
|
|
# Record same tool/resource but different arguments
|
|
test_resource = "/tmp/test.txt" # noqa: S108
|
|
for i in range(5):
|
|
action = create_action(
|
|
sample_metadata,
|
|
"file_read",
|
|
resource=test_resource,
|
|
arguments={"offset": i},
|
|
)
|
|
await detector.record(action)
|
|
|
|
# Next similar action should be detected as semantic loop
|
|
action = create_action(
|
|
sample_metadata,
|
|
"file_read",
|
|
resource=test_resource,
|
|
arguments={"offset": 100},
|
|
)
|
|
is_loop, loop_type = await detector.check(action)
|
|
|
|
assert is_loop is True
|
|
assert loop_type == "semantic"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oscillation_detection(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test detection of A→B→A→B oscillation pattern."""
|
|
action_a = create_action(sample_metadata, "tool_a", resource="/a")
|
|
action_b = create_action(sample_metadata, "tool_b", resource="/b")
|
|
|
|
# Create A→B→A pattern
|
|
await detector.record(action_a)
|
|
await detector.record(action_b)
|
|
await detector.record(action_a)
|
|
|
|
# Fourth action completing A→B→A→B should be detected as oscillation
|
|
is_loop, loop_type = await detector.check(action_b)
|
|
|
|
assert is_loop is True
|
|
assert loop_type == "oscillation"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_different_actions_no_loop(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test that different actions don't trigger loops."""
|
|
for i in range(10):
|
|
action = create_action(
|
|
sample_metadata,
|
|
f"tool_{i}",
|
|
resource=f"/resource_{i}",
|
|
)
|
|
is_loop, _ = await detector.check(action)
|
|
assert is_loop is False
|
|
await detector.record(action)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_check_and_raise(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test check_and_raise raises on loop detection."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
|
|
# Record threshold number of times
|
|
for _ in range(3):
|
|
await detector.record(action)
|
|
|
|
# Should raise
|
|
with pytest.raises(LoopDetectedError) as exc_info:
|
|
await detector.check_and_raise(action)
|
|
|
|
assert "exact" in exc_info.value.loop_type.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_clear_history(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test clearing agent history."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
|
|
# Record multiple times
|
|
for _ in range(3):
|
|
await detector.record(action)
|
|
|
|
# Clear history
|
|
await detector.clear_history(sample_metadata.agent_id)
|
|
|
|
# Should no longer detect loop
|
|
is_loop, _ = await detector.check(action)
|
|
assert is_loop is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_per_agent_history(
|
|
self,
|
|
detector: LoopDetector,
|
|
) -> None:
|
|
"""Test that history is tracked per agent."""
|
|
metadata1 = ActionMetadata(agent_id="agent-1", session_id="s1")
|
|
metadata2 = ActionMetadata(agent_id="agent-2", session_id="s2")
|
|
|
|
action1 = create_action(metadata1, "file_read")
|
|
action2 = create_action(metadata2, "file_read")
|
|
|
|
# Record for agent 1 (threshold times)
|
|
for _ in range(3):
|
|
await detector.record(action1)
|
|
|
|
# Agent 1 should detect loop
|
|
is_loop1, _ = await detector.check(action1)
|
|
assert is_loop1 is True
|
|
|
|
# Agent 2 should not detect loop
|
|
is_loop2, _ = await detector.check(action2)
|
|
assert is_loop2 is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_stats(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test getting loop detection stats."""
|
|
for i in range(5):
|
|
action = create_action(
|
|
sample_metadata,
|
|
f"tool_{i % 2}", # Alternate between 2 tools
|
|
resource=f"/resource_{i}",
|
|
)
|
|
await detector.record(action)
|
|
|
|
stats = await detector.get_stats(sample_metadata.agent_id)
|
|
|
|
assert stats["history_size"] == 5
|
|
assert len(stats["action_type_counts"]) > 0
|
|
|
|
|
|
class TestLoopBreaker:
|
|
"""Tests for LoopBreaker class."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggest_alternatives_exact(
|
|
self,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test suggestions for exact loops."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
suggestions = await LoopBreaker.suggest_alternatives(action, "exact")
|
|
|
|
assert len(suggestions) > 0
|
|
assert "same action" in suggestions[0].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggest_alternatives_semantic(
|
|
self,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test suggestions for semantic loops."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
suggestions = await LoopBreaker.suggest_alternatives(action, "semantic")
|
|
|
|
assert len(suggestions) > 0
|
|
assert "similar" in suggestions[0].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggest_alternatives_oscillation(
|
|
self,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test suggestions for oscillation loops."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
suggestions = await LoopBreaker.suggest_alternatives(action, "oscillation")
|
|
|
|
assert len(suggestions) > 0
|
|
assert "oscillat" in suggestions[0].lower()
|