Core MCP client implementation with comprehensive tooling:
**Services:**
- MCPClientManager: Main facade for all MCP operations
- MCPServerRegistry: Thread-safe singleton for server configs
- ConnectionPool: Connection pooling with auto-reconnection
- ToolRouter: Automatic tool routing with circuit breaker
- AsyncCircuitBreaker: Custom async-compatible circuit breaker
**Configuration:**
- YAML-based config with Pydantic models
- Environment variable expansion support
- Transport types: HTTP, SSE, STDIO
**API Endpoints:**
- GET /mcp/servers - List all MCP servers
- GET /mcp/servers/{name}/tools - List server tools
- GET /mcp/tools - List all tools from all servers
- GET /mcp/health - Health check all servers
- POST /mcp/call - Execute tool (admin only)
- GET /mcp/circuit-breakers - Circuit breaker status
- POST /mcp/circuit-breakers/{name}/reset - Reset circuit breaker
- POST /mcp/servers/{name}/reconnect - Force reconnection
**Testing:**
- 156 unit tests with comprehensive coverage
- Tests for all services, routes, and error handling
- Proper mocking and async test support
**Documentation:**
- MCP_CLIENT.md with usage examples
- Phase 2+ workflow documentation
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
11 KiB
MCP Client Infrastructure
This document describes the Model Context Protocol (MCP) client infrastructure used by Syndarix to communicate with AI agent tools.
Overview
The MCP client infrastructure provides a robust, fault-tolerant layer for communicating with MCP servers. It enables AI agents to discover and execute tools provided by various services (LLM Gateway, Knowledge Base, Git Operations, Issue Tracker, etc.).
Architecture
┌────────────────────────────────────────────────────────────────────────┐
│ MCPClientManager │
│ (Main Facade Class) │
├────────────────────────────────────────────────────────────────────────┤
│ - initialize() / shutdown() │
│ - call_tool() / route_tool() │
│ - connect() / disconnect() │
│ - health_check() / list_tools() │
└─────────────┬────────────────────┬─────────────────┬───────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────┐ ┌──────────────────────────┐
│ MCPServerRegistry │ │ ConnectionPool │ │ ToolRouter │
│ (Singleton) │ │ │ │ │
├─────────────────────┤ ├─────────────────┤ ├──────────────────────────┤
│ - Server configs │ │ - Connection │ │ - Tool → Server mapping │
│ - Capabilities │ │ management │ │ - Circuit breakers │
│ - Tool discovery │ │ - Auto reconnect│ │ - Retry logic │
└─────────────────────┘ └─────────────────┘ └──────────────────────────┘
Components
MCPClientManager
The main entry point for all MCP operations. Provides a clean facade over the underlying infrastructure.
from app.services.mcp import get_mcp_client, MCPClientManager
# In FastAPI dependency injection
async def my_route(mcp: MCPClientManager = Depends(get_mcp_client)):
result = await mcp.call_tool(
server="llm-gateway",
tool="chat",
args={"prompt": "Hello"}
)
return result.data
# Direct usage
manager = MCPClientManager()
await manager.initialize()
# Execute a tool
result = await manager.call_tool(
server="issues",
tool="create_issue",
args={"title": "New Feature", "body": "Description"}
)
await manager.shutdown()
Configuration
Configuration is loaded from YAML files and supports environment variable expansion:
# mcp_servers.yaml
mcp_servers:
llm-gateway:
url: ${LLM_GATEWAY_URL:-http://localhost:8001}
timeout: 60
transport: http
enabled: true
retry_attempts: 3
circuit_breaker_threshold: 5
circuit_breaker_timeout: 30.0
knowledge-base:
url: ${KNOWLEDGE_BASE_URL:-http://localhost:8002}
timeout: 30
enabled: true
default_timeout: 30
connection_pool_size: 10
health_check_interval: 30
Environment Variable Syntax:
${VAR_NAME}- Uses the environment variable value${VAR_NAME:-default}- Uses default if variable is not set
Connection Management
The ConnectionPool manages connections to MCP servers with:
- Connection Reuse: Connections are pooled and reused
- Auto Reconnection: Failed connections are automatically retried
- Health Checks: Periodic health checks detect unhealthy servers
- Exponential Backoff: Retry delays increase exponentially with jitter
from app.services.mcp import ConnectionPool, MCPConnection
pool = ConnectionPool(max_connections_per_server=5)
# Get a connection (creates new or reuses existing)
conn = await pool.get_connection("server-1", config)
# Execute request
result = await conn.execute_request("POST", "/mcp", data={...})
# Health check all connections
health = await pool.health_check_all()
Circuit Breaker Pattern
The AsyncCircuitBreaker prevents cascade failures:
| State | Description |
|---|---|
| CLOSED | Normal operation, calls pass through |
| OPEN | Too many failures, calls are rejected immediately |
| HALF-OPEN | After timeout, allows one call to test if service recovered |
from app.services.mcp import AsyncCircuitBreaker
breaker = AsyncCircuitBreaker(
fail_max=5, # Open after 5 failures
reset_timeout=30, # Try again after 30 seconds
name="my-service"
)
if breaker.is_open():
raise MCPCircuitOpenError(...)
try:
result = await call_external_service()
await breaker.success()
except Exception:
await breaker.failure()
raise
Tool Routing
The ToolRouter handles:
- Tool Discovery: Automatically discovers tools from connected servers
- Routing: Routes tool calls to the appropriate server
- Retry Logic: Retries failed calls with exponential backoff
from app.services.mcp import ToolRouter
router = ToolRouter(registry, pool)
# Discover tools from all servers
await router.discover_tools()
# Route to the right server automatically
result = await router.route_tool(
tool_name="create_issue",
arguments={"title": "Bug fix"}
)
# Or call a specific server
result = await router.call_tool(
server_name="issues",
tool_name="create_issue",
arguments={"title": "Bug fix"}
)
Exception Hierarchy
MCPError
├── MCPConnectionError # Connection failures
├── MCPTimeoutError # Operation timeouts
├── MCPToolError # Tool execution errors
├── MCPServerNotFoundError # Unknown server
├── MCPToolNotFoundError # Unknown tool
├── MCPCircuitOpenError # Circuit breaker open
└── MCPValidationError # Invalid configuration
All exceptions include rich context:
except MCPServerNotFoundError as e:
print(f"Server: {e.server_name}")
print(f"Available: {e.available_servers}")
print(f"Suggestion: {e.suggestion}")
REST API Endpoints
| Method | Endpoint | Description | Auth |
|---|---|---|---|
| GET | /api/v1/mcp/servers |
List all MCP servers | No |
| GET | /api/v1/mcp/servers/{name}/tools |
List server tools | No |
| GET | /api/v1/mcp/tools |
List all tools | No |
| GET | /api/v1/mcp/health |
Health check | No |
| POST | /api/v1/mcp/call |
Execute tool | Superuser |
| GET | /api/v1/mcp/circuit-breakers |
List circuit breakers | No |
| POST | /api/v1/mcp/circuit-breakers/{name}/reset |
Reset breaker | Superuser |
| POST | /api/v1/mcp/servers/{name}/reconnect |
Force reconnect | Superuser |
Example: Execute a Tool
POST /api/v1/mcp/call
Authorization: Bearer <token>
Content-Type: application/json
{
"server": "issues",
"tool": "create_issue",
"arguments": {
"title": "New Feature Request",
"body": "Please add dark mode support"
},
"timeout": 30
}
Response:
{
"success": true,
"data": {
"issue_id": "12345",
"url": "https://gitea.example.com/org/repo/issues/42"
},
"tool_name": "create_issue",
"server_name": "issues",
"execution_time_ms": 234.5,
"request_id": "550e8400-e29b-41d4-a716-446655440000"
}
Usage in Syndarix Agents
AI agents use the MCP client to execute tools:
class IssueCreatorAgent:
def __init__(self, mcp: MCPClientManager):
self.mcp = mcp
async def create_issue(self, title: str, body: str) -> dict:
result = await self.mcp.call_tool(
server="issues",
tool="create_issue",
args={"title": title, "body": body}
)
if not result.success:
raise AgentError(f"Failed to create issue: {result.error}")
return result.data
Testing
The MCP infrastructure is thoroughly tested:
- Unit Tests:
tests/services/mcp/- Service layer tests - API Tests:
tests/api/routes/test_mcp.py- Endpoint tests
Run tests:
# All MCP tests
IS_TEST=True uv run pytest tests/services/mcp/ tests/api/routes/test_mcp.py -v
# With coverage
IS_TEST=True uv run pytest tests/services/mcp/ --cov=app/services/mcp
Configuration Reference
MCPServerConfig
| Field | Type | Default | Description |
|---|---|---|---|
url |
str | Required | Server URL |
transport |
str | "http" | Transport type (http, stdio, sse) |
timeout |
int | 30 | Request timeout (1-600 seconds) |
retry_attempts |
int | 3 | Max retry attempts (0-10) |
retry_delay |
float | 1.0 | Initial retry delay (0.1-300 seconds) |
retry_max_delay |
float | 30.0 | Maximum retry delay |
circuit_breaker_threshold |
int | 5 | Failures before opening circuit |
circuit_breaker_timeout |
float | 30.0 | Seconds before trying again |
enabled |
bool | true | Whether server is enabled |
description |
str | None | Server description |
MCPConfig (Global)
| Field | Type | Default | Description |
|---|---|---|---|
mcp_servers |
dict | {} | Server configurations |
default_timeout |
int | 30 | Default request timeout |
default_retry_attempts |
int | 3 | Default retry attempts |
connection_pool_size |
int | 10 | Max connections per server |
health_check_interval |
int | 30 | Health check interval (seconds) |
Files
| Path | Description |
|---|---|
app/services/mcp/__init__.py |
Package exports |
app/services/mcp/client_manager.py |
Main facade class |
app/services/mcp/config.py |
Configuration models |
app/services/mcp/registry.py |
Server registry singleton |
app/services/mcp/connection.py |
Connection management |
app/services/mcp/routing.py |
Tool routing and circuit breakers |
app/services/mcp/exceptions.py |
Exception classes |
app/api/routes/mcp.py |
REST API endpoints |
mcp_servers.yaml |
Default configuration |