forked from cardosofelipe/fast-next-template
feat(tests): add comprehensive integration tests for MCP stack
- Introduced integration tests covering backend, LLM Gateway, Knowledge Base, and Context Engine. - Includes health checks, tool listing, token counting, and end-to-end MCP flows. - Added `RUN_INTEGRATION_TESTS` environment flag to enable selective test execution. - Includes a quick health check script to verify service availability before running tests.
This commit is contained in:
1
backend/tests/integration/__init__.py
Normal file
1
backend/tests/integration/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Integration tests that require the full stack to be running."""
|
||||
322
backend/tests/integration/test_mcp_integration.py
Normal file
322
backend/tests/integration/test_mcp_integration.py
Normal file
@@ -0,0 +1,322 @@
|
||||
"""
|
||||
Integration tests for MCP server connectivity.
|
||||
|
||||
These tests require the full stack to be running:
|
||||
- docker compose -f docker-compose.dev.yml up
|
||||
|
||||
Run with:
|
||||
pytest tests/integration/ -v --integration
|
||||
|
||||
Or skip with:
|
||||
pytest tests/ -v --ignore=tests/integration/
|
||||
"""
|
||||
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
# Skip all tests in this module if not running integration tests
|
||||
pytestmark = pytest.mark.skipif(
|
||||
os.getenv("RUN_INTEGRATION_TESTS", "false").lower() != "true",
|
||||
reason="Integration tests require RUN_INTEGRATION_TESTS=true and running stack",
|
||||
)
|
||||
|
||||
|
||||
# Configuration from environment
|
||||
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
|
||||
LLM_GATEWAY_URL = os.getenv("LLM_GATEWAY_URL", "http://localhost:8001")
|
||||
KNOWLEDGE_BASE_URL = os.getenv("KNOWLEDGE_BASE_URL", "http://localhost:8002")
|
||||
|
||||
|
||||
class TestMCPServerHealth:
|
||||
"""Test that MCP servers are healthy and reachable."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_llm_gateway_health(self) -> None:
|
||||
"""Test LLM Gateway health endpoint."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(f"{LLM_GATEWAY_URL}/health", timeout=10.0)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data.get("status") == "healthy" or data.get("healthy") is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_knowledge_base_health(self) -> None:
|
||||
"""Test Knowledge Base health endpoint."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(f"{KNOWLEDGE_BASE_URL}/health", timeout=10.0)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data.get("status") == "healthy" or data.get("healthy") is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backend_health(self) -> None:
|
||||
"""Test Backend health endpoint."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(f"{BACKEND_URL}/health", timeout=10.0)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
class TestMCPClientManagerIntegration:
|
||||
"""Test MCPClientManager can connect to real MCP servers."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mcp_servers_list(self) -> None:
|
||||
"""Test that backend can list MCP servers via API."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
# This endpoint lists configured MCP servers
|
||||
response = await client.get(
|
||||
f"{BACKEND_URL}/api/v1/mcp/servers",
|
||||
timeout=10.0,
|
||||
)
|
||||
# Should return 200 or 401 (if auth required)
|
||||
assert response.status_code in [200, 401, 403]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mcp_health_check_endpoint(self) -> None:
|
||||
"""Test backend's MCP health check endpoint."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{BACKEND_URL}/api/v1/mcp/health",
|
||||
timeout=30.0, # MCP health checks can take time
|
||||
)
|
||||
# Should return 200 or 401 (if auth required)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
# Check structure
|
||||
assert "servers" in data or "healthy" in data
|
||||
|
||||
|
||||
class TestLLMGatewayIntegration:
|
||||
"""Test LLM Gateway MCP server functionality."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_models(self) -> None:
|
||||
"""Test that LLM Gateway can list available models."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
# MCP servers use JSON-RPC 2.0 protocol at /mcp endpoint
|
||||
response = await client.post(
|
||||
f"{LLM_GATEWAY_URL}/mcp",
|
||||
json={
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "tools/list",
|
||||
"params": {},
|
||||
},
|
||||
timeout=10.0,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
# Should have tools listed
|
||||
assert "result" in data or "error" in data
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_count_tokens(self) -> None:
|
||||
"""Test token counting functionality."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{LLM_GATEWAY_URL}/mcp",
|
||||
json={
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "tools/call",
|
||||
"params": {
|
||||
"name": "count_tokens",
|
||||
"arguments": {
|
||||
"project_id": "test-project",
|
||||
"agent_id": "test-agent",
|
||||
"text": "Hello, world!",
|
||||
},
|
||||
},
|
||||
},
|
||||
timeout=10.0,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
# Check for result or error
|
||||
if "result" in data:
|
||||
assert "content" in data["result"] or "token_count" in str(
|
||||
data["result"]
|
||||
)
|
||||
|
||||
|
||||
class TestKnowledgeBaseIntegration:
|
||||
"""Test Knowledge Base MCP server functionality."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_tools(self) -> None:
|
||||
"""Test that Knowledge Base can list available tools."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
# Knowledge Base uses GET /mcp/tools for listing
|
||||
response = await client.get(
|
||||
f"{KNOWLEDGE_BASE_URL}/mcp/tools",
|
||||
timeout=10.0,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "tools" in data
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_search_knowledge_empty(self) -> None:
|
||||
"""Test search on empty knowledge base."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
# Knowledge Base uses direct tool name as method
|
||||
response = await client.post(
|
||||
f"{KNOWLEDGE_BASE_URL}/mcp",
|
||||
json={
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "search_knowledge",
|
||||
"params": {
|
||||
"project_id": "test-project",
|
||||
"agent_id": "test-agent",
|
||||
"query": "test query",
|
||||
"limit": 5,
|
||||
},
|
||||
},
|
||||
timeout=10.0,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
# Should return empty results or error for no collection
|
||||
assert "result" in data or "error" in data
|
||||
|
||||
|
||||
class TestEndToEndMCPFlow:
|
||||
"""End-to-end tests for MCP integration flow."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_full_mcp_discovery_flow(self) -> None:
|
||||
"""Test the full flow of discovering and listing MCP tools."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
# 1. Check backend health
|
||||
health = await client.get(f"{BACKEND_URL}/health", timeout=10.0)
|
||||
assert health.status_code == 200
|
||||
|
||||
# 2. Check LLM Gateway health
|
||||
llm_health = await client.get(f"{LLM_GATEWAY_URL}/health", timeout=10.0)
|
||||
assert llm_health.status_code == 200
|
||||
|
||||
# 3. Check Knowledge Base health
|
||||
kb_health = await client.get(f"{KNOWLEDGE_BASE_URL}/health", timeout=10.0)
|
||||
assert kb_health.status_code == 200
|
||||
|
||||
# 4. List tools from LLM Gateway (uses JSON-RPC at /mcp)
|
||||
llm_tools = await client.post(
|
||||
f"{LLM_GATEWAY_URL}/mcp",
|
||||
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
|
||||
timeout=10.0,
|
||||
)
|
||||
assert llm_tools.status_code == 200
|
||||
|
||||
# 5. List tools from Knowledge Base (uses GET /mcp/tools)
|
||||
kb_tools = await client.get(
|
||||
f"{KNOWLEDGE_BASE_URL}/mcp/tools",
|
||||
timeout=10.0,
|
||||
)
|
||||
assert kb_tools.status_code == 200
|
||||
|
||||
|
||||
class TestContextEngineIntegration:
|
||||
"""Test Context Engine integration with MCP servers."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_context_health_endpoint(self) -> None:
|
||||
"""Test context engine health endpoint."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{BACKEND_URL}/api/v1/context/health",
|
||||
timeout=10.0,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data.get("status") == "healthy"
|
||||
assert "mcp_connected" in data
|
||||
assert "cache_enabled" in data
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_context_budget_endpoint(self) -> None:
|
||||
"""Test token budget endpoint."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{BACKEND_URL}/api/v1/context/budget/claude-3-sonnet",
|
||||
timeout=10.0,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "total_tokens" in data
|
||||
assert "system_tokens" in data
|
||||
assert data.get("model") == "claude-3-sonnet"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_context_assembly_requires_auth(self) -> None:
|
||||
"""Test that context assembly requires authentication."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{BACKEND_URL}/api/v1/context/assemble",
|
||||
json={
|
||||
"project_id": "test-project",
|
||||
"agent_id": "test-agent",
|
||||
"query": "test query",
|
||||
"model": "claude-3-sonnet",
|
||||
},
|
||||
timeout=10.0,
|
||||
)
|
||||
# Should require auth
|
||||
assert response.status_code in [401, 403]
|
||||
|
||||
|
||||
def run_quick_health_check() -> dict[str, Any]:
|
||||
"""
|
||||
Quick synchronous health check for all services.
|
||||
Can be run standalone to verify the stack is up.
|
||||
"""
|
||||
import httpx
|
||||
|
||||
results: dict[str, Any] = {
|
||||
"backend": False,
|
||||
"llm_gateway": False,
|
||||
"knowledge_base": False,
|
||||
}
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=5.0) as client:
|
||||
try:
|
||||
r = client.get(f"{BACKEND_URL}/health")
|
||||
results["backend"] = r.status_code == 200
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
r = client.get(f"{LLM_GATEWAY_URL}/health")
|
||||
results["llm_gateway"] = r.status_code == 200
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
r = client.get(f"{KNOWLEDGE_BASE_URL}/health")
|
||||
results["knowledge_base"] = r.status_code == 200
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Checking service health...")
|
||||
results = run_quick_health_check()
|
||||
for service, healthy in results.items():
|
||||
status = "OK" if healthy else "FAILED"
|
||||
print(f" {service}: {status}")
|
||||
|
||||
all_healthy = all(results.values())
|
||||
if all_healthy:
|
||||
print("\nAll services healthy! Run integration tests with:")
|
||||
print(" RUN_INTEGRATION_TESTS=true pytest tests/integration/ -v")
|
||||
else:
|
||||
print("\nSome services are not healthy. Start the stack with:")
|
||||
print(" make dev")
|
||||
Reference in New Issue
Block a user