""" Permission Manager Manages permissions for agent actions on resources. """ import asyncio import fnmatch import logging from datetime import datetime, timedelta from uuid import uuid4 from ..exceptions import PermissionDeniedError from ..models import ( ActionRequest, ActionType, PermissionLevel, ResourceType, ) logger = logging.getLogger(__name__) class PermissionGrant: """A permission grant for an agent on a resource.""" def __init__( self, agent_id: str, resource_pattern: str, resource_type: ResourceType, level: PermissionLevel, *, expires_at: datetime | None = None, granted_by: str | None = None, reason: str | None = None, ) -> None: self.id = str(uuid4()) self.agent_id = agent_id self.resource_pattern = resource_pattern self.resource_type = resource_type self.level = level self.expires_at = expires_at self.granted_by = granted_by self.reason = reason self.created_at = datetime.utcnow() def is_expired(self) -> bool: """Check if the grant has expired.""" if self.expires_at is None: return False return datetime.utcnow() > self.expires_at def matches(self, resource: str, resource_type: ResourceType) -> bool: """Check if this grant applies to a resource.""" if self.resource_type != resource_type: return False return fnmatch.fnmatch(resource, self.resource_pattern) def allows(self, required_level: PermissionLevel) -> bool: """Check if this grant allows the required permission level.""" # Permission level hierarchy hierarchy = { PermissionLevel.NONE: 0, PermissionLevel.READ: 1, PermissionLevel.WRITE: 2, PermissionLevel.EXECUTE: 3, PermissionLevel.DELETE: 4, PermissionLevel.ADMIN: 5, } return hierarchy[self.level] >= hierarchy[required_level] class PermissionManager: """ Manages permissions for agent access to resources. Features: - Permission grants by agent/resource pattern - Permission inheritance (project → agent → action) - Temporary permissions with expiration - Least-privilege defaults - Permission escalation logging """ def __init__( self, default_deny: bool = True, ) -> None: """ Initialize the PermissionManager. Args: default_deny: If True, deny access unless explicitly granted """ self._grants: list[PermissionGrant] = [] self._default_deny = default_deny self._lock = asyncio.Lock() # Default permissions for common resources self._default_permissions: dict[ResourceType, PermissionLevel] = { ResourceType.FILE: PermissionLevel.READ, ResourceType.DATABASE: PermissionLevel.READ, ResourceType.API: PermissionLevel.READ, ResourceType.GIT: PermissionLevel.READ, ResourceType.LLM: PermissionLevel.EXECUTE, ResourceType.SHELL: PermissionLevel.NONE, ResourceType.NETWORK: PermissionLevel.READ, } async def grant( self, agent_id: str, resource_pattern: str, resource_type: ResourceType, level: PermissionLevel, *, duration_seconds: int | None = None, granted_by: str | None = None, reason: str | None = None, ) -> PermissionGrant: """ Grant a permission to an agent. Args: agent_id: ID of the agent resource_pattern: Pattern for matching resources (supports wildcards) resource_type: Type of resource level: Permission level to grant duration_seconds: Optional duration for temporary permission granted_by: Who granted the permission reason: Reason for granting Returns: The created permission grant """ expires_at = None if duration_seconds: expires_at = datetime.utcnow() + timedelta(seconds=duration_seconds) grant = PermissionGrant( agent_id=agent_id, resource_pattern=resource_pattern, resource_type=resource_type, level=level, expires_at=expires_at, granted_by=granted_by, reason=reason, ) async with self._lock: self._grants.append(grant) logger.info( "Permission granted: agent=%s, resource=%s, type=%s, level=%s", agent_id, resource_pattern, resource_type.value, level.value, ) return grant async def revoke(self, grant_id: str) -> bool: """ Revoke a permission grant. Args: grant_id: ID of the grant to revoke Returns: True if grant was found and revoked """ async with self._lock: for i, grant in enumerate(self._grants): if grant.id == grant_id: del self._grants[i] logger.info("Permission revoked: %s", grant_id) return True return False async def revoke_all(self, agent_id: str) -> int: """ Revoke all permissions for an agent. Args: agent_id: ID of the agent Returns: Number of grants revoked """ async with self._lock: original_count = len(self._grants) self._grants = [g for g in self._grants if g.agent_id != agent_id] revoked = original_count - len(self._grants) if revoked: logger.info("Revoked %d permissions for agent %s", revoked, agent_id) return revoked async def check( self, agent_id: str, resource: str, resource_type: ResourceType, required_level: PermissionLevel, ) -> bool: """ Check if an agent has permission to access a resource. Args: agent_id: ID of the agent resource: Resource to access resource_type: Type of resource required_level: Required permission level Returns: True if access is allowed """ # Clean up expired grants await self._cleanup_expired() async with self._lock: for grant in self._grants: if grant.agent_id != agent_id: continue if grant.is_expired(): continue if grant.matches(resource, resource_type): if grant.allows(required_level): return True # Check default permissions if not self._default_deny: default_level = self._default_permissions.get( resource_type, PermissionLevel.NONE ) hierarchy = { PermissionLevel.NONE: 0, PermissionLevel.READ: 1, PermissionLevel.WRITE: 2, PermissionLevel.EXECUTE: 3, PermissionLevel.DELETE: 4, PermissionLevel.ADMIN: 5, } if hierarchy[default_level] >= hierarchy[required_level]: return True return False async def check_action(self, action: ActionRequest) -> bool: """ Check if an action is permitted. Args: action: The action to check Returns: True if action is allowed """ # Determine required permission level from action type level_map = { ActionType.FILE_READ: PermissionLevel.READ, ActionType.FILE_WRITE: PermissionLevel.WRITE, ActionType.FILE_DELETE: PermissionLevel.DELETE, ActionType.DATABASE_QUERY: PermissionLevel.READ, ActionType.DATABASE_MUTATE: PermissionLevel.WRITE, ActionType.SHELL_COMMAND: PermissionLevel.EXECUTE, ActionType.API_CALL: PermissionLevel.EXECUTE, ActionType.GIT_OPERATION: PermissionLevel.WRITE, ActionType.LLM_CALL: PermissionLevel.EXECUTE, ActionType.NETWORK_REQUEST: PermissionLevel.READ, ActionType.TOOL_CALL: PermissionLevel.EXECUTE, } required_level = level_map.get(action.action_type, PermissionLevel.EXECUTE) # Determine resource type from action resource_type_map = { ActionType.FILE_READ: ResourceType.FILE, ActionType.FILE_WRITE: ResourceType.FILE, ActionType.FILE_DELETE: ResourceType.FILE, ActionType.DATABASE_QUERY: ResourceType.DATABASE, ActionType.DATABASE_MUTATE: ResourceType.DATABASE, ActionType.SHELL_COMMAND: ResourceType.SHELL, ActionType.API_CALL: ResourceType.API, ActionType.GIT_OPERATION: ResourceType.GIT, ActionType.LLM_CALL: ResourceType.LLM, ActionType.NETWORK_REQUEST: ResourceType.NETWORK, } resource_type = resource_type_map.get(action.action_type, ResourceType.CUSTOM) resource = action.resource or action.tool_name or "*" return await self.check( agent_id=action.metadata.agent_id, resource=resource, resource_type=resource_type, required_level=required_level, ) async def require_permission( self, agent_id: str, resource: str, resource_type: ResourceType, required_level: PermissionLevel, ) -> None: """ Require permission or raise exception. Args: agent_id: ID of the agent resource: Resource to access resource_type: Type of resource required_level: Required permission level Raises: PermissionDeniedError: If permission is denied """ if not await self.check(agent_id, resource, resource_type, required_level): raise PermissionDeniedError( f"Permission denied: {resource}", action_type=None, resource=resource, required_permission=required_level.value, agent_id=agent_id, ) async def list_grants( self, agent_id: str | None = None, resource_type: ResourceType | None = None, ) -> list[PermissionGrant]: """ List permission grants. Args: agent_id: Optional filter by agent resource_type: Optional filter by resource type Returns: List of matching grants """ await self._cleanup_expired() async with self._lock: grants = list(self._grants) if agent_id: grants = [g for g in grants if g.agent_id == agent_id] if resource_type: grants = [g for g in grants if g.resource_type == resource_type] return grants def set_default_permission( self, resource_type: ResourceType, level: PermissionLevel, ) -> None: """ Set the default permission level for a resource type. Args: resource_type: Type of resource level: Default permission level """ self._default_permissions[resource_type] = level async def _cleanup_expired(self) -> None: """Remove expired grants.""" async with self._lock: original_count = len(self._grants) self._grants = [g for g in self._grants if not g.is_expired()] removed = original_count - len(self._grants) if removed: logger.debug("Cleaned up %d expired permission grants", removed)