feat(safety): add Phase C advanced controls

- Add rollback manager with file checkpointing and transaction context
- Add HITL manager with approval queues and notification handlers
- Add content filter with PII, secrets, and injection detection
- Add emergency controls with stop/pause/resume capabilities
- Update SafetyConfig with checkpoint_dir setting

Issue #63

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-03 11:36:24 +01:00
parent 728edd1453
commit ef659cd72d
9 changed files with 2053 additions and 4 deletions

View File

@@ -1 +1,23 @@
"""${dir} module."""
"""Content filtering for safety."""
from .filter import (
ContentCategory,
ContentFilter,
FilterAction,
FilterMatch,
FilterPattern,
FilterResult,
filter_content,
scan_for_secrets,
)
__all__ = [
"ContentCategory",
"ContentFilter",
"FilterAction",
"FilterMatch",
"FilterPattern",
"FilterResult",
"filter_content",
"scan_for_secrets",
]

View File

@@ -0,0 +1,532 @@
"""
Content Filter
Filters and sanitizes content for safety, including PII detection and secret scanning.
"""
import asyncio
import logging
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, ClassVar
from ..exceptions import ContentFilterError
logger = logging.getLogger(__name__)
class ContentCategory(str, Enum):
"""Categories of sensitive content."""
PII = "pii"
SECRETS = "secrets"
CREDENTIALS = "credentials"
FINANCIAL = "financial"
HEALTH = "health"
PROFANITY = "profanity"
INJECTION = "injection"
CUSTOM = "custom"
class FilterAction(str, Enum):
"""Actions to take on detected content."""
ALLOW = "allow"
REDACT = "redact"
BLOCK = "block"
WARN = "warn"
@dataclass
class FilterMatch:
"""A match found by a filter."""
category: ContentCategory
pattern_name: str
matched_text: str
start_pos: int
end_pos: int
confidence: float = 1.0
redacted_text: str | None = None
@dataclass
class FilterResult:
"""Result of content filtering."""
original_content: str
filtered_content: str
matches: list[FilterMatch] = field(default_factory=list)
blocked: bool = False
block_reason: str | None = None
warnings: list[str] = field(default_factory=list)
@property
def has_sensitive_content(self) -> bool:
"""Check if any sensitive content was found."""
return len(self.matches) > 0
@dataclass
class FilterPattern:
"""A pattern for detecting sensitive content."""
name: str
category: ContentCategory
pattern: str # Regex pattern
action: FilterAction = FilterAction.REDACT
replacement: str = "[REDACTED]"
confidence: float = 1.0
enabled: bool = True
def __post_init__(self) -> None:
"""Compile the regex pattern."""
self._compiled = re.compile(self.pattern, re.IGNORECASE | re.MULTILINE)
def find_matches(self, content: str) -> list[FilterMatch]:
"""Find all matches in content."""
matches = []
for match in self._compiled.finditer(content):
matches.append(
FilterMatch(
category=self.category,
pattern_name=self.name,
matched_text=match.group(),
start_pos=match.start(),
end_pos=match.end(),
confidence=self.confidence,
redacted_text=self.replacement,
)
)
return matches
class ContentFilter:
"""
Filters content for sensitive information.
Features:
- PII detection (emails, phones, SSN, etc.)
- Secret scanning (API keys, tokens, passwords)
- Credential detection
- Injection attack prevention
- Custom pattern support
- Configurable actions (allow, redact, block, warn)
"""
# Default patterns for common sensitive data
DEFAULT_PATTERNS: ClassVar[list[FilterPattern]] = [
# PII Patterns
FilterPattern(
name="email",
category=ContentCategory.PII,
pattern=r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
action=FilterAction.REDACT,
replacement="[EMAIL]",
),
FilterPattern(
name="phone_us",
category=ContentCategory.PII,
pattern=r"\b(?:\+1[-.\s]?)?(?:\(?\d{3}\)?[-.\s]?)?\d{3}[-.\s]?\d{4}\b",
action=FilterAction.REDACT,
replacement="[PHONE]",
),
FilterPattern(
name="ssn",
category=ContentCategory.PII,
pattern=r"\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b",
action=FilterAction.REDACT,
replacement="[SSN]",
),
FilterPattern(
name="credit_card",
category=ContentCategory.FINANCIAL,
pattern=r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
action=FilterAction.REDACT,
replacement="[CREDIT_CARD]",
),
FilterPattern(
name="ip_address",
category=ContentCategory.PII,
pattern=r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
action=FilterAction.WARN,
replacement="[IP]",
confidence=0.8,
),
# Secret Patterns
FilterPattern(
name="api_key_generic",
category=ContentCategory.SECRETS,
pattern=r"\b(?:api[_-]?key|apikey)\s*[:=]\s*['\"]?([A-Za-z0-9_-]{20,})['\"]?",
action=FilterAction.BLOCK,
replacement="[API_KEY]",
),
FilterPattern(
name="aws_access_key",
category=ContentCategory.SECRETS,
pattern=r"\bAKIA[0-9A-Z]{16}\b",
action=FilterAction.BLOCK,
replacement="[AWS_KEY]",
),
FilterPattern(
name="aws_secret_key",
category=ContentCategory.SECRETS,
pattern=r"\b[A-Za-z0-9/+=]{40}\b",
action=FilterAction.WARN,
replacement="[AWS_SECRET]",
confidence=0.6, # Lower confidence - might be false positive
),
FilterPattern(
name="github_token",
category=ContentCategory.SECRETS,
pattern=r"\b(ghp|gho|ghu|ghs|ghr)_[A-Za-z0-9]{36,}\b",
action=FilterAction.BLOCK,
replacement="[GITHUB_TOKEN]",
),
FilterPattern(
name="jwt_token",
category=ContentCategory.SECRETS,
pattern=r"\beyJ[A-Za-z0-9_-]*\.eyJ[A-Za-z0-9_-]*\.[A-Za-z0-9_-]*\b",
action=FilterAction.BLOCK,
replacement="[JWT]",
),
# Credential Patterns
FilterPattern(
name="password_in_url",
category=ContentCategory.CREDENTIALS,
pattern=r"://[^:]+:([^@]+)@",
action=FilterAction.BLOCK,
replacement="://[REDACTED]@",
),
FilterPattern(
name="password_assignment",
category=ContentCategory.CREDENTIALS,
pattern=r"\b(?:password|passwd|pwd)\s*[:=]\s*['\"]?([^\s'\"]+)['\"]?",
action=FilterAction.REDACT,
replacement="[PASSWORD]",
),
FilterPattern(
name="private_key",
category=ContentCategory.SECRETS,
pattern=r"-----BEGIN (?:RSA |DSA |EC |OPENSSH )?PRIVATE KEY-----",
action=FilterAction.BLOCK,
replacement="[PRIVATE_KEY]",
),
# Injection Patterns
FilterPattern(
name="sql_injection",
category=ContentCategory.INJECTION,
pattern=r"(?:'\s*(?:OR|AND)\s*')|(?:--\s*$)|(?:;\s*(?:DROP|DELETE|UPDATE|INSERT))",
action=FilterAction.BLOCK,
replacement="[BLOCKED]",
),
FilterPattern(
name="command_injection",
category=ContentCategory.INJECTION,
pattern=r"[;&|`$]|\$\(|\$\{",
action=FilterAction.WARN,
replacement="[CMD]",
confidence=0.5, # Low confidence - common in code
),
]
def __init__(
self,
enable_pii_filter: bool = True,
enable_secret_filter: bool = True,
enable_injection_filter: bool = True,
custom_patterns: list[FilterPattern] | None = None,
default_action: FilterAction = FilterAction.REDACT,
) -> None:
"""
Initialize the ContentFilter.
Args:
enable_pii_filter: Enable PII detection
enable_secret_filter: Enable secret scanning
enable_injection_filter: Enable injection detection
custom_patterns: Additional custom patterns
default_action: Default action for matches
"""
self._patterns: list[FilterPattern] = []
self._default_action = default_action
self._lock = asyncio.Lock()
# Load default patterns based on configuration
for pattern in self.DEFAULT_PATTERNS:
if pattern.category == ContentCategory.PII and not enable_pii_filter:
continue
if pattern.category == ContentCategory.SECRETS and not enable_secret_filter:
continue
if pattern.category == ContentCategory.CREDENTIALS and not enable_secret_filter:
continue
if pattern.category == ContentCategory.INJECTION and not enable_injection_filter:
continue
self._patterns.append(pattern)
# Add custom patterns
if custom_patterns:
self._patterns.extend(custom_patterns)
logger.info("ContentFilter initialized with %d patterns", len(self._patterns))
def add_pattern(self, pattern: FilterPattern) -> None:
"""Add a custom pattern."""
self._patterns.append(pattern)
logger.debug("Added pattern: %s", pattern.name)
def remove_pattern(self, pattern_name: str) -> bool:
"""Remove a pattern by name."""
for i, pattern in enumerate(self._patterns):
if pattern.name == pattern_name:
del self._patterns[i]
logger.debug("Removed pattern: %s", pattern_name)
return True
return False
def enable_pattern(self, pattern_name: str, enabled: bool = True) -> bool:
"""Enable or disable a pattern."""
for pattern in self._patterns:
if pattern.name == pattern_name:
pattern.enabled = enabled
return True
return False
async def filter(
self,
content: str,
context: dict[str, Any] | None = None,
raise_on_block: bool = False,
) -> FilterResult:
"""
Filter content for sensitive information.
Args:
content: Content to filter
context: Optional context for filtering decisions
raise_on_block: Raise exception if content is blocked
Returns:
FilterResult with filtered content and match details
Raises:
ContentFilterError: If content is blocked and raise_on_block=True
"""
all_matches: list[FilterMatch] = []
blocked = False
block_reason: str | None = None
warnings: list[str] = []
# Find all matches
for pattern in self._patterns:
if not pattern.enabled:
continue
matches = pattern.find_matches(content)
for match in matches:
all_matches.append(match)
if pattern.action == FilterAction.BLOCK:
blocked = True
block_reason = f"Blocked by pattern: {pattern.name}"
elif pattern.action == FilterAction.WARN:
warnings.append(
f"Warning: {pattern.name} detected at position {match.start_pos}"
)
# Sort matches by position (reverse for replacement)
all_matches.sort(key=lambda m: m.start_pos, reverse=True)
# Apply redactions
filtered_content = content
for match in all_matches:
matched_pattern = self._get_pattern(match.pattern_name)
if matched_pattern and matched_pattern.action in (FilterAction.REDACT, FilterAction.BLOCK):
filtered_content = (
filtered_content[: match.start_pos]
+ (match.redacted_text or "[REDACTED]")
+ filtered_content[match.end_pos :]
)
# Re-sort for result
all_matches.sort(key=lambda m: m.start_pos)
result = FilterResult(
original_content=content,
filtered_content=filtered_content if not blocked else "",
matches=all_matches,
blocked=blocked,
block_reason=block_reason,
warnings=warnings,
)
if blocked:
logger.warning(
"Content blocked: %s (%d matches)",
block_reason,
len(all_matches),
)
if raise_on_block:
raise ContentFilterError(
block_reason or "Content blocked",
detected_category=all_matches[0].category.value if all_matches else "unknown",
pattern_name=all_matches[0].pattern_name if all_matches else None,
)
elif all_matches:
logger.debug(
"Content filtered: %d matches, %d warnings",
len(all_matches),
len(warnings),
)
return result
async def filter_dict(
self,
data: dict[str, Any],
keys_to_filter: list[str] | None = None,
recursive: bool = True,
) -> dict[str, Any]:
"""
Filter string values in a dictionary.
Args:
data: Dictionary to filter
keys_to_filter: Specific keys to filter (None = all)
recursive: Filter nested dictionaries
Returns:
Filtered dictionary
"""
result: dict[str, Any] = {}
for key, value in data.items():
if isinstance(value, str):
if keys_to_filter is None or key in keys_to_filter:
filter_result = await self.filter(value)
result[key] = filter_result.filtered_content
else:
result[key] = value
elif isinstance(value, dict) and recursive:
result[key] = await self.filter_dict(value, keys_to_filter, recursive)
elif isinstance(value, list):
result[key] = [
(await self.filter(item)).filtered_content
if isinstance(item, str)
else item
for item in value
]
else:
result[key] = value
return result
async def scan(
self,
content: str,
categories: list[ContentCategory] | None = None,
) -> list[FilterMatch]:
"""
Scan content without filtering (detection only).
Args:
content: Content to scan
categories: Limit to specific categories
Returns:
List of matches found
"""
all_matches: list[FilterMatch] = []
for pattern in self._patterns:
if not pattern.enabled:
continue
if categories and pattern.category not in categories:
continue
matches = pattern.find_matches(content)
all_matches.extend(matches)
all_matches.sort(key=lambda m: m.start_pos)
return all_matches
async def validate_safe(
self,
content: str,
categories: list[ContentCategory] | None = None,
allow_warnings: bool = True,
) -> tuple[bool, list[str]]:
"""
Validate that content is safe (no blocked patterns).
Args:
content: Content to validate
categories: Limit to specific categories
allow_warnings: Allow content with warnings
Returns:
Tuple of (is_safe, list of issues)
"""
issues: list[str] = []
for pattern in self._patterns:
if not pattern.enabled:
continue
if categories and pattern.category not in categories:
continue
matches = pattern.find_matches(content)
for match in matches:
if pattern.action == FilterAction.BLOCK:
issues.append(f"Blocked: {pattern.name} at position {match.start_pos}")
elif pattern.action == FilterAction.WARN and not allow_warnings:
issues.append(f"Warning: {pattern.name} at position {match.start_pos}")
return len(issues) == 0, issues
def _get_pattern(self, name: str) -> FilterPattern | None:
"""Get a pattern by name."""
for pattern in self._patterns:
if pattern.name == name:
return pattern
return None
def get_pattern_stats(self) -> dict[str, Any]:
"""Get statistics about configured patterns."""
by_category: dict[str, int] = {}
by_action: dict[str, int] = {}
for pattern in self._patterns:
cat = pattern.category.value
by_category[cat] = by_category.get(cat, 0) + 1
act = pattern.action.value
by_action[act] = by_action.get(act, 0) + 1
return {
"total_patterns": len(self._patterns),
"enabled_patterns": sum(1 for p in self._patterns if p.enabled),
"by_category": by_category,
"by_action": by_action,
}
# Convenience function for quick filtering
async def filter_content(content: str) -> str:
"""Quick filter content with default settings."""
filter_instance = ContentFilter()
result = await filter_instance.filter(content)
return result.filtered_content
async def scan_for_secrets(content: str) -> list[FilterMatch]:
"""Quick scan for secrets only."""
filter_instance = ContentFilter(
enable_pii_filter=False,
enable_injection_filter=False,
)
return await filter_instance.scan(
content,
categories=[ContentCategory.SECRETS, ContentCategory.CREDENTIALS],
)