9 Commits

Author SHA1 Message Date
Felipe Cardoso
4b149b8a52 feat(tests): add unit tests for Context Management API routes
- Added detailed unit tests for `/context` endpoints, covering health checks, context assembly, token counting, budget retrieval, and cache invalidation.
- Included edge cases, error handling, and input validation for context-related operations.
- Improved test coverage for the Context Management module with mocked dependencies and integration scenarios.
2026-01-05 01:02:49 +01:00
Felipe Cardoso
ad0c06851d feat(tests): add comprehensive E2E tests for MCP and Agent workflows
- Introduced end-to-end tests for MCP workflows, including server discovery, authentication, context engine operations, error handling, and input validation.
- Added full lifecycle tests for agent workflows, covering type management, instance spawning, status transitions, and admin-only operations.
- Enhanced test coverage for real-world MCP and Agent scenarios across PostgreSQL and async environments.
2026-01-05 01:02:41 +01:00
Felipe Cardoso
49359b1416 feat(api): add Context Management API and routes
- Introduced a new `context` module and its endpoints for Context Management.
- Added `/context` route to the API router for assembling LLM context, token counting, budget management, and cache invalidation.
- Implemented health checks, context assembly, token counting, and caching operations in the Context Management Engine.
- Included schemas for request/response models and tightened error handling for context-related operations.
2026-01-05 01:02:33 +01:00
Felipe Cardoso
911d950c15 feat(tests): add comprehensive integration tests for MCP stack
- Introduced integration tests covering backend, LLM Gateway, Knowledge Base, and Context Engine.
- Includes health checks, tool listing, token counting, and end-to-end MCP flows.
- Added `RUN_INTEGRATION_TESTS` environment flag to enable selective test execution.
- Includes a quick health check script to verify service availability before running tests.
2026-01-05 01:02:22 +01:00
Felipe Cardoso
b2a3ac60e0 feat: add integration testing target to Makefile
- Introduced `test-integration` command for MCP integration tests.
- Expanded help section with details about running integration tests.
- Improved Makefile's testing capabilities for enhanced developer workflows.
2026-01-05 01:02:16 +01:00
Felipe Cardoso
dea092e1bb feat: extend Makefile with testing and validation commands, expand help section
- Added new targets for testing (`test`, `test-backend`, `test-mcp`, `test-frontend`, etc.) and validation (`validate`, `validate-all`).
- Enhanced help section to reflect updates, including detailed descriptions for testing, validation, and new MCP-specific commands.
- Improved developer workflow by centralizing testing and linting processes in the Makefile.
2026-01-05 01:02:09 +01:00
Felipe Cardoso
4154dd5268 feat: enhance database transactions, add Makefiles, and improve Docker setup
- Refactored database batch operations to ensure transaction atomicity and simplify nested structure.
- Added `Makefile` for `knowledge-base` and `llm-gateway` modules to streamline development workflows.
- Simplified `Dockerfile` for `llm-gateway` by removing multi-stage builds and optimizing dependencies.
- Improved code readability in `collection_manager` and `failover` modules with refined logic.
- Minor fixes in `test_server` and Redis health check handling for better diagnostics.
2026-01-05 00:49:19 +01:00
Felipe Cardoso
db12937495 feat: integrate MCP servers into Docker Compose files for development and deployment
- Added `mcp-llm-gateway` and `mcp-knowledge-base` services to `docker-compose.dev.yml`, `docker-compose.deploy.yml`, and `docker-compose.yml` for AI agent capabilities.
- Configured health checks, environment variables, and dependencies for MCP services.
- Included updated resource limits and deployment settings for production environments.
- Connected backend and agent services to the MCP servers.
2026-01-05 00:49:10 +01:00
Felipe Cardoso
81e1456631 test(activity): fix flaky test by generating fresh events for today group
- Resolves timezone and day boundary issues by creating fresh "today" events in the test case.
2026-01-05 00:30:36 +01:00
22 changed files with 3624 additions and 126 deletions

View File

@@ -1,18 +1,31 @@
.PHONY: help dev dev-full prod down logs logs-dev clean clean-slate drop-db reset-db push-images deploy
.PHONY: test test-backend test-mcp test-frontend test-all test-cov test-integration validate validate-all
VERSION ?= latest
REGISTRY ?= ghcr.io/cardosofelipe/pragma-stack
# Default target
help:
@echo "FastAPI + Next.js Full-Stack Template"
@echo "Syndarix - AI-Powered Software Consulting Agency"
@echo ""
@echo "Development:"
@echo " make dev - Start backend + db (frontend runs separately)"
@echo " make dev - Start backend + db + MCP servers (frontend runs separately)"
@echo " make dev-full - Start all services including frontend"
@echo " make down - Stop all services"
@echo " make logs-dev - Follow dev container logs"
@echo ""
@echo "Testing:"
@echo " make test - Run all tests (backend + MCP servers)"
@echo " make test-backend - Run backend tests only"
@echo " make test-mcp - Run MCP server tests only"
@echo " make test-frontend - Run frontend tests only"
@echo " make test-cov - Run all tests with coverage reports"
@echo " make test-integration - Run MCP integration tests (requires running stack)"
@echo ""
@echo "Validation:"
@echo " make validate - Validate backend + MCP servers (lint, type-check, test)"
@echo " make validate-all - Validate everything including frontend"
@echo ""
@echo "Database:"
@echo " make drop-db - Drop and recreate empty database"
@echo " make reset-db - Drop database and apply all migrations"
@@ -28,8 +41,10 @@ help:
@echo " make clean-slate - Stop containers AND delete volumes (DATA LOSS!)"
@echo ""
@echo "Subdirectory commands:"
@echo " cd backend && make help - Backend-specific commands"
@echo " cd frontend && npm run - Frontend-specific commands"
@echo " cd backend && make help - Backend-specific commands"
@echo " cd mcp-servers/llm-gateway && make - LLM Gateway commands"
@echo " cd mcp-servers/knowledge-base && make - Knowledge Base commands"
@echo " cd frontend && npm run - Frontend-specific commands"
# ============================================================================
# Development
@@ -99,3 +114,72 @@ clean:
# WARNING! THIS REMOVES CONTAINERS AND VOLUMES AS WELL - DO NOT USE THIS UNLESS YOU WANT TO START OVER WITH DATA AND ALL
clean-slate:
docker compose -f docker-compose.dev.yml down -v --remove-orphans
# ============================================================================
# Testing
# ============================================================================
test: test-backend test-mcp
@echo ""
@echo "All tests passed!"
test-backend:
@echo "Running backend tests..."
@cd backend && IS_TEST=True uv run pytest tests/ -v
test-mcp:
@echo "Running MCP server tests..."
@echo ""
@echo "=== LLM Gateway ==="
@cd mcp-servers/llm-gateway && uv run pytest tests/ -v
@echo ""
@echo "=== Knowledge Base ==="
@cd mcp-servers/knowledge-base && uv run pytest tests/ -v
test-frontend:
@echo "Running frontend tests..."
@cd frontend && npm test
test-all: test test-frontend
@echo ""
@echo "All tests (backend + MCP + frontend) passed!"
test-cov:
@echo "Running all tests with coverage..."
@echo ""
@echo "=== Backend Coverage ==="
@cd backend && IS_TEST=True uv run pytest tests/ -v --cov=app --cov-report=term-missing
@echo ""
@echo "=== LLM Gateway Coverage ==="
@cd mcp-servers/llm-gateway && uv run pytest tests/ -v --cov=. --cov-report=term-missing
@echo ""
@echo "=== Knowledge Base Coverage ==="
@cd mcp-servers/knowledge-base && uv run pytest tests/ -v --cov=. --cov-report=term-missing
test-integration:
@echo "Running MCP integration tests..."
@echo "Note: Requires running stack (make dev first)"
@cd backend && RUN_INTEGRATION_TESTS=true IS_TEST=True uv run pytest tests/integration/ -v
# ============================================================================
# Validation (lint + type-check + test)
# ============================================================================
validate:
@echo "Validating backend..."
@cd backend && make validate
@echo ""
@echo "Validating LLM Gateway..."
@cd mcp-servers/llm-gateway && make validate
@echo ""
@echo "Validating Knowledge Base..."
@cd mcp-servers/knowledge-base && make validate
@echo ""
@echo "All validations passed!"
validate-all: validate
@echo ""
@echo "Validating frontend..."
@cd frontend && npm run validate
@echo ""
@echo "Full validation passed!"

View File

@@ -1,4 +1,4 @@
.PHONY: help lint lint-fix format format-check type-check test test-cov validate clean install-dev sync check-docker install-e2e test-e2e test-e2e-schema test-all
.PHONY: help lint lint-fix format format-check type-check test test-cov validate clean install-dev sync check-docker install-e2e test-e2e test-e2e-schema test-all test-integration
# Default target
help:
@@ -22,6 +22,7 @@ help:
@echo " make test-cov - Run pytest with coverage report"
@echo " make test-e2e - Run E2E tests (PostgreSQL, requires Docker)"
@echo " make test-e2e-schema - Run Schemathesis API schema tests"
@echo " make test-integration - Run MCP integration tests (requires running stack)"
@echo " make test-all - Run all tests (unit + E2E)"
@echo " make check-docker - Check if Docker is available"
@echo ""
@@ -82,6 +83,15 @@ test-cov:
@IS_TEST=True PYTHONPATH=. uv run pytest --cov=app --cov-report=term-missing --cov-report=html -n 16
@echo "📊 Coverage report generated in htmlcov/index.html"
# ============================================================================
# Integration Testing (requires running stack: make dev)
# ============================================================================
test-integration:
@echo "🧪 Running MCP integration tests..."
@echo "Note: Requires running stack (make dev from project root)"
@RUN_INTEGRATION_TESTS=true IS_TEST=True PYTHONPATH=. uv run pytest tests/integration/ -v
# ============================================================================
# E2E Testing (requires Docker)
# ============================================================================

View File

@@ -5,6 +5,7 @@ from app.api.routes import (
agent_types,
agents,
auth,
context,
events,
issues,
mcp,
@@ -35,6 +36,9 @@ api_router.include_router(events.router, tags=["Events"])
# MCP (Model Context Protocol) router
api_router.include_router(mcp.router, prefix="/mcp", tags=["MCP"])
# Context Management Engine router
api_router.include_router(context.router, prefix="/context", tags=["Context"])
# Syndarix domain routers
api_router.include_router(projects.router, prefix="/projects", tags=["Projects"])
api_router.include_router(

View File

@@ -0,0 +1,411 @@
"""
Context Management API Endpoints.
Provides REST endpoints for context assembly and optimization
for LLM requests using the ContextEngine.
"""
import logging
from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel, Field
from app.api.dependencies.permissions import require_superuser
from app.models.user import User
from app.services.context import (
AssemblyTimeoutError,
BudgetExceededError,
ContextEngine,
ContextSettings,
create_context_engine,
get_context_settings,
)
from app.services.mcp import MCPClientManager, get_mcp_client
logger = logging.getLogger(__name__)
router = APIRouter()
# ============================================================================
# Singleton Engine Management
# ============================================================================
_context_engine: ContextEngine | None = None
def _get_or_create_engine(
mcp: MCPClientManager,
settings: ContextSettings | None = None,
) -> ContextEngine:
"""Get or create the singleton ContextEngine."""
global _context_engine
if _context_engine is None:
_context_engine = create_context_engine(
mcp_manager=mcp,
redis=None, # Optional: add Redis caching later
settings=settings or get_context_settings(),
)
logger.info("ContextEngine initialized")
else:
# Ensure MCP manager is up to date
_context_engine.set_mcp_manager(mcp)
return _context_engine
async def get_context_engine(
mcp: MCPClientManager = Depends(get_mcp_client),
) -> ContextEngine:
"""FastAPI dependency to get the ContextEngine."""
return _get_or_create_engine(mcp)
# ============================================================================
# Request/Response Schemas
# ============================================================================
class ConversationTurn(BaseModel):
"""A single conversation turn."""
role: str = Field(..., description="Role: 'user' or 'assistant'")
content: str = Field(..., description="Message content")
class ToolResult(BaseModel):
"""A tool execution result."""
tool_name: str = Field(..., description="Name of the tool")
content: str | dict[str, Any] = Field(..., description="Tool result content")
status: str = Field(default="success", description="Execution status")
class AssembleContextRequest(BaseModel):
"""Request to assemble context for an LLM request."""
project_id: str = Field(..., description="Project identifier")
agent_id: str = Field(..., description="Agent identifier")
query: str = Field(..., description="User's query or current request")
model: str = Field(
default="claude-3-sonnet",
description="Target model name",
)
max_tokens: int | None = Field(
None,
description="Maximum context tokens (uses model default if None)",
)
system_prompt: str | None = Field(
None,
description="System prompt/instructions",
)
task_description: str | None = Field(
None,
description="Current task description",
)
knowledge_query: str | None = Field(
None,
description="Query for knowledge base search",
)
knowledge_limit: int = Field(
default=10,
ge=1,
le=50,
description="Max number of knowledge results",
)
conversation_history: list[ConversationTurn] | None = Field(
None,
description="Previous conversation turns",
)
tool_results: list[ToolResult] | None = Field(
None,
description="Tool execution results to include",
)
compress: bool = Field(
default=True,
description="Whether to apply compression",
)
use_cache: bool = Field(
default=True,
description="Whether to use caching",
)
class AssembledContextResponse(BaseModel):
"""Response containing assembled context."""
content: str = Field(..., description="Assembled context content")
total_tokens: int = Field(..., description="Total token count")
context_count: int = Field(..., description="Number of context items included")
compressed: bool = Field(..., description="Whether compression was applied")
budget_used_percent: float = Field(
...,
description="Percentage of token budget used",
)
metadata: dict[str, Any] = Field(
default_factory=dict,
description="Additional metadata",
)
class TokenCountRequest(BaseModel):
"""Request to count tokens in content."""
content: str = Field(..., description="Content to count tokens in")
model: str | None = Field(
None,
description="Model for model-specific tokenization",
)
class TokenCountResponse(BaseModel):
"""Response containing token count."""
token_count: int = Field(..., description="Number of tokens")
model: str | None = Field(None, description="Model used for counting")
class BudgetInfoResponse(BaseModel):
"""Response containing budget information for a model."""
model: str = Field(..., description="Model name")
total_tokens: int = Field(..., description="Total token budget")
system_tokens: int = Field(..., description="Tokens reserved for system")
knowledge_tokens: int = Field(..., description="Tokens for knowledge")
conversation_tokens: int = Field(..., description="Tokens for conversation")
tool_tokens: int = Field(..., description="Tokens for tool results")
response_reserve: int = Field(..., description="Tokens reserved for response")
class ContextEngineStatsResponse(BaseModel):
"""Response containing engine statistics."""
cache: dict[str, Any] = Field(..., description="Cache statistics")
settings: dict[str, Any] = Field(..., description="Current settings")
class HealthResponse(BaseModel):
"""Health check response."""
status: str = Field(..., description="Health status")
mcp_connected: bool = Field(..., description="Whether MCP is connected")
cache_enabled: bool = Field(..., description="Whether caching is enabled")
# ============================================================================
# Endpoints
# ============================================================================
@router.get(
"/health",
response_model=HealthResponse,
summary="Context Engine Health",
description="Check health status of the context engine.",
)
async def health_check(
engine: ContextEngine = Depends(get_context_engine),
) -> HealthResponse:
"""Check context engine health."""
stats = await engine.get_stats()
return HealthResponse(
status="healthy",
mcp_connected=engine._mcp is not None,
cache_enabled=stats.get("settings", {}).get("cache_enabled", False),
)
@router.post(
"/assemble",
response_model=AssembledContextResponse,
summary="Assemble Context",
description="Assemble optimized context for an LLM request.",
)
async def assemble_context(
request: AssembleContextRequest,
current_user: User = Depends(require_superuser),
engine: ContextEngine = Depends(get_context_engine),
) -> AssembledContextResponse:
"""
Assemble optimized context for an LLM request.
This endpoint gathers context from various sources, scores and ranks them,
compresses if needed, and formats for the target model.
"""
logger.info(
"Context assembly for project=%s agent=%s by user=%s",
request.project_id,
request.agent_id,
current_user.id,
)
# Convert conversation history to dict format
conversation_history = None
if request.conversation_history:
conversation_history = [
{"role": turn.role, "content": turn.content}
for turn in request.conversation_history
]
# Convert tool results to dict format
tool_results = None
if request.tool_results:
tool_results = [
{
"tool_name": tr.tool_name,
"content": tr.content,
"status": tr.status,
}
for tr in request.tool_results
]
try:
result = await engine.assemble_context(
project_id=request.project_id,
agent_id=request.agent_id,
query=request.query,
model=request.model,
max_tokens=request.max_tokens,
system_prompt=request.system_prompt,
task_description=request.task_description,
knowledge_query=request.knowledge_query,
knowledge_limit=request.knowledge_limit,
conversation_history=conversation_history,
tool_results=tool_results,
compress=request.compress,
use_cache=request.use_cache,
)
# Calculate budget usage percentage
budget = await engine.get_budget_for_model(request.model, request.max_tokens)
budget_used_percent = (result.total_tokens / budget.total) * 100
# Check if compression was applied (from metadata if available)
was_compressed = result.metadata.get("compressed_contexts", 0) > 0
return AssembledContextResponse(
content=result.content,
total_tokens=result.total_tokens,
context_count=result.context_count,
compressed=was_compressed,
budget_used_percent=round(budget_used_percent, 2),
metadata={
"model": request.model,
"query": request.query,
"knowledge_included": bool(request.knowledge_query),
"conversation_turns": len(request.conversation_history or []),
"excluded_count": result.excluded_count,
"assembly_time_ms": result.assembly_time_ms,
},
)
except AssemblyTimeoutError as e:
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail=f"Context assembly timed out: {e}",
) from e
except BudgetExceededError as e:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"Token budget exceeded: {e}",
) from e
except Exception as e:
logger.exception("Context assembly failed")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Context assembly failed: {e}",
) from e
@router.post(
"/count-tokens",
response_model=TokenCountResponse,
summary="Count Tokens",
description="Count tokens in content using the LLM Gateway.",
)
async def count_tokens(
request: TokenCountRequest,
engine: ContextEngine = Depends(get_context_engine),
) -> TokenCountResponse:
"""Count tokens in content."""
try:
count = await engine.count_tokens(
content=request.content,
model=request.model,
)
return TokenCountResponse(
token_count=count,
model=request.model,
)
except Exception as e:
logger.warning(f"Token counting failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Token counting failed: {e}",
) from e
@router.get(
"/budget/{model}",
response_model=BudgetInfoResponse,
summary="Get Token Budget",
description="Get token budget allocation for a specific model.",
)
async def get_budget(
model: str,
max_tokens: Annotated[int | None, Query(description="Custom max tokens")] = None,
engine: ContextEngine = Depends(get_context_engine),
) -> BudgetInfoResponse:
"""Get token budget information for a model."""
budget = await engine.get_budget_for_model(model, max_tokens)
return BudgetInfoResponse(
model=model,
total_tokens=budget.total,
system_tokens=budget.system,
knowledge_tokens=budget.knowledge,
conversation_tokens=budget.conversation,
tool_tokens=budget.tools,
response_reserve=budget.response_reserve,
)
@router.get(
"/stats",
response_model=ContextEngineStatsResponse,
summary="Engine Statistics",
description="Get context engine statistics and configuration.",
)
async def get_stats(
current_user: User = Depends(require_superuser),
engine: ContextEngine = Depends(get_context_engine),
) -> ContextEngineStatsResponse:
"""Get engine statistics."""
stats = await engine.get_stats()
return ContextEngineStatsResponse(
cache=stats.get("cache", {}),
settings=stats.get("settings", {}),
)
@router.post(
"/cache/invalidate",
status_code=status.HTTP_204_NO_CONTENT,
summary="Invalidate Cache (Admin Only)",
description="Invalidate context cache entries.",
)
async def invalidate_cache(
project_id: Annotated[
str | None, Query(description="Project to invalidate")
] = None,
pattern: Annotated[str | None, Query(description="Pattern to match")] = None,
current_user: User = Depends(require_superuser),
engine: ContextEngine = Depends(get_context_engine),
) -> None:
"""Invalidate cache entries."""
logger.info(
"Cache invalidation by user %s: project=%s pattern=%s",
current_user.id,
project_id,
pattern,
)
await engine.invalidate_cache(project_id=project_id, pattern=pattern)

View File

@@ -0,0 +1,466 @@
"""
Tests for Context Management API Routes.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import status
from fastapi.testclient import TestClient
from app.main import app
from app.models.user import User
from app.services.context import (
AssembledContext,
AssemblyTimeoutError,
BudgetExceededError,
ContextEngine,
TokenBudget,
)
from app.services.mcp import MCPClientManager
@pytest.fixture
def mock_mcp_client():
"""Create a mock MCP client manager."""
client = MagicMock(spec=MCPClientManager)
client.is_initialized = True
return client
@pytest.fixture
def mock_context_engine(mock_mcp_client):
"""Create a mock ContextEngine."""
engine = MagicMock(spec=ContextEngine)
engine._mcp = mock_mcp_client
return engine
@pytest.fixture
def mock_superuser():
"""Create a mock superuser."""
user = MagicMock(spec=User)
user.id = "00000000-0000-0000-0000-000000000001"
user.is_superuser = True
user.email = "admin@example.com"
return user
@pytest.fixture
def client(mock_mcp_client, mock_context_engine, mock_superuser):
"""Create a FastAPI test client with mocked dependencies."""
from app.api.dependencies.permissions import require_superuser
from app.api.routes.context import get_context_engine
from app.services.mcp import get_mcp_client
# Override dependencies
async def override_get_mcp_client():
return mock_mcp_client
async def override_get_context_engine():
return mock_context_engine
async def override_require_superuser():
return mock_superuser
app.dependency_overrides[get_mcp_client] = override_get_mcp_client
app.dependency_overrides[get_context_engine] = override_get_context_engine
app.dependency_overrides[require_superuser] = override_require_superuser
with patch("app.main.check_database_health", return_value=True):
yield TestClient(app)
# Clean up
app.dependency_overrides.clear()
class TestContextHealth:
"""Tests for GET /context/health endpoint."""
def test_health_check_success(self, client, mock_context_engine, mock_mcp_client):
"""Test context engine health check."""
mock_context_engine.get_stats = AsyncMock(
return_value={
"cache": {"hits": 10, "misses": 5},
"settings": {"cache_enabled": True},
}
)
response = client.get("/api/v1/context/health")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["status"] == "healthy"
assert "mcp_connected" in data
assert "cache_enabled" in data
class TestAssembleContext:
"""Tests for POST /context/assemble endpoint."""
def test_assemble_context_success(self, client, mock_context_engine):
"""Test successful context assembly."""
# Create mock assembled context
mock_result = MagicMock(spec=AssembledContext)
mock_result.content = "Assembled context content"
mock_result.total_tokens = 500
mock_result.context_count = 2
mock_result.excluded_count = 0
mock_result.assembly_time_ms = 50.5
mock_result.metadata = {}
mock_context_engine.assemble_context = AsyncMock(return_value=mock_result)
mock_context_engine.get_budget_for_model = AsyncMock(
return_value=TokenBudget(
total=4000,
system=500,
knowledge=1500,
conversation=1000,
tools=500,
response_reserve=500,
)
)
response = client.post(
"/api/v1/context/assemble",
json={
"project_id": "test-project",
"agent_id": "test-agent",
"query": "What is the auth flow?",
"model": "claude-3-sonnet",
"system_prompt": "You are a helpful assistant.",
},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["content"] == "Assembled context content"
assert data["total_tokens"] == 500
assert data["context_count"] == 2
assert data["compressed"] is False
assert "budget_used_percent" in data
def test_assemble_context_with_conversation(self, client, mock_context_engine):
"""Test context assembly with conversation history."""
mock_result = MagicMock(spec=AssembledContext)
mock_result.content = "Context with history"
mock_result.total_tokens = 800
mock_result.context_count = 1
mock_result.excluded_count = 0
mock_result.assembly_time_ms = 30.0
mock_result.metadata = {}
mock_context_engine.assemble_context = AsyncMock(return_value=mock_result)
mock_context_engine.get_budget_for_model = AsyncMock(
return_value=TokenBudget(
total=4000,
system=500,
knowledge=1500,
conversation=1000,
tools=500,
response_reserve=500,
)
)
response = client.post(
"/api/v1/context/assemble",
json={
"project_id": "test-project",
"agent_id": "test-agent",
"query": "Continue the discussion",
"conversation_history": [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
],
},
)
assert response.status_code == status.HTTP_200_OK
call_args = mock_context_engine.assemble_context.call_args
assert call_args.kwargs["conversation_history"] == [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
]
def test_assemble_context_with_tool_results(self, client, mock_context_engine):
"""Test context assembly with tool results."""
mock_result = MagicMock(spec=AssembledContext)
mock_result.content = "Context with tools"
mock_result.total_tokens = 600
mock_result.context_count = 1
mock_result.excluded_count = 0
mock_result.assembly_time_ms = 25.0
mock_result.metadata = {}
mock_context_engine.assemble_context = AsyncMock(return_value=mock_result)
mock_context_engine.get_budget_for_model = AsyncMock(
return_value=TokenBudget(
total=4000,
system=500,
knowledge=1500,
conversation=1000,
tools=500,
response_reserve=500,
)
)
response = client.post(
"/api/v1/context/assemble",
json={
"project_id": "test-project",
"agent_id": "test-agent",
"query": "What did the search find?",
"tool_results": [
{
"tool_name": "search_knowledge",
"content": {"results": ["item1", "item2"]},
"status": "success",
}
],
},
)
assert response.status_code == status.HTTP_200_OK
call_args = mock_context_engine.assemble_context.call_args
assert len(call_args.kwargs["tool_results"]) == 1
def test_assemble_context_timeout(self, client, mock_context_engine):
"""Test context assembly timeout error."""
mock_context_engine.assemble_context = AsyncMock(
side_effect=AssemblyTimeoutError("Assembly exceeded 5000ms limit")
)
response = client.post(
"/api/v1/context/assemble",
json={
"project_id": "test-project",
"agent_id": "test-agent",
"query": "test",
},
)
assert response.status_code == status.HTTP_504_GATEWAY_TIMEOUT
def test_assemble_context_budget_exceeded(self, client, mock_context_engine):
"""Test context assembly budget exceeded error."""
mock_context_engine.assemble_context = AsyncMock(
side_effect=BudgetExceededError("Token budget exceeded: 5000 > 4000")
)
response = client.post(
"/api/v1/context/assemble",
json={
"project_id": "test-project",
"agent_id": "test-agent",
"query": "test",
},
)
assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE
def test_assemble_context_validation_error(self, client):
"""Test context assembly with invalid request."""
response = client.post(
"/api/v1/context/assemble",
json={}, # Missing required fields
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
class TestCountTokens:
"""Tests for POST /context/count-tokens endpoint."""
def test_count_tokens_success(self, client, mock_context_engine):
"""Test successful token counting."""
mock_context_engine.count_tokens = AsyncMock(return_value=42)
response = client.post(
"/api/v1/context/count-tokens",
json={
"content": "This is some test content.",
"model": "claude-3-sonnet",
},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["token_count"] == 42
assert data["model"] == "claude-3-sonnet"
def test_count_tokens_without_model(self, client, mock_context_engine):
"""Test token counting without specifying model."""
mock_context_engine.count_tokens = AsyncMock(return_value=100)
response = client.post(
"/api/v1/context/count-tokens",
json={"content": "Some content to count."},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["token_count"] == 100
assert data["model"] is None
class TestGetBudget:
"""Tests for GET /context/budget/{model} endpoint."""
def test_get_budget_success(self, client, mock_context_engine):
"""Test getting token budget for a model."""
mock_context_engine.get_budget_for_model = AsyncMock(
return_value=TokenBudget(
total=100000,
system=10000,
knowledge=40000,
conversation=30000,
tools=10000,
response_reserve=10000,
)
)
response = client.get("/api/v1/context/budget/claude-3-opus")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["model"] == "claude-3-opus"
assert data["total_tokens"] == 100000
assert data["system_tokens"] == 10000
assert data["knowledge_tokens"] == 40000
def test_get_budget_with_max_tokens(self, client, mock_context_engine):
"""Test getting budget with custom max tokens."""
mock_context_engine.get_budget_for_model = AsyncMock(
return_value=TokenBudget(
total=2000,
system=200,
knowledge=800,
conversation=600,
tools=200,
response_reserve=200,
)
)
response = client.get("/api/v1/context/budget/gpt-4?max_tokens=2000")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["total_tokens"] == 2000
class TestGetStats:
"""Tests for GET /context/stats endpoint."""
def test_get_stats_success(self, client, mock_context_engine):
"""Test getting engine statistics."""
mock_context_engine.get_stats = AsyncMock(
return_value={
"cache": {
"hits": 100,
"misses": 25,
"hit_rate": 0.8,
},
"settings": {
"compression_threshold": 0.9,
"max_assembly_time_ms": 5000,
"cache_enabled": True,
},
}
)
response = client.get("/api/v1/context/stats")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["cache"]["hits"] == 100
assert data["settings"]["cache_enabled"] is True
class TestInvalidateCache:
"""Tests for POST /context/cache/invalidate endpoint."""
def test_invalidate_cache_by_project(self, client, mock_context_engine):
"""Test cache invalidation by project ID."""
mock_context_engine.invalidate_cache = AsyncMock(return_value=5)
response = client.post(
"/api/v1/context/cache/invalidate?project_id=test-project"
)
assert response.status_code == status.HTTP_204_NO_CONTENT
mock_context_engine.invalidate_cache.assert_called_once()
call_kwargs = mock_context_engine.invalidate_cache.call_args.kwargs
assert call_kwargs["project_id"] == "test-project"
def test_invalidate_cache_by_pattern(self, client, mock_context_engine):
"""Test cache invalidation by pattern."""
mock_context_engine.invalidate_cache = AsyncMock(return_value=10)
response = client.post("/api/v1/context/cache/invalidate?pattern=*auth*")
assert response.status_code == status.HTTP_204_NO_CONTENT
mock_context_engine.invalidate_cache.assert_called_once()
call_kwargs = mock_context_engine.invalidate_cache.call_args.kwargs
assert call_kwargs["pattern"] == "*auth*"
def test_invalidate_cache_all(self, client, mock_context_engine):
"""Test invalidating all cache entries."""
mock_context_engine.invalidate_cache = AsyncMock(return_value=100)
response = client.post("/api/v1/context/cache/invalidate")
assert response.status_code == status.HTTP_204_NO_CONTENT
class TestContextEndpointsEdgeCases:
"""Edge case tests for Context endpoints."""
def test_context_content_type(self, client, mock_context_engine):
"""Test that endpoints return JSON content type."""
mock_context_engine.get_stats = AsyncMock(
return_value={"cache": {}, "settings": {}}
)
response = client.get("/api/v1/context/health")
assert "application/json" in response.headers["content-type"]
def test_assemble_context_with_knowledge_query(self, client, mock_context_engine):
"""Test context assembly with knowledge base query."""
mock_result = MagicMock(spec=AssembledContext)
mock_result.content = "Context with knowledge"
mock_result.total_tokens = 1000
mock_result.context_count = 3
mock_result.excluded_count = 0
mock_result.assembly_time_ms = 100.0
mock_result.metadata = {
"compressed_contexts": 1
} # Indicates compression happened
mock_context_engine.assemble_context = AsyncMock(return_value=mock_result)
mock_context_engine.get_budget_for_model = AsyncMock(
return_value=TokenBudget(
total=4000,
system=500,
knowledge=1500,
conversation=1000,
tools=500,
response_reserve=500,
)
)
response = client.post(
"/api/v1/context/assemble",
json={
"project_id": "test-project",
"agent_id": "test-agent",
"query": "How does authentication work?",
"knowledge_query": "authentication flow implementation",
"knowledge_limit": 5,
},
)
assert response.status_code == status.HTTP_200_OK
call_kwargs = mock_context_engine.assemble_context.call_args.kwargs
assert call_kwargs["knowledge_query"] == "authentication flow implementation"
assert call_kwargs["knowledge_limit"] == 5

View File

@@ -0,0 +1,646 @@
"""
Agent E2E Workflow Tests.
Tests complete workflows for AI agents including:
- Agent type management (admin-only)
- Agent instance spawning and lifecycle
- Agent status transitions (pause/resume/terminate)
- Authorization and access control
Usage:
make test-e2e # Run all E2E tests
"""
from uuid import uuid4
import pytest
pytestmark = [
pytest.mark.e2e,
pytest.mark.postgres,
pytest.mark.asyncio,
]
class TestAgentTypesAdminWorkflows:
"""Test agent type management (admin-only operations)."""
async def test_create_agent_type_requires_superuser(self, e2e_client):
"""Test that creating agent types requires superuser privileges."""
# Register regular user
email = f"regular-{uuid4().hex[:8]}@example.com"
password = "RegularPass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Regular",
"last_name": "User",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# Try to create agent type
response = await e2e_client.post(
"/api/v1/agent-types",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"name": "Test Agent",
"slug": f"test-agent-{uuid4().hex[:8]}",
"personality_prompt": "You are a helpful assistant.",
"primary_model": "claude-3-sonnet",
},
)
assert response.status_code == 403
async def test_superuser_can_create_agent_type(self, e2e_client, e2e_superuser):
"""Test that superuser can create and manage agent types."""
slug = f"test-type-{uuid4().hex[:8]}"
# Create agent type
create_resp = await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Product Owner Agent",
"slug": slug,
"description": "A product owner agent for requirements gathering",
"expertise": ["requirements", "user_stories", "prioritization"],
"personality_prompt": "You are a product owner focused on delivering value.",
"primary_model": "claude-3-opus",
"fallback_models": ["claude-3-sonnet"],
"model_params": {"temperature": 0.7, "max_tokens": 4000},
"mcp_servers": ["knowledge-base"],
"is_active": True,
},
)
assert create_resp.status_code == 201, f"Failed: {create_resp.text}"
agent_type = create_resp.json()
assert agent_type["name"] == "Product Owner Agent"
assert agent_type["slug"] == slug
assert agent_type["primary_model"] == "claude-3-opus"
assert agent_type["is_active"] is True
assert "requirements" in agent_type["expertise"]
async def test_list_agent_types_public(self, e2e_client, e2e_superuser):
"""Test that any authenticated user can list agent types."""
# First create an agent type as superuser
slug = f"list-test-{uuid4().hex[:8]}"
await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": f"List Test Agent {slug}",
"slug": slug,
"personality_prompt": "Test agent.",
"primary_model": "claude-3-sonnet",
},
)
# Register regular user
email = f"lister-{uuid4().hex[:8]}@example.com"
password = "ListerPass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "List",
"last_name": "User",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# List agent types as regular user
list_resp = await e2e_client.get(
"/api/v1/agent-types",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert list_resp.status_code == 200
data = list_resp.json()
assert "data" in data
assert "pagination" in data
assert data["pagination"]["total"] >= 1
async def test_get_agent_type_by_slug(self, e2e_client, e2e_superuser):
"""Test getting agent type by slug."""
slug = f"slug-test-{uuid4().hex[:8]}"
# Create agent type
await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": f"Slug Test {slug}",
"slug": slug,
"personality_prompt": "Test agent.",
"primary_model": "claude-3-sonnet",
},
)
# Get by slug (route is /slug/{slug}, not /by-slug/{slug})
get_resp = await e2e_client.get(
f"/api/v1/agent-types/slug/{slug}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert get_resp.status_code == 200
data = get_resp.json()
assert data["slug"] == slug
async def test_update_agent_type(self, e2e_client, e2e_superuser):
"""Test updating an agent type."""
slug = f"update-test-{uuid4().hex[:8]}"
# Create agent type
create_resp = await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Original Name",
"slug": slug,
"personality_prompt": "Original prompt.",
"primary_model": "claude-3-sonnet",
},
)
agent_type_id = create_resp.json()["id"]
# Update agent type
update_resp = await e2e_client.patch(
f"/api/v1/agent-types/{agent_type_id}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Updated Name",
"description": "Added description",
},
)
assert update_resp.status_code == 200
updated = update_resp.json()
assert updated["name"] == "Updated Name"
assert updated["description"] == "Added description"
assert updated["personality_prompt"] == "Original prompt." # Unchanged
class TestAgentInstanceWorkflows:
"""Test agent instance spawning and lifecycle."""
async def test_spawn_agent_workflow(self, e2e_client, e2e_superuser):
"""Test complete workflow: create type -> create project -> spawn agent."""
# 1. Create agent type as superuser
type_slug = f"spawn-test-type-{uuid4().hex[:8]}"
type_resp = await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Spawn Test Agent",
"slug": type_slug,
"personality_prompt": "You are a helpful agent.",
"primary_model": "claude-3-sonnet",
},
)
assert type_resp.status_code == 201
agent_type = type_resp.json()
agent_type_id = agent_type["id"]
# 2. Create a project (superuser can create projects too)
project_slug = f"spawn-test-project-{uuid4().hex[:8]}"
project_resp = await e2e_client.post(
"/api/v1/projects",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Spawn Test Project", "slug": project_slug},
)
assert project_resp.status_code == 201
project = project_resp.json()
project_id = project["id"]
# 3. Spawn agent instance
spawn_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/agents",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"agent_type_id": agent_type_id,
"project_id": project_id,
"name": "My PO Agent",
},
)
assert spawn_resp.status_code == 201, f"Failed: {spawn_resp.text}"
agent = spawn_resp.json()
assert agent["name"] == "My PO Agent"
assert agent["status"] == "idle"
assert agent["project_id"] == project_id
assert agent["agent_type_id"] == agent_type_id
async def test_list_project_agents(self, e2e_client, e2e_superuser):
"""Test listing agents in a project."""
# Setup: Create agent type and project
type_slug = f"list-agents-type-{uuid4().hex[:8]}"
type_resp = await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "List Agents Type",
"slug": type_slug,
"personality_prompt": "Test agent.",
"primary_model": "claude-3-sonnet",
},
)
agent_type_id = type_resp.json()["id"]
project_slug = f"list-agents-project-{uuid4().hex[:8]}"
project_resp = await e2e_client.post(
"/api/v1/projects",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "List Agents Project", "slug": project_slug},
)
project_id = project_resp.json()["id"]
# Spawn multiple agents
for i in range(3):
await e2e_client.post(
f"/api/v1/projects/{project_id}/agents",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"agent_type_id": agent_type_id,
"project_id": project_id,
"name": f"Agent {i + 1}",
},
)
# List agents
list_resp = await e2e_client.get(
f"/api/v1/projects/{project_id}/agents",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert list_resp.status_code == 200
data = list_resp.json()
assert data["pagination"]["total"] == 3
assert len(data["data"]) == 3
class TestAgentLifecycle:
"""Test agent lifecycle operations (pause/resume/terminate)."""
async def test_agent_pause_and_resume(self, e2e_client, e2e_superuser):
"""Test pausing and resuming an agent."""
# Setup: Create agent type, project, and agent
type_slug = f"pause-test-type-{uuid4().hex[:8]}"
type_resp = await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Pause Test Type",
"slug": type_slug,
"personality_prompt": "Test agent.",
"primary_model": "claude-3-sonnet",
},
)
agent_type_id = type_resp.json()["id"]
project_slug = f"pause-test-project-{uuid4().hex[:8]}"
project_resp = await e2e_client.post(
"/api/v1/projects",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Pause Test Project", "slug": project_slug},
)
project_id = project_resp.json()["id"]
spawn_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/agents",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"agent_type_id": agent_type_id,
"project_id": project_id,
"name": "Pausable Agent",
},
)
agent_id = spawn_resp.json()["id"]
assert spawn_resp.json()["status"] == "idle"
# Pause agent
pause_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/agents/{agent_id}/pause",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert pause_resp.status_code == 200, f"Failed: {pause_resp.text}"
assert pause_resp.json()["status"] == "paused"
# Resume agent
resume_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/agents/{agent_id}/resume",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert resume_resp.status_code == 200, f"Failed: {resume_resp.text}"
assert resume_resp.json()["status"] == "idle"
async def test_agent_terminate(self, e2e_client, e2e_superuser):
"""Test terminating an agent."""
# Setup
type_slug = f"terminate-type-{uuid4().hex[:8]}"
type_resp = await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Terminate Type",
"slug": type_slug,
"personality_prompt": "Test agent.",
"primary_model": "claude-3-sonnet",
},
)
agent_type_id = type_resp.json()["id"]
project_slug = f"terminate-project-{uuid4().hex[:8]}"
project_resp = await e2e_client.post(
"/api/v1/projects",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Terminate Project", "slug": project_slug},
)
project_id = project_resp.json()["id"]
spawn_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/agents",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"agent_type_id": agent_type_id,
"project_id": project_id,
"name": "To Be Terminated",
},
)
agent_id = spawn_resp.json()["id"]
# Terminate agent (returns MessageResponse, not agent status)
terminate_resp = await e2e_client.delete(
f"/api/v1/projects/{project_id}/agents/{agent_id}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert terminate_resp.status_code == 200
assert "message" in terminate_resp.json()
# Verify terminated agent cannot be resumed (returns 400 or 422)
resume_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/agents/{agent_id}/resume",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert resume_resp.status_code in [400, 422] # Invalid transition
class TestAgentAccessControl:
"""Test agent access control and authorization."""
async def test_user_cannot_access_other_project_agents(
self, e2e_client, e2e_superuser
):
"""Test that users cannot access agents in projects they don't own."""
# Superuser creates agent type
type_slug = f"access-type-{uuid4().hex[:8]}"
type_resp = await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Access Type",
"slug": type_slug,
"personality_prompt": "Test agent.",
"primary_model": "claude-3-sonnet",
},
)
agent_type_id = type_resp.json()["id"]
# Superuser creates project and spawns agent
project_slug = f"protected-project-{uuid4().hex[:8]}"
project_resp = await e2e_client.post(
"/api/v1/projects",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Protected Project", "slug": project_slug},
)
project_id = project_resp.json()["id"]
spawn_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/agents",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"agent_type_id": agent_type_id,
"project_id": project_id,
"name": "Protected Agent",
},
)
agent_id = spawn_resp.json()["id"]
# Create a different user
email = f"other-user-{uuid4().hex[:8]}@example.com"
password = "OtherPass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Other",
"last_name": "User",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
other_tokens = login_resp.json()
# Other user tries to access the agent
get_resp = await e2e_client.get(
f"/api/v1/projects/{project_id}/agents/{agent_id}",
headers={"Authorization": f"Bearer {other_tokens['access_token']}"},
)
# Should be forbidden or not found
assert get_resp.status_code in [403, 404]
async def test_cannot_spawn_with_inactive_agent_type(
self, e2e_client, e2e_superuser
):
"""Test that agents cannot be spawned from inactive agent types."""
# Create agent type
type_slug = f"inactive-type-{uuid4().hex[:8]}"
type_resp = await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Inactive Type",
"slug": type_slug,
"personality_prompt": "Test agent.",
"primary_model": "claude-3-sonnet",
"is_active": True,
},
)
agent_type_id = type_resp.json()["id"]
# Deactivate the agent type
await e2e_client.patch(
f"/api/v1/agent-types/{agent_type_id}",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"is_active": False},
)
# Create project
project_slug = f"inactive-spawn-project-{uuid4().hex[:8]}"
project_resp = await e2e_client.post(
"/api/v1/projects",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Inactive Spawn Project", "slug": project_slug},
)
project_id = project_resp.json()["id"]
# Try to spawn agent with inactive type
spawn_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/agents",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"agent_type_id": agent_type_id,
"project_id": project_id,
"name": "Should Fail",
},
)
# 422 is correct for validation errors per REST conventions
assert spawn_resp.status_code == 422
class TestAgentMetrics:
"""Test agent metrics endpoint."""
async def test_get_agent_metrics(self, e2e_client, e2e_superuser):
"""Test retrieving agent metrics."""
# Setup
type_slug = f"metrics-type-{uuid4().hex[:8]}"
type_resp = await e2e_client.post(
"/api/v1/agent-types",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"name": "Metrics Type",
"slug": type_slug,
"personality_prompt": "Test agent.",
"primary_model": "claude-3-sonnet",
},
)
agent_type_id = type_resp.json()["id"]
project_slug = f"metrics-project-{uuid4().hex[:8]}"
project_resp = await e2e_client.post(
"/api/v1/projects",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={"name": "Metrics Project", "slug": project_slug},
)
project_id = project_resp.json()["id"]
spawn_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/agents",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"agent_type_id": agent_type_id,
"project_id": project_id,
"name": "Metrics Agent",
},
)
agent_id = spawn_resp.json()["id"]
# Get metrics
metrics_resp = await e2e_client.get(
f"/api/v1/projects/{project_id}/agents/{agent_id}/metrics",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert metrics_resp.status_code == 200
metrics = metrics_resp.json()
# Verify AgentInstanceMetrics structure
assert "total_instances" in metrics
assert "active_instances" in metrics
assert "idle_instances" in metrics
assert "total_tasks_completed" in metrics
assert "total_tokens_used" in metrics
assert "total_cost_incurred" in metrics

View File

@@ -0,0 +1,460 @@
"""
MCP and Context Engine E2E Workflow Tests.
Tests complete workflows involving MCP servers and the Context Engine
against real PostgreSQL. These tests verify:
- MCP server listing and tool discovery
- Context engine operations
- Admin-only MCP operations with proper authentication
- Error handling for MCP operations
Usage:
make test-e2e # Run all E2E tests
"""
from uuid import uuid4
import pytest
pytestmark = [
pytest.mark.e2e,
pytest.mark.postgres,
pytest.mark.asyncio,
]
class TestMCPServerDiscovery:
"""Test MCP server listing and discovery workflows."""
async def test_list_mcp_servers(self, e2e_client):
"""Test listing MCP servers returns expected configuration."""
response = await e2e_client.get("/api/v1/mcp/servers")
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
# Should have servers configured
assert "servers" in data
assert "total" in data
assert isinstance(data["servers"], list)
# Should have at least llm-gateway and knowledge-base
server_names = [s["name"] for s in data["servers"]]
assert "llm-gateway" in server_names
assert "knowledge-base" in server_names
async def test_list_all_mcp_tools(self, e2e_client):
"""Test listing all tools from all MCP servers."""
response = await e2e_client.get("/api/v1/mcp/tools")
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
assert "tools" in data
assert "total" in data
assert isinstance(data["tools"], list)
async def test_mcp_health_check(self, e2e_client):
"""Test MCP health check returns server status."""
response = await e2e_client.get("/api/v1/mcp/health")
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
assert "servers" in data
assert "healthy_count" in data
assert "unhealthy_count" in data
assert "total" in data
async def test_list_circuit_breakers(self, e2e_client):
"""Test listing circuit breaker status."""
response = await e2e_client.get("/api/v1/mcp/circuit-breakers")
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
assert "circuit_breakers" in data
assert isinstance(data["circuit_breakers"], list)
class TestMCPServerTools:
"""Test MCP server tool listing."""
async def test_list_llm_gateway_tools(self, e2e_client):
"""Test listing tools from LLM Gateway server."""
response = await e2e_client.get("/api/v1/mcp/servers/llm-gateway/tools")
# May return 200 with tools or 404 if server not connected
assert response.status_code in [200, 404, 502]
if response.status_code == 200:
data = response.json()
assert "tools" in data
assert "total" in data
async def test_list_knowledge_base_tools(self, e2e_client):
"""Test listing tools from Knowledge Base server."""
response = await e2e_client.get("/api/v1/mcp/servers/knowledge-base/tools")
# May return 200 with tools or 404/502 if server not connected
assert response.status_code in [200, 404, 502]
if response.status_code == 200:
data = response.json()
assert "tools" in data
assert "total" in data
async def test_invalid_server_returns_404(self, e2e_client):
"""Test that invalid server name returns 404."""
response = await e2e_client.get("/api/v1/mcp/servers/nonexistent-server/tools")
assert response.status_code == 404
class TestContextEngineWorkflows:
"""Test Context Engine operations."""
async def test_context_engine_health(self, e2e_client):
"""Test context engine health endpoint."""
response = await e2e_client.get("/api/v1/context/health")
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
assert data["status"] == "healthy"
assert "mcp_connected" in data
assert "cache_enabled" in data
async def test_get_token_budget_claude_sonnet(self, e2e_client):
"""Test getting token budget for Claude 3 Sonnet."""
response = await e2e_client.get("/api/v1/context/budget/claude-3-sonnet")
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
assert data["model"] == "claude-3-sonnet"
assert "total_tokens" in data
assert "system_tokens" in data
assert "knowledge_tokens" in data
assert "conversation_tokens" in data
assert "tool_tokens" in data
assert "response_reserve" in data
# Verify budget allocation makes sense
assert data["total_tokens"] > 0
total_allocated = (
data["system_tokens"]
+ data["knowledge_tokens"]
+ data["conversation_tokens"]
+ data["tool_tokens"]
+ data["response_reserve"]
)
assert total_allocated <= data["total_tokens"]
async def test_get_token_budget_with_custom_max(self, e2e_client):
"""Test getting token budget with custom max tokens."""
response = await e2e_client.get(
"/api/v1/context/budget/claude-3-sonnet",
params={"max_tokens": 50000},
)
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
assert data["model"] == "claude-3-sonnet"
# Custom max should be respected or capped
assert data["total_tokens"] <= 50000
async def test_count_tokens(self, e2e_client):
"""Test token counting endpoint."""
response = await e2e_client.post(
"/api/v1/context/count-tokens",
json={
"content": "Hello, this is a test message for token counting.",
"model": "claude-3-sonnet",
},
)
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
assert "token_count" in data
assert data["token_count"] > 0
assert data["model"] == "claude-3-sonnet"
class TestAdminMCPOperations:
"""Test admin-only MCP operations require authentication."""
async def test_tool_call_requires_auth(self, e2e_client):
"""Test that tool execution requires authentication."""
response = await e2e_client.post(
"/api/v1/mcp/call",
json={
"server": "llm-gateway",
"tool": "count_tokens",
"arguments": {"text": "test"},
},
)
# Should require authentication
assert response.status_code in [401, 403]
async def test_circuit_reset_requires_auth(self, e2e_client):
"""Test that circuit breaker reset requires authentication."""
response = await e2e_client.post(
"/api/v1/mcp/circuit-breakers/llm-gateway/reset"
)
assert response.status_code in [401, 403]
async def test_server_reconnect_requires_auth(self, e2e_client):
"""Test that server reconnect requires authentication."""
response = await e2e_client.post("/api/v1/mcp/servers/llm-gateway/reconnect")
assert response.status_code in [401, 403]
async def test_context_stats_requires_auth(self, e2e_client):
"""Test that context stats requires authentication."""
response = await e2e_client.get("/api/v1/context/stats")
assert response.status_code in [401, 403]
async def test_context_assemble_requires_auth(self, e2e_client):
"""Test that context assembly requires authentication."""
response = await e2e_client.post(
"/api/v1/context/assemble",
json={
"project_id": "test-project",
"agent_id": "test-agent",
"query": "test query",
"model": "claude-3-sonnet",
},
)
assert response.status_code in [401, 403]
async def test_cache_invalidate_requires_auth(self, e2e_client):
"""Test that cache invalidation requires authentication."""
response = await e2e_client.post("/api/v1/context/cache/invalidate")
assert response.status_code in [401, 403]
class TestAdminMCPWithAuthentication:
"""Test admin MCP operations with superuser authentication."""
async def test_superuser_can_get_context_stats(self, e2e_client, e2e_superuser):
"""Test that superuser can get context engine stats."""
response = await e2e_client.get(
"/api/v1/context/stats",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
assert "cache" in data
assert "settings" in data
@pytest.mark.skip(
reason="Requires MCP servers (llm-gateway, knowledge-base) to be running"
)
async def test_superuser_can_assemble_context(self, e2e_client, e2e_superuser):
"""Test that superuser can assemble context."""
response = await e2e_client.post(
"/api/v1/context/assemble",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"project_id": f"test-project-{uuid4().hex[:8]}",
"agent_id": f"test-agent-{uuid4().hex[:8]}",
"query": "What is the status of the project?",
"model": "claude-3-sonnet",
"system_prompt": "You are a helpful assistant.",
"compress": True,
"use_cache": False,
},
)
assert response.status_code == 200, f"Failed: {response.text}"
data = response.json()
assert "content" in data
assert "total_tokens" in data
assert "context_count" in data
assert "budget_used_percent" in data
assert "metadata" in data
async def test_superuser_can_invalidate_cache(self, e2e_client, e2e_superuser):
"""Test that superuser can invalidate cache."""
response = await e2e_client.post(
"/api/v1/context/cache/invalidate",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
params={"project_id": "test-project"},
)
assert response.status_code == 204
async def test_regular_user_cannot_access_admin_operations(self, e2e_client):
"""Test that regular (non-superuser) cannot access admin operations."""
email = f"regular-{uuid4().hex[:8]}@example.com"
password = "RegularUser123!"
# Register regular user
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Regular",
"last_name": "User",
},
)
# Login
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# Try to access admin endpoint
response = await e2e_client.get(
"/api/v1/context/stats",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
# Should be forbidden for non-superuser
assert response.status_code == 403
class TestMCPInputValidation:
"""Test input validation for MCP endpoints."""
async def test_server_name_max_length(self, e2e_client):
"""Test that server name has max length validation."""
long_name = "a" * 100 # Exceeds 64 char limit
response = await e2e_client.get(f"/api/v1/mcp/servers/{long_name}/tools")
assert response.status_code == 422
async def test_server_name_invalid_characters(self, e2e_client):
"""Test that server name rejects invalid characters."""
invalid_name = "server@name!invalid"
response = await e2e_client.get(f"/api/v1/mcp/servers/{invalid_name}/tools")
assert response.status_code == 422
async def test_token_count_empty_content(self, e2e_client):
"""Test token counting with empty content."""
response = await e2e_client.post(
"/api/v1/context/count-tokens",
json={"content": ""},
)
# Empty content is valid, should return 0 tokens
if response.status_code == 200:
data = response.json()
assert data["token_count"] == 0
else:
# Or it might be rejected as invalid
assert response.status_code == 422
class TestMCPWorkflowIntegration:
"""Test complete MCP workflows end-to-end."""
async def test_discovery_to_budget_workflow(self, e2e_client):
"""Test complete workflow: discover servers -> check budget -> ready for use."""
# 1. Discover available servers
servers_resp = await e2e_client.get("/api/v1/mcp/servers")
assert servers_resp.status_code == 200
servers = servers_resp.json()["servers"]
assert len(servers) > 0
# 2. Check context engine health
health_resp = await e2e_client.get("/api/v1/context/health")
assert health_resp.status_code == 200
health = health_resp.json()
assert health["status"] == "healthy"
# 3. Get token budget for a model
budget_resp = await e2e_client.get("/api/v1/context/budget/claude-3-sonnet")
assert budget_resp.status_code == 200
budget = budget_resp.json()
# 4. Verify system is ready for context assembly
assert budget["total_tokens"] > 0
assert health["mcp_connected"] is True
@pytest.mark.skip(
reason="Requires MCP servers (llm-gateway, knowledge-base) to be running"
)
async def test_full_context_assembly_workflow(self, e2e_client, e2e_superuser):
"""Test complete context assembly workflow with superuser."""
project_id = f"e2e-project-{uuid4().hex[:8]}"
agent_id = f"e2e-agent-{uuid4().hex[:8]}"
# 1. Check budget before assembly
budget_resp = await e2e_client.get("/api/v1/context/budget/claude-3-sonnet")
assert budget_resp.status_code == 200
_ = budget_resp.json() # Verify valid response
# 2. Count tokens in sample content
count_resp = await e2e_client.post(
"/api/v1/context/count-tokens",
json={"content": "This is a test message for context assembly."},
)
assert count_resp.status_code == 200
token_count = count_resp.json()["token_count"]
assert token_count > 0
# 3. Assemble context
assemble_resp = await e2e_client.post(
"/api/v1/context/assemble",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
json={
"project_id": project_id,
"agent_id": agent_id,
"query": "Summarize the current project status",
"model": "claude-3-sonnet",
"system_prompt": "You are a project management assistant.",
"task_description": "Generate a status report",
"conversation_history": [
{"role": "user", "content": "What's the project status?"},
{
"role": "assistant",
"content": "Let me check the current status.",
},
],
"compress": True,
"use_cache": False,
},
)
assert assemble_resp.status_code == 200
assembled = assemble_resp.json()
# 4. Verify assembly results
assert assembled["total_tokens"] > 0
assert assembled["context_count"] > 0
assert assembled["budget_used_percent"] > 0
assert assembled["budget_used_percent"] <= 100
# 5. Get stats to verify the operation was recorded
stats_resp = await e2e_client.get(
"/api/v1/context/stats",
headers={
"Authorization": f"Bearer {e2e_superuser['tokens']['access_token']}"
},
)
assert stats_resp.status_code == 200

View File

@@ -0,0 +1,684 @@
"""
Project and Agent E2E Workflow Tests.
Tests complete project management workflows with real PostgreSQL:
- Project CRUD and lifecycle management
- Agent spawning and lifecycle
- Issue management within projects
- Sprint planning and execution
Usage:
make test-e2e # Run all E2E tests
"""
from datetime import date, timedelta
from uuid import uuid4
import pytest
pytestmark = [
pytest.mark.e2e,
pytest.mark.postgres,
pytest.mark.asyncio,
]
class TestProjectCRUDWorkflows:
"""Test complete project CRUD workflows."""
async def test_create_project_workflow(self, e2e_client):
"""Test creating a project as authenticated user."""
# Register and login
email = f"project-owner-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Project",
"last_name": "Owner",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# Create project
project_slug = f"test-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"name": "E2E Test Project",
"slug": project_slug,
"description": "A project for E2E testing",
"autonomy_level": "milestone",
},
)
assert create_resp.status_code == 201, f"Failed: {create_resp.text}"
project = create_resp.json()
assert project["name"] == "E2E Test Project"
assert project["slug"] == project_slug
assert project["status"] == "active"
assert project["agent_count"] == 0
assert project["issue_count"] == 0
async def test_list_projects_only_shows_owned(self, e2e_client):
"""Test that users only see their own projects."""
# Create two users with projects
users = []
for i in range(2):
email = f"user-{i}-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": f"User{i}",
"last_name": "Test",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
# Each user creates their own project
project_slug = f"user{i}-project-{uuid4().hex[:8]}"
await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"name": f"User {i} Project",
"slug": project_slug,
},
)
users.append({"email": email, "tokens": tokens, "slug": project_slug})
# User 0 should only see their project
list_resp = await e2e_client.get(
"/api/v1/projects",
headers={"Authorization": f"Bearer {users[0]['tokens']['access_token']}"},
)
assert list_resp.status_code == 200
data = list_resp.json()
slugs = [p["slug"] for p in data["data"]]
assert users[0]["slug"] in slugs
assert users[1]["slug"] not in slugs
async def test_project_lifecycle_pause_resume(self, e2e_client):
"""Test pausing and resuming a project."""
# Setup user and project
email = f"lifecycle-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Lifecycle",
"last_name": "Test",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
project_slug = f"lifecycle-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "Lifecycle Project", "slug": project_slug},
)
project = create_resp.json()
project_id = project["id"]
# Pause the project
pause_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/pause",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert pause_resp.status_code == 200
assert pause_resp.json()["status"] == "paused"
# Resume the project
resume_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/resume",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert resume_resp.status_code == 200
assert resume_resp.json()["status"] == "active"
async def test_project_archive(self, e2e_client):
"""Test archiving a project (soft delete)."""
# Setup user and project
email = f"archive-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Archive",
"last_name": "Test",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
project_slug = f"archive-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "Archive Project", "slug": project_slug},
)
project = create_resp.json()
project_id = project["id"]
# Archive the project
archive_resp = await e2e_client.delete(
f"/api/v1/projects/{project_id}",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert archive_resp.status_code == 200
assert archive_resp.json()["success"] is True
# Verify project is archived
get_resp = await e2e_client.get(
f"/api/v1/projects/{project_id}",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert get_resp.status_code == 200
assert get_resp.json()["status"] == "archived"
class TestIssueWorkflows:
"""Test issue management workflows within projects."""
async def test_create_and_list_issues(self, e2e_client):
"""Test creating and listing issues in a project."""
# Setup user and project
email = f"issue-test-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Issue",
"last_name": "Tester",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
project_slug = f"issue-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "Issue Test Project", "slug": project_slug},
)
project = create_resp.json()
project_id = project["id"]
# Create multiple issues
issues = []
for i in range(3):
issue_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/issues",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"project_id": project_id,
"title": f"Test Issue {i + 1}",
"body": f"Description for issue {i + 1}",
"priority": ["low", "medium", "high"][i],
},
)
assert issue_resp.status_code == 201, f"Failed: {issue_resp.text}"
issues.append(issue_resp.json())
# List issues
list_resp = await e2e_client.get(
f"/api/v1/projects/{project_id}/issues",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert list_resp.status_code == 200
data = list_resp.json()
assert data["pagination"]["total"] == 3
async def test_issue_status_transitions(self, e2e_client):
"""Test issue status workflow transitions."""
# Setup user and project
email = f"status-test-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Status",
"last_name": "Tester",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
project_slug = f"status-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "Status Test Project", "slug": project_slug},
)
project = create_resp.json()
project_id = project["id"]
# Create issue
issue_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/issues",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"project_id": project_id,
"title": "Status Workflow Issue",
"body": "Testing status transitions",
},
)
issue = issue_resp.json()
issue_id = issue["id"]
assert issue["status"] == "open"
# Transition through statuses
for new_status in ["in_progress", "in_review", "closed"]:
update_resp = await e2e_client.patch(
f"/api/v1/projects/{project_id}/issues/{issue_id}",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"status": new_status},
)
assert update_resp.status_code == 200, f"Failed: {update_resp.text}"
assert update_resp.json()["status"] == new_status
async def test_issue_filtering(self, e2e_client):
"""Test issue filtering by status and priority."""
# Setup user and project
email = f"filter-test-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Filter",
"last_name": "Tester",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
project_slug = f"filter-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "Filter Test Project", "slug": project_slug},
)
project = create_resp.json()
project_id = project["id"]
# Create issues with different priorities
for priority in ["low", "medium", "high"]:
await e2e_client.post(
f"/api/v1/projects/{project_id}/issues",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"project_id": project_id,
"title": f"{priority.title()} Priority Issue",
"priority": priority,
},
)
# Filter by high priority
filter_resp = await e2e_client.get(
f"/api/v1/projects/{project_id}/issues",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
params={"priority": "high"},
)
assert filter_resp.status_code == 200
data = filter_resp.json()
assert data["pagination"]["total"] == 1
assert data["data"][0]["priority"] == "high"
class TestSprintWorkflows:
"""Test sprint planning and execution workflows."""
async def test_sprint_lifecycle(self, e2e_client):
"""Test complete sprint lifecycle: plan -> start -> complete."""
# Setup user and project
email = f"sprint-test-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Sprint",
"last_name": "Tester",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
project_slug = f"sprint-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "Sprint Test Project", "slug": project_slug},
)
project = create_resp.json()
project_id = project["id"]
# Create sprint
today = date.today()
sprint_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/sprints",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"project_id": project_id,
"name": "Sprint 1",
"number": 1,
"goal": "Complete initial features",
"start_date": today.isoformat(),
"end_date": (today + timedelta(days=14)).isoformat(),
},
)
assert sprint_resp.status_code == 201, f"Failed: {sprint_resp.text}"
sprint = sprint_resp.json()
sprint_id = sprint["id"]
assert sprint["status"] == "planned"
# Start sprint
start_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/sprints/{sprint_id}/start",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert start_resp.status_code == 200, f"Failed: {start_resp.text}"
assert start_resp.json()["status"] == "active"
# Complete sprint
complete_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/sprints/{sprint_id}/complete",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert complete_resp.status_code == 200, f"Failed: {complete_resp.text}"
assert complete_resp.json()["status"] == "completed"
async def test_add_issues_to_sprint(self, e2e_client):
"""Test adding issues to a sprint."""
# Setup user and project
email = f"sprint-issues-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "SprintIssues",
"last_name": "Tester",
},
)
login_resp = await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
tokens = login_resp.json()
project_slug = f"sprint-issues-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "Sprint Issues Project", "slug": project_slug},
)
project = create_resp.json()
project_id = project["id"]
# Create sprint
today = date.today()
sprint_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/sprints",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"project_id": project_id,
"name": "Sprint 1",
"number": 1,
"start_date": today.isoformat(),
"end_date": (today + timedelta(days=14)).isoformat(),
},
)
assert sprint_resp.status_code == 201, f"Failed: {sprint_resp.text}"
sprint = sprint_resp.json()
sprint_id = sprint["id"]
# Create issue
issue_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/issues",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"project_id": project_id,
"title": "Sprint Issue",
"story_points": 5,
},
)
issue = issue_resp.json()
issue_id = issue["id"]
# Add issue to sprint
add_resp = await e2e_client.post(
f"/api/v1/projects/{project_id}/sprints/{sprint_id}/issues",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
params={"issue_id": issue_id},
)
assert add_resp.status_code == 200, f"Failed: {add_resp.text}"
# Verify issue is in sprint
issue_check = await e2e_client.get(
f"/api/v1/projects/{project_id}/issues/{issue_id}",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert issue_check.json()["sprint_id"] == sprint_id
class TestCrossEntityValidation:
"""Test validation across related entities."""
async def test_cannot_access_other_users_project(self, e2e_client):
"""Test that users cannot access projects they don't own."""
# Create two users
owner_email = f"owner-{uuid4().hex[:8]}@example.com"
other_email = f"other-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
# Register owner
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": owner_email,
"password": password,
"first_name": "Owner",
"last_name": "User",
},
)
owner_tokens = (
await e2e_client.post(
"/api/v1/auth/login",
json={"email": owner_email, "password": password},
)
).json()
# Register other user
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": other_email,
"password": password,
"first_name": "Other",
"last_name": "User",
},
)
other_tokens = (
await e2e_client.post(
"/api/v1/auth/login",
json={"email": other_email, "password": password},
)
).json()
# Owner creates project
project_slug = f"private-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {owner_tokens['access_token']}"},
json={"name": "Private Project", "slug": project_slug},
)
project = create_resp.json()
project_id = project["id"]
# Other user tries to access
access_resp = await e2e_client.get(
f"/api/v1/projects/{project_id}",
headers={"Authorization": f"Bearer {other_tokens['access_token']}"},
)
assert access_resp.status_code == 403
async def test_duplicate_project_slug_rejected(self, e2e_client):
"""Test that duplicate project slugs are rejected."""
email = f"dup-test-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Dup",
"last_name": "Tester",
},
)
tokens = (
await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
).json()
slug = f"unique-slug-{uuid4().hex[:8]}"
# First creation should succeed
resp1 = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "First Project", "slug": slug},
)
assert resp1.status_code == 201
# Second creation with same slug should fail
resp2 = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "Second Project", "slug": slug},
)
assert resp2.status_code == 409 # Conflict
class TestIssueStats:
"""Test issue statistics endpoints."""
async def test_issue_stats_aggregation(self, e2e_client):
"""Test that issue stats are correctly aggregated."""
email = f"stats-test-{uuid4().hex[:8]}@example.com"
password = "SecurePass123!"
await e2e_client.post(
"/api/v1/auth/register",
json={
"email": email,
"password": password,
"first_name": "Stats",
"last_name": "Tester",
},
)
tokens = (
await e2e_client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
).json()
project_slug = f"stats-project-{uuid4().hex[:8]}"
create_resp = await e2e_client.post(
"/api/v1/projects",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={"name": "Stats Project", "slug": project_slug},
)
project = create_resp.json()
project_id = project["id"]
# Create issues with different priorities and story points
await e2e_client.post(
f"/api/v1/projects/{project_id}/issues",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"project_id": project_id,
"title": "High Priority",
"priority": "high",
"story_points": 8,
},
)
await e2e_client.post(
f"/api/v1/projects/{project_id}/issues",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
json={
"project_id": project_id,
"title": "Low Priority",
"priority": "low",
"story_points": 2,
},
)
# Get stats
stats_resp = await e2e_client.get(
f"/api/v1/projects/{project_id}/issues/stats",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert stats_resp.status_code == 200
stats = stats_resp.json()
assert stats["total"] == 2
assert stats["total_story_points"] == 10

View File

@@ -0,0 +1 @@
"""Integration tests that require the full stack to be running."""

View File

@@ -0,0 +1,322 @@
"""
Integration tests for MCP server connectivity.
These tests require the full stack to be running:
- docker compose -f docker-compose.dev.yml up
Run with:
pytest tests/integration/ -v --integration
Or skip with:
pytest tests/ -v --ignore=tests/integration/
"""
import os
from typing import Any
import httpx
import pytest
# Skip all tests in this module if not running integration tests
pytestmark = pytest.mark.skipif(
os.getenv("RUN_INTEGRATION_TESTS", "false").lower() != "true",
reason="Integration tests require RUN_INTEGRATION_TESTS=true and running stack",
)
# Configuration from environment
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
LLM_GATEWAY_URL = os.getenv("LLM_GATEWAY_URL", "http://localhost:8001")
KNOWLEDGE_BASE_URL = os.getenv("KNOWLEDGE_BASE_URL", "http://localhost:8002")
class TestMCPServerHealth:
"""Test that MCP servers are healthy and reachable."""
@pytest.mark.asyncio
async def test_llm_gateway_health(self) -> None:
"""Test LLM Gateway health endpoint."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{LLM_GATEWAY_URL}/health", timeout=10.0)
assert response.status_code == 200
data = response.json()
assert data.get("status") == "healthy" or data.get("healthy") is True
@pytest.mark.asyncio
async def test_knowledge_base_health(self) -> None:
"""Test Knowledge Base health endpoint."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{KNOWLEDGE_BASE_URL}/health", timeout=10.0)
assert response.status_code == 200
data = response.json()
assert data.get("status") == "healthy" or data.get("healthy") is True
@pytest.mark.asyncio
async def test_backend_health(self) -> None:
"""Test Backend health endpoint."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{BACKEND_URL}/health", timeout=10.0)
assert response.status_code == 200
class TestMCPClientManagerIntegration:
"""Test MCPClientManager can connect to real MCP servers."""
@pytest.mark.asyncio
async def test_mcp_servers_list(self) -> None:
"""Test that backend can list MCP servers via API."""
async with httpx.AsyncClient() as client:
# This endpoint lists configured MCP servers
response = await client.get(
f"{BACKEND_URL}/api/v1/mcp/servers",
timeout=10.0,
)
# Should return 200 or 401 (if auth required)
assert response.status_code in [200, 401, 403]
@pytest.mark.asyncio
async def test_mcp_health_check_endpoint(self) -> None:
"""Test backend's MCP health check endpoint."""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{BACKEND_URL}/api/v1/mcp/health",
timeout=30.0, # MCP health checks can take time
)
# Should return 200 or 401 (if auth required)
if response.status_code == 200:
data = response.json()
# Check structure
assert "servers" in data or "healthy" in data
class TestLLMGatewayIntegration:
"""Test LLM Gateway MCP server functionality."""
@pytest.mark.asyncio
async def test_list_models(self) -> None:
"""Test that LLM Gateway can list available models."""
async with httpx.AsyncClient() as client:
# MCP servers use JSON-RPC 2.0 protocol at /mcp endpoint
response = await client.post(
f"{LLM_GATEWAY_URL}/mcp",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {},
},
timeout=10.0,
)
assert response.status_code == 200
data = response.json()
# Should have tools listed
assert "result" in data or "error" in data
@pytest.mark.asyncio
async def test_count_tokens(self) -> None:
"""Test token counting functionality."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{LLM_GATEWAY_URL}/mcp",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "count_tokens",
"arguments": {
"project_id": "test-project",
"agent_id": "test-agent",
"text": "Hello, world!",
},
},
},
timeout=10.0,
)
assert response.status_code == 200
data = response.json()
# Check for result or error
if "result" in data:
assert "content" in data["result"] or "token_count" in str(
data["result"]
)
class TestKnowledgeBaseIntegration:
"""Test Knowledge Base MCP server functionality."""
@pytest.mark.asyncio
async def test_list_tools(self) -> None:
"""Test that Knowledge Base can list available tools."""
async with httpx.AsyncClient() as client:
# Knowledge Base uses GET /mcp/tools for listing
response = await client.get(
f"{KNOWLEDGE_BASE_URL}/mcp/tools",
timeout=10.0,
)
assert response.status_code == 200
data = response.json()
assert "tools" in data
@pytest.mark.asyncio
async def test_search_knowledge_empty(self) -> None:
"""Test search on empty knowledge base."""
async with httpx.AsyncClient() as client:
# Knowledge Base uses direct tool name as method
response = await client.post(
f"{KNOWLEDGE_BASE_URL}/mcp",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "search_knowledge",
"params": {
"project_id": "test-project",
"agent_id": "test-agent",
"query": "test query",
"limit": 5,
},
},
timeout=10.0,
)
assert response.status_code == 200
data = response.json()
# Should return empty results or error for no collection
assert "result" in data or "error" in data
class TestEndToEndMCPFlow:
"""End-to-end tests for MCP integration flow."""
@pytest.mark.asyncio
async def test_full_mcp_discovery_flow(self) -> None:
"""Test the full flow of discovering and listing MCP tools."""
async with httpx.AsyncClient() as client:
# 1. Check backend health
health = await client.get(f"{BACKEND_URL}/health", timeout=10.0)
assert health.status_code == 200
# 2. Check LLM Gateway health
llm_health = await client.get(f"{LLM_GATEWAY_URL}/health", timeout=10.0)
assert llm_health.status_code == 200
# 3. Check Knowledge Base health
kb_health = await client.get(f"{KNOWLEDGE_BASE_URL}/health", timeout=10.0)
assert kb_health.status_code == 200
# 4. List tools from LLM Gateway (uses JSON-RPC at /mcp)
llm_tools = await client.post(
f"{LLM_GATEWAY_URL}/mcp",
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
timeout=10.0,
)
assert llm_tools.status_code == 200
# 5. List tools from Knowledge Base (uses GET /mcp/tools)
kb_tools = await client.get(
f"{KNOWLEDGE_BASE_URL}/mcp/tools",
timeout=10.0,
)
assert kb_tools.status_code == 200
class TestContextEngineIntegration:
"""Test Context Engine integration with MCP servers."""
@pytest.mark.asyncio
async def test_context_health_endpoint(self) -> None:
"""Test context engine health endpoint."""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{BACKEND_URL}/api/v1/context/health",
timeout=10.0,
)
assert response.status_code == 200
data = response.json()
assert data.get("status") == "healthy"
assert "mcp_connected" in data
assert "cache_enabled" in data
@pytest.mark.asyncio
async def test_context_budget_endpoint(self) -> None:
"""Test token budget endpoint."""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{BACKEND_URL}/api/v1/context/budget/claude-3-sonnet",
timeout=10.0,
)
assert response.status_code == 200
data = response.json()
assert "total_tokens" in data
assert "system_tokens" in data
assert data.get("model") == "claude-3-sonnet"
@pytest.mark.asyncio
async def test_context_assembly_requires_auth(self) -> None:
"""Test that context assembly requires authentication."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{BACKEND_URL}/api/v1/context/assemble",
json={
"project_id": "test-project",
"agent_id": "test-agent",
"query": "test query",
"model": "claude-3-sonnet",
},
timeout=10.0,
)
# Should require auth
assert response.status_code in [401, 403]
def run_quick_health_check() -> dict[str, Any]:
"""
Quick synchronous health check for all services.
Can be run standalone to verify the stack is up.
"""
import httpx
results: dict[str, Any] = {
"backend": False,
"llm_gateway": False,
"knowledge_base": False,
}
try:
with httpx.Client(timeout=5.0) as client:
try:
r = client.get(f"{BACKEND_URL}/health")
results["backend"] = r.status_code == 200
except Exception:
pass
try:
r = client.get(f"{LLM_GATEWAY_URL}/health")
results["llm_gateway"] = r.status_code == 200
except Exception:
pass
try:
r = client.get(f"{KNOWLEDGE_BASE_URL}/health")
results["knowledge_base"] = r.status_code == 200
except Exception:
pass
except Exception:
pass
return results
if __name__ == "__main__":
print("Checking service health...")
results = run_quick_health_check()
for service, healthy in results.items():
status = "OK" if healthy else "FAILED"
print(f" {service}: {status}")
all_healthy = all(results.values())
if all_healthy:
print("\nAll services healthy! Run integration tests with:")
print(" RUN_INTEGRATION_TESTS=true pytest tests/integration/ -v")
else:
print("\nSome services are not healthy. Start the stack with:")
print(" make dev")

View File

@@ -48,6 +48,80 @@ services:
- app-network
restart: unless-stopped
# ==========================================================================
# MCP Servers - Model Context Protocol servers for AI agent capabilities
# ==========================================================================
mcp-llm-gateway:
# REPLACE THIS with your actual image from your container registry
image: YOUR_REGISTRY/YOUR_PROJECT_MCP_LLM_GATEWAY:latest
env_file:
- .env
environment:
- LLM_GATEWAY_HOST=0.0.0.0
- LLM_GATEWAY_PORT=8001
- REDIS_URL=redis://redis:6379/1
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ENVIRONMENT=production
depends_on:
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:8001/health').raise_for_status()"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
networks:
- app-network
restart: unless-stopped
deploy:
resources:
limits:
cpus: '2.0'
memory: 2G
reservations:
cpus: '0.5'
memory: 512M
mcp-knowledge-base:
# REPLACE THIS with your actual image from your container registry
image: YOUR_REGISTRY/YOUR_PROJECT_MCP_KNOWLEDGE_BASE:latest
env_file:
- .env
environment:
# KB_ prefix required by pydantic-settings config
- KB_HOST=0.0.0.0
- KB_PORT=8002
- KB_DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB}
- KB_REDIS_URL=redis://redis:6379/2
- KB_LLM_GATEWAY_URL=http://mcp-llm-gateway:8001
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ENVIRONMENT=production
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:8002/health').raise_for_status()"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
networks:
- app-network
restart: unless-stopped
deploy:
resources:
limits:
cpus: '1.0'
memory: 1G
reservations:
cpus: '0.25'
memory: 256M
backend:
# REPLACE THIS with your actual image from your container registry
# Examples:
@@ -64,11 +138,18 @@ services:
- DEBUG=false
- BACKEND_CORS_ORIGINS=${BACKEND_CORS_ORIGINS}
- REDIS_URL=redis://redis:6379/0
# MCP Server URLs
- LLM_GATEWAY_URL=http://mcp-llm-gateway:8001
- KNOWLEDGE_BASE_URL=http://mcp-knowledge-base:8002
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
mcp-llm-gateway:
condition: service_healthy
mcp-knowledge-base:
condition: service_healthy
networks:
- app-network
restart: unless-stopped
@@ -92,11 +173,18 @@ services:
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=redis://redis:6379/0
- CELERY_QUEUE=agent
# MCP Server URLs (agents need access to MCP)
- LLM_GATEWAY_URL=http://mcp-llm-gateway:8001
- KNOWLEDGE_BASE_URL=http://mcp-knowledge-base:8002
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
mcp-llm-gateway:
condition: service_healthy
mcp-knowledge-base:
condition: service_healthy
networks:
- app-network
restart: unless-stopped

View File

@@ -32,6 +32,70 @@ services:
networks:
- app-network
# ==========================================================================
# MCP Servers - Model Context Protocol servers for AI agent capabilities
# ==========================================================================
mcp-llm-gateway:
build:
context: ./mcp-servers/llm-gateway
dockerfile: Dockerfile
ports:
- "8001:8001"
env_file:
- .env
environment:
- LLM_GATEWAY_HOST=0.0.0.0
- LLM_GATEWAY_PORT=8001
- REDIS_URL=redis://redis:6379/1
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ENVIRONMENT=development
depends_on:
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:8001/health').raise_for_status()"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
networks:
- app-network
restart: unless-stopped
mcp-knowledge-base:
build:
context: ./mcp-servers/knowledge-base
dockerfile: Dockerfile
ports:
- "8002:8002"
env_file:
- .env
environment:
# KB_ prefix required by pydantic-settings config
- KB_HOST=0.0.0.0
- KB_PORT=8002
- KB_DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB}
- KB_REDIS_URL=redis://redis:6379/2
- KB_LLM_GATEWAY_URL=http://mcp-llm-gateway:8001
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ENVIRONMENT=development
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:8002/health').raise_for_status()"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
networks:
- app-network
restart: unless-stopped
backend:
build:
context: ./backend
@@ -52,11 +116,18 @@ services:
- DEBUG=true
- BACKEND_CORS_ORIGINS=${BACKEND_CORS_ORIGINS}
- REDIS_URL=redis://redis:6379/0
# MCP Server URLs
- LLM_GATEWAY_URL=http://mcp-llm-gateway:8001
- KNOWLEDGE_BASE_URL=http://mcp-knowledge-base:8002
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
mcp-llm-gateway:
condition: service_healthy
mcp-knowledge-base:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 10s
@@ -81,11 +152,18 @@ services:
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=redis://redis:6379/0
- CELERY_QUEUE=agent
# MCP Server URLs (agents need access to MCP)
- LLM_GATEWAY_URL=http://mcp-llm-gateway:8001
- KNOWLEDGE_BASE_URL=http://mcp-knowledge-base:8002
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
mcp-llm-gateway:
condition: service_healthy
mcp-knowledge-base:
condition: service_healthy
networks:
- app-network
command: ["celery", "-A", "app.celery_app", "worker", "-Q", "agent", "-l", "info", "-c", "4"]

View File

@@ -32,6 +32,82 @@ services:
- app-network
restart: unless-stopped
# ==========================================================================
# MCP Servers - Model Context Protocol servers for AI agent capabilities
# ==========================================================================
mcp-llm-gateway:
build:
context: ./mcp-servers/llm-gateway
dockerfile: Dockerfile
env_file:
- .env
environment:
- LLM_GATEWAY_HOST=0.0.0.0
- LLM_GATEWAY_PORT=8001
- REDIS_URL=redis://redis:6379/1
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ENVIRONMENT=production
depends_on:
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:8001/health').raise_for_status()"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
networks:
- app-network
restart: unless-stopped
deploy:
resources:
limits:
cpus: '2.0'
memory: 2G
reservations:
cpus: '0.5'
memory: 512M
mcp-knowledge-base:
build:
context: ./mcp-servers/knowledge-base
dockerfile: Dockerfile
env_file:
- .env
environment:
# KB_ prefix required by pydantic-settings config
- KB_HOST=0.0.0.0
- KB_PORT=8002
- KB_DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB}
- KB_REDIS_URL=redis://redis:6379/2
- KB_LLM_GATEWAY_URL=http://mcp-llm-gateway:8001
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ENVIRONMENT=production
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:8002/health').raise_for_status()"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
networks:
- app-network
restart: unless-stopped
deploy:
resources:
limits:
cpus: '1.0'
memory: 1G
reservations:
cpus: '0.25'
memory: 256M
backend:
build:
context: ./backend
@@ -48,11 +124,18 @@ services:
- DEBUG=false
- BACKEND_CORS_ORIGINS=${BACKEND_CORS_ORIGINS}
- REDIS_URL=redis://redis:6379/0
# MCP Server URLs
- LLM_GATEWAY_URL=http://mcp-llm-gateway:8001
- KNOWLEDGE_BASE_URL=http://mcp-knowledge-base:8002
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
mcp-llm-gateway:
condition: service_healthy
mcp-knowledge-base:
condition: service_healthy
networks:
- app-network
restart: unless-stopped
@@ -75,11 +158,18 @@ services:
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=redis://redis:6379/0
- CELERY_QUEUE=agent
# MCP Server URLs (agents need access to MCP)
- LLM_GATEWAY_URL=http://mcp-llm-gateway:8001
- KNOWLEDGE_BASE_URL=http://mcp-knowledge-base:8002
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
mcp-llm-gateway:
condition: service_healthy
mcp-knowledge-base:
condition: service_healthy
networks:
- app-network
restart: unless-stopped

View File

@@ -386,10 +386,24 @@ describe('ActivityFeed', () => {
});
it('shows event count in group header', () => {
render(<ActivityFeed {...defaultProps} />);
// Create fresh "today" events to avoid timezone/day boundary issues
const todayEvents: ProjectEvent[] = [
createMockEvent({
id: 'today-event-1',
type: EventType.APPROVAL_REQUESTED,
timestamp: new Date().toISOString(),
}),
createMockEvent({
id: 'today-event-2',
type: EventType.AGENT_MESSAGE,
timestamp: new Date().toISOString(),
}),
];
render(<ActivityFeed {...defaultProps} events={todayEvents} />);
const todayGroup = screen.getByTestId('event-group-today');
// Today has 2 events in our mock data
// Today has 2 events
expect(within(todayGroup).getByText('2')).toBeInTheDocument();
});
});

View File

@@ -0,0 +1,79 @@
.PHONY: help install install-dev lint lint-fix format type-check test test-cov validate clean run
# Default target
help:
@echo "Knowledge Base MCP Server - Development Commands"
@echo ""
@echo "Setup:"
@echo " make install - Install production dependencies"
@echo " make install-dev - Install development dependencies"
@echo ""
@echo "Quality Checks:"
@echo " make lint - Run Ruff linter"
@echo " make lint-fix - Run Ruff linter with auto-fix"
@echo " make format - Format code with Ruff"
@echo " make type-check - Run mypy type checker"
@echo ""
@echo "Testing:"
@echo " make test - Run pytest"
@echo " make test-cov - Run pytest with coverage"
@echo ""
@echo "All-in-one:"
@echo " make validate - Run lint, type-check, and tests"
@echo ""
@echo "Running:"
@echo " make run - Run the server locally"
@echo ""
@echo "Cleanup:"
@echo " make clean - Remove cache and build artifacts"
# Setup
install:
@echo "Installing production dependencies..."
@uv pip install -e .
install-dev:
@echo "Installing development dependencies..."
@uv pip install -e ".[dev]"
# Quality checks
lint:
@echo "Running Ruff linter..."
@uv run ruff check .
lint-fix:
@echo "Running Ruff linter with auto-fix..."
@uv run ruff check --fix .
format:
@echo "Formatting code..."
@uv run ruff format .
type-check:
@echo "Running mypy..."
@uv run mypy . --ignore-missing-imports
# Testing
test:
@echo "Running tests..."
@uv run pytest tests/ -v
test-cov:
@echo "Running tests with coverage..."
@uv run pytest tests/ -v --cov=. --cov-report=term-missing --cov-report=html
# All-in-one validation
validate: lint type-check test
@echo "All validations passed!"
# Running
run:
@echo "Starting Knowledge Base server..."
@uv run python server.py
# Cleanup
clean:
@echo "Cleaning up..."
@rm -rf __pycache__ .pytest_cache .mypy_cache .ruff_cache .coverage htmlcov
@find . -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true
@find . -type f -name "*.pyc" -delete 2>/dev/null || true

View File

@@ -328,7 +328,7 @@ class CollectionManager:
"source_path": chunk.source_path or source_path,
"start_line": chunk.start_line,
"end_line": chunk.end_line,
"file_type": (chunk.file_type or file_type).value if (chunk.file_type or file_type) else None,
"file_type": effective_file_type.value if (effective_file_type := chunk.file_type or file_type) else None,
}
embeddings_data.append((
chunk.content,

View File

@@ -284,41 +284,40 @@ class DatabaseManager:
)
try:
async with self.acquire() as conn:
async with self.acquire() as conn, conn.transaction():
# Wrap in transaction for all-or-nothing batch semantics
async with conn.transaction():
for project_id, collection, content, embedding, chunk_type, metadata in embeddings:
content_hash = self.compute_content_hash(content)
source_path = metadata.get("source_path")
start_line = metadata.get("start_line")
end_line = metadata.get("end_line")
file_type = metadata.get("file_type")
for project_id, collection, content, embedding, chunk_type, metadata in embeddings:
content_hash = self.compute_content_hash(content)
source_path = metadata.get("source_path")
start_line = metadata.get("start_line")
end_line = metadata.get("end_line")
file_type = metadata.get("file_type")
embedding_id = await conn.fetchval(
"""
INSERT INTO knowledge_embeddings
(project_id, collection, content, embedding, chunk_type,
source_path, start_line, end_line, file_type, metadata,
content_hash, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT DO NOTHING
RETURNING id
""",
project_id,
collection,
content,
embedding,
chunk_type.value,
source_path,
start_line,
end_line,
file_type,
metadata,
content_hash,
expires_at,
)
if embedding_id:
ids.append(str(embedding_id))
embedding_id = await conn.fetchval(
"""
INSERT INTO knowledge_embeddings
(project_id, collection, content, embedding, chunk_type,
source_path, start_line, end_line, file_type, metadata,
content_hash, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT DO NOTHING
RETURNING id
""",
project_id,
collection,
content,
embedding,
chunk_type.value,
source_path,
start_line,
end_line,
file_type,
metadata,
content_hash,
expires_at,
)
if embedding_id:
ids.append(str(embedding_id))
logger.info(f"Stored {len(ids)} embeddings in batch")
return ids
@@ -566,59 +565,58 @@ class DatabaseManager:
)
try:
async with self.acquire() as conn:
async with self.acquire() as conn, conn.transaction():
# Use transaction for atomic replace
async with conn.transaction():
# First, delete existing embeddings for this source
delete_result = await conn.execute(
# First, delete existing embeddings for this source
delete_result = await conn.execute(
"""
DELETE FROM knowledge_embeddings
WHERE project_id = $1 AND source_path = $2 AND collection = $3
""",
project_id,
source_path,
collection,
)
deleted_count = int(delete_result.split()[-1])
# Then insert new embeddings
new_ids = []
for content, embedding, chunk_type, metadata in embeddings:
content_hash = self.compute_content_hash(content)
start_line = metadata.get("start_line")
end_line = metadata.get("end_line")
file_type = metadata.get("file_type")
embedding_id = await conn.fetchval(
"""
DELETE FROM knowledge_embeddings
WHERE project_id = $1 AND source_path = $2 AND collection = $3
INSERT INTO knowledge_embeddings
(project_id, collection, content, embedding, chunk_type,
source_path, start_line, end_line, file_type, metadata,
content_hash, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING id
""",
project_id,
source_path,
collection,
content,
embedding,
chunk_type.value,
source_path,
start_line,
end_line,
file_type,
metadata,
content_hash,
expires_at,
)
deleted_count = int(delete_result.split()[-1])
if embedding_id:
new_ids.append(str(embedding_id))
# Then insert new embeddings
new_ids = []
for content, embedding, chunk_type, metadata in embeddings:
content_hash = self.compute_content_hash(content)
start_line = metadata.get("start_line")
end_line = metadata.get("end_line")
file_type = metadata.get("file_type")
embedding_id = await conn.fetchval(
"""
INSERT INTO knowledge_embeddings
(project_id, collection, content, embedding, chunk_type,
source_path, start_line, end_line, file_type, metadata,
content_hash, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING id
""",
project_id,
collection,
content,
embedding,
chunk_type.value,
source_path,
start_line,
end_line,
file_type,
metadata,
content_hash,
expires_at,
)
if embedding_id:
new_ids.append(str(embedding_id))
logger.info(
f"Replaced source {source_path}: deleted {deleted_count}, "
f"inserted {len(new_ids)} embeddings"
)
return deleted_count, new_ids
logger.info(
f"Replaced source {source_path}: deleted {deleted_count}, "
f"inserted {len(new_ids)} embeddings"
)
return deleted_count, new_ids
except asyncpg.PostgresError as e:
logger.error(f"Replace source error: {e}")

View File

@@ -193,7 +193,7 @@ async def health_check() -> dict[str, Any]:
# Check Redis cache (non-critical - degraded without it)
try:
if _embeddings and _embeddings._redis:
await _embeddings._redis.ping()
await _embeddings._redis.ping() # type: ignore[misc]
status["dependencies"]["redis"] = "connected"
else:
status["dependencies"]["redis"] = "not initialized"

View File

@@ -1,8 +1,7 @@
"""Tests for server module and MCP tools."""
import json
from datetime import UTC, datetime
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, MagicMock
import pytest
from fastapi.testclient import TestClient

View File

@@ -1,39 +1,25 @@
# Syndarix LLM Gateway MCP Server
# Multi-stage build for minimal image size
FROM python:3.12-slim
# Build stage
FROM python:3.12-slim AS builder
WORKDIR /app
# Install system dependencies (needed for tiktoken regex compilation)
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Install uv for fast package management
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv
WORKDIR /app
# Copy dependency files
# Copy project files
COPY pyproject.toml ./
COPY *.py ./
# Create virtual environment and install dependencies
RUN uv venv /app/.venv
ENV PATH="/app/.venv/bin:$PATH"
RUN uv pip install -e .
# Runtime stage
FROM python:3.12-slim AS runtime
# Install dependencies to system Python
RUN uv pip install --system --no-cache .
# Create non-root user for security
RUN groupadd --gid 1000 appgroup && \
useradd --uid 1000 --gid appgroup --shell /bin/bash --create-home appuser
WORKDIR /app
# Copy virtual environment from builder
COPY --from=builder /app/.venv /app/.venv
ENV PATH="/app/.venv/bin:$PATH"
# Copy application code
COPY --chown=appuser:appgroup . .
# Switch to non-root user
RUN useradd --create-home --shell /bin/bash appuser
USER appuser
# Environment variables
@@ -47,7 +33,7 @@ EXPOSE 8001
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8001/health').raise_for_status()" || exit 1
CMD python -c "import httpx; httpx.get('http://localhost:8001/health').raise_for_status()"
# Run the server
CMD ["python", "server.py"]

View File

@@ -0,0 +1,79 @@
.PHONY: help install install-dev lint lint-fix format type-check test test-cov validate clean run
# Default target
help:
@echo "LLM Gateway MCP Server - Development Commands"
@echo ""
@echo "Setup:"
@echo " make install - Install production dependencies"
@echo " make install-dev - Install development dependencies"
@echo ""
@echo "Quality Checks:"
@echo " make lint - Run Ruff linter"
@echo " make lint-fix - Run Ruff linter with auto-fix"
@echo " make format - Format code with Ruff"
@echo " make type-check - Run mypy type checker"
@echo ""
@echo "Testing:"
@echo " make test - Run pytest"
@echo " make test-cov - Run pytest with coverage"
@echo ""
@echo "All-in-one:"
@echo " make validate - Run lint, type-check, and tests"
@echo ""
@echo "Running:"
@echo " make run - Run the server locally"
@echo ""
@echo "Cleanup:"
@echo " make clean - Remove cache and build artifacts"
# Setup
install:
@echo "Installing production dependencies..."
@uv pip install -e .
install-dev:
@echo "Installing development dependencies..."
@uv pip install -e ".[dev]"
# Quality checks
lint:
@echo "Running Ruff linter..."
@uv run ruff check .
lint-fix:
@echo "Running Ruff linter with auto-fix..."
@uv run ruff check --fix .
format:
@echo "Formatting code..."
@uv run ruff format .
type-check:
@echo "Running mypy..."
@uv run mypy . --ignore-missing-imports
# Testing
test:
@echo "Running tests..."
@uv run pytest tests/ -v
test-cov:
@echo "Running tests with coverage..."
@uv run pytest tests/ -v --cov=. --cov-report=term-missing --cov-report=html
# All-in-one validation
validate: lint type-check test
@echo "All validations passed!"
# Running
run:
@echo "Starting LLM Gateway server..."
@uv run python server.py
# Cleanup
clean:
@echo "Cleaning up..."
@rm -rf __pycache__ .pytest_cache .mypy_cache .ruff_cache .coverage htmlcov
@find . -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true
@find . -type f -name "*.pyc" -delete 2>/dev/null || true

View File

@@ -110,14 +110,13 @@ class CircuitBreaker:
"""
if self._state == CircuitState.OPEN:
time_in_open = time.time() - self._stats.state_changed_at
if time_in_open >= self.recovery_timeout:
# Only transition if still in OPEN state (double-check)
if self._state == CircuitState.OPEN:
self._transition_to(CircuitState.HALF_OPEN)
logger.info(
f"Circuit {self.name} transitioned to HALF_OPEN "
f"after {time_in_open:.1f}s"
)
# Double-check state after time calculation (for thread safety)
if time_in_open >= self.recovery_timeout and self._state == CircuitState.OPEN:
self._transition_to(CircuitState.HALF_OPEN)
logger.info(
f"Circuit {self.name} transitioned to HALF_OPEN "
f"after {time_in_open:.1f}s"
)
def _transition_to(self, new_state: CircuitState) -> None:
"""Transition to a new state."""