# app/services/memory/scoping/scope.py """ Scope Management. Provides utilities for managing memory scopes with hierarchical inheritance: Global -> Project -> Agent Type -> Agent Instance -> Session """ import logging from dataclasses import dataclass, field from typing import Any, ClassVar from uuid import UUID from app.services.memory.types import ScopeContext, ScopeLevel logger = logging.getLogger(__name__) @dataclass class ScopePolicy: """Access control policy for a scope.""" scope_type: ScopeLevel scope_id: str can_read: bool = True can_write: bool = True can_inherit: bool = True allowed_memory_types: list[str] = field(default_factory=lambda: ["all"]) metadata: dict[str, Any] = field(default_factory=dict) def allows_read(self) -> bool: """Check if reading is allowed.""" return self.can_read def allows_write(self) -> bool: """Check if writing is allowed.""" return self.can_write def allows_inherit(self) -> bool: """Check if inheritance from parent is allowed.""" return self.can_inherit def allows_memory_type(self, memory_type: str) -> bool: """Check if a specific memory type is allowed.""" return ( "all" in self.allowed_memory_types or memory_type in self.allowed_memory_types ) @dataclass class ScopeInfo: """Information about a scope including its hierarchy.""" context: ScopeContext policy: ScopePolicy parent_info: "ScopeInfo | None" = None child_count: int = 0 memory_count: int = 0 @property def depth(self) -> int: """Get the depth of this scope in the hierarchy.""" count = 0 current = self.parent_info while current is not None: count += 1 current = current.parent_info return count class ScopeManager: """ Manages memory scopes and their hierarchies. Provides: - Scope creation and validation - Hierarchy management - Access control policy management - Scope inheritance rules """ # Order of scope levels from root to leaf SCOPE_ORDER: ClassVar[list[ScopeLevel]] = [ ScopeLevel.GLOBAL, ScopeLevel.PROJECT, ScopeLevel.AGENT_TYPE, ScopeLevel.AGENT_INSTANCE, ScopeLevel.SESSION, ] def __init__(self) -> None: """Initialize the scope manager.""" # In-memory policy cache (would be backed by database in production) self._policies: dict[str, ScopePolicy] = {} self._default_policies = self._create_default_policies() def _create_default_policies(self) -> dict[ScopeLevel, ScopePolicy]: """Create default policies for each scope level.""" return { ScopeLevel.GLOBAL: ScopePolicy( scope_type=ScopeLevel.GLOBAL, scope_id="global", can_read=True, can_write=False, # Global writes require special permission can_inherit=True, ), ScopeLevel.PROJECT: ScopePolicy( scope_type=ScopeLevel.PROJECT, scope_id="default", can_read=True, can_write=True, can_inherit=True, ), ScopeLevel.AGENT_TYPE: ScopePolicy( scope_type=ScopeLevel.AGENT_TYPE, scope_id="default", can_read=True, can_write=True, can_inherit=True, ), ScopeLevel.AGENT_INSTANCE: ScopePolicy( scope_type=ScopeLevel.AGENT_INSTANCE, scope_id="default", can_read=True, can_write=True, can_inherit=True, ), ScopeLevel.SESSION: ScopePolicy( scope_type=ScopeLevel.SESSION, scope_id="default", can_read=True, can_write=True, can_inherit=True, allowed_memory_types=["working"], # Sessions only allow working memory ), } def create_scope( self, scope_type: ScopeLevel, scope_id: str, parent: ScopeContext | None = None, ) -> ScopeContext: """ Create a new scope context. Args: scope_type: Level of the scope scope_id: Unique identifier within the level parent: Optional parent scope Returns: Created scope context Raises: ValueError: If scope hierarchy is invalid """ # Validate hierarchy if parent is not None: self._validate_parent_child(parent.scope_type, scope_type) # For non-global scopes without parent, auto-create parent chain if parent is None and scope_type != ScopeLevel.GLOBAL: parent = self._create_parent_chain(scope_type, scope_id) context = ScopeContext( scope_type=scope_type, scope_id=scope_id, parent=parent, ) logger.debug(f"Created scope: {scope_type.value}:{scope_id}") return context def _validate_parent_child( self, parent_type: ScopeLevel, child_type: ScopeLevel, ) -> None: """Validate that parent-child relationship is valid.""" parent_idx = self.SCOPE_ORDER.index(parent_type) child_idx = self.SCOPE_ORDER.index(child_type) if child_idx <= parent_idx: raise ValueError( f"Invalid scope hierarchy: {child_type.value} cannot be child of {parent_type.value}" ) # Allow skipping levels (e.g., PROJECT -> SESSION is valid) # This enables flexible scope structures def _create_parent_chain( self, target_type: ScopeLevel, scope_id: str, ) -> ScopeContext: """Create parent scope chain up to target type.""" target_idx = self.SCOPE_ORDER.index(target_type) # Start from global and build chain current: ScopeContext | None = None for i in range(target_idx): level = self.SCOPE_ORDER[i] if level == ScopeLevel.GLOBAL: level_id = "global" else: # Use a default ID for intermediate levels level_id = f"default_{level.value}" current = ScopeContext( scope_type=level, scope_id=level_id, parent=current, ) return current # type: ignore[return-value] def create_scope_from_ids( self, project_id: UUID | None = None, agent_type_id: UUID | None = None, agent_instance_id: UUID | None = None, session_id: str | None = None, ) -> ScopeContext: """ Create a scope context from individual IDs. Automatically determines the most specific scope level based on provided IDs. Args: project_id: Project UUID agent_type_id: Agent type UUID agent_instance_id: Agent instance UUID session_id: Session identifier Returns: Scope context for the most specific level """ # Build scope chain from most general to most specific current: ScopeContext = ScopeContext( scope_type=ScopeLevel.GLOBAL, scope_id="global", parent=None, ) if project_id is not None: current = ScopeContext( scope_type=ScopeLevel.PROJECT, scope_id=str(project_id), parent=current, ) if agent_type_id is not None: current = ScopeContext( scope_type=ScopeLevel.AGENT_TYPE, scope_id=str(agent_type_id), parent=current, ) if agent_instance_id is not None: current = ScopeContext( scope_type=ScopeLevel.AGENT_INSTANCE, scope_id=str(agent_instance_id), parent=current, ) if session_id is not None: current = ScopeContext( scope_type=ScopeLevel.SESSION, scope_id=session_id, parent=current, ) return current def get_policy( self, scope: ScopeContext, ) -> ScopePolicy: """ Get the access policy for a scope. Args: scope: Scope to get policy for Returns: Policy for the scope """ key = self._scope_key(scope) if key in self._policies: return self._policies[key] # Return default policy for the scope level return self._default_policies.get( scope.scope_type, ScopePolicy( scope_type=scope.scope_type, scope_id=scope.scope_id, ), ) def set_policy( self, scope: ScopeContext, policy: ScopePolicy, ) -> None: """ Set the access policy for a scope. Args: scope: Scope to set policy for policy: Policy to apply """ key = self._scope_key(scope) self._policies[key] = policy logger.info(f"Set policy for scope {key}") def _scope_key(self, scope: ScopeContext) -> str: """Generate a unique key for a scope.""" return f"{scope.scope_type.value}:{scope.scope_id}" def get_scope_depth(self, scope_type: ScopeLevel) -> int: """Get the depth of a scope level in the hierarchy.""" return self.SCOPE_ORDER.index(scope_type) def get_parent_level(self, scope_type: ScopeLevel) -> ScopeLevel | None: """Get the parent scope level for a given level.""" idx = self.SCOPE_ORDER.index(scope_type) if idx == 0: return None return self.SCOPE_ORDER[idx - 1] def get_child_level(self, scope_type: ScopeLevel) -> ScopeLevel | None: """Get the child scope level for a given level.""" idx = self.SCOPE_ORDER.index(scope_type) if idx >= len(self.SCOPE_ORDER) - 1: return None return self.SCOPE_ORDER[idx + 1] def is_ancestor( self, potential_ancestor: ScopeContext, descendant: ScopeContext, ) -> bool: """ Check if one scope is an ancestor of another. Args: potential_ancestor: Scope to check as ancestor descendant: Scope to check as descendant Returns: True if ancestor relationship exists """ current = descendant.parent while current is not None: if ( current.scope_type == potential_ancestor.scope_type and current.scope_id == potential_ancestor.scope_id ): return True current = current.parent return False def get_common_ancestor( self, scope_a: ScopeContext, scope_b: ScopeContext, ) -> ScopeContext | None: """ Find the nearest common ancestor of two scopes. Args: scope_a: First scope scope_b: Second scope Returns: Common ancestor or None if none exists """ # Get ancestors of scope_a ancestors_a: set[str] = set() current: ScopeContext | None = scope_a while current is not None: ancestors_a.add(self._scope_key(current)) current = current.parent # Find first ancestor of scope_b that's in ancestors_a current = scope_b while current is not None: if self._scope_key(current) in ancestors_a: return current current = current.parent return None def can_access( self, accessor_scope: ScopeContext, target_scope: ScopeContext, operation: str = "read", ) -> bool: """ Check if accessor scope can access target scope. Access rules: - A scope can always access itself - A scope can access ancestors (if inheritance allowed) - A scope CANNOT access descendants (privacy) - Sibling scopes cannot access each other Args: accessor_scope: Scope attempting access target_scope: Scope being accessed operation: Type of operation (read/write) Returns: True if access is allowed """ # Same scope - always allowed if ( accessor_scope.scope_type == target_scope.scope_type and accessor_scope.scope_id == target_scope.scope_id ): policy = self.get_policy(target_scope) if operation == "write": return policy.allows_write() return policy.allows_read() # Check if target is ancestor (inheritance) if self.is_ancestor(target_scope, accessor_scope): policy = self.get_policy(target_scope) if not policy.allows_inherit(): return False if operation == "write": return policy.allows_write() return policy.allows_read() # Check if accessor is ancestor of target (downward access) # This is NOT allowed - parents cannot access children's memories if self.is_ancestor(accessor_scope, target_scope): return False # Sibling scopes cannot access each other return False # Singleton manager instance _manager: ScopeManager | None = None def get_scope_manager() -> ScopeManager: """Get the singleton scope manager instance.""" global _manager if _manager is None: _manager = ScopeManager() return _manager