test_agent_discovery.pyโข14.3 kB
"""Tests for agent discovery module."""
import asyncio
import json
import pytest
from pathlib import Path
import tempfile
from unittest.mock import AsyncMock, MagicMock, patch
from delegation_mcp.agent_discovery import AgentDiscovery, AgentMetadata
@pytest.fixture
def temp_cache_file():
"""Create a temporary cache file with correct structure."""
with tempfile.TemporaryDirectory() as tmpdir:
home = Path(tmpdir)
cache_dir = home / ".cache" / "delegation-mcp"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / "test_cache.json"
yield cache_file
@pytest.fixture
def discovery(temp_cache_file):
"""Create an AgentDiscovery instance with temporary cache."""
# Patch Path.home() to return the temp directory so validation passes
# temp_cache_file is inside temp_home/.cache/delegation-mcp
temp_home = temp_cache_file.parent.parent.parent
with patch("pathlib.Path.home", return_value=temp_home):
# We need to mock the cache_dir construction in __init__ to match
with patch("delegation_mcp.agent_discovery.Path.home", return_value=temp_home):
return AgentDiscovery(cache_file=temp_cache_file)
def test_agent_metadata_creation():
"""Test creating agent metadata."""
metadata = AgentMetadata(
name="claude",
command="claude",
version="1.0.0",
available=True,
path="/usr/local/bin/claude",
capabilities=["reasoning", "code_generation"],
)
assert metadata.name == "claude"
assert metadata.command == "claude"
assert metadata.version == "1.0.0"
assert metadata.available is True
assert metadata.path == "/usr/local/bin/claude"
assert "reasoning" in metadata.capabilities
def test_discovery_initialization(discovery, temp_cache_file):
"""Test AgentDiscovery initialization."""
assert discovery.cache_file == temp_cache_file
assert isinstance(discovery._discovered_agents, dict)
assert len(AgentDiscovery.KNOWN_AGENTS) > 0
def test_cache_save_and_load(discovery, temp_cache_file):
"""Test saving and loading discovery cache."""
# Add some test data
discovery._discovered_agents["test_agent"] = AgentMetadata(
name="test_agent",
command="test",
version="1.0.0",
available=True,
path="/usr/bin/test",
)
# Save cache
discovery._save_cache()
# Verify file exists
assert temp_cache_file.exists()
# Load cache in new instance
# We need to patch here too for the new instance
temp_home = temp_cache_file.parent.parent.parent
with patch("pathlib.Path.home", return_value=temp_home):
with patch("delegation_mcp.agent_discovery.Path.home", return_value=temp_home):
new_discovery = AgentDiscovery(cache_file=temp_cache_file)
assert "test_agent" in new_discovery._discovered_agents
assert new_discovery._discovered_agents["test_agent"].name == "test_agent"
assert new_discovery._discovered_agents["test_agent"].version == "1.0.0"
@pytest.mark.asyncio
async def test_resolve_command_path(discovery):
"""Test resolving command paths."""
# Mock shutil.which to return a path
with patch("shutil.which", return_value="/usr/bin/python3"):
path = discovery._resolve_command_path("python3")
assert path == "/usr/bin/python3"
# Test with command not found
with patch("shutil.which", return_value=None):
path = discovery._resolve_command_path("nonexistent_command")
assert path is None
@pytest.mark.asyncio
async def test_verify_agent_success(discovery):
"""Test successful agent verification."""
# Mock subprocess for successful verification
mock_process = AsyncMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(
return_value=(b"claude version 1.0.0\n", b"")
)
mock_process.stdin = MagicMock() # Mock stdin as non-async
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
available, version, error = await discovery._verify_agent(
"claude", "claude", "--version"
)
assert available is True
assert "1.0.0" in version
assert error is None
@pytest.mark.asyncio
async def test_verify_agent_failure(discovery):
"""Test failed agent verification."""
# Mock subprocess for failed verification
mock_process = AsyncMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(
return_value=(b"", b"command not found\n")
)
mock_process.stdin = MagicMock() # Mock stdin as non-async
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
# First call will fail, second with --help should also fail
available, version, error = await discovery._verify_agent(
"nonexistent", "nonexistent", "--version"
)
# The function retries with --help, so we need to mock that too
assert available is False or error is not None
@pytest.mark.asyncio
async def test_verify_agent_timeout(discovery):
"""Test agent verification timeout."""
# Mock subprocess that times out
mock_process = AsyncMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.stdin = MagicMock() # Mock stdin as non-async
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
available, version, error = await discovery._verify_agent(
"slow_agent", "slow_agent", "--version"
)
assert available is False
assert version is None
assert "timed out" in error.lower()
@pytest.mark.asyncio
async def test_verify_agent_not_found(discovery):
"""Test agent verification when command not found."""
with patch(
"asyncio.create_subprocess_exec",
side_effect=FileNotFoundError("Command not found"),
):
available, version, error = await discovery._verify_agent(
"missing", "missing", "--version"
)
assert available is False
assert version is None
assert "not found" in error.lower()
@pytest.mark.asyncio
async def test_discover_single_agent(discovery):
"""Test discovering a single agent."""
config = {
"command": "python3",
"version_flag": "--version",
"capabilities": ["scripting", "general"],
}
# Mock shutil.which to return a path
with patch("shutil.which", return_value="/usr/bin/python3"):
# Mock verify_agent
with patch.object(
discovery,
"_verify_agent",
return_value=(True, "Python 3.9.0", None),
):
metadata = await discovery._discover_single_agent("python", config)
assert metadata.name == "python"
assert metadata.available is True
assert metadata.version == "Python 3.9.0"
assert metadata.path == "/usr/bin/python3"
assert "scripting" in metadata.capabilities
@pytest.mark.asyncio
async def test_discover_agents_with_cache(discovery, temp_cache_file):
"""Test agent discovery with caching."""
# Add cached data
discovery._discovered_agents["cached_agent"] = AgentMetadata(
name="cached_agent",
command="cached",
version="1.0.0",
available=True,
)
# First call should use cache
result = await discovery.discover_agents(force_refresh=False)
assert "cached_agent" in result
# Force refresh should re-discover
with patch.object(discovery, "_discover_single_agent") as mock_discover:
mock_discover.return_value = AgentMetadata(
name="test",
command="test",
version="2.0.0",
available=True,
)
result = await discovery.discover_agents(force_refresh=True)
# Should have called discover for all known agents
assert mock_discover.called
@pytest.mark.asyncio
async def test_discover_agents_parallel(discovery):
"""Test that agent discovery runs in parallel."""
# Mock _discover_single_agent to track calls
call_count = 0
async def mock_discover(name, config):
nonlocal call_count
call_count += 1
await asyncio.sleep(0.01) # Simulate some work
return AgentMetadata(
name=name,
command=config["command"],
available=False,
error_message="Not found",
)
with patch.object(discovery, "_discover_single_agent", side_effect=mock_discover):
await discovery.discover_agents(force_refresh=True)
# Should have discovered multiple agents
assert call_count == len(AgentDiscovery.KNOWN_AGENTS)
def test_get_available_agents(discovery):
"""Test getting list of available agents."""
discovery._discovered_agents = {
"agent1": AgentMetadata(name="agent1", command="a1", available=True),
"agent2": AgentMetadata(name="agent2", command="a2", available=False),
"agent3": AgentMetadata(name="agent3", command="a3", available=True),
}
available = discovery.get_available_agents()
assert len(available) == 2
assert all(agent.available for agent in available)
def test_get_unavailable_agents(discovery):
"""Test getting list of unavailable agents."""
discovery._discovered_agents = {
"agent1": AgentMetadata(name="agent1", command="a1", available=True),
"agent2": AgentMetadata(name="agent2", command="a2", available=False),
"agent3": AgentMetadata(name="agent3", command="a3", available=False),
}
unavailable = discovery.get_unavailable_agents()
assert len(unavailable) == 2
assert all(not agent.available for agent in unavailable)
def test_is_agent_available(discovery):
"""Test checking if specific agent is available."""
discovery._discovered_agents = {
"claude": AgentMetadata(name="claude", command="claude", available=True),
"gemini": AgentMetadata(name="gemini", command="gemini", available=False),
}
assert discovery.is_agent_available("claude") is True
assert discovery.is_agent_available("gemini") is False
assert discovery.is_agent_available("nonexistent") is False
def test_get_agent_metadata(discovery):
"""Test getting metadata for specific agent."""
metadata = AgentMetadata(
name="claude",
command="claude",
version="1.0.0",
available=True,
)
discovery._discovered_agents["claude"] = metadata
result = discovery.get_agent_metadata("claude")
assert result == metadata
result = discovery.get_agent_metadata("nonexistent")
assert result is None
def test_get_discovery_summary(discovery):
"""Test getting discovery summary."""
discovery._discovered_agents = {
"agent1": AgentMetadata(
name="agent1",
command="a1",
version="1.0.0",
available=True,
path="/usr/bin/a1",
),
"agent2": AgentMetadata(
name="agent2",
command="a2",
available=False,
error_message="Not found",
),
}
summary = discovery.get_discovery_summary()
assert summary["total_agents"] == 2
assert summary["available"] == 1
assert summary["unavailable"] == 1
assert len(summary["available_agents"]) == 1
assert len(summary["unavailable_agents"]) == 1
assert summary["available_agents"][0]["name"] == "agent1"
assert summary["unavailable_agents"][0]["name"] == "agent2"
assert "system_info" in summary
def test_clear_cache(discovery, temp_cache_file):
"""Test clearing the discovery cache."""
# Add some data and save
discovery._discovered_agents["test"] = AgentMetadata(
name="test",
command="test",
available=True,
)
discovery._save_cache()
assert temp_cache_file.exists()
assert len(discovery._discovered_agents) > 0
# Clear cache
discovery.clear_cache()
assert len(discovery._discovered_agents) == 0
assert not temp_cache_file.exists()
def test_get_install_message(discovery):
"""Test getting installation instructions."""
message = discovery._get_install_message("claude")
assert "Claude Code" in message
assert "install" in message.lower()
message = discovery._get_install_message("gemini")
assert "Gemini" in message
assert "npm install" in message.lower()
message = discovery._get_install_message("unknown_agent")
assert "unknown_agent" in message
assert "documentation" in message.lower()
@pytest.mark.asyncio
async def test_discover_agents_convenience_function():
"""Test the convenience function for discovering agents."""
from delegation_mcp.agent_discovery import discover_agents
with patch("delegation_mcp.agent_discovery.AgentDiscovery") as mock_class:
mock_instance = MagicMock()
mock_instance.discover_agents = AsyncMock(return_value={})
mock_class.return_value = mock_instance
result = await discover_agents(force_refresh=True)
mock_instance.discover_agents.assert_called_once_with(force_refresh=True)
@pytest.mark.asyncio
async def test_discover_specific_agents(discovery):
"""Test discovering only specific agents."""
agents_to_check = ["claude", "gemini"]
with patch.object(discovery, "_discover_single_agent") as mock_discover:
mock_discover.return_value = AgentMetadata(
name="test",
command="test",
available=False,
)
await discovery.discover_agents(
force_refresh=True,
agents_to_check=agents_to_check,
)
# Should only call discover for specified agents
assert mock_discover.call_count == len(agents_to_check)
def test_known_agents_structure():
"""Test that KNOWN_AGENTS has expected structure."""
for name, config in AgentDiscovery.KNOWN_AGENTS.items():
assert "command" in config
assert "version_flag" in config
assert "capabilities" in config
assert isinstance(config["capabilities"], list)
assert len(config["capabilities"]) > 0