2 Commits

Author SHA1 Message Date
Felipe Cardoso
0a624a94af **test(git-ops): add comprehensive tests for server and API tools**
- Introduced extensive test coverage for FastAPI endpoints, including health check, MCP tools, and JSON-RPC operations.
- Added tests for Git operations MCP tools, including cloning, status, branching, committing, and provider detection.
- Mocked dependencies and ensured reliable test isolation with unittest.mock and pytest fixtures.
- Validated error handling, workspace management, tool execution, and type conversion functions.
2026-01-07 09:17:32 +01:00
Felipe Cardoso
011b21bf0a refactor(tests): adjust formatting for consistency and readability
- Updated line breaks and indentation across test modules to enhance clarity and maintain consistent style.
- Applied changes to workspace, provider, server, and GitWrapper-related test cases. No functional changes introduced.
2026-01-07 09:17:26 +01:00
8 changed files with 2273 additions and 85 deletions

View File

@@ -248,9 +248,7 @@ def mock_fastapi_app():
@pytest.fixture
async def async_workspace_manager(
temp_dir: Path, test_settings
) -> AsyncIterator:
async def async_workspace_manager(temp_dir: Path, test_settings) -> AsyncIterator:
"""Async fixture for workspace manager."""
from workspace import WorkspaceManager

View File

@@ -0,0 +1,440 @@
"""
Tests for FastAPI endpoints.
Tests health check and MCP JSON-RPC endpoints.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
class TestHealthEndpoint:
"""Tests for health check endpoint."""
@pytest.mark.asyncio
async def test_health_no_providers(self):
"""Test health check when no providers configured."""
from httpx import ASGITransport, AsyncClient
from server import app
with (
patch("server._settings", MagicMock()),
patch("server._workspace_manager", None),
patch("server._gitea_provider", None),
patch("server._github_provider", None),
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] in ["healthy", "degraded"]
assert data["service"] == "git-ops"
@pytest.mark.asyncio
async def test_health_with_gitea_connected(self):
"""Test health check with Gitea provider connected."""
from httpx import ASGITransport, AsyncClient
from server import app
mock_gitea = AsyncMock()
mock_gitea.is_connected = AsyncMock(return_value=True)
mock_gitea.get_authenticated_user = AsyncMock(return_value="test-user")
with (
patch("server._settings", MagicMock()),
patch("server._workspace_manager", None),
patch("server._gitea_provider", mock_gitea),
patch("server._github_provider", None),
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert "gitea" in data["dependencies"]
@pytest.mark.asyncio
async def test_health_with_gitea_not_connected(self):
"""Test health check when Gitea is not connected."""
from httpx import ASGITransport, AsyncClient
from server import app
mock_gitea = AsyncMock()
mock_gitea.is_connected = AsyncMock(return_value=False)
with (
patch("server._settings", MagicMock()),
patch("server._workspace_manager", None),
patch("server._gitea_provider", mock_gitea),
patch("server._github_provider", None),
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "degraded"
@pytest.mark.asyncio
async def test_health_with_gitea_error(self):
"""Test health check when Gitea throws error."""
from httpx import ASGITransport, AsyncClient
from server import app
mock_gitea = AsyncMock()
mock_gitea.is_connected = AsyncMock(side_effect=Exception("Connection failed"))
with (
patch("server._settings", MagicMock()),
patch("server._workspace_manager", None),
patch("server._gitea_provider", mock_gitea),
patch("server._github_provider", None),
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "degraded"
@pytest.mark.asyncio
async def test_health_with_github_connected(self):
"""Test health check with GitHub provider connected."""
from httpx import ASGITransport, AsyncClient
from server import app
mock_github = AsyncMock()
mock_github.is_connected = AsyncMock(return_value=True)
mock_github.get_authenticated_user = AsyncMock(return_value="github-user")
with (
patch("server._settings", MagicMock()),
patch("server._workspace_manager", None),
patch("server._gitea_provider", None),
patch("server._github_provider", mock_github),
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert "github" in data["dependencies"]
@pytest.mark.asyncio
async def test_health_with_github_not_connected(self):
"""Test health check when GitHub is not connected."""
from httpx import ASGITransport, AsyncClient
from server import app
mock_github = AsyncMock()
mock_github.is_connected = AsyncMock(return_value=False)
with (
patch("server._settings", MagicMock()),
patch("server._workspace_manager", None),
patch("server._gitea_provider", None),
patch("server._github_provider", mock_github),
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "degraded"
@pytest.mark.asyncio
async def test_health_with_github_error(self):
"""Test health check when GitHub throws error."""
from httpx import ASGITransport, AsyncClient
from server import app
mock_github = AsyncMock()
mock_github.is_connected = AsyncMock(side_effect=Exception("Auth failed"))
with (
patch("server._settings", MagicMock()),
patch("server._workspace_manager", None),
patch("server._gitea_provider", None),
patch("server._github_provider", mock_github),
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "degraded"
@pytest.mark.asyncio
async def test_health_with_workspace_manager(self):
"""Test health check with workspace manager."""
from pathlib import Path
from httpx import ASGITransport, AsyncClient
from server import app
mock_manager = AsyncMock()
mock_manager.base_path = Path("/tmp/workspaces")
mock_manager.list_workspaces = AsyncMock(return_value=[])
with (
patch("server._settings", MagicMock()),
patch("server._workspace_manager", mock_manager),
patch("server._gitea_provider", None),
patch("server._github_provider", None),
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert "workspace" in data["dependencies"]
@pytest.mark.asyncio
async def test_health_workspace_error(self):
"""Test health check when workspace manager throws error."""
from pathlib import Path
from httpx import ASGITransport, AsyncClient
from server import app
mock_manager = AsyncMock()
mock_manager.base_path = Path("/tmp/workspaces")
mock_manager.list_workspaces = AsyncMock(side_effect=Exception("Disk full"))
with (
patch("server._settings", MagicMock()),
patch("server._workspace_manager", mock_manager),
patch("server._gitea_provider", None),
patch("server._github_provider", None),
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "degraded"
class TestMCPToolsEndpoint:
"""Tests for MCP tools list endpoint."""
@pytest.mark.asyncio
async def test_list_mcp_tools(self):
"""Test listing MCP tools."""
from httpx import ASGITransport, AsyncClient
from server import app
with patch("server._settings", MagicMock()):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/mcp/tools")
assert response.status_code == 200
data = response.json()
assert "tools" in data
class TestMCPRPCEndpoint:
"""Tests for MCP JSON-RPC endpoint."""
@pytest.mark.asyncio
async def test_mcp_rpc_invalid_json(self):
"""Test RPC with invalid JSON."""
from httpx import ASGITransport, AsyncClient
from server import app
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.post(
"/mcp",
content="not valid json",
headers={"Content-Type": "application/json"},
)
assert response.status_code == 400
data = response.json()
assert data["error"]["code"] == -32700
@pytest.mark.asyncio
async def test_mcp_rpc_invalid_jsonrpc(self):
"""Test RPC with invalid jsonrpc version."""
from httpx import ASGITransport, AsyncClient
from server import app
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.post(
"/mcp", json={"jsonrpc": "1.0", "method": "test", "id": 1}
)
assert response.status_code == 400
data = response.json()
assert data["error"]["code"] == -32600
@pytest.mark.asyncio
async def test_mcp_rpc_missing_method(self):
"""Test RPC with missing method."""
from httpx import ASGITransport, AsyncClient
from server import app
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.post("/mcp", json={"jsonrpc": "2.0", "id": 1})
assert response.status_code == 400
data = response.json()
assert data["error"]["code"] == -32600
@pytest.mark.asyncio
async def test_mcp_rpc_method_not_found(self):
"""Test RPC with unknown method."""
from httpx import ASGITransport, AsyncClient
from server import app
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.post(
"/mcp",
json={
"jsonrpc": "2.0",
"method": "unknown_method",
"params": {},
"id": 1,
},
)
assert response.status_code == 404
data = response.json()
assert data["error"]["code"] == -32601
class TestTypeSchemaConversion:
"""Tests for type to JSON schema conversion."""
def test_python_type_to_json_schema_str(self):
"""Test converting str type to JSON schema."""
from server import _python_type_to_json_schema
result = _python_type_to_json_schema(str)
assert result == {"type": "string"}
def test_python_type_to_json_schema_int(self):
"""Test converting int type to JSON schema."""
from server import _python_type_to_json_schema
result = _python_type_to_json_schema(int)
assert result == {"type": "integer"}
def test_python_type_to_json_schema_float(self):
"""Test converting float type to JSON schema."""
from server import _python_type_to_json_schema
result = _python_type_to_json_schema(float)
assert result == {"type": "number"}
def test_python_type_to_json_schema_bool(self):
"""Test converting bool type to JSON schema."""
from server import _python_type_to_json_schema
result = _python_type_to_json_schema(bool)
assert result == {"type": "boolean"}
def test_python_type_to_json_schema_none(self):
"""Test converting NoneType to JSON schema."""
from server import _python_type_to_json_schema
result = _python_type_to_json_schema(type(None))
assert result == {"type": "null"}
def test_python_type_to_json_schema_list(self):
"""Test converting list type to JSON schema."""
from server import _python_type_to_json_schema
result = _python_type_to_json_schema(list[str])
assert result["type"] == "array"
def test_python_type_to_json_schema_dict(self):
"""Test converting dict type to JSON schema."""
from server import _python_type_to_json_schema
result = _python_type_to_json_schema(dict[str, int])
assert result == {"type": "object"}
def test_python_type_to_json_schema_optional(self):
"""Test converting Optional type to JSON schema."""
from server import _python_type_to_json_schema
result = _python_type_to_json_schema(str | None)
# The function returns object type for complex union types
assert "type" in result
class TestToolSchema:
"""Tests for tool schema extraction."""
def test_get_tool_schema_simple(self):
"""Test getting schema from simple function."""
from server import _get_tool_schema
def simple_func(name: str, count: int) -> str:
return f"{name}: {count}"
result = _get_tool_schema(simple_func)
assert "properties" in result
assert "name" in result["properties"]
assert "count" in result["properties"]
def test_register_and_get_tool(self):
"""Test registering a tool."""
from server import _register_tool, _tool_registry
async def test_tool(x: str) -> str:
"""A test tool."""
return x
_register_tool("test_tool", test_tool, "Test description")
assert "test_tool" in _tool_registry
assert _tool_registry["test_tool"]["description"] == "Test description"
# Clean up
del _tool_registry["test_tool"]

View File

@@ -3,17 +3,22 @@ Tests for the git_wrapper module.
"""
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from git import GitCommandError
from exceptions import (
BranchExistsError,
BranchNotFoundError,
CheckoutError,
CloneError,
CommitError,
GitError,
PullError,
PushError,
)
from git_wrapper import GitWrapper
from git_wrapper import GitWrapper, run_in_executor
from models import FileChangeType
@@ -116,7 +121,9 @@ class TestGitWrapperBranch:
@pytest.mark.asyncio
async def test_create_branch_without_checkout(self, git_wrapper_with_repo):
"""Test creating branch without checkout."""
result = await git_wrapper_with_repo.create_branch("feature-no-checkout", checkout=False)
result = await git_wrapper_with_repo.create_branch(
"feature-no-checkout", checkout=False
)
assert result.success is True
assert result.branch == "feature-no-checkout"
@@ -432,3 +439,505 @@ class TestGitWrapperStash:
result = await git_wrapper_with_repo.stash()
assert result is None
@pytest.mark.asyncio
async def test_stash_pop(self, git_wrapper_with_repo, git_repo):
"""Test popping a stash."""
# Make changes and stash them
readme = Path(git_repo.working_dir) / "README.md"
original_content = readme.read_text()
readme.write_text("Modified for stash pop test")
git_repo.index.add(["README.md"])
stash_ref = await git_wrapper_with_repo.stash("Test stash for pop")
if stash_ref:
# Pop the stash
result = await git_wrapper_with_repo.stash_pop()
assert result is True
class TestGitWrapperRepoProperty:
"""Tests for repo property edge cases."""
def test_repo_property_path_not_exists(self, test_settings):
"""Test that accessing repo on non-existent path raises error."""
wrapper = GitWrapper(
Path("/nonexistent/path/that/does/not/exist"), test_settings
)
with pytest.raises(GitError, match="Path does not exist"):
_ = wrapper.repo
def test_refresh_repo(self, git_wrapper_with_repo):
"""Test _refresh_repo clears cached repo."""
# Access repo to cache it
_ = git_wrapper_with_repo.repo
assert git_wrapper_with_repo._repo is not None
# Refresh should clear it
git_wrapper_with_repo._refresh_repo()
assert git_wrapper_with_repo._repo is None
class TestGitWrapperBranchAdvanced:
"""Advanced tests for branch operations."""
@pytest.mark.asyncio
async def test_create_branch_from_ref(self, git_wrapper_with_repo, git_repo):
"""Test creating branch from specific ref."""
# Get current HEAD SHA
head_sha = git_repo.head.commit.hexsha
result = await git_wrapper_with_repo.create_branch(
"feature-from-ref",
from_ref=head_sha,
checkout=False,
)
assert result.success is True
assert result.branch == "feature-from-ref"
@pytest.mark.asyncio
async def test_delete_branch_force(self, git_wrapper_with_repo, git_repo):
"""Test force deleting a branch."""
# Create branch and add unmerged commit
await git_wrapper_with_repo.create_branch("unmerged-branch", checkout=True)
new_file = Path(git_repo.working_dir) / "unmerged.txt"
new_file.write_text("unmerged content")
git_repo.index.add(["unmerged.txt"])
git_repo.index.commit("Unmerged commit")
# Switch back to main
await git_wrapper_with_repo.checkout("main")
# Force delete
result = await git_wrapper_with_repo.delete_branch(
"unmerged-branch", force=True
)
assert result.success is True
class TestGitWrapperListBranchesRemote:
"""Tests for listing remote branches."""
@pytest.mark.asyncio
async def test_list_branches_with_remote(self, git_wrapper_with_repo):
"""Test listing branches including remote."""
# Even without remotes, this should work
result = await git_wrapper_with_repo.list_branches(include_remote=True)
assert result.current_branch == "main"
# Remote branches list should be empty for local repo
assert len(result.remote_branches) == 0
class TestGitWrapperCheckoutAdvanced:
"""Advanced tests for checkout operations."""
@pytest.mark.asyncio
async def test_checkout_create_existing_error(self, git_wrapper_with_repo):
"""Test error when creating branch that already exists."""
with pytest.raises(BranchExistsError):
await git_wrapper_with_repo.checkout("main", create_branch=True)
@pytest.mark.asyncio
async def test_checkout_force(self, git_wrapper_with_repo, git_repo):
"""Test force checkout discards local changes."""
# Create branch
await git_wrapper_with_repo.create_branch("force-test", checkout=False)
# Make local changes
readme = Path(git_repo.working_dir) / "README.md"
readme.write_text("local changes")
# Force checkout should work
result = await git_wrapper_with_repo.checkout("force-test", force=True)
assert result.success is True
class TestGitWrapperCommitAdvanced:
"""Advanced tests for commit operations."""
@pytest.mark.asyncio
async def test_commit_specific_files(self, git_wrapper_with_repo, git_repo):
"""Test committing specific files only."""
# Create multiple files
file1 = Path(git_repo.working_dir) / "commit_specific1.txt"
file2 = Path(git_repo.working_dir) / "commit_specific2.txt"
file1.write_text("content 1")
file2.write_text("content 2")
result = await git_wrapper_with_repo.commit(
"Commit specific file",
files=["commit_specific1.txt"],
)
assert result.success is True
assert result.files_changed == 1
@pytest.mark.asyncio
async def test_commit_with_partial_author(self, git_wrapper_with_repo, git_repo):
"""Test commit with only author name."""
new_file = Path(git_repo.working_dir) / "partial_author.txt"
new_file.write_text("content")
result = await git_wrapper_with_repo.commit(
"Partial author commit",
author_name="Test Author",
)
assert result.success is True
@pytest.mark.asyncio
async def test_commit_allow_empty(self, git_wrapper_with_repo):
"""Test allowing empty commits."""
result = await git_wrapper_with_repo.commit(
"Empty commit allowed",
allow_empty=True,
)
assert result.success is True
class TestGitWrapperUnstageAdvanced:
"""Advanced tests for unstaging operations."""
@pytest.mark.asyncio
async def test_unstage_specific_files(self, git_wrapper_with_repo, git_repo):
"""Test unstaging specific files."""
# Create and stage files
file1 = Path(git_repo.working_dir) / "unstage1.txt"
file2 = Path(git_repo.working_dir) / "unstage2.txt"
file1.write_text("content 1")
file2.write_text("content 2")
git_repo.index.add(["unstage1.txt", "unstage2.txt"])
count = await git_wrapper_with_repo.unstage(["unstage1.txt"])
assert count == 1
class TestGitWrapperResetAdvanced:
"""Advanced tests for reset operations."""
@pytest.mark.asyncio
async def test_reset_hard(self, git_wrapper_with_repo, git_repo):
"""Test hard reset."""
# Create a commit
file1 = Path(git_repo.working_dir) / "reset_hard.txt"
file1.write_text("content")
git_repo.index.add(["reset_hard.txt"])
git_repo.index.commit("Commit for hard reset")
result = await git_wrapper_with_repo.reset("HEAD~1", mode="hard")
assert result is True
# File should be gone after hard reset
assert not file1.exists()
@pytest.mark.asyncio
async def test_reset_specific_files(self, git_wrapper_with_repo, git_repo):
"""Test resetting specific files."""
# Create and stage a file
file1 = Path(git_repo.working_dir) / "reset_file.txt"
file1.write_text("content")
git_repo.index.add(["reset_file.txt"])
result = await git_wrapper_with_repo.reset("HEAD", files=["reset_file.txt"])
assert result is True
class TestGitWrapperDiffAdvanced:
"""Advanced tests for diff operations."""
@pytest.mark.asyncio
async def test_diff_between_refs(self, git_wrapper_with_repo, git_repo):
"""Test diff between two refs."""
# Create initial commit
file1 = Path(git_repo.working_dir) / "diff_ref.txt"
file1.write_text("initial")
git_repo.index.add(["diff_ref.txt"])
commit1 = git_repo.index.commit("First commit for diff")
# Create second commit
file1.write_text("modified")
git_repo.index.add(["diff_ref.txt"])
commit2 = git_repo.index.commit("Second commit for diff")
result = await git_wrapper_with_repo.diff(
base=commit1.hexsha,
head=commit2.hexsha,
)
assert result.files_changed > 0
@pytest.mark.asyncio
async def test_diff_specific_files(self, git_wrapper_with_repo, git_repo):
"""Test diff for specific files only."""
# Create files
file1 = Path(git_repo.working_dir) / "diff_specific1.txt"
file2 = Path(git_repo.working_dir) / "diff_specific2.txt"
file1.write_text("content 1")
file2.write_text("content 2")
result = await git_wrapper_with_repo.diff(files=["diff_specific1.txt"])
# Should only show changes for specified file
for f in result.files:
assert "diff_specific2.txt" not in f.get("path", "")
@pytest.mark.asyncio
async def test_diff_base_only(self, git_wrapper_with_repo, git_repo):
"""Test diff with base ref only (vs HEAD)."""
# Create commit
file1 = Path(git_repo.working_dir) / "diff_base.txt"
file1.write_text("content")
git_repo.index.add(["diff_base.txt"])
commit = git_repo.index.commit("Commit for diff base test")
# Get parent commit
parent = commit.parents[0] if commit.parents else commit
result = await git_wrapper_with_repo.diff(base=parent.hexsha)
assert isinstance(result.files_changed, int)
class TestGitWrapperLogAdvanced:
"""Advanced tests for log operations."""
@pytest.mark.asyncio
async def test_log_with_ref(self, git_wrapper_with_repo, git_repo):
"""Test log starting from specific ref."""
# Create branch with commits
await git_wrapper_with_repo.create_branch("log-test", checkout=True)
file1 = Path(git_repo.working_dir) / "log_ref.txt"
file1.write_text("content")
git_repo.index.add(["log_ref.txt"])
git_repo.index.commit("Commit on log-test branch")
result = await git_wrapper_with_repo.log(ref="log-test", limit=5)
assert result.total_commits > 0
@pytest.mark.asyncio
async def test_log_with_path(self, git_wrapper_with_repo, git_repo):
"""Test log filtered by path."""
# Create file and commit
file1 = Path(git_repo.working_dir) / "log_path.txt"
file1.write_text("content")
git_repo.index.add(["log_path.txt"])
git_repo.index.commit("Commit for path log")
result = await git_wrapper_with_repo.log(path="log_path.txt")
assert result.total_commits >= 1
@pytest.mark.asyncio
async def test_log_with_skip(self, git_wrapper_with_repo, git_repo):
"""Test log with skip parameter."""
# Create multiple commits
for i in range(3):
file_path = Path(git_repo.working_dir) / f"skip_test{i}.txt"
file_path.write_text(f"content {i}")
git_repo.index.add([f"skip_test{i}.txt"])
git_repo.index.commit(f"Skip test commit {i}")
result = await git_wrapper_with_repo.log(skip=1, limit=2)
# Should have skipped first commit
assert len(result.commits) <= 2
class TestGitWrapperRemoteUrl:
"""Tests for remote URL operations."""
@pytest.mark.asyncio
async def test_get_remote_url_nonexistent(self, git_wrapper_with_repo):
"""Test getting URL for non-existent remote."""
url = await git_wrapper_with_repo.get_remote_url("nonexistent")
assert url is None
class TestGitWrapperConfig:
"""Tests for git config operations."""
@pytest.mark.asyncio
async def test_set_and_get_config(self, git_wrapper_with_repo):
"""Test setting and getting config value."""
await git_wrapper_with_repo.set_config("test.key", "test_value")
value = await git_wrapper_with_repo.get_config("test.key")
assert value == "test_value"
@pytest.mark.asyncio
async def test_get_config_nonexistent(self, git_wrapper_with_repo):
"""Test getting non-existent config value."""
value = await git_wrapper_with_repo.get_config("nonexistent.key")
assert value is None
class TestGitWrapperClone:
"""Tests for clone operations."""
@pytest.mark.asyncio
async def test_clone_success(self, temp_workspace, test_settings):
"""Test successful clone."""
wrapper = GitWrapper(temp_workspace, test_settings)
# Mock the clone operation
with patch("git_wrapper.GitRepo") as mock_repo_class:
mock_repo = MagicMock()
mock_repo.active_branch.name = "main"
mock_repo.head.commit.hexsha = "abc123"
mock_repo_class.clone_from.return_value = mock_repo
result = await wrapper.clone("https://github.com/test/repo.git")
assert result.success is True
assert result.branch == "main"
assert result.commit_sha == "abc123"
@pytest.mark.asyncio
async def test_clone_with_auth_token(self, temp_workspace, test_settings):
"""Test clone with auth token."""
wrapper = GitWrapper(temp_workspace, test_settings)
with patch("git_wrapper.GitRepo") as mock_repo_class:
mock_repo = MagicMock()
mock_repo.active_branch.name = "main"
mock_repo.head.commit.hexsha = "abc123"
mock_repo_class.clone_from.return_value = mock_repo
result = await wrapper.clone(
"https://github.com/test/repo.git",
auth_token="test-token",
)
assert result.success is True
# Verify token was injected in URL
call_args = mock_repo_class.clone_from.call_args
assert "test-token@" in call_args.kwargs["url"]
@pytest.mark.asyncio
async def test_clone_with_branch_and_depth(self, temp_workspace, test_settings):
"""Test clone with branch and depth parameters."""
wrapper = GitWrapper(temp_workspace, test_settings)
with patch("git_wrapper.GitRepo") as mock_repo_class:
mock_repo = MagicMock()
mock_repo.active_branch.name = "develop"
mock_repo.head.commit.hexsha = "def456"
mock_repo_class.clone_from.return_value = mock_repo
result = await wrapper.clone(
"https://github.com/test/repo.git",
branch="develop",
depth=1,
)
assert result.success is True
call_args = mock_repo_class.clone_from.call_args
assert call_args.kwargs["branch"] == "develop"
assert call_args.kwargs["depth"] == 1
@pytest.mark.asyncio
async def test_clone_failure(self, temp_workspace, test_settings):
"""Test clone failure raises CloneError."""
wrapper = GitWrapper(temp_workspace, test_settings)
with patch("git_wrapper.GitRepo") as mock_repo_class:
mock_repo_class.clone_from.side_effect = GitCommandError(
"git clone", 128, stderr="Authentication failed"
)
with pytest.raises(CloneError):
await wrapper.clone("https://github.com/test/repo.git")
class TestGitWrapperPush:
"""Tests for push operations."""
@pytest.mark.asyncio
async def test_push_force_disabled(self, git_wrapper_with_repo, git_repo):
"""Test force push is disabled by default."""
git_repo.create_remote("origin", "https://github.com/test/repo.git")
with pytest.raises(PushError, match="Force push is disabled"):
await git_wrapper_with_repo.push(force=True)
@pytest.mark.asyncio
async def test_push_remote_not_found(self, git_wrapper_with_repo):
"""Test push to non-existent remote."""
with pytest.raises(PushError, match="Remote not found"):
await git_wrapper_with_repo.push(remote="nonexistent")
class TestGitWrapperPull:
"""Tests for pull operations."""
@pytest.mark.asyncio
async def test_pull_remote_not_found(self, git_wrapper_with_repo):
"""Test pull from non-existent remote."""
with pytest.raises(PullError, match="Remote not found"):
await git_wrapper_with_repo.pull(remote="nonexistent")
class TestGitWrapperFetch:
"""Tests for fetch operations."""
@pytest.mark.asyncio
async def test_fetch_remote_not_found(self, git_wrapper_with_repo):
"""Test fetch from non-existent remote."""
with pytest.raises(GitError, match="Remote not found"):
await git_wrapper_with_repo.fetch(remote="nonexistent")
class TestGitWrapperDiffHeadOnly:
"""Tests for diff with head ref only."""
@pytest.mark.asyncio
async def test_diff_head_only(self, git_wrapper_with_repo, git_repo):
"""Test diff with head ref only (working tree vs ref)."""
# Make some changes
readme = Path(git_repo.working_dir) / "README.md"
readme.write_text("modified content")
# This tests the head-only branch (base=None, head=specified)
result = await git_wrapper_with_repo.diff(head="HEAD")
assert isinstance(result.files_changed, int)
class TestGitWrapperRemoteWithUrl:
"""Tests for getting remote URL when remote exists."""
@pytest.mark.asyncio
async def test_get_remote_url_exists(self, git_wrapper_with_repo, git_repo):
"""Test getting URL for existing remote."""
git_repo.create_remote("origin", "https://github.com/test/repo.git")
url = await git_wrapper_with_repo.get_remote_url("origin")
assert url == "https://github.com/test/repo.git"
class TestRunInExecutor:
"""Tests for run_in_executor utility."""
@pytest.mark.asyncio
async def test_run_in_executor(self):
"""Test running function in executor."""
def blocking_func(x, y):
return x + y
result = await run_in_executor(blocking_func, 1, 2)
assert result == 3

View File

@@ -141,7 +141,9 @@ class TestGitHubProviderConnection:
await provider.close()
@pytest.mark.asyncio
async def test_get_authenticated_user(self, github_provider, mock_github_httpx_client):
async def test_get_authenticated_user(
self, github_provider, mock_github_httpx_client
):
"""Test getting authenticated user."""
mock_github_httpx_client.request.return_value.json = MagicMock(
return_value={"login": "test-user"}
@@ -210,7 +212,9 @@ class TestGitHubPROperations:
assert result.pr_url == "https://github.com/owner/repo/pull/42"
@pytest.mark.asyncio
async def test_create_pr_with_draft(self, github_provider, mock_github_httpx_client):
async def test_create_pr_with_draft(
self, github_provider, mock_github_httpx_client
):
"""Test creating a draft PR."""
mock_github_httpx_client.request.return_value.json = MagicMock(
return_value={
@@ -233,15 +237,22 @@ class TestGitHubPROperations:
assert result.pr_number == 43
@pytest.mark.asyncio
async def test_create_pr_with_options(self, github_provider, mock_github_httpx_client):
async def test_create_pr_with_options(
self, github_provider, mock_github_httpx_client
):
"""Test creating PR with labels, assignees, reviewers."""
mock_responses = [
{"number": 44, "html_url": "https://github.com/owner/repo/pull/44"}, # Create PR
{
"number": 44,
"html_url": "https://github.com/owner/repo/pull/44",
}, # Create PR
[{"name": "enhancement"}], # POST add labels
{}, # POST add assignees
{}, # POST request reviewers
]
mock_github_httpx_client.request.return_value.json = MagicMock(side_effect=mock_responses)
mock_github_httpx_client.request.return_value.json = MagicMock(
side_effect=mock_responses
)
result = await github_provider.create_pr(
owner="owner",
@@ -258,7 +269,9 @@ class TestGitHubPROperations:
assert result.success is True
@pytest.mark.asyncio
async def test_get_pr(self, github_provider, mock_github_httpx_client, github_pr_data):
async def test_get_pr(
self, github_provider, mock_github_httpx_client, github_pr_data
):
"""Test getting a pull request."""
mock_github_httpx_client.request.return_value.json = MagicMock(
return_value=github_pr_data
@@ -274,14 +287,18 @@ class TestGitHubPROperations:
async def test_get_pr_not_found(self, github_provider, mock_github_httpx_client):
"""Test getting non-existent PR."""
mock_github_httpx_client.request.return_value.status_code = 404
mock_github_httpx_client.request.return_value.json = MagicMock(return_value=None)
mock_github_httpx_client.request.return_value.json = MagicMock(
return_value=None
)
result = await github_provider.get_pr("owner", "repo", 999)
assert result.success is False
@pytest.mark.asyncio
async def test_list_prs(self, github_provider, mock_github_httpx_client, github_pr_data):
async def test_list_prs(
self, github_provider, mock_github_httpx_client, github_pr_data
):
"""Test listing pull requests."""
mock_github_httpx_client.request.return_value.json = MagicMock(
return_value=[github_pr_data, github_pr_data]
@@ -293,20 +310,22 @@ class TestGitHubPROperations:
assert len(result.pull_requests) == 2
@pytest.mark.asyncio
async def test_list_prs_with_state_filter(self, github_provider, mock_github_httpx_client, github_pr_data):
async def test_list_prs_with_state_filter(
self, github_provider, mock_github_httpx_client, github_pr_data
):
"""Test listing PRs with state filter."""
mock_github_httpx_client.request.return_value.json = MagicMock(
return_value=[github_pr_data]
)
result = await github_provider.list_prs(
"owner", "repo", state=PRState.OPEN
)
result = await github_provider.list_prs("owner", "repo", state=PRState.OPEN)
assert result.success is True
@pytest.mark.asyncio
async def test_merge_pr(self, github_provider, mock_github_httpx_client, github_pr_data):
async def test_merge_pr(
self, github_provider, mock_github_httpx_client, github_pr_data
):
"""Test merging a pull request."""
# Merge returns sha, then get_pr returns the PR data, then delete branch
mock_responses = [
@@ -319,7 +338,9 @@ class TestGitHubPROperations:
)
result = await github_provider.merge_pr(
"owner", "repo", 42,
"owner",
"repo",
42,
merge_strategy=MergeStrategy.SQUASH,
)
@@ -327,7 +348,9 @@ class TestGitHubPROperations:
assert result.merge_commit_sha == "merge-commit-sha"
@pytest.mark.asyncio
async def test_merge_pr_rebase(self, github_provider, mock_github_httpx_client, github_pr_data):
async def test_merge_pr_rebase(
self, github_provider, mock_github_httpx_client, github_pr_data
):
"""Test merging with rebase strategy."""
mock_responses = [
{"sha": "rebase-commit-sha", "merged": True}, # PUT merge
@@ -339,21 +362,27 @@ class TestGitHubPROperations:
)
result = await github_provider.merge_pr(
"owner", "repo", 42,
"owner",
"repo",
42,
merge_strategy=MergeStrategy.REBASE,
)
assert result.success is True
@pytest.mark.asyncio
async def test_update_pr(self, github_provider, mock_github_httpx_client, github_pr_data):
async def test_update_pr(
self, github_provider, mock_github_httpx_client, github_pr_data
):
"""Test updating a pull request."""
mock_github_httpx_client.request.return_value.json = MagicMock(
return_value=github_pr_data
)
result = await github_provider.update_pr(
"owner", "repo", 42,
"owner",
"repo",
42,
title="Updated Title",
body="Updated body",
)
@@ -361,7 +390,9 @@ class TestGitHubPROperations:
assert result.success is True
@pytest.mark.asyncio
async def test_close_pr(self, github_provider, mock_github_httpx_client, github_pr_data):
async def test_close_pr(
self, github_provider, mock_github_httpx_client, github_pr_data
):
"""Test closing a pull request."""
github_pr_data["state"] = "closed"
mock_github_httpx_client.request.return_value.json = MagicMock(
@@ -391,11 +422,15 @@ class TestGitHubBranchOperations:
assert result["name"] == "main"
@pytest.mark.asyncio
async def test_delete_remote_branch(self, github_provider, mock_github_httpx_client):
async def test_delete_remote_branch(
self, github_provider, mock_github_httpx_client
):
"""Test deleting a remote branch."""
mock_github_httpx_client.request.return_value.status_code = 204
result = await github_provider.delete_remote_branch("owner", "repo", "old-branch")
result = await github_provider.delete_remote_branch(
"owner", "repo", "old-branch"
)
assert result is True
@@ -455,12 +490,12 @@ class TestGitHubLabelOperations:
None, # DELETE label
{"labels": []}, # GET issue
]
mock_github_httpx_client.request.return_value.json = MagicMock(side_effect=mock_responses)
result = await github_provider.remove_label(
"owner", "repo", 42, "bug"
mock_github_httpx_client.request.return_value.json = MagicMock(
side_effect=mock_responses
)
result = await github_provider.remove_label("owner", "repo", 42, "bug")
assert isinstance(result, list)
@@ -483,7 +518,9 @@ class TestGitHubErrorHandling:
"""Tests for error handling in GitHub provider."""
@pytest.mark.asyncio
async def test_authentication_error(self, github_provider, mock_github_httpx_client):
async def test_authentication_error(
self, github_provider, mock_github_httpx_client
):
"""Test handling authentication errors."""
mock_github_httpx_client.request.return_value.status_code = 401

View File

@@ -17,10 +17,7 @@ class TestBaseProvider:
def test_parse_repo_url_https(self, mock_gitea_provider):
"""Test parsing HTTPS repo URL."""
# The mock needs parse_repo_url to work
provider = GiteaProvider(
base_url="https://gitea.test.com",
token="test-token"
)
provider = GiteaProvider(base_url="https://gitea.test.com", token="test-token")
owner, repo = provider.parse_repo_url("https://gitea.test.com/owner/repo.git")
@@ -29,10 +26,7 @@ class TestBaseProvider:
def test_parse_repo_url_https_no_git(self):
"""Test parsing HTTPS URL without .git suffix."""
provider = GiteaProvider(
base_url="https://gitea.test.com",
token="test-token"
)
provider = GiteaProvider(base_url="https://gitea.test.com", token="test-token")
owner, repo = provider.parse_repo_url("https://gitea.test.com/owner/repo")
@@ -41,10 +35,7 @@ class TestBaseProvider:
def test_parse_repo_url_ssh(self):
"""Test parsing SSH repo URL."""
provider = GiteaProvider(
base_url="https://gitea.test.com",
token="test-token"
)
provider = GiteaProvider(base_url="https://gitea.test.com", token="test-token")
owner, repo = provider.parse_repo_url("git@gitea.test.com:owner/repo.git")
@@ -53,10 +44,7 @@ class TestBaseProvider:
def test_parse_repo_url_invalid(self):
"""Test error on invalid URL."""
provider = GiteaProvider(
base_url="https://gitea.test.com",
token="test-token"
)
provider = GiteaProvider(base_url="https://gitea.test.com", token="test-token")
with pytest.raises(ValueError, match="Unable to parse"):
provider.parse_repo_url("invalid-url")
@@ -166,14 +154,19 @@ class TestGiteaPROperations:
# 5. PATCH add assignees
# 6. POST request reviewers
mock_responses = [
{"number": 43, "html_url": "https://gitea.test.com/owner/repo/pull/43"}, # Create PR
{
"number": 43,
"html_url": "https://gitea.test.com/owner/repo/pull/43",
}, # Create PR
[{"id": 1, "name": "enhancement"}], # GET labels (found)
{}, # POST add labels to PR
{"labels": [{"name": "enhancement"}]}, # GET issue to return current labels
{}, # PATCH add assignees
{}, # POST request reviewers
]
mock_httpx_client.request.return_value.json = MagicMock(side_effect=mock_responses)
mock_httpx_client.request.return_value.json = MagicMock(
side_effect=mock_responses
)
result = await gitea_provider.create_pr(
owner="owner",
@@ -225,15 +218,15 @@ class TestGiteaPROperations:
assert len(result.pull_requests) == 2
@pytest.mark.asyncio
async def test_list_prs_with_state_filter(self, gitea_provider, mock_httpx_client, sample_pr_data):
async def test_list_prs_with_state_filter(
self, gitea_provider, mock_httpx_client, sample_pr_data
):
"""Test listing PRs with state filter."""
mock_httpx_client.request.return_value.json = MagicMock(
return_value=[sample_pr_data]
)
result = await gitea_provider.list_prs(
"owner", "repo", state=PRState.OPEN
)
result = await gitea_provider.list_prs("owner", "repo", state=PRState.OPEN)
assert result.success is True
@@ -246,7 +239,9 @@ class TestGiteaPROperations:
)
result = await gitea_provider.merge_pr(
"owner", "repo", 42,
"owner",
"repo",
42,
merge_strategy=MergeStrategy.SQUASH,
)
@@ -261,7 +256,9 @@ class TestGiteaPROperations:
)
result = await gitea_provider.update_pr(
"owner", "repo", 42,
"owner",
"repo",
42,
title="Updated Title",
body="Updated body",
)
@@ -303,7 +300,9 @@ class TestGiteaBranchOperations:
"""Test deleting a remote branch."""
mock_httpx_client.request.return_value.status_code = 204
result = await gitea_provider.delete_remote_branch("owner", "repo", "old-branch")
result = await gitea_provider.delete_remote_branch(
"owner", "repo", "old-branch"
)
assert result is True
@@ -355,17 +354,20 @@ class TestGiteaLabelOperations:
mock_responses = [
[{"id": 1, "name": "existing"}], # GET labels (bug not found)
{"id": 2, "name": "bug"}, # POST create bug
[{"id": 1, "name": "existing"}, {"id": 2, "name": "bug"}], # GET labels (urgent not found)
[
{"id": 1, "name": "existing"},
{"id": 2, "name": "bug"},
], # GET labels (urgent not found)
{"id": 3, "name": "urgent"}, # POST create urgent
{}, # POST add labels to PR
{"labels": [{"name": "bug"}, {"name": "urgent"}]}, # GET issue
]
mock_httpx_client.request.return_value.json = MagicMock(side_effect=mock_responses)
result = await gitea_provider.add_labels(
"owner", "repo", 42, ["bug", "urgent"]
mock_httpx_client.request.return_value.json = MagicMock(
side_effect=mock_responses
)
result = await gitea_provider.add_labels("owner", "repo", 42, ["bug", "urgent"])
# Should return updated label list
assert isinstance(result, list)
@@ -381,12 +383,12 @@ class TestGiteaLabelOperations:
{}, # DELETE label
{"labels": []}, # GET issue
]
mock_httpx_client.request.return_value.json = MagicMock(side_effect=mock_responses)
result = await gitea_provider.remove_label(
"owner", "repo", 42, "bug"
mock_httpx_client.request.return_value.json = MagicMock(
side_effect=mock_responses
)
result = await gitea_provider.remove_label("owner", "repo", 42, "bug")
assert isinstance(result, list)

View File

@@ -85,8 +85,10 @@ class TestHealthCheck:
"""Test health check returns proper structure."""
from server import health_check
with patch("server._gitea_provider", None), \
patch("server._workspace_manager", None):
with (
patch("server._gitea_provider", None),
patch("server._workspace_manager", None),
):
result = await health_check()
assert "status" in result
@@ -100,8 +102,10 @@ class TestHealthCheck:
"""Test health check without providers configured."""
from server import health_check
with patch("server._gitea_provider", None), \
patch("server._workspace_manager", None):
with (
patch("server._gitea_provider", None),
patch("server._workspace_manager", None),
):
result = await health_check()
assert result["dependencies"]["gitea"] == "not configured"
@@ -371,8 +375,10 @@ class TestPROperations:
mock_provider = AsyncMock()
mock_provider.parse_repo_url = MagicMock(return_value=("owner", "repo"))
with patch("server._workspace_manager", mock_manager), \
patch("server._get_provider_for_url", return_value=mock_provider):
with (
patch("server._workspace_manager", mock_manager),
patch("server._get_provider_for_url", return_value=mock_provider),
):
result = await list_pull_requests.fn(
project_id="test-project",
agent_id="agent-1",
@@ -401,8 +407,10 @@ class TestPROperations:
mock_provider = AsyncMock()
mock_provider.parse_repo_url = MagicMock(return_value=("owner", "repo"))
with patch("server._workspace_manager", mock_manager), \
patch("server._get_provider_for_url", return_value=mock_provider):
with (
patch("server._workspace_manager", mock_manager),
patch("server._get_provider_for_url", return_value=mock_provider),
):
result = await merge_pull_request.fn(
project_id="test-project",
agent_id="agent-1",

File diff suppressed because it is too large Load Diff

View File

@@ -25,7 +25,9 @@ class TestWorkspaceManager:
assert Path(workspace.path).exists()
@pytest.mark.asyncio
async def test_create_workspace_with_repo_url(self, workspace_manager, valid_project_id, sample_repo_url):
async def test_create_workspace_with_repo_url(
self, workspace_manager, valid_project_id, sample_repo_url
):
"""Test creating workspace with repository URL."""
workspace = await workspace_manager.create_workspace(
valid_project_id, repo_url=sample_repo_url
@@ -92,7 +94,9 @@ class TestWorkspaceLocking:
"""Tests for workspace locking."""
@pytest.mark.asyncio
async def test_lock_workspace(self, workspace_manager, valid_project_id, valid_agent_id):
async def test_lock_workspace(
self, workspace_manager, valid_project_id, valid_agent_id
):
"""Test locking a workspace."""
await workspace_manager.create_workspace(valid_project_id)
@@ -113,13 +117,19 @@ class TestWorkspaceLocking:
await workspace_manager.lock_workspace(valid_project_id, "agent-1", timeout=60)
with pytest.raises(WorkspaceLockedError):
await workspace_manager.lock_workspace(valid_project_id, "agent-2", timeout=60)
await workspace_manager.lock_workspace(
valid_project_id, "agent-2", timeout=60
)
@pytest.mark.asyncio
async def test_lock_same_holder(self, workspace_manager, valid_project_id, valid_agent_id):
async def test_lock_same_holder(
self, workspace_manager, valid_project_id, valid_agent_id
):
"""Test re-locking by same holder extends lock."""
await workspace_manager.create_workspace(valid_project_id)
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id, timeout=60)
await workspace_manager.lock_workspace(
valid_project_id, valid_agent_id, timeout=60
)
# Same holder can re-lock
result = await workspace_manager.lock_workspace(
@@ -129,12 +139,16 @@ class TestWorkspaceLocking:
assert result is True
@pytest.mark.asyncio
async def test_unlock_workspace(self, workspace_manager, valid_project_id, valid_agent_id):
async def test_unlock_workspace(
self, workspace_manager, valid_project_id, valid_agent_id
):
"""Test unlocking a workspace."""
await workspace_manager.create_workspace(valid_project_id)
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id)
result = await workspace_manager.unlock_workspace(valid_project_id, valid_agent_id)
result = await workspace_manager.unlock_workspace(
valid_project_id, valid_agent_id
)
assert result is True
workspace = await workspace_manager.get_workspace(valid_project_id)
@@ -173,7 +187,9 @@ class TestWorkspaceLockContextManager:
"""Tests for WorkspaceLock context manager."""
@pytest.mark.asyncio
async def test_lock_context_manager(self, workspace_manager, valid_project_id, valid_agent_id):
async def test_lock_context_manager(
self, workspace_manager, valid_project_id, valid_agent_id
):
"""Test using WorkspaceLock as context manager."""
await workspace_manager.create_workspace(valid_project_id)
@@ -188,7 +204,9 @@ class TestWorkspaceLockContextManager:
assert workspace.lock_holder is None
@pytest.mark.asyncio
async def test_lock_context_manager_error(self, workspace_manager, valid_project_id, valid_agent_id):
async def test_lock_context_manager_error(
self, workspace_manager, valid_project_id, valid_agent_id
):
"""Test WorkspaceLock releases on exception."""
await workspace_manager.create_workspace(valid_project_id)
@@ -223,7 +241,9 @@ class TestWorkspaceMetadata:
"""Test updating workspace branch."""
await workspace_manager.create_workspace(valid_project_id)
await workspace_manager.update_workspace_branch(valid_project_id, "feature-branch")
await workspace_manager.update_workspace_branch(
valid_project_id, "feature-branch"
)
workspace = await workspace_manager.get_workspace(valid_project_id)
assert workspace.current_branch == "feature-branch"
@@ -316,7 +336,9 @@ class TestWorkspaceCleanup:
assert cleaned >= 1
@pytest.mark.asyncio
async def test_delete_locked_workspace_blocked(self, workspace_manager, valid_project_id, valid_agent_id):
async def test_delete_locked_workspace_blocked(
self, workspace_manager, valid_project_id, valid_agent_id
):
"""Test deleting locked workspace is blocked without force."""
await workspace_manager.create_workspace(valid_project_id)
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id)
@@ -325,7 +347,9 @@ class TestWorkspaceCleanup:
await workspace_manager.delete_workspace(valid_project_id)
@pytest.mark.asyncio
async def test_delete_locked_workspace_force(self, workspace_manager, valid_project_id, valid_agent_id):
async def test_delete_locked_workspace_force(
self, workspace_manager, valid_project_id, valid_agent_id
):
"""Test force deleting locked workspace."""
await workspace_manager.create_workspace(valid_project_id)
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id)