2 Commits

Author SHA1 Message Date
Felipe Cardoso
2ab69f8561 docs(mcp): add comprehensive MCP server documentation
- Add docs/architecture/MCP_SERVERS.md with full architecture overview
- Add README.md for LLM Gateway with quick start, tools, and model groups
- Add README.md for Knowledge Base with search types, chunking strategies
- Include API endpoints, security guidelines, and testing instructions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-04 01:37:04 +01:00
Felipe Cardoso
95342cc94d fix(mcp-gateway): address critical issues from deep review
Frontend:
- Fix debounce race condition in UserListTable search handler
- Use useRef to properly track and cleanup timeout between keystrokes

Backend (LLM Gateway):
- Add thread-safe double-checked locking for global singletons
  (providers, circuit registry, cost tracker)
- Fix Redis URL parsing with proper urlparse validation
- Add explicit error handling for malformed Redis URLs
- Document circuit breaker state transition safety

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-04 01:36:55 +01:00
7 changed files with 621 additions and 35 deletions

View File

@@ -0,0 +1,192 @@
# MCP Servers Architecture
This document describes the Model Context Protocol (MCP) server architecture in Syndarix.
## Overview
Syndarix uses MCP servers to provide specialized capabilities to AI agents. Each MCP server exposes tools via JSON-RPC 2.0 that agents can invoke through the MCPClientManager.
## Architecture Diagram
```
┌─────────────────────────────────────────────────────────────────────┐
│ Backend (FastAPI) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ MCPClientManager │ │
│ │ - Connection pooling - Health checks - Tool routing │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
└─────────────────────────────┼───────────────────────────────────────┘
│ HTTP/JSON-RPC 2.0
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ LLM Gateway │ │ Knowledge Base │ │ Future MCP │
│ Port 8001 │ │ Port 8002 │ │ Servers │
│ │ │ │ │ │
│ - chat_complete │ │ - search │ │ - git_ops │
│ - count_tokens │ │ - ingest │ │ - issues │
│ - list_models │ │ - delete │ │ - etc. │
│ - get_usage │ │ - update │ │ │
└────────┬────────┘ └────────┬────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ LiteLLM │ │ PostgreSQL │
│ (Anthropic, │ │ + pgvector │
│ OpenAI, etc.) │ │ │
└─────────────────┘ └─────────────────┘
```
## MCP Servers
### 1. LLM Gateway (`mcp-servers/llm-gateway/`)
**Purpose**: Unified access to multiple LLM providers with failover, streaming, and cost tracking.
**Port**: 8001
**Tools**:
| Tool | Description |
|------|-------------|
| `chat_completion` | Generate completions with automatic failover |
| `count_tokens` | Count tokens in text using tiktoken |
| `list_models` | List available models by group |
| `get_usage` | Get token/cost usage statistics |
**Model Groups**:
- `reasoning`: Claude Opus 4.5 → GPT-4.1 → Gemini 2.5 Pro
- `code`: Claude Sonnet 4 → Codex → DeepSeek Coder
- `fast`: Claude Haiku → GPT-4.1 Mini → Gemini Flash
- `vision`: Claude Opus 4.5 → GPT-4.1 Vision
- `embedding`: text-embedding-3-large → voyage-3
**Features**:
- Circuit breaker for provider failures (5 failures → 30s cooldown)
- Redis-based cost tracking per project/agent
- Streaming support via SSE
- Automatic failover chain
### 2. Knowledge Base (`mcp-servers/knowledge-base/`)
**Purpose**: RAG capabilities with pgvector for semantic search, intelligent chunking, and collection management.
**Port**: 8002
**Tools**:
| Tool | Description |
|------|-------------|
| `search_knowledge` | Semantic, keyword, or hybrid search |
| `ingest_content` | Add content with automatic chunking |
| `delete_content` | Remove by source, collection, or IDs |
| `list_collections` | List collections in a project |
| `get_collection_stats` | Get collection statistics |
| `update_document` | Atomically replace document content |
**Chunking Strategies**:
- **Code**: AST-aware for Python, tree-sitter for JS/TS/Go/Rust
- **Markdown**: Heading-hierarchy aware, preserves structure
- **Text**: Sentence-based with configurable overlap
**Search Types**:
- **Semantic**: pgvector cosine similarity (HNSW index)
- **Keyword**: PostgreSQL full-text search (ts_rank)
- **Hybrid**: Reciprocal Rank Fusion (RRF) combining both
**Features**:
- Redis caching for embedding deduplication
- 1536-dimension embeddings via LLM Gateway
- Atomic document updates (delete + insert in transaction)
- Per-project collection isolation
## Communication Protocol
All MCP servers use JSON-RPC 2.0 over HTTP:
### Tool Discovery
```
GET /mcp/tools
Response: { "tools": [{ "name": "...", "description": "...", "inputSchema": {...} }] }
```
### Tool Execution
```
POST /mcp
Request: {
"jsonrpc": "2.0",
"method": "tool_name",
"params": { "project_id": "...", "agent_id": "...", ... },
"id": 1
}
Response: {
"jsonrpc": "2.0",
"result": { "success": true, ... },
"id": 1
}
```
### Health Check
```
GET /health
Response: { "status": "healthy", "dependencies": {...} }
```
## Configuration
### Environment Variables
**LLM Gateway**:
```bash
LLM_GATEWAY_HOST=0.0.0.0
LLM_GATEWAY_PORT=8001
LLM_GATEWAY_REDIS_URL=redis://redis:6379/1
ANTHROPIC_API_KEY=...
OPENAI_API_KEY=...
```
**Knowledge Base**:
```bash
KB_HOST=0.0.0.0
KB_PORT=8002
KB_DATABASE_URL=postgresql://...
KB_REDIS_URL=redis://redis:6379/2
KB_LLM_GATEWAY_URL=http://llm-gateway:8001
```
## Security
### Input Validation
- `project_id`, `agent_id`: Alphanumeric + hyphens/underscores (1-128 chars)
- `collection`: Alphanumeric + hyphens/underscores (1-64 chars)
- `source_path`: No path traversal (`..`), no null bytes, max 4096 chars
- `content`: Max size limit (configurable, default 10MB)
### Error Codes
| Code | Meaning |
|------|---------|
| `INVALID_REQUEST` | Input validation failed |
| `NOT_FOUND` | Resource not found |
| `INTERNAL_ERROR` | Unexpected server error |
| `EMBEDDING_ERROR` | Embedding generation failed |
| `SEARCH_ERROR` | Search operation failed |
## Testing
```bash
# Run LLM Gateway tests
cd mcp-servers/llm-gateway
IS_TEST=True uv run pytest -v --cov=.
# Run Knowledge Base tests
cd mcp-servers/knowledge-base
IS_TEST=True uv run pytest -v --cov=.
```
## Adding New MCP Servers
1. Create directory under `mcp-servers/<name>/`
2. Use FastMCP for tool registration
3. Implement `/health`, `/mcp/tools`, `/mcp` endpoints
4. Add Docker configuration
5. Register in MCPClientManager config
6. Add tests (>90% coverage target)

View File

@@ -5,7 +5,7 @@
'use client';
import { useState, useCallback } from 'react';
import { useState, useCallback, useRef, useEffect } from 'react';
import { format } from 'date-fns';
import { Check, X } from 'lucide-react';
import {
@@ -61,15 +61,28 @@ export function UserListTable({
currentUserId,
}: UserListTableProps) {
const [searchValue, setSearchValue] = useState('');
const searchTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
// Debounce search
// Cleanup timeout on unmount
useEffect(() => {
return () => {
if (searchTimeoutRef.current) {
clearTimeout(searchTimeoutRef.current);
}
};
}, []);
// Debounce search with proper cleanup
const handleSearchChange = useCallback(
(value: string) => {
setSearchValue(value);
const timeoutId = setTimeout(() => {
// Clear previous timeout to prevent stale searches
if (searchTimeoutRef.current) {
clearTimeout(searchTimeoutRef.current);
}
searchTimeoutRef.current = setTimeout(() => {
onSearch(value);
}, 300);
return () => clearTimeout(timeoutId);
},
[onSearch]
);

View File

@@ -0,0 +1,178 @@
# Knowledge Base MCP Server
RAG capabilities with pgvector for semantic search, intelligent chunking, and collection management.
## Features
- **Semantic Search**: pgvector cosine similarity with HNSW indexing
- **Keyword Search**: PostgreSQL full-text search
- **Hybrid Search**: Reciprocal Rank Fusion combining both
- **Intelligent Chunking**: Code-aware, markdown-aware, and text chunking
- **Collection Management**: Per-project knowledge organization
- **Embedding Caching**: Redis deduplication for efficiency
## Quick Start
```bash
# Install dependencies
uv sync
# Run tests
IS_TEST=True uv run pytest -v
# Start server
uv run python server.py
```
## Configuration
Environment variables:
```bash
KB_HOST=0.0.0.0
KB_PORT=8002
KB_DEBUG=false
KB_DATABASE_URL=postgresql://user:pass@localhost:5432/syndarix
KB_REDIS_URL=redis://localhost:6379/2
KB_LLM_GATEWAY_URL=http://localhost:8001
```
## MCP Tools
### search_knowledge
Search the knowledge base.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"query": "authentication flow",
"search_type": "hybrid",
"collection": "code",
"limit": 10,
"threshold": 0.7,
"file_types": ["python", "typescript"]
}
```
### ingest_content
Add content to the knowledge base.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"content": "def authenticate(user): ...",
"source_path": "/src/auth.py",
"collection": "code",
"chunk_type": "code",
"file_type": "python"
}
```
### delete_content
Remove content from the knowledge base.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"source_path": "/src/old_file.py"
}
```
### list_collections
List all collections in a project.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456"
}
```
### get_collection_stats
Get detailed collection statistics.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"collection": "code"
}
```
### update_document
Atomically replace document content.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"source_path": "/src/auth.py",
"content": "def authenticate_v2(user): ...",
"collection": "code",
"chunk_type": "code",
"file_type": "python"
}
```
## Chunking Strategies
### Code Chunking
- **Python**: AST-based (functions, classes, methods)
- **JavaScript/TypeScript**: Tree-sitter based
- **Go/Rust**: Tree-sitter based
- Target: ~500 tokens, 50 token overlap
### Markdown Chunking
- Heading-hierarchy aware
- Preserves code blocks
- Target: ~800 tokens, 100 token overlap
### Text Chunking
- Sentence-based splitting
- Target: ~400 tokens, 50 token overlap
## Search Types
### Semantic Search
Uses pgvector cosine similarity with HNSW indexing for fast approximate nearest neighbor search.
### Keyword Search
Uses PostgreSQL full-text search with ts_rank scoring.
### Hybrid Search
Combines semantic and keyword results using Reciprocal Rank Fusion (RRF):
- Default weights: 70% semantic, 30% keyword
- Configurable via settings
## Security
- Input validation for all IDs and paths
- Path traversal prevention
- Content size limits (default 10MB)
- Per-project data isolation
## Testing
```bash
# Full test suite with coverage
IS_TEST=True uv run pytest -v --cov=. --cov-report=term-missing
# Specific test file
IS_TEST=True uv run pytest tests/test_server.py -v
```
## API Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/health` | GET | Health check with dependency status |
| `/mcp/tools` | GET | List available tools |
| `/mcp` | POST | JSON-RPC 2.0 tool execution |

View File

@@ -0,0 +1,129 @@
# LLM Gateway MCP Server
Unified LLM access with failover chains, cost tracking, and streaming support.
## Features
- **Multi-Provider Support**: Anthropic, OpenAI, Google, DeepSeek
- **Automatic Failover**: Circuit breaker with configurable thresholds
- **Cost Tracking**: Redis-based per-project/agent usage tracking
- **Streaming**: SSE support for real-time token delivery
- **Model Groups**: Pre-configured chains for different use cases
## Quick Start
```bash
# Install dependencies
uv sync
# Run tests
IS_TEST=True uv run pytest -v
# Start server
uv run python server.py
```
## Configuration
Environment variables:
```bash
LLM_GATEWAY_HOST=0.0.0.0
LLM_GATEWAY_PORT=8001
LLM_GATEWAY_DEBUG=false
LLM_GATEWAY_REDIS_URL=redis://localhost:6379/1
# Provider API keys
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
GOOGLE_API_KEY=...
DEEPSEEK_API_KEY=...
```
## MCP Tools
### chat_completion
Generate completions with automatic failover.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"messages": [{"role": "user", "content": "Hello"}],
"model_group": "reasoning",
"max_tokens": 4096,
"temperature": 0.7,
"stream": false
}
```
### count_tokens
Count tokens in text using tiktoken.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"text": "Hello, world!",
"model": "gpt-4"
}
```
### list_models
List available models by group.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"model_group": "code"
}
```
### get_usage
Get usage statistics.
```json
{
"project_id": "proj-123",
"agent_id": "agent-456",
"period": "day"
}
```
## Model Groups
| Group | Primary | Fallback 1 | Fallback 2 |
|-------|---------|------------|------------|
| reasoning | claude-opus-4-5 | gpt-4.1 | gemini-2.5-pro |
| code | claude-sonnet-4 | gpt-4.1 | deepseek-coder |
| fast | claude-haiku | gpt-4.1-mini | gemini-flash |
| vision | claude-sonnet-4 | gpt-4.1 | gemini-2.5-pro |
| embedding | text-embedding-3-large | voyage-3 | - |
## Circuit Breaker
- **Threshold**: 5 consecutive failures
- **Cooldown**: 30 seconds
- **Half-Open**: After cooldown, allows one test request
## Testing
```bash
# Full test suite with coverage
IS_TEST=True uv run pytest -v --cov=. --cov-report=term-missing
# Specific test file
IS_TEST=True uv run pytest tests/test_server.py -v
```
## API Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/health` | GET | Health check |
| `/mcp/tools` | GET | List available tools |
| `/mcp` | POST | JSON-RPC 2.0 tool execution |

View File

@@ -6,6 +6,7 @@ Provides aggregation by hour, day, and month with TTL-based expiry.
"""
import logging
import threading
from datetime import UTC, datetime, timedelta
from typing import Any
@@ -441,27 +442,37 @@ def calculate_cost(
return round(input_cost + output_cost, 6)
# Global tracker instance (lazy initialization)
# Global tracker instance with thread-safe lazy initialization
_tracker: CostTracker | None = None
_tracker_lock = threading.Lock()
def get_cost_tracker() -> CostTracker:
"""Get the global cost tracker instance."""
"""
Get the global cost tracker instance.
Thread-safe with double-checked locking pattern.
"""
global _tracker
if _tracker is None:
_tracker = CostTracker()
with _tracker_lock:
# Double-check after acquiring lock
if _tracker is None:
_tracker = CostTracker()
return _tracker
async def close_cost_tracker() -> None:
"""Close the global cost tracker."""
global _tracker
if _tracker:
await _tracker.close()
_tracker = None
with _tracker_lock:
if _tracker:
await _tracker.close()
_tracker = None
def reset_cost_tracker() -> None:
"""Reset the global tracker (for testing)."""
global _tracker
_tracker = None
with _tracker_lock:
_tracker = None

View File

@@ -7,6 +7,7 @@ temporarily disabling providers that are experiencing issues.
import asyncio
import logging
import threading
import time
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
@@ -85,6 +86,9 @@ class CircuitBreaker:
@property
def state(self) -> CircuitState:
"""Get current circuit state (may trigger state transition)."""
# Check transition outside lock since it only reads/writes _state
# which is atomic in Python, and we use the lock inside _check_state_transition
# if a state change is needed
self._check_state_transition()
return self._state
@@ -94,15 +98,26 @@ class CircuitBreaker:
return self._stats
def _check_state_transition(self) -> None:
"""Check if state should transition based on time."""
"""
Check if state should transition based on time.
Note: This method is intentionally not async and doesn't acquire the lock
because it's called frequently from the state property. The transition
is safe because:
1. We check the current state first (atomic read)
2. _transition_to only modifies state if we're still in OPEN state
3. Multiple concurrent transitions to HALF_OPEN are idempotent
"""
if self._state == CircuitState.OPEN:
time_in_open = time.time() - self._stats.state_changed_at
if time_in_open >= self.recovery_timeout:
self._transition_to(CircuitState.HALF_OPEN)
logger.info(
f"Circuit {self.name} transitioned to HALF_OPEN "
f"after {time_in_open:.1f}s"
)
# Only transition if still in OPEN state (double-check)
if self._state == CircuitState.OPEN:
self._transition_to(CircuitState.HALF_OPEN)
logger.info(
f"Circuit {self.name} transitioned to HALF_OPEN "
f"after {time_in_open:.1f}s"
)
def _transition_to(self, new_state: CircuitState) -> None:
"""Transition to a new state."""
@@ -339,19 +354,28 @@ class CircuitBreakerRegistry:
]
# Global registry instance (lazy initialization)
# Global registry instance with thread-safe lazy initialization
_registry: CircuitBreakerRegistry | None = None
_registry_lock = threading.Lock()
def get_circuit_registry() -> CircuitBreakerRegistry:
"""Get the global circuit breaker registry."""
"""
Get the global circuit breaker registry.
Thread-safe with double-checked locking pattern.
"""
global _registry
if _registry is None:
_registry = CircuitBreakerRegistry()
with _registry_lock:
# Double-check after acquiring lock
if _registry is None:
_registry = CircuitBreakerRegistry()
return _registry
def reset_circuit_registry() -> None:
"""Reset the global registry (for testing)."""
global _registry
_registry = None
with _registry_lock:
_registry = None

View File

@@ -6,7 +6,9 @@ Configures the LiteLLM Router with model lists and failover chains.
import logging
import os
import threading
from typing import Any
from urllib.parse import urlparse
import litellm
from litellm import Router
@@ -57,19 +59,47 @@ def configure_litellm(settings: Settings) -> None:
def _parse_redis_host(redis_url: str) -> str:
"""Extract host from Redis URL."""
# redis://host:port/db
url = redis_url.replace("redis://", "")
host_port = url.split("/")[0]
return host_port.split(":")[0]
"""
Extract host from Redis URL.
Args:
redis_url: Redis connection URL (e.g., redis://localhost:6379/0)
Returns:
Hostname extracted from URL
Raises:
ValueError: If URL is malformed or missing host
"""
try:
parsed = urlparse(redis_url)
if not parsed.hostname:
raise ValueError(f"Invalid Redis URL: missing hostname in '{redis_url}'")
return parsed.hostname
except Exception as e:
logger.error(f"Failed to parse Redis URL: {e}")
raise ValueError(f"Invalid Redis URL: {redis_url}") from e
def _parse_redis_port(redis_url: str) -> int:
"""Extract port from Redis URL."""
url = redis_url.replace("redis://", "")
host_port = url.split("/")[0]
parts = host_port.split(":")
return int(parts[1]) if len(parts) > 1 else 6379
"""
Extract port from Redis URL.
Args:
redis_url: Redis connection URL (e.g., redis://localhost:6379/0)
Returns:
Port number (defaults to 6379 if not specified)
Raises:
ValueError: If URL is malformed
"""
try:
parsed = urlparse(redis_url)
return parsed.port or 6379
except Exception as e:
logger.error(f"Failed to parse Redis URL: {e}")
raise ValueError(f"Invalid Redis URL: {redis_url}") from e
def _is_provider_available(provider: Provider, settings: Settings) -> bool:
@@ -310,19 +340,28 @@ class LLMProvider:
return _is_provider_available(model_config.provider, self._settings)
# Global provider instance (lazy initialization)
# Global provider instance with thread-safe lazy initialization
_provider: LLMProvider | None = None
_provider_lock = threading.Lock()
def get_provider() -> LLMProvider:
"""Get the global LLM Provider instance."""
"""
Get the global LLM Provider instance.
Thread-safe with double-checked locking pattern.
"""
global _provider
if _provider is None:
_provider = LLMProvider()
with _provider_lock:
# Double-check after acquiring lock
if _provider is None:
_provider = LLMProvider()
return _provider
def reset_provider() -> None:
"""Reset the global provider (for testing)."""
global _provider
_provider = None
with _provider_lock:
_provider = None