forked from cardosofelipe/fast-next-template
Implements core control subsystems for the safety framework: **Action Validation (validation/validator.py):** - Rule-based validation engine with priority ordering - Allow/deny/require-approval rule types - Pattern matching for tools and resources - Validation result caching with LRU eviction - Emergency bypass capability with audit **Permission System (permissions/manager.py):** - Per-agent permission grants on resources - Resource pattern matching (wildcards) - Temporary permissions with expiration - Permission inheritance hierarchy - Default deny with configurable defaults **Cost Control (costs/controller.py):** - Per-session and per-day budget tracking - Token and USD cost limits - Warning alerts at configurable thresholds - Budget rollover and reset policies - Real-time usage tracking **Rate Limiting (limits/limiter.py):** - Sliding window rate limiter - Per-action, per-LLM-call, per-file-op limits - Burst allowance with recovery - Configurable limits per operation type **Loop Detection (loops/detector.py):** - Exact repetition detection (same action+args) - Semantic repetition (similar actions) - Oscillation pattern detection (A→B→A→B) - Per-agent action history tracking - Loop breaking suggestions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
440 lines
13 KiB
Python
440 lines
13 KiB
Python
"""
|
|
Action Validator
|
|
|
|
Pre-execution validation with rule engine for action requests.
|
|
"""
|
|
|
|
import asyncio
|
|
import fnmatch
|
|
import logging
|
|
from collections import OrderedDict
|
|
|
|
from ..config import get_safety_config
|
|
from ..models import (
|
|
ActionRequest,
|
|
ActionType,
|
|
SafetyDecision,
|
|
SafetyPolicy,
|
|
ValidationResult,
|
|
ValidationRule,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ValidationCache:
|
|
"""LRU cache for validation results."""
|
|
|
|
def __init__(self, max_size: int = 1000, ttl_seconds: int = 60) -> None:
|
|
self._cache: OrderedDict[str, tuple[ValidationResult, float]] = OrderedDict()
|
|
self._max_size = max_size
|
|
self._ttl = ttl_seconds
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def get(self, key: str) -> ValidationResult | None:
|
|
"""Get cached validation result."""
|
|
import time
|
|
|
|
async with self._lock:
|
|
if key not in self._cache:
|
|
return None
|
|
|
|
result, timestamp = self._cache[key]
|
|
if time.time() - timestamp > self._ttl:
|
|
del self._cache[key]
|
|
return None
|
|
|
|
# Move to end (LRU)
|
|
self._cache.move_to_end(key)
|
|
return result
|
|
|
|
async def set(self, key: str, result: ValidationResult) -> None:
|
|
"""Cache a validation result."""
|
|
import time
|
|
|
|
async with self._lock:
|
|
if key in self._cache:
|
|
self._cache.move_to_end(key)
|
|
else:
|
|
if len(self._cache) >= self._max_size:
|
|
self._cache.popitem(last=False)
|
|
self._cache[key] = (result, time.time())
|
|
|
|
async def clear(self) -> None:
|
|
"""Clear the cache."""
|
|
async with self._lock:
|
|
self._cache.clear()
|
|
|
|
|
|
class ActionValidator:
|
|
"""
|
|
Validates actions against safety rules before execution.
|
|
|
|
Features:
|
|
- Rule-based validation engine
|
|
- Allow/deny/require-approval rules
|
|
- Pattern matching for tools and resources
|
|
- Validation result caching
|
|
- Bypass capability for emergencies
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
cache_enabled: bool = True,
|
|
cache_size: int = 1000,
|
|
cache_ttl: int = 60,
|
|
) -> None:
|
|
"""
|
|
Initialize the ActionValidator.
|
|
|
|
Args:
|
|
cache_enabled: Whether to cache validation results
|
|
cache_size: Maximum cache entries
|
|
cache_ttl: Cache TTL in seconds
|
|
"""
|
|
self._rules: list[ValidationRule] = []
|
|
self._cache_enabled = cache_enabled
|
|
self._cache = ValidationCache(max_size=cache_size, ttl_seconds=cache_ttl)
|
|
self._bypass_enabled = False
|
|
self._bypass_reason: str | None = None
|
|
|
|
config = get_safety_config()
|
|
self._cache_enabled = cache_enabled
|
|
self._cache_ttl = config.validation_cache_ttl
|
|
self._cache_size = config.validation_cache_size
|
|
|
|
def add_rule(self, rule: ValidationRule) -> None:
|
|
"""
|
|
Add a validation rule.
|
|
|
|
Args:
|
|
rule: The rule to add
|
|
"""
|
|
self._rules.append(rule)
|
|
# Re-sort by priority (higher first)
|
|
self._rules.sort(key=lambda r: r.priority, reverse=True)
|
|
logger.debug("Added validation rule: %s (priority %d)", rule.name, rule.priority)
|
|
|
|
def remove_rule(self, rule_id: str) -> bool:
|
|
"""
|
|
Remove a validation rule by ID.
|
|
|
|
Args:
|
|
rule_id: ID of the rule to remove
|
|
|
|
Returns:
|
|
True if rule was found and removed
|
|
"""
|
|
for i, rule in enumerate(self._rules):
|
|
if rule.id == rule_id:
|
|
del self._rules[i]
|
|
logger.debug("Removed validation rule: %s", rule_id)
|
|
return True
|
|
return False
|
|
|
|
def clear_rules(self) -> None:
|
|
"""Remove all validation rules."""
|
|
self._rules.clear()
|
|
|
|
def load_rules_from_policy(self, policy: SafetyPolicy) -> None:
|
|
"""
|
|
Load validation rules from a safety policy.
|
|
|
|
Args:
|
|
policy: The policy to load rules from
|
|
"""
|
|
# Clear existing rules
|
|
self.clear_rules()
|
|
|
|
# Add rules from policy
|
|
for rule in policy.validation_rules:
|
|
self.add_rule(rule)
|
|
|
|
# Create implicit rules from policy settings
|
|
|
|
# Denied tools
|
|
for i, pattern in enumerate(policy.denied_tools):
|
|
self.add_rule(
|
|
ValidationRule(
|
|
name=f"deny_tool_{i}",
|
|
description=f"Deny tool pattern: {pattern}",
|
|
priority=100, # High priority for denials
|
|
tool_patterns=[pattern],
|
|
decision=SafetyDecision.DENY,
|
|
reason=f"Tool matches denied pattern: {pattern}",
|
|
)
|
|
)
|
|
|
|
# Require approval patterns
|
|
for i, pattern in enumerate(policy.require_approval_for):
|
|
if pattern == "*":
|
|
# All actions require approval
|
|
self.add_rule(
|
|
ValidationRule(
|
|
name="require_approval_all",
|
|
description="All actions require approval",
|
|
priority=50,
|
|
action_types=list(ActionType),
|
|
decision=SafetyDecision.REQUIRE_APPROVAL,
|
|
reason="All actions require human approval",
|
|
)
|
|
)
|
|
else:
|
|
self.add_rule(
|
|
ValidationRule(
|
|
name=f"require_approval_{i}",
|
|
description=f"Require approval for: {pattern}",
|
|
priority=50,
|
|
tool_patterns=[pattern],
|
|
decision=SafetyDecision.REQUIRE_APPROVAL,
|
|
reason=f"Action matches approval-required pattern: {pattern}",
|
|
)
|
|
)
|
|
|
|
logger.info("Loaded %d rules from policy: %s", len(self._rules), policy.name)
|
|
|
|
async def validate(
|
|
self,
|
|
action: ActionRequest,
|
|
policy: SafetyPolicy | None = None,
|
|
) -> ValidationResult:
|
|
"""
|
|
Validate an action against all rules.
|
|
|
|
Args:
|
|
action: The action to validate
|
|
policy: Optional policy override
|
|
|
|
Returns:
|
|
ValidationResult with decision and details
|
|
"""
|
|
# Check bypass
|
|
if self._bypass_enabled:
|
|
logger.warning(
|
|
"Validation bypass active: %s - allowing action %s",
|
|
self._bypass_reason,
|
|
action.id,
|
|
)
|
|
return ValidationResult(
|
|
action_id=action.id,
|
|
decision=SafetyDecision.ALLOW,
|
|
applied_rules=[],
|
|
reasons=[f"Validation bypassed: {self._bypass_reason}"],
|
|
)
|
|
|
|
# Check cache
|
|
if self._cache_enabled:
|
|
cache_key = self._get_cache_key(action)
|
|
cached = await self._cache.get(cache_key)
|
|
if cached:
|
|
logger.debug("Using cached validation for action %s", action.id)
|
|
return cached
|
|
|
|
# Load rules from policy if provided
|
|
if policy and not self._rules:
|
|
self.load_rules_from_policy(policy)
|
|
|
|
# Validate against rules
|
|
applied_rules: list[str] = []
|
|
reasons: list[str] = []
|
|
final_decision = SafetyDecision.ALLOW
|
|
approval_id: str | None = None
|
|
|
|
for rule in self._rules:
|
|
if not rule.enabled:
|
|
continue
|
|
|
|
if self._rule_matches(rule, action):
|
|
applied_rules.append(rule.id)
|
|
|
|
if rule.reason:
|
|
reasons.append(rule.reason)
|
|
|
|
# Handle decision priority
|
|
if rule.decision == SafetyDecision.DENY:
|
|
# Deny takes precedence
|
|
final_decision = SafetyDecision.DENY
|
|
break
|
|
|
|
elif rule.decision == SafetyDecision.REQUIRE_APPROVAL:
|
|
# Upgrade to require approval
|
|
if final_decision != SafetyDecision.DENY:
|
|
final_decision = SafetyDecision.REQUIRE_APPROVAL
|
|
|
|
# If no rules matched and no explicit allow, default to allow
|
|
if not applied_rules:
|
|
reasons.append("No matching rules - default allow")
|
|
|
|
result = ValidationResult(
|
|
action_id=action.id,
|
|
decision=final_decision,
|
|
applied_rules=applied_rules,
|
|
reasons=reasons,
|
|
approval_id=approval_id,
|
|
)
|
|
|
|
# Cache result
|
|
if self._cache_enabled:
|
|
cache_key = self._get_cache_key(action)
|
|
await self._cache.set(cache_key, result)
|
|
|
|
return result
|
|
|
|
async def validate_batch(
|
|
self,
|
|
actions: list[ActionRequest],
|
|
policy: SafetyPolicy | None = None,
|
|
) -> list[ValidationResult]:
|
|
"""
|
|
Validate multiple actions.
|
|
|
|
Args:
|
|
actions: Actions to validate
|
|
policy: Optional policy override
|
|
|
|
Returns:
|
|
List of validation results
|
|
"""
|
|
tasks = [self.validate(action, policy) for action in actions]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
def enable_bypass(self, reason: str) -> None:
|
|
"""
|
|
Enable validation bypass (emergency use only).
|
|
|
|
Args:
|
|
reason: Reason for enabling bypass
|
|
"""
|
|
logger.critical("Validation bypass enabled: %s", reason)
|
|
self._bypass_enabled = True
|
|
self._bypass_reason = reason
|
|
|
|
def disable_bypass(self) -> None:
|
|
"""Disable validation bypass."""
|
|
logger.info("Validation bypass disabled")
|
|
self._bypass_enabled = False
|
|
self._bypass_reason = None
|
|
|
|
async def clear_cache(self) -> None:
|
|
"""Clear the validation cache."""
|
|
await self._cache.clear()
|
|
|
|
def _rule_matches(self, rule: ValidationRule, action: ActionRequest) -> bool:
|
|
"""Check if a rule matches an action."""
|
|
# Check action types
|
|
if rule.action_types:
|
|
if action.action_type not in rule.action_types:
|
|
return False
|
|
|
|
# Check tool patterns
|
|
if rule.tool_patterns:
|
|
if not action.tool_name:
|
|
return False
|
|
matched = False
|
|
for pattern in rule.tool_patterns:
|
|
if self._matches_pattern(action.tool_name, pattern):
|
|
matched = True
|
|
break
|
|
if not matched:
|
|
return False
|
|
|
|
# Check resource patterns
|
|
if rule.resource_patterns:
|
|
if not action.resource:
|
|
return False
|
|
matched = False
|
|
for pattern in rule.resource_patterns:
|
|
if self._matches_pattern(action.resource, pattern):
|
|
matched = True
|
|
break
|
|
if not matched:
|
|
return False
|
|
|
|
# Check agent IDs
|
|
if rule.agent_ids:
|
|
if action.metadata.agent_id not in rule.agent_ids:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _matches_pattern(self, value: str, pattern: str) -> bool:
|
|
"""Check if value matches a pattern (supports wildcards)."""
|
|
if pattern == "*":
|
|
return True
|
|
|
|
# Use fnmatch for glob-style matching
|
|
return fnmatch.fnmatch(value, pattern)
|
|
|
|
def _get_cache_key(self, action: ActionRequest) -> str:
|
|
"""Generate a cache key for an action."""
|
|
# Key based on action characteristics that affect validation
|
|
key_parts = [
|
|
action.action_type.value,
|
|
action.tool_name or "",
|
|
action.resource or "",
|
|
action.metadata.agent_id,
|
|
action.metadata.autonomy_level.value,
|
|
]
|
|
return ":".join(key_parts)
|
|
|
|
|
|
# Module-level convenience functions
|
|
|
|
|
|
def create_allow_rule(
|
|
name: str,
|
|
tool_patterns: list[str] | None = None,
|
|
resource_patterns: list[str] | None = None,
|
|
action_types: list[ActionType] | None = None,
|
|
priority: int = 0,
|
|
) -> ValidationRule:
|
|
"""Create an allow rule."""
|
|
return ValidationRule(
|
|
name=name,
|
|
tool_patterns=tool_patterns,
|
|
resource_patterns=resource_patterns,
|
|
action_types=action_types,
|
|
decision=SafetyDecision.ALLOW,
|
|
priority=priority,
|
|
)
|
|
|
|
|
|
def create_deny_rule(
|
|
name: str,
|
|
tool_patterns: list[str] | None = None,
|
|
resource_patterns: list[str] | None = None,
|
|
action_types: list[ActionType] | None = None,
|
|
reason: str | None = None,
|
|
priority: int = 100,
|
|
) -> ValidationRule:
|
|
"""Create a deny rule."""
|
|
return ValidationRule(
|
|
name=name,
|
|
tool_patterns=tool_patterns,
|
|
resource_patterns=resource_patterns,
|
|
action_types=action_types,
|
|
decision=SafetyDecision.DENY,
|
|
reason=reason,
|
|
priority=priority,
|
|
)
|
|
|
|
|
|
def create_approval_rule(
|
|
name: str,
|
|
tool_patterns: list[str] | None = None,
|
|
resource_patterns: list[str] | None = None,
|
|
action_types: list[ActionType] | None = None,
|
|
reason: str | None = None,
|
|
priority: int = 50,
|
|
) -> ValidationRule:
|
|
"""Create a require-approval rule."""
|
|
return ValidationRule(
|
|
name=name,
|
|
tool_patterns=tool_patterns,
|
|
resource_patterns=resource_patterns,
|
|
action_types=action_types,
|
|
decision=SafetyDecision.REQUIRE_APPROVAL,
|
|
reason=reason,
|
|
priority=priority,
|
|
)
|