Files
syndarix/backend/app/services/safety/validation/validator.py
Felipe Cardoso 520c06175e refactor(safety): apply consistent formatting across services and tests
Improved code readability and uniformity by standardizing line breaks, indentation, and inline conditions across safety-related services, models, and tests, including content filters, validation rules, and emergency controls.
2026-01-03 16:23:39 +01:00

442 lines
13 KiB
Python

"""
Action Validator
Pre-execution validation with rule engine for action requests.
"""
import asyncio
import fnmatch
import logging
from collections import OrderedDict
from ..config import get_safety_config
from ..models import (
ActionRequest,
ActionType,
SafetyDecision,
SafetyPolicy,
ValidationResult,
ValidationRule,
)
logger = logging.getLogger(__name__)
class ValidationCache:
"""LRU cache for validation results."""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 60) -> None:
self._cache: OrderedDict[str, tuple[ValidationResult, float]] = OrderedDict()
self._max_size = max_size
self._ttl = ttl_seconds
self._lock = asyncio.Lock()
async def get(self, key: str) -> ValidationResult | None:
"""Get cached validation result."""
import time
async with self._lock:
if key not in self._cache:
return None
result, timestamp = self._cache[key]
if time.time() - timestamp > self._ttl:
del self._cache[key]
return None
# Move to end (LRU)
self._cache.move_to_end(key)
return result
async def set(self, key: str, result: ValidationResult) -> None:
"""Cache a validation result."""
import time
async with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
else:
if len(self._cache) >= self._max_size:
self._cache.popitem(last=False)
self._cache[key] = (result, time.time())
async def clear(self) -> None:
"""Clear the cache."""
async with self._lock:
self._cache.clear()
class ActionValidator:
"""
Validates actions against safety rules before execution.
Features:
- Rule-based validation engine
- Allow/deny/require-approval rules
- Pattern matching for tools and resources
- Validation result caching
- Bypass capability for emergencies
"""
def __init__(
self,
cache_enabled: bool = True,
cache_size: int = 1000,
cache_ttl: int = 60,
) -> None:
"""
Initialize the ActionValidator.
Args:
cache_enabled: Whether to cache validation results
cache_size: Maximum cache entries
cache_ttl: Cache TTL in seconds
"""
self._rules: list[ValidationRule] = []
self._cache_enabled = cache_enabled
self._cache = ValidationCache(max_size=cache_size, ttl_seconds=cache_ttl)
self._bypass_enabled = False
self._bypass_reason: str | None = None
config = get_safety_config()
self._cache_enabled = cache_enabled
self._cache_ttl = config.validation_cache_ttl
self._cache_size = config.validation_cache_size
def add_rule(self, rule: ValidationRule) -> None:
"""
Add a validation rule.
Args:
rule: The rule to add
"""
self._rules.append(rule)
# Re-sort by priority (higher first)
self._rules.sort(key=lambda r: r.priority, reverse=True)
logger.debug(
"Added validation rule: %s (priority %d)", rule.name, rule.priority
)
def remove_rule(self, rule_id: str) -> bool:
"""
Remove a validation rule by ID.
Args:
rule_id: ID of the rule to remove
Returns:
True if rule was found and removed
"""
for i, rule in enumerate(self._rules):
if rule.id == rule_id:
del self._rules[i]
logger.debug("Removed validation rule: %s", rule_id)
return True
return False
def clear_rules(self) -> None:
"""Remove all validation rules."""
self._rules.clear()
def load_rules_from_policy(self, policy: SafetyPolicy) -> None:
"""
Load validation rules from a safety policy.
Args:
policy: The policy to load rules from
"""
# Clear existing rules
self.clear_rules()
# Add rules from policy
for rule in policy.validation_rules:
self.add_rule(rule)
# Create implicit rules from policy settings
# Denied tools
for i, pattern in enumerate(policy.denied_tools):
self.add_rule(
ValidationRule(
name=f"deny_tool_{i}",
description=f"Deny tool pattern: {pattern}",
priority=100, # High priority for denials
tool_patterns=[pattern],
decision=SafetyDecision.DENY,
reason=f"Tool matches denied pattern: {pattern}",
)
)
# Require approval patterns
for i, pattern in enumerate(policy.require_approval_for):
if pattern == "*":
# All actions require approval
self.add_rule(
ValidationRule(
name="require_approval_all",
description="All actions require approval",
priority=50,
action_types=list(ActionType),
decision=SafetyDecision.REQUIRE_APPROVAL,
reason="All actions require human approval",
)
)
else:
self.add_rule(
ValidationRule(
name=f"require_approval_{i}",
description=f"Require approval for: {pattern}",
priority=50,
tool_patterns=[pattern],
decision=SafetyDecision.REQUIRE_APPROVAL,
reason=f"Action matches approval-required pattern: {pattern}",
)
)
logger.info("Loaded %d rules from policy: %s", len(self._rules), policy.name)
async def validate(
self,
action: ActionRequest,
policy: SafetyPolicy | None = None,
) -> ValidationResult:
"""
Validate an action against all rules.
Args:
action: The action to validate
policy: Optional policy override
Returns:
ValidationResult with decision and details
"""
# Check bypass
if self._bypass_enabled:
logger.warning(
"Validation bypass active: %s - allowing action %s",
self._bypass_reason,
action.id,
)
return ValidationResult(
action_id=action.id,
decision=SafetyDecision.ALLOW,
applied_rules=[],
reasons=[f"Validation bypassed: {self._bypass_reason}"],
)
# Check cache
if self._cache_enabled:
cache_key = self._get_cache_key(action)
cached = await self._cache.get(cache_key)
if cached:
logger.debug("Using cached validation for action %s", action.id)
return cached
# Load rules from policy if provided
if policy and not self._rules:
self.load_rules_from_policy(policy)
# Validate against rules
applied_rules: list[str] = []
reasons: list[str] = []
final_decision = SafetyDecision.ALLOW
approval_id: str | None = None
for rule in self._rules:
if not rule.enabled:
continue
if self._rule_matches(rule, action):
applied_rules.append(rule.id)
if rule.reason:
reasons.append(rule.reason)
# Handle decision priority
if rule.decision == SafetyDecision.DENY:
# Deny takes precedence
final_decision = SafetyDecision.DENY
break
elif rule.decision == SafetyDecision.REQUIRE_APPROVAL:
# Upgrade to require approval
if final_decision != SafetyDecision.DENY:
final_decision = SafetyDecision.REQUIRE_APPROVAL
# If no rules matched and no explicit allow, default to allow
if not applied_rules:
reasons.append("No matching rules - default allow")
result = ValidationResult(
action_id=action.id,
decision=final_decision,
applied_rules=applied_rules,
reasons=reasons,
approval_id=approval_id,
)
# Cache result
if self._cache_enabled:
cache_key = self._get_cache_key(action)
await self._cache.set(cache_key, result)
return result
async def validate_batch(
self,
actions: list[ActionRequest],
policy: SafetyPolicy | None = None,
) -> list[ValidationResult]:
"""
Validate multiple actions.
Args:
actions: Actions to validate
policy: Optional policy override
Returns:
List of validation results
"""
tasks = [self.validate(action, policy) for action in actions]
return await asyncio.gather(*tasks)
def enable_bypass(self, reason: str) -> None:
"""
Enable validation bypass (emergency use only).
Args:
reason: Reason for enabling bypass
"""
logger.critical("Validation bypass enabled: %s", reason)
self._bypass_enabled = True
self._bypass_reason = reason
def disable_bypass(self) -> None:
"""Disable validation bypass."""
logger.info("Validation bypass disabled")
self._bypass_enabled = False
self._bypass_reason = None
async def clear_cache(self) -> None:
"""Clear the validation cache."""
await self._cache.clear()
def _rule_matches(self, rule: ValidationRule, action: ActionRequest) -> bool:
"""Check if a rule matches an action."""
# Check action types
if rule.action_types:
if action.action_type not in rule.action_types:
return False
# Check tool patterns
if rule.tool_patterns:
if not action.tool_name:
return False
matched = False
for pattern in rule.tool_patterns:
if self._matches_pattern(action.tool_name, pattern):
matched = True
break
if not matched:
return False
# Check resource patterns
if rule.resource_patterns:
if not action.resource:
return False
matched = False
for pattern in rule.resource_patterns:
if self._matches_pattern(action.resource, pattern):
matched = True
break
if not matched:
return False
# Check agent IDs
if rule.agent_ids:
if action.metadata.agent_id not in rule.agent_ids:
return False
return True
def _matches_pattern(self, value: str, pattern: str) -> bool:
"""Check if value matches a pattern (supports wildcards)."""
if pattern == "*":
return True
# Use fnmatch for glob-style matching
return fnmatch.fnmatch(value, pattern)
def _get_cache_key(self, action: ActionRequest) -> str:
"""Generate a cache key for an action."""
# Key based on action characteristics that affect validation
key_parts = [
action.action_type.value,
action.tool_name or "",
action.resource or "",
action.metadata.agent_id,
action.metadata.autonomy_level.value,
]
return ":".join(key_parts)
# Module-level convenience functions
def create_allow_rule(
name: str,
tool_patterns: list[str] | None = None,
resource_patterns: list[str] | None = None,
action_types: list[ActionType] | None = None,
priority: int = 0,
) -> ValidationRule:
"""Create an allow rule."""
return ValidationRule(
name=name,
tool_patterns=tool_patterns,
resource_patterns=resource_patterns,
action_types=action_types,
decision=SafetyDecision.ALLOW,
priority=priority,
)
def create_deny_rule(
name: str,
tool_patterns: list[str] | None = None,
resource_patterns: list[str] | None = None,
action_types: list[ActionType] | None = None,
reason: str | None = None,
priority: int = 100,
) -> ValidationRule:
"""Create a deny rule."""
return ValidationRule(
name=name,
tool_patterns=tool_patterns,
resource_patterns=resource_patterns,
action_types=action_types,
decision=SafetyDecision.DENY,
reason=reason,
priority=priority,
)
def create_approval_rule(
name: str,
tool_patterns: list[str] | None = None,
resource_patterns: list[str] | None = None,
action_types: list[ActionType] | None = None,
reason: str | None = None,
priority: int = 50,
) -> ValidationRule:
"""Create a require-approval rule."""
return ValidationRule(
name=name,
tool_patterns=tool_patterns,
resource_patterns=resource_patterns,
action_types=action_types,
decision=SafetyDecision.REQUIRE_APPROVAL,
reason=reason,
priority=priority,
)