- Add tests for models: ActionMetadata, ActionRequest, ActionResult, ValidationRule, BudgetStatus, RateLimitConfig, ApprovalRequest/Response, Checkpoint, RollbackResult, AuditEvent, SafetyPolicy, GuardianResult - Add tests for validation: ActionValidator rules, priorities, patterns, bypass mode, batch validation, rule creation helpers - Add tests for loops: LoopDetector exact/semantic/oscillation detection, LoopBreaker throttle/backoff, history management - Add tests for content filter: PII filtering (email, phone, SSN, credit card), secret blocking (API keys, GitHub tokens, private keys), custom patterns, scan without filtering, dict filtering - Add tests for emergency controls: state management, pause/resume/reset, scoped emergency stops, callbacks, EmergencyTrigger events - Fix exception kwargs in content filter and emergency controls to match exception class signatures All 108 tests passing with lint and type checks clean.
317 lines
9.7 KiB
Python
317 lines
9.7 KiB
Python
"""Tests for loop detection module."""
|
|
|
|
import pytest
|
|
|
|
from app.services.safety.exceptions import LoopDetectedError
|
|
from app.services.safety.loops.detector import (
|
|
ActionSignature,
|
|
LoopBreaker,
|
|
LoopDetector,
|
|
)
|
|
from app.services.safety.models import (
|
|
ActionMetadata,
|
|
ActionRequest,
|
|
ActionType,
|
|
AutonomyLevel,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def detector() -> LoopDetector:
|
|
"""Create a fresh LoopDetector with low thresholds for testing."""
|
|
return LoopDetector(
|
|
history_size=20,
|
|
max_exact_repetitions=3,
|
|
max_semantic_repetitions=5,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_metadata() -> ActionMetadata:
|
|
"""Create sample action metadata."""
|
|
return ActionMetadata(
|
|
agent_id="test-agent",
|
|
session_id="test-session",
|
|
autonomy_level=AutonomyLevel.MILESTONE,
|
|
)
|
|
|
|
|
|
def create_action(
|
|
metadata: ActionMetadata,
|
|
tool_name: str,
|
|
resource: str = "/tmp/test.txt", # noqa: S108
|
|
arguments: dict | None = None,
|
|
) -> ActionRequest:
|
|
"""Helper to create test actions."""
|
|
return ActionRequest(
|
|
action_type=ActionType.FILE_READ,
|
|
tool_name=tool_name,
|
|
resource=resource,
|
|
arguments=arguments or {},
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
class TestActionSignature:
|
|
"""Tests for ActionSignature class."""
|
|
|
|
def test_exact_key_includes_args(self, sample_metadata: ActionMetadata) -> None:
|
|
"""Test that exact key includes argument hash."""
|
|
action1 = create_action(sample_metadata, "file_read", arguments={"path": "a"})
|
|
action2 = create_action(sample_metadata, "file_read", arguments={"path": "b"})
|
|
|
|
sig1 = ActionSignature(action1)
|
|
sig2 = ActionSignature(action2)
|
|
|
|
assert sig1.exact_key() != sig2.exact_key()
|
|
|
|
def test_semantic_key_ignores_args(self, sample_metadata: ActionMetadata) -> None:
|
|
"""Test that semantic key ignores arguments."""
|
|
action1 = create_action(sample_metadata, "file_read", arguments={"path": "a"})
|
|
action2 = create_action(sample_metadata, "file_read", arguments={"path": "b"})
|
|
|
|
sig1 = ActionSignature(action1)
|
|
sig2 = ActionSignature(action2)
|
|
|
|
assert sig1.semantic_key() == sig2.semantic_key()
|
|
|
|
def test_type_key(self, sample_metadata: ActionMetadata) -> None:
|
|
"""Test type key extraction."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
sig = ActionSignature(action)
|
|
|
|
assert sig.type_key() == "file_read"
|
|
|
|
|
|
class TestLoopDetector:
|
|
"""Tests for LoopDetector class."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_loop_on_first_action(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test that first action is never a loop."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
|
|
is_loop, loop_type = await detector.check(action)
|
|
|
|
assert is_loop is False
|
|
assert loop_type is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exact_loop_detection(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test detection of exact repetitions."""
|
|
action = create_action(
|
|
sample_metadata,
|
|
"file_read",
|
|
resource="/tmp/same.txt", # noqa: S108
|
|
arguments={"path": "/tmp/same.txt"}, # noqa: S108
|
|
)
|
|
|
|
# Record the same action 3 times (threshold)
|
|
for _ in range(3):
|
|
await detector.record(action)
|
|
|
|
# Next should be detected as a loop
|
|
is_loop, loop_type = await detector.check(action)
|
|
|
|
assert is_loop is True
|
|
assert loop_type == "exact"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_semantic_loop_detection(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test detection of semantic (similar) repetitions."""
|
|
# Record same tool/resource but different arguments
|
|
test_resource = "/tmp/test.txt" # noqa: S108
|
|
for i in range(5):
|
|
action = create_action(
|
|
sample_metadata,
|
|
"file_read",
|
|
resource=test_resource,
|
|
arguments={"offset": i},
|
|
)
|
|
await detector.record(action)
|
|
|
|
# Next similar action should be detected as semantic loop
|
|
action = create_action(
|
|
sample_metadata,
|
|
"file_read",
|
|
resource=test_resource,
|
|
arguments={"offset": 100},
|
|
)
|
|
is_loop, loop_type = await detector.check(action)
|
|
|
|
assert is_loop is True
|
|
assert loop_type == "semantic"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oscillation_detection(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test detection of A→B→A→B oscillation pattern."""
|
|
action_a = create_action(sample_metadata, "tool_a", resource="/a")
|
|
action_b = create_action(sample_metadata, "tool_b", resource="/b")
|
|
|
|
# Create A→B→A pattern
|
|
await detector.record(action_a)
|
|
await detector.record(action_b)
|
|
await detector.record(action_a)
|
|
|
|
# Fourth action completing A→B→A→B should be detected as oscillation
|
|
is_loop, loop_type = await detector.check(action_b)
|
|
|
|
assert is_loop is True
|
|
assert loop_type == "oscillation"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_different_actions_no_loop(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test that different actions don't trigger loops."""
|
|
for i in range(10):
|
|
action = create_action(
|
|
sample_metadata,
|
|
f"tool_{i}",
|
|
resource=f"/resource_{i}",
|
|
)
|
|
is_loop, _ = await detector.check(action)
|
|
assert is_loop is False
|
|
await detector.record(action)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_check_and_raise(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test check_and_raise raises on loop detection."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
|
|
# Record threshold number of times
|
|
for _ in range(3):
|
|
await detector.record(action)
|
|
|
|
# Should raise
|
|
with pytest.raises(LoopDetectedError) as exc_info:
|
|
await detector.check_and_raise(action)
|
|
|
|
assert "exact" in exc_info.value.loop_type.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_clear_history(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test clearing agent history."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
|
|
# Record multiple times
|
|
for _ in range(3):
|
|
await detector.record(action)
|
|
|
|
# Clear history
|
|
await detector.clear_history(sample_metadata.agent_id)
|
|
|
|
# Should no longer detect loop
|
|
is_loop, _ = await detector.check(action)
|
|
assert is_loop is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_per_agent_history(
|
|
self,
|
|
detector: LoopDetector,
|
|
) -> None:
|
|
"""Test that history is tracked per agent."""
|
|
metadata1 = ActionMetadata(agent_id="agent-1", session_id="s1")
|
|
metadata2 = ActionMetadata(agent_id="agent-2", session_id="s2")
|
|
|
|
action1 = create_action(metadata1, "file_read")
|
|
action2 = create_action(metadata2, "file_read")
|
|
|
|
# Record for agent 1 (threshold times)
|
|
for _ in range(3):
|
|
await detector.record(action1)
|
|
|
|
# Agent 1 should detect loop
|
|
is_loop1, _ = await detector.check(action1)
|
|
assert is_loop1 is True
|
|
|
|
# Agent 2 should not detect loop
|
|
is_loop2, _ = await detector.check(action2)
|
|
assert is_loop2 is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_stats(
|
|
self,
|
|
detector: LoopDetector,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test getting loop detection stats."""
|
|
for i in range(5):
|
|
action = create_action(
|
|
sample_metadata,
|
|
f"tool_{i % 2}", # Alternate between 2 tools
|
|
resource=f"/resource_{i}",
|
|
)
|
|
await detector.record(action)
|
|
|
|
stats = await detector.get_stats(sample_metadata.agent_id)
|
|
|
|
assert stats["history_size"] == 5
|
|
assert len(stats["action_type_counts"]) > 0
|
|
|
|
|
|
class TestLoopBreaker:
|
|
"""Tests for LoopBreaker class."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggest_alternatives_exact(
|
|
self,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test suggestions for exact loops."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
suggestions = await LoopBreaker.suggest_alternatives(action, "exact")
|
|
|
|
assert len(suggestions) > 0
|
|
assert "same action" in suggestions[0].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggest_alternatives_semantic(
|
|
self,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test suggestions for semantic loops."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
suggestions = await LoopBreaker.suggest_alternatives(action, "semantic")
|
|
|
|
assert len(suggestions) > 0
|
|
assert "similar" in suggestions[0].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggest_alternatives_oscillation(
|
|
self,
|
|
sample_metadata: ActionMetadata,
|
|
) -> None:
|
|
"""Test suggestions for oscillation loops."""
|
|
action = create_action(sample_metadata, "file_read")
|
|
suggestions = await LoopBreaker.suggest_alternatives(action, "oscillation")
|
|
|
|
assert len(suggestions) > 0
|
|
assert "oscillat" in suggestions[0].lower()
|