feat(mcp): implement Git Operations MCP server with Gitea provider
Implements the Git Operations MCP server (Issue #58) providing: Core features: - GitPython wrapper for local repository operations (clone, commit, push, pull, diff, log) - Branch management (create, delete, list, checkout) - Workspace isolation per project with file-based locking - Gitea provider for remote PR operations MCP Tools (17 registered): - clone_repository, git_status, create_branch, list_branches - checkout, commit, push, pull, diff, log - create_pull_request, get_pull_request, list_pull_requests - merge_pull_request, get_workspace, lock_workspace, unlock_workspace Technical details: - FastMCP + FastAPI with JSON-RPC 2.0 protocol - pydantic-settings for configuration (env prefix: GIT_OPS_) - Comprehensive error hierarchy with structured codes - 131 tests passing with 67% coverage - Async operations via ThreadPoolExecutor Closes: #105, #106, #107, #108, #109 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
334
mcp-servers/git-ops/tests/test_workspace.py
Normal file
334
mcp-servers/git-ops/tests/test_workspace.py
Normal file
@@ -0,0 +1,334 @@
|
||||
"""
|
||||
Tests for the workspace management module.
|
||||
"""
|
||||
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from exceptions import WorkspaceLockedError, WorkspaceNotFoundError
|
||||
from models import WorkspaceState
|
||||
from workspace import FileLockManager, WorkspaceLock
|
||||
|
||||
|
||||
class TestWorkspaceManager:
|
||||
"""Tests for WorkspaceManager."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_workspace(self, workspace_manager, valid_project_id):
|
||||
"""Test creating a new workspace."""
|
||||
workspace = await workspace_manager.create_workspace(valid_project_id)
|
||||
|
||||
assert workspace.project_id == valid_project_id
|
||||
assert workspace.state == WorkspaceState.INITIALIZING
|
||||
assert Path(workspace.path).exists()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_workspace_with_repo_url(self, workspace_manager, valid_project_id, sample_repo_url):
|
||||
"""Test creating workspace with repository URL."""
|
||||
workspace = await workspace_manager.create_workspace(
|
||||
valid_project_id, repo_url=sample_repo_url
|
||||
)
|
||||
|
||||
assert workspace.repo_url == sample_repo_url
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_workspace(self, workspace_manager, valid_project_id):
|
||||
"""Test getting an existing workspace."""
|
||||
# Create first
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
|
||||
# Get it
|
||||
workspace = await workspace_manager.get_workspace(valid_project_id)
|
||||
|
||||
assert workspace is not None
|
||||
assert workspace.project_id == valid_project_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_workspace_not_found(self, workspace_manager):
|
||||
"""Test getting non-existent workspace."""
|
||||
workspace = await workspace_manager.get_workspace("nonexistent")
|
||||
assert workspace is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_workspace(self, workspace_manager, valid_project_id):
|
||||
"""Test deleting a workspace."""
|
||||
# Create first
|
||||
workspace = await workspace_manager.create_workspace(valid_project_id)
|
||||
workspace_path = Path(workspace.path)
|
||||
assert workspace_path.exists()
|
||||
|
||||
# Delete
|
||||
result = await workspace_manager.delete_workspace(valid_project_id)
|
||||
|
||||
assert result is True
|
||||
assert not workspace_path.exists()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_nonexistent_workspace(self, workspace_manager):
|
||||
"""Test deleting non-existent workspace returns True."""
|
||||
result = await workspace_manager.delete_workspace("nonexistent")
|
||||
assert result is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_workspaces(self, workspace_manager):
|
||||
"""Test listing workspaces."""
|
||||
# Create multiple workspaces
|
||||
await workspace_manager.create_workspace("project-1")
|
||||
await workspace_manager.create_workspace("project-2")
|
||||
await workspace_manager.create_workspace("project-3")
|
||||
|
||||
workspaces = await workspace_manager.list_workspaces()
|
||||
|
||||
assert len(workspaces) >= 3
|
||||
project_ids = [w.project_id for w in workspaces]
|
||||
assert "project-1" in project_ids
|
||||
assert "project-2" in project_ids
|
||||
assert "project-3" in project_ids
|
||||
|
||||
|
||||
class TestWorkspaceLocking:
|
||||
"""Tests for workspace locking."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lock_workspace(self, workspace_manager, valid_project_id, valid_agent_id):
|
||||
"""Test locking a workspace."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
|
||||
result = await workspace_manager.lock_workspace(
|
||||
valid_project_id, valid_agent_id, timeout=60
|
||||
)
|
||||
|
||||
assert result is True
|
||||
|
||||
workspace = await workspace_manager.get_workspace(valid_project_id)
|
||||
assert workspace.state == WorkspaceState.LOCKED
|
||||
assert workspace.lock_holder == valid_agent_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lock_already_locked(self, workspace_manager, valid_project_id):
|
||||
"""Test locking already-locked workspace by different holder."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
await workspace_manager.lock_workspace(valid_project_id, "agent-1", timeout=60)
|
||||
|
||||
with pytest.raises(WorkspaceLockedError):
|
||||
await workspace_manager.lock_workspace(valid_project_id, "agent-2", timeout=60)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lock_same_holder(self, workspace_manager, valid_project_id, valid_agent_id):
|
||||
"""Test re-locking by same holder extends lock."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id, timeout=60)
|
||||
|
||||
# Same holder can re-lock
|
||||
result = await workspace_manager.lock_workspace(
|
||||
valid_project_id, valid_agent_id, timeout=120
|
||||
)
|
||||
|
||||
assert result is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unlock_workspace(self, workspace_manager, valid_project_id, valid_agent_id):
|
||||
"""Test unlocking a workspace."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id)
|
||||
|
||||
result = await workspace_manager.unlock_workspace(valid_project_id, valid_agent_id)
|
||||
|
||||
assert result is True
|
||||
workspace = await workspace_manager.get_workspace(valid_project_id)
|
||||
assert workspace.state == WorkspaceState.READY
|
||||
assert workspace.lock_holder is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unlock_wrong_holder(self, workspace_manager, valid_project_id):
|
||||
"""Test unlock fails with wrong holder."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
await workspace_manager.lock_workspace(valid_project_id, "agent-1")
|
||||
|
||||
with pytest.raises(WorkspaceLockedError):
|
||||
await workspace_manager.unlock_workspace(valid_project_id, "agent-2")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_force_unlock(self, workspace_manager, valid_project_id):
|
||||
"""Test force unlock works regardless of holder."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
await workspace_manager.lock_workspace(valid_project_id, "agent-1")
|
||||
|
||||
result = await workspace_manager.unlock_workspace(
|
||||
valid_project_id, "admin", force=True
|
||||
)
|
||||
|
||||
assert result is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lock_nonexistent_workspace(self, workspace_manager, valid_agent_id):
|
||||
"""Test locking non-existent workspace raises error."""
|
||||
with pytest.raises(WorkspaceNotFoundError):
|
||||
await workspace_manager.lock_workspace("nonexistent", valid_agent_id)
|
||||
|
||||
|
||||
class TestWorkspaceLockContextManager:
|
||||
"""Tests for WorkspaceLock context manager."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lock_context_manager(self, workspace_manager, valid_project_id, valid_agent_id):
|
||||
"""Test using WorkspaceLock as context manager."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
|
||||
async with WorkspaceLock(
|
||||
workspace_manager, valid_project_id, valid_agent_id
|
||||
) as lock:
|
||||
workspace = await workspace_manager.get_workspace(valid_project_id)
|
||||
assert workspace.state == WorkspaceState.LOCKED
|
||||
|
||||
# After exiting context, should be unlocked
|
||||
workspace = await workspace_manager.get_workspace(valid_project_id)
|
||||
assert workspace.lock_holder is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lock_context_manager_error(self, workspace_manager, valid_project_id, valid_agent_id):
|
||||
"""Test WorkspaceLock releases on exception."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
|
||||
try:
|
||||
async with WorkspaceLock(
|
||||
workspace_manager, valid_project_id, valid_agent_id
|
||||
):
|
||||
raise ValueError("Test error")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
workspace = await workspace_manager.get_workspace(valid_project_id)
|
||||
assert workspace.lock_holder is None
|
||||
|
||||
|
||||
class TestWorkspaceMetadata:
|
||||
"""Tests for workspace metadata operations."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_touch_workspace(self, workspace_manager, valid_project_id):
|
||||
"""Test updating workspace access time."""
|
||||
workspace = await workspace_manager.create_workspace(valid_project_id)
|
||||
original_time = workspace.last_accessed
|
||||
|
||||
await workspace_manager.touch_workspace(valid_project_id)
|
||||
|
||||
updated = await workspace_manager.get_workspace(valid_project_id)
|
||||
assert updated.last_accessed >= original_time
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_workspace_branch(self, workspace_manager, valid_project_id):
|
||||
"""Test updating workspace branch."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
|
||||
await workspace_manager.update_workspace_branch(valid_project_id, "feature-branch")
|
||||
|
||||
workspace = await workspace_manager.get_workspace(valid_project_id)
|
||||
assert workspace.current_branch == "feature-branch"
|
||||
|
||||
|
||||
class TestWorkspaceSize:
|
||||
"""Tests for workspace size management."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_size_within_limit(self, workspace_manager, valid_project_id):
|
||||
"""Test size check passes for small workspace."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
|
||||
# Should not raise
|
||||
result = await workspace_manager.check_size_limit(valid_project_id)
|
||||
assert result is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_total_size(self, workspace_manager, valid_project_id):
|
||||
"""Test getting total workspace size."""
|
||||
workspace = await workspace_manager.create_workspace(valid_project_id)
|
||||
|
||||
# Add some content
|
||||
content_file = Path(workspace.path) / "content.txt"
|
||||
content_file.write_text("x" * 1000)
|
||||
|
||||
total_size = await workspace_manager.get_total_size()
|
||||
assert total_size >= 1000
|
||||
|
||||
|
||||
class TestFileLockManager:
|
||||
"""Tests for file-based locking."""
|
||||
|
||||
def test_acquire_lock(self, temp_dir):
|
||||
"""Test acquiring a file lock."""
|
||||
manager = FileLockManager(temp_dir / "locks")
|
||||
|
||||
result = manager.acquire("test-key")
|
||||
assert result is True
|
||||
|
||||
# Cleanup
|
||||
manager.release("test-key")
|
||||
|
||||
def test_release_lock(self, temp_dir):
|
||||
"""Test releasing a file lock."""
|
||||
manager = FileLockManager(temp_dir / "locks")
|
||||
manager.acquire("test-key")
|
||||
|
||||
result = manager.release("test-key")
|
||||
assert result is True
|
||||
|
||||
def test_is_locked(self, temp_dir):
|
||||
"""Test checking if locked."""
|
||||
manager = FileLockManager(temp_dir / "locks")
|
||||
|
||||
assert manager.is_locked("test-key") is False
|
||||
|
||||
manager.acquire("test-key")
|
||||
assert manager.is_locked("test-key") is True
|
||||
|
||||
manager.release("test-key")
|
||||
|
||||
def test_release_nonexistent_lock(self, temp_dir):
|
||||
"""Test releasing a lock that doesn't exist."""
|
||||
manager = FileLockManager(temp_dir / "locks")
|
||||
|
||||
# Should not raise
|
||||
result = manager.release("nonexistent")
|
||||
assert result is False
|
||||
|
||||
|
||||
class TestWorkspaceCleanup:
|
||||
"""Tests for workspace cleanup operations."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleanup_stale_workspaces(self, workspace_manager, test_settings):
|
||||
"""Test cleaning up stale workspaces."""
|
||||
# Create workspace
|
||||
workspace = await workspace_manager.create_workspace("stale-project")
|
||||
|
||||
# Manually set it as stale by updating metadata
|
||||
await workspace_manager._update_metadata(
|
||||
"stale-project",
|
||||
last_accessed=(datetime.now(UTC) - timedelta(days=30)).isoformat(),
|
||||
)
|
||||
|
||||
# Run cleanup
|
||||
cleaned = await workspace_manager.cleanup_stale_workspaces()
|
||||
|
||||
assert cleaned >= 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_locked_workspace_blocked(self, workspace_manager, valid_project_id, valid_agent_id):
|
||||
"""Test deleting locked workspace is blocked without force."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id)
|
||||
|
||||
with pytest.raises(WorkspaceLockedError):
|
||||
await workspace_manager.delete_workspace(valid_project_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_locked_workspace_force(self, workspace_manager, valid_project_id, valid_agent_id):
|
||||
"""Test force deleting locked workspace."""
|
||||
await workspace_manager.create_workspace(valid_project_id)
|
||||
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id)
|
||||
|
||||
result = await workspace_manager.delete_workspace(valid_project_id, force=True)
|
||||
assert result is True
|
||||
Reference in New Issue
Block a user