Files
syndarix/mcp-servers/git-ops/server.py
Felipe Cardoso 1779239c07 feat(git-ops): add GitHub provider with auto-detection
Implements GitHub API provider following the same pattern as Gitea:
- Full PR operations (create, get, list, merge, update, close)
- Branch operations via API
- Comment and label management
- Reviewer request support
- Rate limit error handling

Server enhancements:
- Auto-detect provider from repository URL (github.com vs custom Gitea)
- Initialize GitHub provider when token is configured
- Health check includes both provider statuses
- Token selection based on repo URL for clone/push operations

Refs: #110

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 20:55:22 +01:00

1280 lines
48 KiB
Python

"""
Git Operations MCP Server.
Provides git repository management, branching, commits, and PR workflows
for Syndarix AI agents.
"""
import inspect
import logging
import re
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from datetime import UTC, datetime
from typing import Any, get_type_hints
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastmcp import FastMCP
from pydantic import Field
from config import Settings, get_settings
from exceptions import ErrorCode, GitOpsError
from git_wrapper import GitWrapper
from models import MergeStrategy, PRState
from providers import BaseProvider, GiteaProvider, GitHubProvider
from workspace import WorkspaceManager
# Input validation patterns
ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{1,128}$")
BRANCH_PATTERN = re.compile(r"^[a-zA-Z0-9_/.-]{1,256}$")
URL_PATTERN = re.compile(
r"^(https?://|git@)[a-zA-Z0-9._-]+[:/][a-zA-Z0-9._/-]+(?:\.git)?$"
)
def _validate_id(value: str, field_name: str) -> str | None:
"""Validate project_id or agent_id format."""
if not isinstance(value, str):
return f"{field_name} must be a string"
if not value:
return f"{field_name} is required"
if not ID_PATTERN.match(value):
return f"Invalid {field_name}: must be 1-128 alphanumeric characters, hyphens, or underscores"
return None
def _validate_branch(value: str) -> str | None:
"""Validate branch name format."""
if not isinstance(value, str):
return "Branch name must be a string"
if not value:
return "Branch name is required"
if not BRANCH_PATTERN.match(value):
return "Invalid branch name: must be 1-256 alphanumeric characters, hyphens, underscores, dots, or slashes"
return None
def _validate_url(value: str) -> str | None:
"""Validate repository URL format."""
if not isinstance(value, str):
return "Repository URL must be a string"
if not value:
return "Repository URL is required"
if not URL_PATTERN.match(value):
return "Invalid repository URL: must be a valid HTTPS or SSH git URL"
return None
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Global instances
_settings: Settings | None = None
_workspace_manager: WorkspaceManager | None = None
_gitea_provider: GiteaProvider | None = None
_github_provider: GitHubProvider | None = None
def _get_provider_for_url(repo_url: str) -> BaseProvider | None:
"""Get the appropriate provider for a repository URL."""
if not _settings:
return None
# Normalize URL for matching
url_lower = repo_url.lower()
# Check for GitHub URLs
if "github.com" in url_lower or (
_settings.github_api_url
and _settings.github_api_url != "https://api.github.com"
and _settings.github_api_url.replace("https://", "").replace("/api/v3", "") in url_lower
):
return _github_provider
# Check if it's a Gitea URL
if _settings.gitea_base_url and _settings.gitea_base_url.lower() in url_lower:
return _gitea_provider
# Default: try to detect from URL pattern
# If URL contains 'github' anywhere, use GitHub
if "github" in url_lower:
return _github_provider
# Default to Gitea for self-hosted instances
return _gitea_provider
def _get_auth_token_for_url(repo_url: str) -> str | None:
"""Get the appropriate auth token for a repository URL."""
if not _settings:
return None
url_lower = repo_url.lower()
# GitHub token
if "github.com" in url_lower or "github" in url_lower:
return _settings.github_token if _settings.github_token else None
# Gitea token (default)
return _settings.gitea_token if _settings.gitea_token else None
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
"""Application lifespan handler."""
global _settings, _workspace_manager, _gitea_provider, _github_provider
logger.info("Starting Git Operations MCP Server...")
# Load settings
_settings = get_settings()
# Initialize workspace manager
_workspace_manager = WorkspaceManager(_settings)
# Initialize providers
if _settings.gitea_base_url and _settings.gitea_token:
_gitea_provider = GiteaProvider(
base_url=_settings.gitea_base_url,
token=_settings.gitea_token,
settings=_settings,
)
logger.info(f"Gitea provider initialized: {_settings.gitea_base_url}")
if _settings.github_token:
_github_provider = GitHubProvider(
token=_settings.github_token,
settings=_settings,
)
logger.info("GitHub provider initialized")
logger.info("Git Operations MCP Server started successfully")
yield
# Cleanup
logger.info("Shutting down Git Operations MCP Server...")
if _gitea_provider:
await _gitea_provider.close()
if _github_provider:
await _github_provider.close()
logger.info("Git Operations MCP Server shut down")
# Create FastMCP server
mcp = FastMCP("syndarix-git-ops")
# Create FastAPI app with lifespan
app = FastAPI(
title="Git Operations MCP Server",
description="Repository management, branching, commits, and PR workflows",
version="0.1.0",
lifespan=lifespan,
)
@app.get("/health")
async def health_check() -> dict[str, Any]:
"""Health check endpoint."""
status: dict[str, Any] = {
"status": "healthy",
"service": "git-ops",
"version": "0.1.0",
"timestamp": datetime.now(UTC).isoformat(),
"dependencies": {},
}
is_degraded = False
# Check Gitea connectivity
if _gitea_provider:
try:
if await _gitea_provider.is_connected():
user = await _gitea_provider.get_authenticated_user()
status["dependencies"]["gitea"] = f"connected as {user}"
else:
status["dependencies"]["gitea"] = "not connected"
is_degraded = True
except Exception as e:
status["dependencies"]["gitea"] = f"error: {e}"
is_degraded = True
else:
status["dependencies"]["gitea"] = "not configured"
# Check GitHub connectivity
if _github_provider:
try:
if await _github_provider.is_connected():
user = await _github_provider.get_authenticated_user()
status["dependencies"]["github"] = f"connected as {user}"
else:
status["dependencies"]["github"] = "not connected"
is_degraded = True
except Exception as e:
status["dependencies"]["github"] = f"error: {e}"
is_degraded = True
else:
status["dependencies"]["github"] = "not configured"
# Check workspace directory
if _workspace_manager:
try:
workspaces = await _workspace_manager.list_workspaces()
status["dependencies"]["workspace"] = {
"path": str(_workspace_manager.base_path),
"active_workspaces": len(workspaces),
}
except Exception as e:
status["dependencies"]["workspace"] = f"error: {e}"
is_degraded = True
else:
status["dependencies"]["workspace"] = "not initialized"
if is_degraded:
status["status"] = "degraded"
return status
# Tool registry for JSON-RPC
_tool_registry: dict[str, Any] = {}
def _python_type_to_json_schema(python_type: Any) -> dict[str, Any]:
"""Convert Python type annotation to JSON Schema."""
type_name = getattr(python_type, "__name__", str(python_type))
if python_type is str or type_name == "str":
return {"type": "string"}
elif python_type is int or type_name == "int":
return {"type": "integer"}
elif python_type is float or type_name == "float":
return {"type": "number"}
elif python_type is bool or type_name == "bool":
return {"type": "boolean"}
elif type_name == "NoneType":
return {"type": "null"}
elif hasattr(python_type, "__origin__"):
origin = python_type.__origin__
args = getattr(python_type, "__args__", ())
if origin is list:
item_type = args[0] if args else Any
return {"type": "array", "items": _python_type_to_json_schema(item_type)}
elif origin is dict:
return {"type": "object"}
elif origin is type(None) or str(origin) == "typing.Union":
non_none_args = [a for a in args if a is not type(None)]
if len(non_none_args) == 1:
schema = _python_type_to_json_schema(non_none_args[0])
schema["nullable"] = True
return schema
return {"type": "object"}
return {"type": "object"}
def _get_tool_schema(func: Any) -> dict[str, Any]:
"""Extract JSON Schema from a tool function."""
sig = inspect.signature(func)
hints = get_type_hints(func) if hasattr(func, "__annotations__") else {}
properties: dict[str, Any] = {}
required: list[str] = []
for name, param in sig.parameters.items():
if name in ("self", "cls"):
continue
prop: dict[str, Any] = {}
if name in hints:
prop = _python_type_to_json_schema(hints[name])
default_val = param.default
if hasattr(default_val, "description") and default_val.description:
prop["description"] = default_val.description
if hasattr(default_val, "ge") and default_val.ge is not None:
prop["minimum"] = default_val.ge
if hasattr(default_val, "le") and default_val.le is not None:
prop["maximum"] = default_val.le
if hasattr(default_val, "default"):
field_default = default_val.default
if field_default is not ... and not (
hasattr(field_default, "__class__")
and "PydanticUndefined" in field_default.__class__.__name__
):
prop["default"] = field_default
if param.default is inspect.Parameter.empty:
required.append(name)
elif hasattr(default_val, "default"):
field_default = default_val.default
if field_default is ... or (
hasattr(field_default, "__class__")
and "PydanticUndefined" in field_default.__class__.__name__
):
required.append(name)
properties[name] = prop
return {
"type": "object",
"properties": properties,
"required": required,
}
def _register_tool(
name: str, tool_or_func: Any, description: str | None = None
) -> None:
"""Register a tool in the registry."""
if hasattr(tool_or_func, "fn"):
func = tool_or_func.fn
if (
not description
and hasattr(tool_or_func, "description")
and tool_or_func.description
):
description = tool_or_func.description
else:
func = tool_or_func
_tool_registry[name] = {
"func": func,
"description": description or (func.__doc__ or "").strip(),
"schema": _get_tool_schema(func),
}
@app.get("/mcp/tools")
async def list_mcp_tools() -> dict[str, Any]:
"""Return list of available MCP tools with their schemas."""
tools = []
for name, info in _tool_registry.items():
tools.append(
{
"name": name,
"description": info["description"],
"inputSchema": info["schema"],
}
)
return {"tools": tools}
@app.post("/mcp")
async def mcp_rpc(request: Request) -> JSONResponse:
"""JSON-RPC 2.0 endpoint for MCP tool execution."""
try:
body = await request.json()
except Exception as e:
return JSONResponse(
status_code=400,
content={
"jsonrpc": "2.0",
"error": {"code": -32700, "message": f"Parse error: {e}"},
"id": None,
},
)
jsonrpc = body.get("jsonrpc")
method = body.get("method")
params = body.get("params", {})
request_id = body.get("id")
if jsonrpc != "2.0":
return JSONResponse(
status_code=400,
content={
"jsonrpc": "2.0",
"error": {
"code": -32600,
"message": "Invalid Request: jsonrpc must be '2.0'",
},
"id": request_id,
},
)
if not method:
return JSONResponse(
status_code=400,
content={
"jsonrpc": "2.0",
"error": {
"code": -32600,
"message": "Invalid Request: method is required",
},
"id": request_id,
},
)
tool_info = _tool_registry.get(method)
if not tool_info:
return JSONResponse(
status_code=404,
content={
"jsonrpc": "2.0",
"error": {"code": -32601, "message": f"Method not found: {method}"},
"id": request_id,
},
)
try:
func = tool_info["func"]
sig = inspect.signature(func)
resolved_params = dict(params)
for name, param in sig.parameters.items():
if name not in resolved_params:
default_val = param.default
if hasattr(default_val, "default"):
field_default = default_val.default
if field_default is not ... and not (
hasattr(field_default, "__class__")
and "PydanticUndefined" in field_default.__class__.__name__
):
resolved_params[name] = field_default
result = await func(**resolved_params)
return JSONResponse(
content={
"jsonrpc": "2.0",
"result": result,
"id": request_id,
}
)
except TypeError as e:
return JSONResponse(
status_code=400,
content={
"jsonrpc": "2.0",
"error": {"code": -32602, "message": f"Invalid params: {e}"},
"id": request_id,
},
)
except Exception as e:
logger.error(f"Tool execution error: {e}")
return JSONResponse(
status_code=500,
content={
"jsonrpc": "2.0",
"error": {"code": -32000, "message": f"Server error: {e}"},
"id": request_id,
},
)
# MCP Tools - Repository Operations
@mcp.tool()
async def clone_repository(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
repo_url: str = Field(..., description="Repository URL to clone"),
branch: str | None = Field(default=None, description="Branch to checkout after clone"),
depth: int | None = Field(default=None, ge=1, description="Shallow clone depth"),
) -> dict[str, Any]:
"""
Clone a repository into a project workspace.
Creates an isolated workspace for the project and clones the repository.
"""
try:
# Validate inputs
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_url(repo_url):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
# Create workspace
workspace = await _workspace_manager.create_workspace(project_id, repo_url) # type: ignore[union-attr]
# Get auth token appropriate for the repository URL
auth_token = _get_auth_token_for_url(repo_url)
# Clone repository
git = GitWrapper(workspace.path, _settings)
result = await git.clone(repo_url, branch=branch, depth=depth, auth_token=auth_token)
# Update workspace metadata
await _workspace_manager.update_workspace_branch(project_id, result.branch) # type: ignore[union-attr]
return {
"success": True,
"project_id": project_id,
"workspace_path": result.workspace_path,
"branch": result.branch,
"commit_sha": result.commit_sha,
}
except GitOpsError as e:
logger.error(f"Clone error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected clone error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def git_status(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
include_untracked: bool = Field(default=True, description="Include untracked files"),
) -> dict[str, Any]:
"""
Get git status for a project workspace.
Returns current branch, staged/unstaged changes, and untracked files.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
git = GitWrapper(workspace.path, _settings)
result = await git.status(include_untracked=include_untracked)
return {
"success": True,
"project_id": project_id,
"branch": result.branch,
"commit_sha": result.commit_sha,
"is_clean": result.is_clean,
"staged": result.staged,
"unstaged": result.unstaged,
"untracked": result.untracked,
"ahead": result.ahead,
"behind": result.behind,
}
except GitOpsError as e:
logger.error(f"Status error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected status error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
# MCP Tools - Branch Operations
@mcp.tool()
async def create_branch(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
branch_name: str = Field(..., description="Name for the new branch"),
from_ref: str | None = Field(default=None, description="Reference to create from (default: HEAD)"),
checkout: bool = Field(default=True, description="Checkout after creation"),
) -> dict[str, Any]:
"""
Create a new branch in the project workspace.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_branch(branch_name):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
git = GitWrapper(workspace.path, _settings)
result = await git.create_branch(branch_name, from_ref=from_ref, checkout=checkout)
if checkout:
await _workspace_manager.update_workspace_branch(project_id, branch_name) # type: ignore[union-attr]
return {
"success": True,
"branch": result.branch,
"commit_sha": result.commit_sha,
"is_current": result.is_current,
}
except GitOpsError as e:
logger.error(f"Create branch error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected create branch error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def list_branches(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
include_remote: bool = Field(default=False, description="Include remote branches"),
) -> dict[str, Any]:
"""
List branches in the project workspace.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
git = GitWrapper(workspace.path, _settings)
result = await git.list_branches(include_remote=include_remote)
return {
"success": True,
"project_id": project_id,
"current_branch": result.current_branch,
"local_branches": result.local_branches,
"remote_branches": result.remote_branches,
}
except GitOpsError as e:
logger.error(f"List branches error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected list branches error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def checkout(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
ref: str = Field(..., description="Branch, tag, or commit to checkout"),
create_branch: bool = Field(default=False, description="Create new branch"),
force: bool = Field(default=False, description="Force checkout (discard changes)"),
) -> dict[str, Any]:
"""
Checkout a branch, tag, or commit in the project workspace.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
git = GitWrapper(workspace.path, _settings)
result = await git.checkout(ref, create_branch=create_branch, force=force)
await _workspace_manager.update_workspace_branch(project_id, ref) # type: ignore[union-attr]
return {
"success": True,
"ref": result.ref,
"commit_sha": result.commit_sha,
}
except GitOpsError as e:
logger.error(f"Checkout error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected checkout error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
# MCP Tools - Commit Operations
@mcp.tool()
async def commit(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
message: str = Field(..., description="Commit message"),
files: list[str] | None = Field(default=None, description="Specific files to commit (None = all staged)"),
author_name: str | None = Field(default=None, description="Author name override"),
author_email: str | None = Field(default=None, description="Author email override"),
) -> dict[str, Any]:
"""
Create a commit in the project workspace.
Stages all changes by default, or specific files if provided.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
git = GitWrapper(workspace.path, _settings)
result = await git.commit(
message=message,
files=files,
author_name=author_name,
author_email=author_email,
)
return {
"success": True,
"commit_sha": result.commit_sha,
"short_sha": result.short_sha,
"message": result.message,
"files_changed": result.files_changed,
"insertions": result.insertions,
"deletions": result.deletions,
}
except GitOpsError as e:
logger.error(f"Commit error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected commit error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def push(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
branch: str | None = Field(default=None, description="Branch to push (None = current)"),
remote: str = Field(default="origin", description="Remote name"),
force: bool = Field(default=False, description="Force push"),
set_upstream: bool = Field(default=True, description="Set upstream tracking"),
) -> dict[str, Any]:
"""
Push commits to remote repository.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
# Get auth token appropriate for the repository URL
auth_token = _get_auth_token_for_url(workspace.repo_url or "")
git = GitWrapper(workspace.path, _settings)
result = await git.push(
branch=branch,
remote=remote,
force=force,
set_upstream=set_upstream,
auth_token=auth_token,
)
return {
"success": True,
"branch": result.branch,
"remote": result.remote,
"commits_pushed": result.commits_pushed,
}
except GitOpsError as e:
logger.error(f"Push error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected push error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def pull(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
branch: str | None = Field(default=None, description="Branch to pull (None = current)"),
remote: str = Field(default="origin", description="Remote name"),
rebase: bool = Field(default=False, description="Rebase instead of merge"),
) -> dict[str, Any]:
"""
Pull changes from remote repository.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
git = GitWrapper(workspace.path, _settings)
result = await git.pull(branch=branch, remote=remote, rebase=rebase)
return {
"success": True,
"branch": result.branch,
"commits_received": result.commits_received,
"fast_forward": result.fast_forward,
"conflicts": result.conflicts,
}
except GitOpsError as e:
logger.error(f"Pull error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected pull error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
# MCP Tools - Diff and Log
@mcp.tool()
async def diff(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
base: str | None = Field(default=None, description="Base reference"),
head: str | None = Field(default=None, description="Head reference"),
files: list[str] | None = Field(default=None, description="Specific files to diff"),
context_lines: int = Field(default=3, ge=0, description="Context lines"),
) -> dict[str, Any]:
"""
Get diff between references or working tree.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
git = GitWrapper(workspace.path, _settings)
result = await git.diff(base=base, head=head, files=files, context_lines=context_lines)
return {
"success": True,
"project_id": project_id,
"base": result.base,
"head": result.head,
"files": result.files,
"total_additions": result.total_additions,
"total_deletions": result.total_deletions,
"files_changed": result.files_changed,
}
except GitOpsError as e:
logger.error(f"Diff error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected diff error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def log(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
ref: str | None = Field(default=None, description="Reference to start from"),
limit: int = Field(default=20, ge=1, le=100, description="Max commits"),
skip: int = Field(default=0, ge=0, description="Commits to skip"),
path: str | None = Field(default=None, description="Filter by path"),
) -> dict[str, Any]:
"""
Get commit history.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
git = GitWrapper(workspace.path, _settings)
result = await git.log(ref=ref, limit=limit, skip=skip, path=path)
return {
"success": True,
"project_id": project_id,
"commits": result.commits,
"total_commits": result.total_commits,
}
except GitOpsError as e:
logger.error(f"Log error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected log error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
# MCP Tools - Pull Request Operations
@mcp.tool()
async def create_pull_request(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
title: str = Field(..., description="PR title"),
source_branch: str = Field(..., description="Source branch"),
body: str = Field(default="", description="PR description"),
target_branch: str = Field(default="main", description="Target branch"),
draft: bool = Field(default=False, description="Create as draft"),
labels: list[str] | None = Field(default=None, description="Labels to add"),
assignees: list[str] | None = Field(default=None, description="Users to assign"),
reviewers: list[str] | None = Field(default=None, description="Users to request review from"),
) -> dict[str, Any]:
"""
Create a pull request on the remote provider.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
if not workspace.repo_url:
return {"success": False, "error": "Workspace has no repository URL", "code": ErrorCode.INVALID_REQUEST.value}
provider = _get_provider_for_url(workspace.repo_url)
if not provider:
return {"success": False, "error": "No provider configured for this repository", "code": ErrorCode.PROVIDER_NOT_FOUND.value}
owner, repo = provider.parse_repo_url(workspace.repo_url)
result = await provider.create_pr(
owner=owner,
repo=repo,
title=title,
body=body,
source_branch=source_branch,
target_branch=target_branch,
draft=draft,
labels=labels,
assignees=assignees,
reviewers=reviewers,
)
return {
"success": result.success,
"pr_number": result.pr_number,
"pr_url": result.pr_url,
"error": result.error,
}
except GitOpsError as e:
logger.error(f"Create PR error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected create PR error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def get_pull_request(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
pr_number: int = Field(..., description="Pull request number"),
) -> dict[str, Any]:
"""
Get pull request details.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
if not workspace.repo_url:
return {"success": False, "error": "Workspace has no repository URL", "code": ErrorCode.INVALID_REQUEST.value}
provider = _get_provider_for_url(workspace.repo_url)
if not provider:
return {"success": False, "error": "No provider configured for this repository", "code": ErrorCode.PROVIDER_NOT_FOUND.value}
owner, repo = provider.parse_repo_url(workspace.repo_url)
result = await provider.get_pr(owner, repo, pr_number)
return {
"success": result.success,
"pr": result.pr,
"error": result.error,
}
except GitOpsError as e:
logger.error(f"Get PR error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected get PR error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def list_pull_requests(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
state: str | None = Field(default=None, description="Filter by state: open, closed, merged"),
author: str | None = Field(default=None, description="Filter by author"),
limit: int = Field(default=20, ge=1, le=100, description="Max PRs"),
) -> dict[str, Any]:
"""
List pull requests for the repository.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
if not workspace.repo_url:
return {"success": False, "error": "Workspace has no repository URL", "code": ErrorCode.INVALID_REQUEST.value}
provider = _get_provider_for_url(workspace.repo_url)
if not provider:
return {"success": False, "error": "No provider configured for this repository", "code": ErrorCode.PROVIDER_NOT_FOUND.value}
# Parse state
pr_state = None
if state:
try:
pr_state = PRState(state.lower())
except ValueError:
return {"success": False, "error": f"Invalid state: {state}. Valid: open, closed, merged", "code": ErrorCode.INVALID_REQUEST.value}
owner, repo = provider.parse_repo_url(workspace.repo_url)
result = await provider.list_prs(owner, repo, state=pr_state, author=author, limit=limit)
return {
"success": result.success,
"pull_requests": result.pull_requests,
"total_count": result.total_count,
"error": result.error,
}
except GitOpsError as e:
logger.error(f"List PRs error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected list PRs error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def merge_pull_request(
project_id: str = Field(..., description="Project ID for scoping"),
agent_id: str = Field(..., description="Agent ID making the request"),
pr_number: int = Field(..., description="Pull request number"),
merge_strategy: str = Field(default="merge", description="Strategy: merge, squash, rebase"),
commit_message: str | None = Field(default=None, description="Custom commit message"),
delete_branch: bool = Field(default=True, description="Delete source branch after merge"),
) -> dict[str, Any]:
"""
Merge a pull request.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
if not workspace.repo_url:
return {"success": False, "error": "Workspace has no repository URL", "code": ErrorCode.INVALID_REQUEST.value}
provider = _get_provider_for_url(workspace.repo_url)
if not provider:
return {"success": False, "error": "No provider configured for this repository", "code": ErrorCode.PROVIDER_NOT_FOUND.value}
# Parse merge strategy
try:
strategy = MergeStrategy(merge_strategy.lower())
except ValueError:
return {"success": False, "error": f"Invalid strategy: {merge_strategy}. Valid: merge, squash, rebase", "code": ErrorCode.INVALID_REQUEST.value}
owner, repo = provider.parse_repo_url(workspace.repo_url)
result = await provider.merge_pr(
owner, repo, pr_number,
merge_strategy=strategy,
commit_message=commit_message,
delete_branch=delete_branch,
)
return {
"success": result.success,
"merge_commit_sha": result.merge_commit_sha,
"branch_deleted": result.branch_deleted,
"error": result.error,
}
except GitOpsError as e:
logger.error(f"Merge PR error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected merge PR error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
# MCP Tools - Workspace Operations
@mcp.tool()
async def get_workspace(
project_id: str = Field(..., description="Project ID"),
agent_id: str = Field(..., description="Agent ID making the request"),
) -> dict[str, Any]:
"""
Get workspace information for a project.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
if not workspace:
return {"success": False, "error": f"Workspace not found for project: {project_id}", "code": ErrorCode.WORKSPACE_NOT_FOUND.value}
return {
"success": True,
"workspace": workspace.to_dict(),
}
except GitOpsError as e:
logger.error(f"Get workspace error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected get workspace error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def lock_workspace(
project_id: str = Field(..., description="Project ID"),
agent_id: str = Field(..., description="Agent ID requesting lock"),
timeout: int = Field(default=300, ge=10, le=3600, description="Lock timeout in seconds"),
) -> dict[str, Any]:
"""
Acquire a lock on a workspace.
Prevents other agents from making changes during critical operations.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
success = await _workspace_manager.lock_workspace(project_id, agent_id, timeout) # type: ignore[union-attr]
workspace = await _workspace_manager.get_workspace(project_id) # type: ignore[union-attr]
return {
"success": success,
"lock_holder": workspace.lock_holder if workspace else None,
"lock_expires": workspace.lock_expires.isoformat() if workspace and workspace.lock_expires else None,
}
except GitOpsError as e:
logger.error(f"Lock workspace error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected lock workspace error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
@mcp.tool()
async def unlock_workspace(
project_id: str = Field(..., description="Project ID"),
agent_id: str = Field(..., description="Agent ID releasing lock"),
force: bool = Field(default=False, description="Force unlock (admin only)"),
) -> dict[str, Any]:
"""
Release a lock on a workspace.
"""
try:
if error := _validate_id(project_id, "project_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
if error := _validate_id(agent_id, "agent_id"):
return {"success": False, "error": error, "code": ErrorCode.INVALID_REQUEST.value}
success = await _workspace_manager.unlock_workspace(project_id, agent_id, force) # type: ignore[union-attr]
return {"success": success}
except GitOpsError as e:
logger.error(f"Unlock workspace error: {e}")
return {"success": False, "error": e.message, "code": e.code.value}
except Exception as e:
logger.error(f"Unexpected unlock workspace error: {e}")
return {"success": False, "error": str(e), "code": ErrorCode.INTERNAL_ERROR.value}
# Register all tools
_register_tool("clone_repository", clone_repository)
_register_tool("git_status", git_status)
_register_tool("create_branch", create_branch)
_register_tool("list_branches", list_branches)
_register_tool("checkout", checkout)
_register_tool("commit", commit)
_register_tool("push", push)
_register_tool("pull", pull)
_register_tool("diff", diff)
_register_tool("log", log)
_register_tool("create_pull_request", create_pull_request)
_register_tool("get_pull_request", get_pull_request)
_register_tool("list_pull_requests", list_pull_requests)
_register_tool("merge_pull_request", merge_pull_request)
_register_tool("get_workspace", get_workspace)
_register_tool("lock_workspace", lock_workspace)
_register_tool("unlock_workspace", unlock_workspace)
def main() -> None:
"""Run the server."""
import uvicorn
settings = get_settings()
uvicorn.run(
"server:app",
host=settings.host,
port=settings.port,
reload=settings.debug,
log_level="info",
)
if __name__ == "__main__":
main()