""" MCP Server Registry Thread-safe singleton registry for managing MCP server configurations and their capabilities. """ import asyncio import logging from threading import Lock from typing import Any from .config import MCPConfig, MCPServerConfig, load_mcp_config from .exceptions import MCPServerNotFoundError logger = logging.getLogger(__name__) class ServerCapabilities: """Cached capabilities for an MCP server.""" def __init__( self, tools: list[dict[str, Any]] | None = None, resources: list[dict[str, Any]] | None = None, prompts: list[dict[str, Any]] | None = None, ) -> None: self.tools = tools or [] self.resources = resources or [] self.prompts = prompts or [] self._loaded = False self._load_time: float | None = None @property def is_loaded(self) -> bool: """Check if capabilities have been loaded.""" return self._loaded @property def tool_names(self) -> list[str]: """Get list of tool names.""" return [t.get("name", "") for t in self.tools if t.get("name")] def mark_loaded(self) -> None: """Mark capabilities as loaded.""" import time self._loaded = True self._load_time = time.time() class MCPServerRegistry: """ Thread-safe singleton registry for MCP servers. Manages server configurations and caches their capabilities. """ _instance: "MCPServerRegistry | None" = None _lock = Lock() def __new__(cls) -> "MCPServerRegistry": """Ensure singleton pattern.""" with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self) -> None: """Initialize registry (only runs once due to singleton).""" if getattr(self, "_initialized", False): return self._config: MCPConfig = MCPConfig() self._capabilities: dict[str, ServerCapabilities] = {} self._capabilities_lock = asyncio.Lock() self._initialized = True logger.info("MCP Server Registry initialized") @classmethod def get_instance(cls) -> "MCPServerRegistry": """Get the singleton registry instance.""" return cls() @classmethod def reset_instance(cls) -> None: """Reset the singleton (for testing).""" with cls._lock: cls._instance = None def load_config(self, config: MCPConfig | None = None) -> None: """ Load configuration into the registry. Args: config: Optional config to load. If None, loads from default path. """ if config is None: config = load_mcp_config() self._config = config self._capabilities.clear() logger.info( "Loaded MCP configuration with %d servers", len(config.mcp_servers), ) for name in config.list_server_names(): logger.debug("Registered MCP server: %s", name) def register(self, name: str, config: MCPServerConfig) -> None: """ Register a new MCP server. Args: name: Unique server name config: Server configuration """ self._config.mcp_servers[name] = config self._capabilities.pop(name, None) # Clear any cached capabilities logger.info("Registered MCP server: %s at %s", name, config.url) def unregister(self, name: str) -> bool: """ Unregister an MCP server. Args: name: Server name to unregister Returns: True if server was found and removed """ if name in self._config.mcp_servers: del self._config.mcp_servers[name] self._capabilities.pop(name, None) logger.info("Unregistered MCP server: %s", name) return True return False def get(self, name: str) -> MCPServerConfig: """ Get a server configuration by name. Args: name: Server name Returns: Server configuration Raises: MCPServerNotFoundError: If server is not registered """ config = self._config.get_server(name) if config is None: raise MCPServerNotFoundError( server_name=name, available_servers=self.list_servers(), ) return config def get_or_none(self, name: str) -> MCPServerConfig | None: """ Get a server configuration by name, or None if not found. Args: name: Server name Returns: Server configuration or None """ return self._config.get_server(name) def list_servers(self) -> list[str]: """Get list of all registered server names.""" return self._config.list_server_names() def list_enabled_servers(self) -> list[str]: """Get list of enabled server names.""" return list(self._config.get_enabled_servers().keys()) def get_all_configs(self) -> dict[str, MCPServerConfig]: """Get all server configurations.""" return dict(self._config.mcp_servers) def get_enabled_configs(self) -> dict[str, MCPServerConfig]: """Get all enabled server configurations.""" return self._config.get_enabled_servers() async def get_capabilities( self, name: str, force_refresh: bool = False, ) -> ServerCapabilities: """ Get capabilities for a server (lazy-loaded and cached). Args: name: Server name force_refresh: If True, refresh cached capabilities Returns: Server capabilities Raises: MCPServerNotFoundError: If server is not registered """ # Verify server exists self.get(name) async with self._capabilities_lock: if name not in self._capabilities or force_refresh: # Will be populated by connection manager when connecting self._capabilities[name] = ServerCapabilities() return self._capabilities[name] def set_capabilities( self, name: str, tools: list[dict[str, Any]] | None = None, resources: list[dict[str, Any]] | None = None, prompts: list[dict[str, Any]] | None = None, ) -> None: """ Set capabilities for a server (called by connection manager). Args: name: Server name tools: List of tool definitions resources: List of resource definitions prompts: List of prompt definitions """ capabilities = ServerCapabilities( tools=tools, resources=resources, prompts=prompts, ) capabilities.mark_loaded() self._capabilities[name] = capabilities logger.debug( "Updated capabilities for %s: %d tools, %d resources, %d prompts", name, len(capabilities.tools), len(capabilities.resources), len(capabilities.prompts), ) def get_cached_capabilities(self, name: str) -> ServerCapabilities: """ Get cached capabilities without async loading. Use this for synchronous access when you only need cached values (e.g., for health check responses). Args: name: Server name Returns: Cached capabilities or empty ServerCapabilities """ return self._capabilities.get(name, ServerCapabilities()) def find_server_for_tool(self, tool_name: str) -> str | None: """ Find which server provides a specific tool. Args: tool_name: Name of the tool to find Returns: Server name or None if not found """ for name, caps in self._capabilities.items(): if tool_name in caps.tool_names: return name return None def get_all_tools(self) -> dict[str, list[dict[str, Any]]]: """ Get all tools from all servers. Returns: Dict mapping server name to list of tool definitions """ return { name: caps.tools for name, caps in self._capabilities.items() if caps.is_loaded } @property def global_config(self) -> MCPConfig: """Get the global MCP configuration.""" return self._config # Module-level convenience function def get_registry() -> MCPServerRegistry: """Get the global MCP server registry instance.""" return MCPServerRegistry.get_instance()