Core MCP client implementation with comprehensive tooling:
**Services:**
- MCPClientManager: Main facade for all MCP operations
- MCPServerRegistry: Thread-safe singleton for server configs
- ConnectionPool: Connection pooling with auto-reconnection
- ToolRouter: Automatic tool routing with circuit breaker
- AsyncCircuitBreaker: Custom async-compatible circuit breaker
**Configuration:**
- YAML-based config with Pydantic models
- Environment variable expansion support
- Transport types: HTTP, SSE, STDIO
**API Endpoints:**
- GET /mcp/servers - List all MCP servers
- GET /mcp/servers/{name}/tools - List server tools
- GET /mcp/tools - List all tools from all servers
- GET /mcp/health - Health check all servers
- POST /mcp/call - Execute tool (admin only)
- GET /mcp/circuit-breakers - Circuit breaker status
- POST /mcp/circuit-breakers/{name}/reset - Reset circuit breaker
- POST /mcp/servers/{name}/reconnect - Force reconnection
**Testing:**
- 156 unit tests with comprehensive coverage
- Tests for all services, routes, and error handling
- Proper mocking and async test support
**Documentation:**
- MCP_CLIENT.md with usage examples
- Phase 2+ workflow documentation
11 KiB
MCP Client Infrastructure
This document describes the Model Context Protocol (MCP) client infrastructure used by Syndarix to communicate with AI agent tools.
Overview
The MCP client infrastructure provides a robust, fault-tolerant layer for communicating with MCP servers. It enables AI agents to discover and execute tools provided by various services (LLM Gateway, Knowledge Base, Git Operations, Issue Tracker, etc.).
Architecture
┌────────────────────────────────────────────────────────────────────────┐
│ MCPClientManager │
│ (Main Facade Class) │
├────────────────────────────────────────────────────────────────────────┤
│ - initialize() / shutdown() │
│ - call_tool() / route_tool() │
│ - connect() / disconnect() │
│ - health_check() / list_tools() │
└─────────────┬────────────────────┬─────────────────┬───────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────┐ ┌──────────────────────────┐
│ MCPServerRegistry │ │ ConnectionPool │ │ ToolRouter │
│ (Singleton) │ │ │ │ │
├─────────────────────┤ ├─────────────────┤ ├──────────────────────────┤
│ - Server configs │ │ - Connection │ │ - Tool → Server mapping │
│ - Capabilities │ │ management │ │ - Circuit breakers │
│ - Tool discovery │ │ - Auto reconnect│ │ - Retry logic │
└─────────────────────┘ └─────────────────┘ └──────────────────────────┘
Components
MCPClientManager
The main entry point for all MCP operations. Provides a clean facade over the underlying infrastructure.
from app.services.mcp import get_mcp_client, MCPClientManager
# In FastAPI dependency injection
async def my_route(mcp: MCPClientManager = Depends(get_mcp_client)):
result = await mcp.call_tool(
server="llm-gateway",
tool="chat",
args={"prompt": "Hello"}
)
return result.data
# Direct usage
manager = MCPClientManager()
await manager.initialize()
# Execute a tool
result = await manager.call_tool(
server="issues",
tool="create_issue",
args={"title": "New Feature", "body": "Description"}
)
await manager.shutdown()
Configuration
Configuration is loaded from YAML files and supports environment variable expansion:
# mcp_servers.yaml
mcp_servers:
llm-gateway:
url: ${LLM_GATEWAY_URL:-http://localhost:8001}
timeout: 60
transport: http
enabled: true
retry_attempts: 3
circuit_breaker_threshold: 5
circuit_breaker_timeout: 30.0
knowledge-base:
url: ${KNOWLEDGE_BASE_URL:-http://localhost:8002}
timeout: 30
enabled: true
default_timeout: 30
connection_pool_size: 10
health_check_interval: 30
Environment Variable Syntax:
${VAR_NAME}- Uses the environment variable value${VAR_NAME:-default}- Uses default if variable is not set
Connection Management
The ConnectionPool manages connections to MCP servers with:
- Connection Reuse: Connections are pooled and reused
- Auto Reconnection: Failed connections are automatically retried
- Health Checks: Periodic health checks detect unhealthy servers
- Exponential Backoff: Retry delays increase exponentially with jitter
from app.services.mcp import ConnectionPool, MCPConnection
pool = ConnectionPool(max_connections_per_server=5)
# Get a connection (creates new or reuses existing)
conn = await pool.get_connection("server-1", config)
# Execute request
result = await conn.execute_request("POST", "/mcp", data={...})
# Health check all connections
health = await pool.health_check_all()
Circuit Breaker Pattern
The AsyncCircuitBreaker prevents cascade failures:
| State | Description |
|---|---|
| CLOSED | Normal operation, calls pass through |
| OPEN | Too many failures, calls are rejected immediately |
| HALF-OPEN | After timeout, allows one call to test if service recovered |
from app.services.mcp import AsyncCircuitBreaker
breaker = AsyncCircuitBreaker(
fail_max=5, # Open after 5 failures
reset_timeout=30, # Try again after 30 seconds
name="my-service"
)
if breaker.is_open():
raise MCPCircuitOpenError(...)
try:
result = await call_external_service()
await breaker.success()
except Exception:
await breaker.failure()
raise
Tool Routing
The ToolRouter handles:
- Tool Discovery: Automatically discovers tools from connected servers
- Routing: Routes tool calls to the appropriate server
- Retry Logic: Retries failed calls with exponential backoff
from app.services.mcp import ToolRouter
router = ToolRouter(registry, pool)
# Discover tools from all servers
await router.discover_tools()
# Route to the right server automatically
result = await router.route_tool(
tool_name="create_issue",
arguments={"title": "Bug fix"}
)
# Or call a specific server
result = await router.call_tool(
server_name="issues",
tool_name="create_issue",
arguments={"title": "Bug fix"}
)
Exception Hierarchy
MCPError
├── MCPConnectionError # Connection failures
├── MCPTimeoutError # Operation timeouts
├── MCPToolError # Tool execution errors
├── MCPServerNotFoundError # Unknown server
├── MCPToolNotFoundError # Unknown tool
├── MCPCircuitOpenError # Circuit breaker open
└── MCPValidationError # Invalid configuration
All exceptions include rich context:
except MCPServerNotFoundError as e:
print(f"Server: {e.server_name}")
print(f"Available: {e.available_servers}")
print(f"Suggestion: {e.suggestion}")
REST API Endpoints
| Method | Endpoint | Description | Auth |
|---|---|---|---|
| GET | /api/v1/mcp/servers |
List all MCP servers | No |
| GET | /api/v1/mcp/servers/{name}/tools |
List server tools | No |
| GET | /api/v1/mcp/tools |
List all tools | No |
| GET | /api/v1/mcp/health |
Health check | No |
| POST | /api/v1/mcp/call |
Execute tool | Superuser |
| GET | /api/v1/mcp/circuit-breakers |
List circuit breakers | No |
| POST | /api/v1/mcp/circuit-breakers/{name}/reset |
Reset breaker | Superuser |
| POST | /api/v1/mcp/servers/{name}/reconnect |
Force reconnect | Superuser |
Example: Execute a Tool
POST /api/v1/mcp/call
Authorization: Bearer <token>
Content-Type: application/json
{
"server": "issues",
"tool": "create_issue",
"arguments": {
"title": "New Feature Request",
"body": "Please add dark mode support"
},
"timeout": 30
}
Response:
{
"success": true,
"data": {
"issue_id": "12345",
"url": "https://gitea.example.com/org/repo/issues/42"
},
"tool_name": "create_issue",
"server_name": "issues",
"execution_time_ms": 234.5,
"request_id": "550e8400-e29b-41d4-a716-446655440000"
}
Usage in Syndarix Agents
AI agents use the MCP client to execute tools:
class IssueCreatorAgent:
def __init__(self, mcp: MCPClientManager):
self.mcp = mcp
async def create_issue(self, title: str, body: str) -> dict:
result = await self.mcp.call_tool(
server="issues",
tool="create_issue",
args={"title": title, "body": body}
)
if not result.success:
raise AgentError(f"Failed to create issue: {result.error}")
return result.data
Testing
The MCP infrastructure is thoroughly tested:
- Unit Tests:
tests/services/mcp/- Service layer tests - API Tests:
tests/api/routes/test_mcp.py- Endpoint tests
Run tests:
# All MCP tests
IS_TEST=True uv run pytest tests/services/mcp/ tests/api/routes/test_mcp.py -v
# With coverage
IS_TEST=True uv run pytest tests/services/mcp/ --cov=app/services/mcp
Configuration Reference
MCPServerConfig
| Field | Type | Default | Description |
|---|---|---|---|
url |
str | Required | Server URL |
transport |
str | "http" | Transport type (http, stdio, sse) |
timeout |
int | 30 | Request timeout (1-600 seconds) |
retry_attempts |
int | 3 | Max retry attempts (0-10) |
retry_delay |
float | 1.0 | Initial retry delay (0.1-300 seconds) |
retry_max_delay |
float | 30.0 | Maximum retry delay |
circuit_breaker_threshold |
int | 5 | Failures before opening circuit |
circuit_breaker_timeout |
float | 30.0 | Seconds before trying again |
enabled |
bool | true | Whether server is enabled |
description |
str | None | Server description |
MCPConfig (Global)
| Field | Type | Default | Description |
|---|---|---|---|
mcp_servers |
dict | {} | Server configurations |
default_timeout |
int | 30 | Default request timeout |
default_retry_attempts |
int | 3 | Default retry attempts |
connection_pool_size |
int | 10 | Max connections per server |
health_check_interval |
int | 30 | Health check interval (seconds) |
Files
| Path | Description |
|---|---|
app/services/mcp/__init__.py |
Package exports |
app/services/mcp/client_manager.py |
Main facade class |
app/services/mcp/config.py |
Configuration models |
app/services/mcp/registry.py |
Server registry singleton |
app/services/mcp/connection.py |
Connection management |
app/services/mcp/routing.py |
Tool routing and circuit breakers |
app/services/mcp/exceptions.py |
Exception classes |
app/api/routes/mcp.py |
REST API endpoints |
mcp_servers.yaml |
Default configuration |