2 Commits

Author SHA1 Message Date
Felipe Cardoso
2ab69f8561 docs(mcp): add comprehensive MCP server documentation
- Add docs/architecture/MCP_SERVERS.md with full architecture overview
- Add README.md for LLM Gateway with quick start, tools, and model groups
- Add README.md for Knowledge Base with search types, chunking strategies
- Include API endpoints, security guidelines, and testing instructions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-04 01:37:04 +01:00
Felipe Cardoso
95342cc94d fix(mcp-gateway): address critical issues from deep review
Frontend:
- Fix debounce race condition in UserListTable search handler
- Use useRef to properly track and cleanup timeout between keystrokes

Backend (LLM Gateway):
- Add thread-safe double-checked locking for global singletons
  (providers, circuit registry, cost tracker)
- Fix Redis URL parsing with proper urlparse validation
- Add explicit error handling for malformed Redis URLs
- Document circuit breaker state transition safety

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-04 01:36:55 +01:00
7 changed files with 621 additions and 35 deletions

View File

@@ -0,0 +1,192 @@
# MCP Servers Architecture
This document describes the Model Context Protocol (MCP) server architecture in Syndarix.
## Overview
Syndarix uses MCP servers to provide specialized capabilities to AI agents. Each MCP server exposes tools via JSON-RPC 2.0 that agents can invoke through the MCPClientManager.
## Architecture Diagram
```
┌─────────────────────────────────────────────────────────────────────┐
│ Backend (FastAPI) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ MCPClientManager │ │
│ │ - Connection pooling - Health checks - Tool routing │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
└─────────────────────────────┼───────────────────────────────────────┘
│ HTTP/JSON-RPC 2.0
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ LLM Gateway │ │ Knowledge Base │ │ Future MCP │
│ Port 8001 │ │ Port 8002 │ │ Servers │
│ │ │ │ │ │
│ - chat_complete │ │ - search │ │ - git_ops │
│ - count_tokens │ │ - ingest │ │ - issues │
│ - list_models │ │ - delete │ │ - etc. │
│ - get_usage │ │ - update │ │ │
└────────┬────────┘ └────────┬────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ LiteLLM │ │ PostgreSQL │
│ (Anthropic, │ │ + pgvector │
│ OpenAI, etc.) │ │ │
└─────────────────┘ └─────────────────┘
```
## MCP Servers
### 1. LLM Gateway (`mcp-servers/llm-gateway/`)
**Purpose**: Unified access to multiple LLM providers with failover, streaming, and cost tracking.
**Port**: 8001
**Tools**:
| Tool | Description |
|------|-------------|
| `chat_completion` | Generate completions with automatic failover |
| `count_tokens` | Count tokens in text using tiktoken |
| `list_models` | List available models by group |
| `get_usage` | Get token/cost usage statistics |
**Model Groups**:
- `reasoning`: Claude Opus 4.5 → GPT-4.1 → Gemini 2.5 Pro
- `code`: Claude Sonnet 4 → Codex → DeepSeek Coder
- `fast`: Claude Haiku → GPT-4.1 Mini → Gemini Flash
- `vision`: Claude Opus 4.5 → GPT-4.1 Vision
- `embedding`: text-embedding-3-large → voyage-3
**Features**:
- Circuit breaker for provider failures (5 failures → 30s cooldown)
- Redis-based cost tracking per project/agent
- Streaming support via SSE
- Automatic failover chain
### 2. Knowledge Base (`mcp-servers/knowledge-base/`)
**Purpose**: RAG capabilities with pgvector for semantic search, intelligent chunking, and collection management.
**Port**: 8002
**Tools**:
| Tool | Description |
|------|-------------|
| `search_knowledge` | Semantic, keyword, or hybrid search |
| `ingest_content` | Add content with automatic chunking |
| `delete_content` | Remove by source, collection, or IDs |
| `list_collections` | List collections in a project |
| `get_collection_stats` | Get collection statistics |
| `update_document` | Atomically replace document content |
**Chunking Strategies**:
- **Code**: AST-aware for Python, tree-sitter for JS/TS/Go/Rust
- **Markdown**: Heading-hierarchy aware, preserves structure
- **Text**: Sentence-based with configurable overlap
**Search Types**:
- **Semantic**: pgvector cosine similarity (HNSW index)
- **Keyword**: PostgreSQL full-text search (ts_rank)
- **Hybrid**: Reciprocal Rank Fusion (RRF) combining both
**Features**:
- Redis caching for embedding deduplication
- 1536-dimension embeddings via LLM Gateway
- Atomic document updates (delete + insert in transaction)
- Per-project collection isolation
## Communication Protocol
All MCP servers use JSON-RPC 2.0 over HTTP:
### Tool Discovery
```
GET /mcp/tools
Response: { "tools": [{ "name": "...", "description": "...", "inputSchema": {...} }] }
```
### Tool Execution
```
POST /mcp
Request: {
"jsonrpc": "2.0",
"method": "tool_name",
"params": { "project_id": "...", "agent_id": "...", ... },
"id": 1
}
Response: {
"jsonrpc": "2.0",
"result": { "success": true, ... },
"id": 1
}
```
### Health Check
```
GET /health
Response: { "status": "healthy", "dependencies": {...} }
```
## Configuration
### Environment Variables
**LLM Gateway**:
```bash
LLM_GATEWAY_HOST=0.0.0.0
LLM_GATEWAY_PORT=8001
LLM_GATEWAY_REDIS_URL=redis://redis:6379/1
ANTHROPIC_API_KEY=...
OPENAI_API_KEY=...
```
**Knowledge Base**:
```bash
KB_HOST=0.0.0.0
KB_PORT=8002
KB_DATABASE_URL=postgresql://...
KB_REDIS_URL=redis://redis:6379/2
KB_LLM_GATEWAY_URL=http://llm-gateway:8001
```
## Security
### Input Validation
- `project_id`, `agent_id`: Alphanumeric + hyphens/underscores (1-128 chars)
- `collection`: Alphanumeric + hyphens/underscores (1-64 chars)
- `source_path`: No path traversal (`..`), no null bytes, max 4096 chars
- `content`: Max size limit (configurable, default 10MB)
### Error Codes
| Code | Meaning |
|------|---------|
| `INVALID_REQUEST` | Input validation failed |
| `NOT_FOUND` | Resource not found |
| `INTERNAL_ERROR` | Unexpected server error |
| `EMBEDDING_ERROR` | Embedding generation failed |
| `SEARCH_ERROR` | Search operation failed |
## Testing
```bash
# Run LLM Gateway tests
cd mcp-servers/llm-gateway
IS_TEST=True uv run pytest -v --cov=.
# Run Knowledge Base tests
cd mcp-servers/knowledge-base
IS_TEST=True uv run pytest -v --cov=.
```
## Adding New MCP Servers
1. Create directory under `mcp-servers/<name>/`
2. Use FastMCP for tool registration
3. Implement `/health`, `/mcp/tools`, `/mcp` endpoints
4. Add Docker configuration
5. Register in MCPClientManager config
6. Add tests (>90% coverage target)

View File

@@ -5,7 +5,7 @@
'use client'; 'use client';
import { useState, useCallback } from 'react'; import { useState, useCallback, useRef, useEffect } from 'react';
import { format } from 'date-fns'; import { format } from 'date-fns';
import { Check, X } from 'lucide-react'; import { Check, X } from 'lucide-react';
import { import {
@@ -61,15 +61,28 @@ export function UserListTable({
currentUserId, currentUserId,
}: UserListTableProps) { }: UserListTableProps) {
const [searchValue, setSearchValue] = useState(''); const [searchValue, setSearchValue] = useState('');
const searchTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
// Debounce search // Cleanup timeout on unmount
useEffect(() => {
return () => {
if (searchTimeoutRef.current) {
clearTimeout(searchTimeoutRef.current);
}
};
}, []);
// Debounce search with proper cleanup
const handleSearchChange = useCallback( const handleSearchChange = useCallback(
(value: string) => { (value: string) => {
setSearchValue(value); setSearchValue(value);
const timeoutId = setTimeout(() => { // Clear previous timeout to prevent stale searches
if (searchTimeoutRef.current) {
clearTimeout(searchTimeoutRef.current);
}
searchTimeoutRef.current = setTimeout(() => {
onSearch(value); onSearch(value);
}, 300); }, 300);
return () => clearTimeout(timeoutId);
}, },
[onSearch] [onSearch]
); );

View File

@@ -0,0 +1,178 @@
# Knowledge Base MCP Server
RAG capabilities with pgvector for semantic search, intelligent chunking, and collection management.
## Features
- **Semantic Search**: pgvector cosine similarity with HNSW indexing
- **Keyword Search**: PostgreSQL full-text search
- **Hybrid Search**: Reciprocal Rank Fusion combining both
- **Intelligent Chunking**: Code-aware, markdown-aware, and text chunking
- **Collection Management**: Per-project knowledge organization
- **Embedding Caching**: Redis deduplication for efficiency
## Quick Start
```bash
# Install dependencies
uv sync
# Run tests
IS_TEST=True uv run pytest -v
# Start server
uv run python server.py
```
## Configuration
Environment variables:
```bash
KB_HOST=0.0.0.0
KB_PORT=8002
KB_DEBUG=false
KB_DATABASE_URL=postgresql://user:pass@localhost:5432/syndarix
KB_REDIS_URL=redis://localhost:6379/2
KB_LLM_GATEWAY_URL=http://localhost:8001
```
## MCP Tools
### search_knowledge
Search the knowledge base.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"query": "authentication flow",
"search_type": "hybrid",
"collection": "code",
"limit": 10,
"threshold": 0.7,
"file_types": ["python", "typescript"]
}
```
### ingest_content
Add content to the knowledge base.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"content": "def authenticate(user): ...",
"source_path": "/src/auth.py",
"collection": "code",
"chunk_type": "code",
"file_type": "python"
}
```
### delete_content
Remove content from the knowledge base.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"source_path": "/src/old_file.py"
}
```
### list_collections
List all collections in a project.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456"
}
```
### get_collection_stats
Get detailed collection statistics.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"collection": "code"
}
```
### update_document
Atomically replace document content.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"source_path": "/src/auth.py",
"content": "def authenticate_v2(user): ...",
"collection": "code",
"chunk_type": "code",
"file_type": "python"
}
```
## Chunking Strategies
### Code Chunking
- **Python**: AST-based (functions, classes, methods)
- **JavaScript/TypeScript**: Tree-sitter based
- **Go/Rust**: Tree-sitter based
- Target: ~500 tokens, 50 token overlap
### Markdown Chunking
- Heading-hierarchy aware
- Preserves code blocks
- Target: ~800 tokens, 100 token overlap
### Text Chunking
- Sentence-based splitting
- Target: ~400 tokens, 50 token overlap
## Search Types
### Semantic Search
Uses pgvector cosine similarity with HNSW indexing for fast approximate nearest neighbor search.
### Keyword Search
Uses PostgreSQL full-text search with ts_rank scoring.
### Hybrid Search
Combines semantic and keyword results using Reciprocal Rank Fusion (RRF):
- Default weights: 70% semantic, 30% keyword
- Configurable via settings
## Security
- Input validation for all IDs and paths
- Path traversal prevention
- Content size limits (default 10MB)
- Per-project data isolation
## Testing
```bash
# Full test suite with coverage
IS_TEST=True uv run pytest -v --cov=. --cov-report=term-missing
# Specific test file
IS_TEST=True uv run pytest tests/test_server.py -v
```
## API Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/health` | GET | Health check with dependency status |
| `/mcp/tools` | GET | List available tools |
| `/mcp` | POST | JSON-RPC 2.0 tool execution |

View File

@@ -0,0 +1,129 @@
# LLM Gateway MCP Server
Unified LLM access with failover chains, cost tracking, and streaming support.
## Features
- **Multi-Provider Support**: Anthropic, OpenAI, Google, DeepSeek
- **Automatic Failover**: Circuit breaker with configurable thresholds
- **Cost Tracking**: Redis-based per-project/agent usage tracking
- **Streaming**: SSE support for real-time token delivery
- **Model Groups**: Pre-configured chains for different use cases
## Quick Start
```bash
# Install dependencies
uv sync
# Run tests
IS_TEST=True uv run pytest -v
# Start server
uv run python server.py
```
## Configuration
Environment variables:
```bash
LLM_GATEWAY_HOST=0.0.0.0
LLM_GATEWAY_PORT=8001
LLM_GATEWAY_DEBUG=false
LLM_GATEWAY_REDIS_URL=redis://localhost:6379/1
# Provider API keys
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
GOOGLE_API_KEY=...
DEEPSEEK_API_KEY=...
```
## MCP Tools
### chat_completion
Generate completions with automatic failover.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"messages": [{"role": "user", "content": "Hello"}],
"model_group": "reasoning",
"max_tokens": 4096,
"temperature": 0.7,
"stream": false
}
```
### count_tokens
Count tokens in text using tiktoken.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"text": "Hello, world!",
"model": "gpt-4"
}
```
### list_models
List available models by group.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"model_group": "code"
}
```
### get_usage
Get usage statistics.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"period": "day"
}
```
## Model Groups
| Group | Primary | Fallback 1 | Fallback 2 |
|-------|---------|------------|------------|
| reasoning | claude-opus-4-5 | gpt-4.1 | gemini-2.5-pro |
| code | claude-sonnet-4 | gpt-4.1 | deepseek-coder |
| fast | claude-haiku | gpt-4.1-mini | gemini-flash |
| vision | claude-sonnet-4 | gpt-4.1 | gemini-2.5-pro |
| embedding | text-embedding-3-large | voyage-3 | - |
## Circuit Breaker
- **Threshold**: 5 consecutive failures
- **Cooldown**: 30 seconds
- **Half-Open**: After cooldown, allows one test request
## Testing
```bash
# Full test suite with coverage
IS_TEST=True uv run pytest -v --cov=. --cov-report=term-missing
# Specific test file
IS_TEST=True uv run pytest tests/test_server.py -v
```
## API Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/health` | GET | Health check |
| `/mcp/tools` | GET | List available tools |
| `/mcp` | POST | JSON-RPC 2.0 tool execution |

View File

@@ -6,6 +6,7 @@ Provides aggregation by hour, day, and month with TTL-based expiry.
""" """
import logging import logging
import threading
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from typing import Any from typing import Any
@@ -441,27 +442,37 @@ def calculate_cost(
return round(input_cost + output_cost, 6) return round(input_cost + output_cost, 6)
# Global tracker instance (lazy initialization) # Global tracker instance with thread-safe lazy initialization
_tracker: CostTracker | None = None _tracker: CostTracker | None = None
_tracker_lock = threading.Lock()
def get_cost_tracker() -> CostTracker: def get_cost_tracker() -> CostTracker:
"""Get the global cost tracker instance.""" """
Get the global cost tracker instance.
Thread-safe with double-checked locking pattern.
"""
global _tracker global _tracker
if _tracker is None: if _tracker is None:
_tracker = CostTracker() with _tracker_lock:
# Double-check after acquiring lock
if _tracker is None:
_tracker = CostTracker()
return _tracker return _tracker
async def close_cost_tracker() -> None: async def close_cost_tracker() -> None:
"""Close the global cost tracker.""" """Close the global cost tracker."""
global _tracker global _tracker
if _tracker: with _tracker_lock:
await _tracker.close() if _tracker:
_tracker = None await _tracker.close()
_tracker = None
def reset_cost_tracker() -> None: def reset_cost_tracker() -> None:
"""Reset the global tracker (for testing).""" """Reset the global tracker (for testing)."""
global _tracker global _tracker
_tracker = None with _tracker_lock:
_tracker = None

View File

@@ -7,6 +7,7 @@ temporarily disabling providers that are experiencing issues.
import asyncio import asyncio
import logging import logging
import threading
import time import time
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -85,6 +86,9 @@ class CircuitBreaker:
@property @property
def state(self) -> CircuitState: def state(self) -> CircuitState:
"""Get current circuit state (may trigger state transition).""" """Get current circuit state (may trigger state transition)."""
# Check transition outside lock since it only reads/writes _state
# which is atomic in Python, and we use the lock inside _check_state_transition
# if a state change is needed
self._check_state_transition() self._check_state_transition()
return self._state return self._state
@@ -94,15 +98,26 @@ class CircuitBreaker:
return self._stats return self._stats
def _check_state_transition(self) -> None: def _check_state_transition(self) -> None:
"""Check if state should transition based on time.""" """
Check if state should transition based on time.
Note: This method is intentionally not async and doesn't acquire the lock
because it's called frequently from the state property. The transition
is safe because:
1. We check the current state first (atomic read)
2. _transition_to only modifies state if we're still in OPEN state
3. Multiple concurrent transitions to HALF_OPEN are idempotent
"""
if self._state == CircuitState.OPEN: if self._state == CircuitState.OPEN:
time_in_open = time.time() - self._stats.state_changed_at time_in_open = time.time() - self._stats.state_changed_at
if time_in_open >= self.recovery_timeout: if time_in_open >= self.recovery_timeout:
self._transition_to(CircuitState.HALF_OPEN) # Only transition if still in OPEN state (double-check)
logger.info( if self._state == CircuitState.OPEN:
f"Circuit {self.name} transitioned to HALF_OPEN " self._transition_to(CircuitState.HALF_OPEN)
f"after {time_in_open:.1f}s" logger.info(
) f"Circuit {self.name} transitioned to HALF_OPEN "
f"after {time_in_open:.1f}s"
)
def _transition_to(self, new_state: CircuitState) -> None: def _transition_to(self, new_state: CircuitState) -> None:
"""Transition to a new state.""" """Transition to a new state."""
@@ -339,19 +354,28 @@ class CircuitBreakerRegistry:
] ]
# Global registry instance (lazy initialization) # Global registry instance with thread-safe lazy initialization
_registry: CircuitBreakerRegistry | None = None _registry: CircuitBreakerRegistry | None = None
_registry_lock = threading.Lock()
def get_circuit_registry() -> CircuitBreakerRegistry: def get_circuit_registry() -> CircuitBreakerRegistry:
"""Get the global circuit breaker registry.""" """
Get the global circuit breaker registry.
Thread-safe with double-checked locking pattern.
"""
global _registry global _registry
if _registry is None: if _registry is None:
_registry = CircuitBreakerRegistry() with _registry_lock:
# Double-check after acquiring lock
if _registry is None:
_registry = CircuitBreakerRegistry()
return _registry return _registry
def reset_circuit_registry() -> None: def reset_circuit_registry() -> None:
"""Reset the global registry (for testing).""" """Reset the global registry (for testing)."""
global _registry global _registry
_registry = None with _registry_lock:
_registry = None

View File

@@ -6,7 +6,9 @@ Configures the LiteLLM Router with model lists and failover chains.
import logging import logging
import os import os
import threading
from typing import Any from typing import Any
from urllib.parse import urlparse
import litellm import litellm
from litellm import Router from litellm import Router
@@ -57,19 +59,47 @@ def configure_litellm(settings: Settings) -> None:
def _parse_redis_host(redis_url: str) -> str: def _parse_redis_host(redis_url: str) -> str:
"""Extract host from Redis URL.""" """
# redis://host:port/db Extract host from Redis URL.
url = redis_url.replace("redis://", "")
host_port = url.split("/")[0] Args:
return host_port.split(":")[0] redis_url: Redis connection URL (e.g., redis://localhost:6379/0)
Returns:
Hostname extracted from URL
Raises:
ValueError: If URL is malformed or missing host
"""
try:
parsed = urlparse(redis_url)
if not parsed.hostname:
raise ValueError(f"Invalid Redis URL: missing hostname in '{redis_url}'")
return parsed.hostname
except Exception as e:
logger.error(f"Failed to parse Redis URL: {e}")
raise ValueError(f"Invalid Redis URL: {redis_url}") from e
def _parse_redis_port(redis_url: str) -> int: def _parse_redis_port(redis_url: str) -> int:
"""Extract port from Redis URL.""" """
url = redis_url.replace("redis://", "") Extract port from Redis URL.
host_port = url.split("/")[0]
parts = host_port.split(":") Args:
return int(parts[1]) if len(parts) > 1 else 6379 redis_url: Redis connection URL (e.g., redis://localhost:6379/0)
Returns:
Port number (defaults to 6379 if not specified)
Raises:
ValueError: If URL is malformed
"""
try:
parsed = urlparse(redis_url)
return parsed.port or 6379
except Exception as e:
logger.error(f"Failed to parse Redis URL: {e}")
raise ValueError(f"Invalid Redis URL: {redis_url}") from e
def _is_provider_available(provider: Provider, settings: Settings) -> bool: def _is_provider_available(provider: Provider, settings: Settings) -> bool:
@@ -310,19 +340,28 @@ class LLMProvider:
return _is_provider_available(model_config.provider, self._settings) return _is_provider_available(model_config.provider, self._settings)
# Global provider instance (lazy initialization) # Global provider instance with thread-safe lazy initialization
_provider: LLMProvider | None = None _provider: LLMProvider | None = None
_provider_lock = threading.Lock()
def get_provider() -> LLMProvider: def get_provider() -> LLMProvider:
"""Get the global LLM Provider instance.""" """
Get the global LLM Provider instance.
Thread-safe with double-checked locking pattern.
"""
global _provider global _provider
if _provider is None: if _provider is None:
_provider = LLMProvider() with _provider_lock:
# Double-check after acquiring lock
if _provider is None:
_provider = LLMProvider()
return _provider return _provider
def reset_provider() -> None: def reset_provider() -> None:
"""Reset the global provider (for testing).""" """Reset the global provider (for testing)."""
global _provider global _provider
_provider = None with _provider_lock:
_provider = None