forked from cardosofelipe/fast-next-template
- Updated line breaks and indentation across test modules to enhance clarity and maintain consistent style. - Applied changes to workspace, provider, server, and GitWrapper-related test cases. No functional changes introduced.
359 lines
13 KiB
Python
359 lines
13 KiB
Python
"""
|
|
Tests for the workspace management module.
|
|
"""
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from exceptions import WorkspaceLockedError, WorkspaceNotFoundError
|
|
from models import WorkspaceState
|
|
from workspace import FileLockManager, WorkspaceLock
|
|
|
|
|
|
class TestWorkspaceManager:
|
|
"""Tests for WorkspaceManager."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_workspace(self, workspace_manager, valid_project_id):
|
|
"""Test creating a new workspace."""
|
|
workspace = await workspace_manager.create_workspace(valid_project_id)
|
|
|
|
assert workspace.project_id == valid_project_id
|
|
assert workspace.state == WorkspaceState.INITIALIZING
|
|
assert Path(workspace.path).exists()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_workspace_with_repo_url(
|
|
self, workspace_manager, valid_project_id, sample_repo_url
|
|
):
|
|
"""Test creating workspace with repository URL."""
|
|
workspace = await workspace_manager.create_workspace(
|
|
valid_project_id, repo_url=sample_repo_url
|
|
)
|
|
|
|
assert workspace.repo_url == sample_repo_url
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_workspace(self, workspace_manager, valid_project_id):
|
|
"""Test getting an existing workspace."""
|
|
# Create first
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
|
|
# Get it
|
|
workspace = await workspace_manager.get_workspace(valid_project_id)
|
|
|
|
assert workspace is not None
|
|
assert workspace.project_id == valid_project_id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_workspace_not_found(self, workspace_manager):
|
|
"""Test getting non-existent workspace."""
|
|
workspace = await workspace_manager.get_workspace("nonexistent")
|
|
assert workspace is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_workspace(self, workspace_manager, valid_project_id):
|
|
"""Test deleting a workspace."""
|
|
# Create first
|
|
workspace = await workspace_manager.create_workspace(valid_project_id)
|
|
workspace_path = Path(workspace.path)
|
|
assert workspace_path.exists()
|
|
|
|
# Delete
|
|
result = await workspace_manager.delete_workspace(valid_project_id)
|
|
|
|
assert result is True
|
|
assert not workspace_path.exists()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_nonexistent_workspace(self, workspace_manager):
|
|
"""Test deleting non-existent workspace returns True."""
|
|
result = await workspace_manager.delete_workspace("nonexistent")
|
|
assert result is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_workspaces(self, workspace_manager):
|
|
"""Test listing workspaces."""
|
|
# Create multiple workspaces
|
|
await workspace_manager.create_workspace("project-1")
|
|
await workspace_manager.create_workspace("project-2")
|
|
await workspace_manager.create_workspace("project-3")
|
|
|
|
workspaces = await workspace_manager.list_workspaces()
|
|
|
|
assert len(workspaces) >= 3
|
|
project_ids = [w.project_id for w in workspaces]
|
|
assert "project-1" in project_ids
|
|
assert "project-2" in project_ids
|
|
assert "project-3" in project_ids
|
|
|
|
|
|
class TestWorkspaceLocking:
|
|
"""Tests for workspace locking."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lock_workspace(
|
|
self, workspace_manager, valid_project_id, valid_agent_id
|
|
):
|
|
"""Test locking a workspace."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
|
|
result = await workspace_manager.lock_workspace(
|
|
valid_project_id, valid_agent_id, timeout=60
|
|
)
|
|
|
|
assert result is True
|
|
|
|
workspace = await workspace_manager.get_workspace(valid_project_id)
|
|
assert workspace.state == WorkspaceState.LOCKED
|
|
assert workspace.lock_holder == valid_agent_id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lock_already_locked(self, workspace_manager, valid_project_id):
|
|
"""Test locking already-locked workspace by different holder."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
await workspace_manager.lock_workspace(valid_project_id, "agent-1", timeout=60)
|
|
|
|
with pytest.raises(WorkspaceLockedError):
|
|
await workspace_manager.lock_workspace(
|
|
valid_project_id, "agent-2", timeout=60
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lock_same_holder(
|
|
self, workspace_manager, valid_project_id, valid_agent_id
|
|
):
|
|
"""Test re-locking by same holder extends lock."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
await workspace_manager.lock_workspace(
|
|
valid_project_id, valid_agent_id, timeout=60
|
|
)
|
|
|
|
# Same holder can re-lock
|
|
result = await workspace_manager.lock_workspace(
|
|
valid_project_id, valid_agent_id, timeout=120
|
|
)
|
|
|
|
assert result is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unlock_workspace(
|
|
self, workspace_manager, valid_project_id, valid_agent_id
|
|
):
|
|
"""Test unlocking a workspace."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id)
|
|
|
|
result = await workspace_manager.unlock_workspace(
|
|
valid_project_id, valid_agent_id
|
|
)
|
|
|
|
assert result is True
|
|
workspace = await workspace_manager.get_workspace(valid_project_id)
|
|
assert workspace.state == WorkspaceState.READY
|
|
assert workspace.lock_holder is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unlock_wrong_holder(self, workspace_manager, valid_project_id):
|
|
"""Test unlock fails with wrong holder."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
await workspace_manager.lock_workspace(valid_project_id, "agent-1")
|
|
|
|
with pytest.raises(WorkspaceLockedError):
|
|
await workspace_manager.unlock_workspace(valid_project_id, "agent-2")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_force_unlock(self, workspace_manager, valid_project_id):
|
|
"""Test force unlock works regardless of holder."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
await workspace_manager.lock_workspace(valid_project_id, "agent-1")
|
|
|
|
result = await workspace_manager.unlock_workspace(
|
|
valid_project_id, "admin", force=True
|
|
)
|
|
|
|
assert result is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lock_nonexistent_workspace(self, workspace_manager, valid_agent_id):
|
|
"""Test locking non-existent workspace raises error."""
|
|
with pytest.raises(WorkspaceNotFoundError):
|
|
await workspace_manager.lock_workspace("nonexistent", valid_agent_id)
|
|
|
|
|
|
class TestWorkspaceLockContextManager:
|
|
"""Tests for WorkspaceLock context manager."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lock_context_manager(
|
|
self, workspace_manager, valid_project_id, valid_agent_id
|
|
):
|
|
"""Test using WorkspaceLock as context manager."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
|
|
async with WorkspaceLock(
|
|
workspace_manager, valid_project_id, valid_agent_id
|
|
) as lock:
|
|
workspace = await workspace_manager.get_workspace(valid_project_id)
|
|
assert workspace.state == WorkspaceState.LOCKED
|
|
|
|
# After exiting context, should be unlocked
|
|
workspace = await workspace_manager.get_workspace(valid_project_id)
|
|
assert workspace.lock_holder is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lock_context_manager_error(
|
|
self, workspace_manager, valid_project_id, valid_agent_id
|
|
):
|
|
"""Test WorkspaceLock releases on exception."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
|
|
try:
|
|
async with WorkspaceLock(
|
|
workspace_manager, valid_project_id, valid_agent_id
|
|
):
|
|
raise ValueError("Test error")
|
|
except ValueError:
|
|
pass
|
|
|
|
workspace = await workspace_manager.get_workspace(valid_project_id)
|
|
assert workspace.lock_holder is None
|
|
|
|
|
|
class TestWorkspaceMetadata:
|
|
"""Tests for workspace metadata operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_touch_workspace(self, workspace_manager, valid_project_id):
|
|
"""Test updating workspace access time."""
|
|
workspace = await workspace_manager.create_workspace(valid_project_id)
|
|
original_time = workspace.last_accessed
|
|
|
|
await workspace_manager.touch_workspace(valid_project_id)
|
|
|
|
updated = await workspace_manager.get_workspace(valid_project_id)
|
|
assert updated.last_accessed >= original_time
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_workspace_branch(self, workspace_manager, valid_project_id):
|
|
"""Test updating workspace branch."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
|
|
await workspace_manager.update_workspace_branch(
|
|
valid_project_id, "feature-branch"
|
|
)
|
|
|
|
workspace = await workspace_manager.get_workspace(valid_project_id)
|
|
assert workspace.current_branch == "feature-branch"
|
|
|
|
|
|
class TestWorkspaceSize:
|
|
"""Tests for workspace size management."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_check_size_within_limit(self, workspace_manager, valid_project_id):
|
|
"""Test size check passes for small workspace."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
|
|
# Should not raise
|
|
result = await workspace_manager.check_size_limit(valid_project_id)
|
|
assert result is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_total_size(self, workspace_manager, valid_project_id):
|
|
"""Test getting total workspace size."""
|
|
workspace = await workspace_manager.create_workspace(valid_project_id)
|
|
|
|
# Add some content
|
|
content_file = Path(workspace.path) / "content.txt"
|
|
content_file.write_text("x" * 1000)
|
|
|
|
total_size = await workspace_manager.get_total_size()
|
|
assert total_size >= 1000
|
|
|
|
|
|
class TestFileLockManager:
|
|
"""Tests for file-based locking."""
|
|
|
|
def test_acquire_lock(self, temp_dir):
|
|
"""Test acquiring a file lock."""
|
|
manager = FileLockManager(temp_dir / "locks")
|
|
|
|
result = manager.acquire("test-key")
|
|
assert result is True
|
|
|
|
# Cleanup
|
|
manager.release("test-key")
|
|
|
|
def test_release_lock(self, temp_dir):
|
|
"""Test releasing a file lock."""
|
|
manager = FileLockManager(temp_dir / "locks")
|
|
manager.acquire("test-key")
|
|
|
|
result = manager.release("test-key")
|
|
assert result is True
|
|
|
|
def test_is_locked(self, temp_dir):
|
|
"""Test checking if locked."""
|
|
manager = FileLockManager(temp_dir / "locks")
|
|
|
|
assert manager.is_locked("test-key") is False
|
|
|
|
manager.acquire("test-key")
|
|
assert manager.is_locked("test-key") is True
|
|
|
|
manager.release("test-key")
|
|
|
|
def test_release_nonexistent_lock(self, temp_dir):
|
|
"""Test releasing a lock that doesn't exist."""
|
|
manager = FileLockManager(temp_dir / "locks")
|
|
|
|
# Should not raise
|
|
result = manager.release("nonexistent")
|
|
assert result is False
|
|
|
|
|
|
class TestWorkspaceCleanup:
|
|
"""Tests for workspace cleanup operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_stale_workspaces(self, workspace_manager, test_settings):
|
|
"""Test cleaning up stale workspaces."""
|
|
# Create workspace
|
|
workspace = await workspace_manager.create_workspace("stale-project")
|
|
|
|
# Manually set it as stale by updating metadata
|
|
await workspace_manager._update_metadata(
|
|
"stale-project",
|
|
last_accessed=(datetime.now(UTC) - timedelta(days=30)).isoformat(),
|
|
)
|
|
|
|
# Run cleanup
|
|
cleaned = await workspace_manager.cleanup_stale_workspaces()
|
|
|
|
assert cleaned >= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_locked_workspace_blocked(
|
|
self, workspace_manager, valid_project_id, valid_agent_id
|
|
):
|
|
"""Test deleting locked workspace is blocked without force."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id)
|
|
|
|
with pytest.raises(WorkspaceLockedError):
|
|
await workspace_manager.delete_workspace(valid_project_id)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_locked_workspace_force(
|
|
self, workspace_manager, valid_project_id, valid_agent_id
|
|
):
|
|
"""Test force deleting locked workspace."""
|
|
await workspace_manager.create_workspace(valid_project_id)
|
|
await workspace_manager.lock_workspace(valid_project_id, valid_agent_id)
|
|
|
|
result = await workspace_manager.delete_workspace(valid_project_id, force=True)
|
|
assert result is True
|