forked from cardosofelipe/fast-next-template
Implements core control subsystems for the safety framework: **Action Validation (validation/validator.py):** - Rule-based validation engine with priority ordering - Allow/deny/require-approval rule types - Pattern matching for tools and resources - Validation result caching with LRU eviction - Emergency bypass capability with audit **Permission System (permissions/manager.py):** - Per-agent permission grants on resources - Resource pattern matching (wildcards) - Temporary permissions with expiration - Permission inheritance hierarchy - Default deny with configurable defaults **Cost Control (costs/controller.py):** - Per-session and per-day budget tracking - Token and USD cost limits - Warning alerts at configurable thresholds - Budget rollover and reset policies - Real-time usage tracking **Rate Limiting (limits/limiter.py):** - Sliding window rate limiter - Per-action, per-LLM-call, per-file-op limits - Burst allowance with recovery - Configurable limits per operation type **Loop Detection (loops/detector.py):** - Exact repetition detection (same action+args) - Semantic repetition (similar actions) - Oscillation pattern detection (A→B→A→B) - Per-agent action history tracking - Loop breaking suggestions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
385 lines
12 KiB
Python
385 lines
12 KiB
Python
"""
|
|
Permission Manager
|
|
|
|
Manages permissions for agent actions on resources.
|
|
"""
|
|
|
|
import asyncio
|
|
import fnmatch
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from uuid import uuid4
|
|
|
|
from ..exceptions import PermissionDeniedError
|
|
from ..models import (
|
|
ActionRequest,
|
|
ActionType,
|
|
PermissionLevel,
|
|
ResourceType,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PermissionGrant:
|
|
"""A permission grant for an agent on a resource."""
|
|
|
|
def __init__(
|
|
self,
|
|
agent_id: str,
|
|
resource_pattern: str,
|
|
resource_type: ResourceType,
|
|
level: PermissionLevel,
|
|
*,
|
|
expires_at: datetime | None = None,
|
|
granted_by: str | None = None,
|
|
reason: str | None = None,
|
|
) -> None:
|
|
self.id = str(uuid4())
|
|
self.agent_id = agent_id
|
|
self.resource_pattern = resource_pattern
|
|
self.resource_type = resource_type
|
|
self.level = level
|
|
self.expires_at = expires_at
|
|
self.granted_by = granted_by
|
|
self.reason = reason
|
|
self.created_at = datetime.utcnow()
|
|
|
|
def is_expired(self) -> bool:
|
|
"""Check if the grant has expired."""
|
|
if self.expires_at is None:
|
|
return False
|
|
return datetime.utcnow() > self.expires_at
|
|
|
|
def matches(self, resource: str, resource_type: ResourceType) -> bool:
|
|
"""Check if this grant applies to a resource."""
|
|
if self.resource_type != resource_type:
|
|
return False
|
|
return fnmatch.fnmatch(resource, self.resource_pattern)
|
|
|
|
def allows(self, required_level: PermissionLevel) -> bool:
|
|
"""Check if this grant allows the required permission level."""
|
|
# Permission level hierarchy
|
|
hierarchy = {
|
|
PermissionLevel.NONE: 0,
|
|
PermissionLevel.READ: 1,
|
|
PermissionLevel.WRITE: 2,
|
|
PermissionLevel.EXECUTE: 3,
|
|
PermissionLevel.DELETE: 4,
|
|
PermissionLevel.ADMIN: 5,
|
|
}
|
|
|
|
return hierarchy[self.level] >= hierarchy[required_level]
|
|
|
|
|
|
class PermissionManager:
|
|
"""
|
|
Manages permissions for agent access to resources.
|
|
|
|
Features:
|
|
- Permission grants by agent/resource pattern
|
|
- Permission inheritance (project → agent → action)
|
|
- Temporary permissions with expiration
|
|
- Least-privilege defaults
|
|
- Permission escalation logging
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
default_deny: bool = True,
|
|
) -> None:
|
|
"""
|
|
Initialize the PermissionManager.
|
|
|
|
Args:
|
|
default_deny: If True, deny access unless explicitly granted
|
|
"""
|
|
self._grants: list[PermissionGrant] = []
|
|
self._default_deny = default_deny
|
|
self._lock = asyncio.Lock()
|
|
|
|
# Default permissions for common resources
|
|
self._default_permissions: dict[ResourceType, PermissionLevel] = {
|
|
ResourceType.FILE: PermissionLevel.READ,
|
|
ResourceType.DATABASE: PermissionLevel.READ,
|
|
ResourceType.API: PermissionLevel.READ,
|
|
ResourceType.GIT: PermissionLevel.READ,
|
|
ResourceType.LLM: PermissionLevel.EXECUTE,
|
|
ResourceType.SHELL: PermissionLevel.NONE,
|
|
ResourceType.NETWORK: PermissionLevel.READ,
|
|
}
|
|
|
|
async def grant(
|
|
self,
|
|
agent_id: str,
|
|
resource_pattern: str,
|
|
resource_type: ResourceType,
|
|
level: PermissionLevel,
|
|
*,
|
|
duration_seconds: int | None = None,
|
|
granted_by: str | None = None,
|
|
reason: str | None = None,
|
|
) -> PermissionGrant:
|
|
"""
|
|
Grant a permission to an agent.
|
|
|
|
Args:
|
|
agent_id: ID of the agent
|
|
resource_pattern: Pattern for matching resources (supports wildcards)
|
|
resource_type: Type of resource
|
|
level: Permission level to grant
|
|
duration_seconds: Optional duration for temporary permission
|
|
granted_by: Who granted the permission
|
|
reason: Reason for granting
|
|
|
|
Returns:
|
|
The created permission grant
|
|
"""
|
|
expires_at = None
|
|
if duration_seconds:
|
|
expires_at = datetime.utcnow() + timedelta(seconds=duration_seconds)
|
|
|
|
grant = PermissionGrant(
|
|
agent_id=agent_id,
|
|
resource_pattern=resource_pattern,
|
|
resource_type=resource_type,
|
|
level=level,
|
|
expires_at=expires_at,
|
|
granted_by=granted_by,
|
|
reason=reason,
|
|
)
|
|
|
|
async with self._lock:
|
|
self._grants.append(grant)
|
|
|
|
logger.info(
|
|
"Permission granted: agent=%s, resource=%s, type=%s, level=%s",
|
|
agent_id,
|
|
resource_pattern,
|
|
resource_type.value,
|
|
level.value,
|
|
)
|
|
|
|
return grant
|
|
|
|
async def revoke(self, grant_id: str) -> bool:
|
|
"""
|
|
Revoke a permission grant.
|
|
|
|
Args:
|
|
grant_id: ID of the grant to revoke
|
|
|
|
Returns:
|
|
True if grant was found and revoked
|
|
"""
|
|
async with self._lock:
|
|
for i, grant in enumerate(self._grants):
|
|
if grant.id == grant_id:
|
|
del self._grants[i]
|
|
logger.info("Permission revoked: %s", grant_id)
|
|
return True
|
|
return False
|
|
|
|
async def revoke_all(self, agent_id: str) -> int:
|
|
"""
|
|
Revoke all permissions for an agent.
|
|
|
|
Args:
|
|
agent_id: ID of the agent
|
|
|
|
Returns:
|
|
Number of grants revoked
|
|
"""
|
|
async with self._lock:
|
|
original_count = len(self._grants)
|
|
self._grants = [g for g in self._grants if g.agent_id != agent_id]
|
|
revoked = original_count - len(self._grants)
|
|
|
|
if revoked:
|
|
logger.info("Revoked %d permissions for agent %s", revoked, agent_id)
|
|
|
|
return revoked
|
|
|
|
async def check(
|
|
self,
|
|
agent_id: str,
|
|
resource: str,
|
|
resource_type: ResourceType,
|
|
required_level: PermissionLevel,
|
|
) -> bool:
|
|
"""
|
|
Check if an agent has permission to access a resource.
|
|
|
|
Args:
|
|
agent_id: ID of the agent
|
|
resource: Resource to access
|
|
resource_type: Type of resource
|
|
required_level: Required permission level
|
|
|
|
Returns:
|
|
True if access is allowed
|
|
"""
|
|
# Clean up expired grants
|
|
await self._cleanup_expired()
|
|
|
|
async with self._lock:
|
|
for grant in self._grants:
|
|
if grant.agent_id != agent_id:
|
|
continue
|
|
|
|
if grant.is_expired():
|
|
continue
|
|
|
|
if grant.matches(resource, resource_type):
|
|
if grant.allows(required_level):
|
|
return True
|
|
|
|
# Check default permissions
|
|
if not self._default_deny:
|
|
default_level = self._default_permissions.get(
|
|
resource_type, PermissionLevel.NONE
|
|
)
|
|
hierarchy = {
|
|
PermissionLevel.NONE: 0,
|
|
PermissionLevel.READ: 1,
|
|
PermissionLevel.WRITE: 2,
|
|
PermissionLevel.EXECUTE: 3,
|
|
PermissionLevel.DELETE: 4,
|
|
PermissionLevel.ADMIN: 5,
|
|
}
|
|
if hierarchy[default_level] >= hierarchy[required_level]:
|
|
return True
|
|
|
|
return False
|
|
|
|
async def check_action(self, action: ActionRequest) -> bool:
|
|
"""
|
|
Check if an action is permitted.
|
|
|
|
Args:
|
|
action: The action to check
|
|
|
|
Returns:
|
|
True if action is allowed
|
|
"""
|
|
# Determine required permission level from action type
|
|
level_map = {
|
|
ActionType.FILE_READ: PermissionLevel.READ,
|
|
ActionType.FILE_WRITE: PermissionLevel.WRITE,
|
|
ActionType.FILE_DELETE: PermissionLevel.DELETE,
|
|
ActionType.DATABASE_QUERY: PermissionLevel.READ,
|
|
ActionType.DATABASE_MUTATE: PermissionLevel.WRITE,
|
|
ActionType.SHELL_COMMAND: PermissionLevel.EXECUTE,
|
|
ActionType.API_CALL: PermissionLevel.EXECUTE,
|
|
ActionType.GIT_OPERATION: PermissionLevel.WRITE,
|
|
ActionType.LLM_CALL: PermissionLevel.EXECUTE,
|
|
ActionType.NETWORK_REQUEST: PermissionLevel.READ,
|
|
ActionType.TOOL_CALL: PermissionLevel.EXECUTE,
|
|
}
|
|
|
|
required_level = level_map.get(action.action_type, PermissionLevel.EXECUTE)
|
|
|
|
# Determine resource type from action
|
|
resource_type_map = {
|
|
ActionType.FILE_READ: ResourceType.FILE,
|
|
ActionType.FILE_WRITE: ResourceType.FILE,
|
|
ActionType.FILE_DELETE: ResourceType.FILE,
|
|
ActionType.DATABASE_QUERY: ResourceType.DATABASE,
|
|
ActionType.DATABASE_MUTATE: ResourceType.DATABASE,
|
|
ActionType.SHELL_COMMAND: ResourceType.SHELL,
|
|
ActionType.API_CALL: ResourceType.API,
|
|
ActionType.GIT_OPERATION: ResourceType.GIT,
|
|
ActionType.LLM_CALL: ResourceType.LLM,
|
|
ActionType.NETWORK_REQUEST: ResourceType.NETWORK,
|
|
}
|
|
|
|
resource_type = resource_type_map.get(action.action_type, ResourceType.CUSTOM)
|
|
resource = action.resource or action.tool_name or "*"
|
|
|
|
return await self.check(
|
|
agent_id=action.metadata.agent_id,
|
|
resource=resource,
|
|
resource_type=resource_type,
|
|
required_level=required_level,
|
|
)
|
|
|
|
async def require_permission(
|
|
self,
|
|
agent_id: str,
|
|
resource: str,
|
|
resource_type: ResourceType,
|
|
required_level: PermissionLevel,
|
|
) -> None:
|
|
"""
|
|
Require permission or raise exception.
|
|
|
|
Args:
|
|
agent_id: ID of the agent
|
|
resource: Resource to access
|
|
resource_type: Type of resource
|
|
required_level: Required permission level
|
|
|
|
Raises:
|
|
PermissionDeniedError: If permission is denied
|
|
"""
|
|
if not await self.check(agent_id, resource, resource_type, required_level):
|
|
raise PermissionDeniedError(
|
|
f"Permission denied: {resource}",
|
|
action_type=None,
|
|
resource=resource,
|
|
required_permission=required_level.value,
|
|
agent_id=agent_id,
|
|
)
|
|
|
|
async def list_grants(
|
|
self,
|
|
agent_id: str | None = None,
|
|
resource_type: ResourceType | None = None,
|
|
) -> list[PermissionGrant]:
|
|
"""
|
|
List permission grants.
|
|
|
|
Args:
|
|
agent_id: Optional filter by agent
|
|
resource_type: Optional filter by resource type
|
|
|
|
Returns:
|
|
List of matching grants
|
|
"""
|
|
await self._cleanup_expired()
|
|
|
|
async with self._lock:
|
|
grants = list(self._grants)
|
|
|
|
if agent_id:
|
|
grants = [g for g in grants if g.agent_id == agent_id]
|
|
|
|
if resource_type:
|
|
grants = [g for g in grants if g.resource_type == resource_type]
|
|
|
|
return grants
|
|
|
|
def set_default_permission(
|
|
self,
|
|
resource_type: ResourceType,
|
|
level: PermissionLevel,
|
|
) -> None:
|
|
"""
|
|
Set the default permission level for a resource type.
|
|
|
|
Args:
|
|
resource_type: Type of resource
|
|
level: Default permission level
|
|
"""
|
|
self._default_permissions[resource_type] = level
|
|
|
|
async def _cleanup_expired(self) -> None:
|
|
"""Remove expired grants."""
|
|
async with self._lock:
|
|
original_count = len(self._grants)
|
|
self._grants = [g for g in self._grants if not g.is_expired()]
|
|
removed = original_count - len(self._grants)
|
|
|
|
if removed:
|
|
logger.debug("Cleaned up %d expired permission grants", removed)
|