""" Safety Framework Configuration Pydantic settings for the safety and guardrails framework. """ import logging import os from functools import lru_cache from pathlib import Path from typing import Any import yaml from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict from .models import AutonomyLevel, SafetyPolicy logger = logging.getLogger(__name__) class SafetyConfig(BaseSettings): """Configuration for the safety framework.""" model_config = SettingsConfigDict( env_prefix="SAFETY_", env_file=".env", env_file_encoding="utf-8", extra="ignore", ) # General settings enabled: bool = Field(True, description="Enable safety framework") strict_mode: bool = Field(True, description="Strict mode (fail closed on errors)") log_level: str = Field("INFO", description="Logging level") # Default autonomy level default_autonomy_level: AutonomyLevel = Field( AutonomyLevel.MILESTONE, description="Default autonomy level for new agents", ) # Default budget limits default_session_token_budget: int = Field( 100_000, description="Default tokens per session" ) default_daily_token_budget: int = Field( 1_000_000, description="Default tokens per day" ) default_session_cost_limit: float = Field( 10.0, description="Default USD per session" ) default_daily_cost_limit: float = Field(100.0, description="Default USD per day") # Default rate limits default_actions_per_minute: int = Field(60, description="Default actions per min") default_llm_calls_per_minute: int = Field(20, description="Default LLM calls/min") default_file_ops_per_minute: int = Field(100, description="Default file ops/min") # Loop detection loop_detection_enabled: bool = Field(True, description="Enable loop detection") max_repeated_actions: int = Field(5, description="Max exact repetitions") max_similar_actions: int = Field(10, description="Max similar actions") loop_history_size: int = Field(100, description="Action history size for loops") # HITL settings hitl_enabled: bool = Field(True, description="Enable human-in-the-loop") hitl_default_timeout: int = Field(300, description="Default approval timeout (s)") hitl_notification_channels: list[str] = Field( default_factory=list, description="Notification channels" ) # Rollback settings rollback_enabled: bool = Field(True, description="Enable rollback capability") checkpoint_dir: str = Field( "/tmp/syndarix_checkpoints", # noqa: S108 description="Directory for checkpoint storage", ) checkpoint_retention_hours: int = Field(24, description="Checkpoint retention") auto_checkpoint_destructive: bool = Field( True, description="Auto-checkpoint destructive actions" ) # Sandbox settings sandbox_enabled: bool = Field(False, description="Enable sandbox execution") sandbox_timeout: int = Field(300, description="Sandbox timeout (s)") sandbox_memory_mb: int = Field(1024, description="Sandbox memory limit (MB)") sandbox_cpu_limit: float = Field(1.0, description="Sandbox CPU limit") sandbox_network_enabled: bool = Field(False, description="Allow sandbox network") # Audit settings audit_enabled: bool = Field(True, description="Enable audit logging") audit_retention_days: int = Field(90, description="Audit log retention (days)") audit_include_sensitive: bool = Field( False, description="Include sensitive data in audit" ) # Content filtering content_filter_enabled: bool = Field(True, description="Enable content filtering") filter_pii: bool = Field(True, description="Filter PII") filter_secrets: bool = Field(True, description="Filter secrets") # Emergency controls emergency_stop_enabled: bool = Field(True, description="Enable emergency stop") emergency_webhook_url: str | None = Field(None, description="Emergency webhook") # Policy file path policy_file: str | None = Field(None, description="Path to policy YAML file") # Validation cache validation_cache_ttl: int = Field(60, description="Validation cache TTL (s)") validation_cache_size: int = Field(1000, description="Validation cache size") class AutonomyConfig(BaseSettings): """Configuration for autonomy levels.""" model_config = SettingsConfigDict( env_prefix="AUTONOMY_", env_file=".env", env_file_encoding="utf-8", extra="ignore", ) # FULL_CONTROL settings full_control_cost_limit: float = Field(1.0, description="USD limit per session") full_control_require_all_approval: bool = Field( True, description="Require approval for all" ) full_control_block_destructive: bool = Field( True, description="Block destructive actions" ) # MILESTONE settings milestone_cost_limit: float = Field(10.0, description="USD limit per session") milestone_require_critical_approval: bool = Field( True, description="Require approval for critical" ) milestone_auto_checkpoint: bool = Field( True, description="Auto-checkpoint destructive" ) # AUTONOMOUS settings autonomous_cost_limit: float = Field(100.0, description="USD limit per session") autonomous_auto_approve_normal: bool = Field( True, description="Auto-approve normal actions" ) autonomous_auto_checkpoint: bool = Field(True, description="Auto-checkpoint all") def _expand_env_vars(value: Any) -> Any: """Recursively expand environment variables in values.""" if isinstance(value, str): return os.path.expandvars(value) elif isinstance(value, dict): return {k: _expand_env_vars(v) for k, v in value.items()} elif isinstance(value, list): return [_expand_env_vars(v) for v in value] return value def load_policy_from_file(file_path: str | Path) -> SafetyPolicy | None: """Load a safety policy from a YAML file.""" path = Path(file_path) if not path.exists(): logger.warning("Policy file not found: %s", path) return None try: with open(path) as f: data = yaml.safe_load(f) if data is None: logger.warning("Empty policy file: %s", path) return None # Expand environment variables data = _expand_env_vars(data) return SafetyPolicy(**data) except Exception as e: logger.error("Failed to load policy file %s: %s", path, e) return None def load_policies_from_directory(directory: str | Path) -> dict[str, SafetyPolicy]: """Load all safety policies from a directory.""" policies: dict[str, SafetyPolicy] = {} path = Path(directory) if not path.exists() or not path.is_dir(): logger.warning("Policy directory not found: %s", path) return policies for file_path in path.glob("*.yaml"): policy = load_policy_from_file(file_path) if policy: policies[policy.name] = policy logger.info("Loaded policy: %s from %s", policy.name, file_path.name) return policies @lru_cache(maxsize=1) def get_safety_config() -> SafetyConfig: """Get the safety configuration (cached singleton).""" return SafetyConfig() @lru_cache(maxsize=1) def get_autonomy_config() -> AutonomyConfig: """Get the autonomy configuration (cached singleton).""" return AutonomyConfig() def get_default_policy() -> SafetyPolicy: """Get the default safety policy.""" config = get_safety_config() return SafetyPolicy( name="default", description="Default safety policy", max_tokens_per_session=config.default_session_token_budget, max_tokens_per_day=config.default_daily_token_budget, max_cost_per_session_usd=config.default_session_cost_limit, max_cost_per_day_usd=config.default_daily_cost_limit, max_actions_per_minute=config.default_actions_per_minute, max_llm_calls_per_minute=config.default_llm_calls_per_minute, max_file_operations_per_minute=config.default_file_ops_per_minute, max_repeated_actions=config.max_repeated_actions, max_similar_actions=config.max_similar_actions, require_sandbox=config.sandbox_enabled, sandbox_timeout_seconds=config.sandbox_timeout, sandbox_memory_mb=config.sandbox_memory_mb, ) def get_policy_for_autonomy_level(level: AutonomyLevel) -> SafetyPolicy: """Get the safety policy for a given autonomy level.""" autonomy = get_autonomy_config() base_policy = get_default_policy() if level == AutonomyLevel.FULL_CONTROL: return SafetyPolicy( name="full_control", description="Full control mode - all actions require approval", max_cost_per_session_usd=autonomy.full_control_cost_limit, max_cost_per_day_usd=autonomy.full_control_cost_limit * 10, require_approval_for=["*"], # All actions max_tokens_per_session=base_policy.max_tokens_per_session // 10, max_tokens_per_day=base_policy.max_tokens_per_day // 10, max_actions_per_minute=base_policy.max_actions_per_minute // 2, max_llm_calls_per_minute=base_policy.max_llm_calls_per_minute // 2, max_file_operations_per_minute=base_policy.max_file_operations_per_minute // 2, denied_tools=["delete_*", "destroy_*", "drop_*"], ) elif level == AutonomyLevel.MILESTONE: return SafetyPolicy( name="milestone", description="Milestone mode - approval at milestones only", max_cost_per_session_usd=autonomy.milestone_cost_limit, max_cost_per_day_usd=autonomy.milestone_cost_limit * 10, require_approval_for=[ "delete_file", "push_to_remote", "deploy_*", "modify_critical_*", "create_pull_request", ], max_tokens_per_session=base_policy.max_tokens_per_session, max_tokens_per_day=base_policy.max_tokens_per_day, max_actions_per_minute=base_policy.max_actions_per_minute, max_llm_calls_per_minute=base_policy.max_llm_calls_per_minute, max_file_operations_per_minute=base_policy.max_file_operations_per_minute, ) else: # AUTONOMOUS return SafetyPolicy( name="autonomous", description="Autonomous mode - minimal intervention", max_cost_per_session_usd=autonomy.autonomous_cost_limit, max_cost_per_day_usd=autonomy.autonomous_cost_limit * 10, require_approval_for=[ "deploy_to_production", "delete_repository", "modify_production_config", ], max_tokens_per_session=base_policy.max_tokens_per_session * 5, max_tokens_per_day=base_policy.max_tokens_per_day * 5, max_actions_per_minute=base_policy.max_actions_per_minute * 2, max_llm_calls_per_minute=base_policy.max_llm_calls_per_minute * 2, max_file_operations_per_minute=base_policy.max_file_operations_per_minute * 2, ) def reset_config_cache() -> None: """Reset configuration caches (for testing).""" get_safety_config.cache_clear() get_autonomy_config.cache_clear()