"""
Tests for hybrid orchestration tools (Issue #10).
"""
import pytest
import json
from pathlib import Path
# Import functions directly from where they're defined, before MCP decoration
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
# Import the functions themselves by introspecting the FunctionTool objects
from src.amicus import server
# Get the actual callable functions from the FunctionTool wrappers
select_execution_strategy = server.select_execution_strategy.fn
spawn_worker_instance = server.spawn_worker_instance.fn
class TestSelectExecutionStrategy:
"""Tests for select_execution_strategy tool."""
def test_quick_high_context_task_uses_subagent(self):
"""Quick tasks with high context dependency should use subagents."""
result = select_execution_strategy(
"Quick research on async Python",
estimated_duration_minutes=3,
context_dependency="high",
complexity="low"
)
assert result["strategy"] == "subagent"
assert result["model"] == "claude-haiku-4.5"
assert "shared context" in result["rationale"].lower()
assert result["estimated_duration_minutes"] == 3
assert "estimated_cost" in result
assert "alternatives" in result
def test_long_medium_complexity_uses_copilot(self):
"""Long-running medium complexity tasks should use Copilot CLI."""
result = select_execution_strategy(
"Implement user authentication system with JWT tokens",
estimated_duration_minutes=30,
context_dependency="low",
complexity="medium"
)
assert result["strategy"] == "copilot"
assert result["model"] == "claude-sonnet-4.5"
assert "cost-effective" in result["rationale"].lower()
assert result["estimated_duration_minutes"] == 30
def test_high_complexity_uses_claude(self):
"""High complexity tasks should use Claude CLI regardless of duration."""
result = select_execution_strategy(
"Design distributed consensus algorithm",
estimated_duration_minutes=15,
context_dependency="medium",
complexity="high"
)
assert result["strategy"] == "claude"
assert result["model"] == "claude-opus-4.5"
assert "high-capability" in result["rationale"].lower() or "complex" in result["rationale"].lower()
def test_auto_estimates_duration_from_keywords(self):
"""Should estimate duration from task description keywords."""
quick_result = select_execution_strategy(
"Quick fix for typo in documentation"
)
assert quick_result["estimated_duration_minutes"] == 3
impl_result = select_execution_strategy(
"Implement new feature for user profiles"
)
assert impl_result["estimated_duration_minutes"] == 30
default_result = select_execution_strategy(
"Update some configurations"
)
assert default_result["estimated_duration_minutes"] == 15
def test_provides_alternatives_with_costs(self):
"""Should provide alternative strategies with cost comparisons."""
result = select_execution_strategy(
"Medium-length implementation task",
estimated_duration_minutes=20,
complexity="medium"
)
assert len(result["alternatives"]) >= 2
for alt in result["alternatives"]:
assert "strategy" in alt
assert "cost" in alt
assert alt["strategy"] != result["strategy"]
# Alternatives should be sorted by cost
costs = [alt["cost"] for alt in result["alternatives"]]
assert costs == sorted(costs)
class TestSpawnWorkerInstance:
"""Tests for spawn_worker_instance tool."""
def test_invalid_client_returns_error(self):
"""Should reject invalid client types."""
result = spawn_worker_instance(
client="invalid-client",
task_id="task-123"
)
assert "error" in result
assert "invalid" in result["error"].lower()
assert "copilot" in result["error"]
assert "claude" in result["error"]
def test_generates_unique_node_ids(self):
"""Should generate unique node IDs for spawned workers."""
# Note: Can't test actual spawning without mocking subprocess,
# so we just verify the error handling works
result1 = spawn_worker_instance(
client="copilot",
task_id="task-123"
)
result2 = spawn_worker_instance(
client="claude",
task_id="task-456"
)
# Both should either spawn successfully or fail with FileNotFoundError
# (depending on whether gh copilot/claude CLI are installed)
assert "error" in result1 or "node_id" in result1
assert "error" in result2 or "node_id" in result2
# If both succeeded, node IDs should be different
if "node_id" in result1 and "node_id" in result2:
assert result1["node_id"] != result2["node_id"]
def test_copilot_command_format(self):
"""Should format copilot CLI command correctly."""
result = spawn_worker_instance(
client="copilot",
task_id="task-789"
)
# Either spawned successfully or got FileNotFoundError
if "error" in result:
assert "not found" in result["error"].lower() or "failed to spawn" in result["error"].lower()
else:
assert result["client"] == "copilot"
assert "gh" in result["command"]
assert "copilot" in result["command"]
assert result["status"] == "spawned"
assert "process_id" in result
def test_claude_command_with_model_override(self):
"""Should include model override in Claude CLI command."""
result = spawn_worker_instance(
client="claude",
task_id="task-999",
model="claude-opus-4.5"
)
# Either spawned successfully or got FileNotFoundError
if "error" in result:
assert "not found" in result["error"].lower() or "failed to spawn" in result["error"].lower()
else:
assert result["client"] == "claude"
assert "claude" in result["command"]
# Model should be in command if Claude CLI supports it
assert result["status"] == "spawned"
if __name__ == "__main__":
pytest.main([__file__, "-v"])