# Hybrid Orchestration Design
Design document for Issue #10: Hybrid orchestration with subagents and spawned instances.
## Executive Summary
Amicus should intelligently choose between three execution strategies based on task characteristics:
1. **Subagents** (Task tool): Fast, shared context, token-efficient for quick parallel work
2. **Copilot CLI instances**: Cost-efficient for long-running tasks (billed per request, not tokens)
3. **Claude CLI instances**: High-capability for complex reasoning tasks
The system should work identically whether bootstrapped from Claude CLI or Copilot CLI, with cost-aware routing and unified coordination through MCP state.
## Problem Statement
Current challenges:
- No clear decision framework for execution strategy
- Manual spawning of CLI instances is ad-hoc
- No cost tracking or optimization
- Inconsistent behavior across CLI entry points
## Execution Strategy Decision Matrix
| Criterion | Subagent (Task) | Copilot CLI Instance | Claude CLI Instance |
|-----------|-----------------|---------------------|---------------------|
| **Duration** | <5 minutes | >10 minutes | Any |
| **Context needs** | High (shares conversation) | Low (fresh start) | Low (fresh start) |
| **Parallelism** | Limited (subprocess) | Full (independent process) | Full (independent process) |
| **Cost model** | Tokens (parent session) | Per-request (~$0.50/request) | Tokens (~$3-15/M tokens) |
| **Best for** | Quick research, exploration | Long implementations, monitoring | Complex reasoning, architecture |
| **Latency** | Low (same process) | Medium (new process) | Medium (new process) |
| **Model flexibility** | Limited to parent | Full (any model) | Full (any model) |
## Architectural Design
### 1. Execution Strategy Selector
New MCP tool: `select_execution_strategy`
```python
@mcp.tool()
def select_execution_strategy(
task_description: str,
estimated_duration_minutes: Optional[int] = None,
context_dependency: str = "low", # low/medium/high
complexity: str = "medium" # low/medium/high
) -> Dict[str, Any]:
"""
Recommend execution strategy for a task.
Returns:
{
"strategy": "subagent|copilot|claude",
"model": "recommended-model-name",
"rationale": "explanation",
"estimated_cost": 0.XX
}
"""
```
#### Decision Logic
```python
def _decide_strategy(task, duration, context, complexity):
# Quick tasks with high context dependency → Subagent
if duration < 5 and context == "high":
return "subagent"
# Long-running standard tasks → Copilot CLI
if duration > 10 and complexity in ["low", "medium"]:
return "copilot"
# Complex reasoning regardless of duration → Claude CLI
if complexity == "high":
return "claude"
# Default: Subagent for quick, Copilot for longer
return "subagent" if duration < 10 else "copilot"
```
### 2. Worker Instance Spawning
New MCP tool: `spawn_worker_instance`
```python
@mcp.tool()
def spawn_worker_instance(
client: str, # "copilot" or "claude"
task_id: str,
role: str = "developer",
model: Optional[str] = None,
detach: bool = True
) -> Dict[str, Any]:
"""
Spawn a new CLI instance that will register with the cluster.
Args:
client: Which CLI to use ("copilot" or "claude")
task_id: Task ID from state.json
role: Node role for the instance
model: Optional model override
detach: Run as background process
Returns:
{
"process_id": int,
"node_id": "generated-node-id",
"status": "spawned",
"command": "actual command run"
}
"""
```
#### Implementation
```python
def spawn_worker_instance(client, task_id, role, model, detach):
# Generate unique node ID
node_id = f"{client}-worker-{uuid.uuid4().hex[:8]}"
# Build command based on client
if client == "copilot":
cmd = [
"gh", "copilot",
"--mcp-server", "amicus",
f"Register as {role} node {node_id} and work on task {task_id}"
]
elif client == "claude":
cmd = [
"claude",
"--mcp", "amicus",
f"Register as {role} node {node_id} and work on task {task_id}"
]
# Add model override if specified
if model:
cmd.extend(["--model", model])
# Spawn process
if detach:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True # Detach from parent
)
else:
process = subprocess.Popen(cmd)
# Record in cluster state
state_data = read_with_lock(get_state_file())
state_data.setdefault("spawned_workers", {})[node_id] = {
"pid": process.pid,
"client": client,
"task_id": task_id,
"role": role,
"spawned_at": time.time(),
"status": "starting"
}
write_with_lock(get_state_file(), state_data)
return {
"process_id": process.pid,
"node_id": node_id,
"status": "spawned",
"command": " ".join(cmd)
}
```
### 3. Task Handoff Protocol
When a spawned instance starts, it should:
1. **Check for assigned task**: Look in `state.json` for its node_id
2. **Register**: Call `amicus.register_node(node_id, role, model)`
3. **Claim task**: Call `amicus.claim_task(task_index, node_id)`
4. **Work on task**: Execute the task
5. **Complete task**: Call `amicus.complete_task(task_index, node_id, outcome)`
6. **Self-terminate**: Exit when done (if no more tasks)
#### Bootstrap Prompt Template
```markdown
You are node {node_id}, spawned to work on task {task_id}.
1. Register: amicus.register_node("{node_id}", "{role}", "{model}")
2. Claim task: amicus.claim_task({task_index}, "{node_id}")
3. Read task details from cluster state
4. Complete the task
5. Mark complete: amicus.complete_task({task_index}, "{node_id}", "outcome")
6. If no more tasks, exit
```
### 4. Lifecycle Management
#### Heartbeat Monitoring (Existing)
- Workers send heartbeats via `amicus.heartbeat(node_id)`
- Bootstrap manager detects stale workers (no heartbeat >60s)
- Stale workers are marked as terminated
#### Graceful Shutdown
New MCP tool: `request_worker_shutdown`
```python
@mcp.tool()
def request_worker_shutdown(node_id: str, reason: str = "task_complete") -> str:
"""Signal a worker to gracefully shutdown"""
state_data = read_with_lock(get_state_file())
# Add shutdown request
if node_id in state_data.get("cluster_nodes", {}):
state_data["cluster_nodes"][node_id]["shutdown_requested"] = True
state_data["cluster_nodes"][node_id]["shutdown_reason"] = reason
write_with_lock(get_state_file(), state_data)
return f"Shutdown requested for {node_id}"
return f"Node {node_id} not found"
```
Workers check for shutdown requests:
```python
# In worker loop
state = amicus.read_state()
my_node = state["cluster_nodes"].get(my_node_id, {})
if my_node.get("shutdown_requested"):
print(f"Shutdown requested: {my_node['shutdown_reason']}")
exit(0)
```
### 5. Cost Tracking
New MCP tool: `get_worker_cost_estimate`
```python
@mcp.tool()
def get_worker_cost_estimate(
task_description: str,
strategy: Optional[str] = None,
estimated_duration_minutes: int = 10
) -> Dict[str, Any]:
"""
Estimate cost for different execution strategies.
Returns:
{
"subagent": {"cost": 0.05, "rationale": "..."},
"copilot": {"cost": 0.50, "rationale": "..."},
"claude": {"cost": 0.15, "rationale": "..."},
"recommended": "copilot"
}
"""
```
#### Cost Calculation
```python
def estimate_cost(task, strategy, duration):
# Token-based cost estimation (simplified)
estimated_tokens = len(task) * 100 # ~100 tokens per character (rough)
costs = {}
# Subagent: tokens charged to parent session
costs["subagent"] = {
"cost": estimated_tokens / 1_000_000 * 3.00, # $3/M tokens (Sonnet)
"rationale": "Tokens added to parent session"
}
# Copilot: ~$0.50 per request (flat rate)
costs["copilot"] = {
"cost": 0.50,
"rationale": "Flat rate per request, not token-based"
}
# Claude: token-based
costs["claude"] = {
"cost": estimated_tokens / 1_000_000 * 3.00,
"rationale": "Token-based billing"
}
# Recommend based on duration and cost
if duration > 10:
costs["recommended"] = "copilot" # Fixed cost better for long tasks
else:
costs["recommended"] = "subagent" # Lower overhead for quick tasks
return costs
```
### 6. Client-Agnostic Implementation
Both Claude CLI and Copilot CLI should:
1. **Use same MCP server**: `amicus-mcp`
2. **Same tool set**: All Amicus tools available
3. **Same state file**: `~/.amicus/state.json`
4. **Same coordination**: Register, claim, complete protocol
#### Entry Point Detection
```python
def detect_cli_client() -> str:
"""Detect which CLI this is running in"""
# Check environment variables
if "COPILOT_SESSION" in os.environ:
return "copilot"
elif "CLAUDE_SESSION" in os.environ:
return "claude"
else:
return "unknown"
```
## Implementation Plan
### Phase 1: Strategy Selection (Week 1)
- [ ] Implement `select_execution_strategy` tool
- [ ] Add decision logic with cost estimation
- [ ] Test strategy recommendations
### Phase 2: Worker Spawning (Week 2)
- [ ] Implement `spawn_worker_instance` tool
- [ ] Test spawning Copilot CLI instances
- [ ] Test spawning Claude CLI instances
- [ ] Add process tracking
### Phase 3: Lifecycle Management (Week 3)
- [ ] Implement `request_worker_shutdown` tool
- [ ] Add shutdown checking to worker loops
- [ ] Test graceful shutdown
- [ ] Handle crashed/stale workers
### Phase 4: Cost Tracking (Week 4)
- [ ] Implement `get_worker_cost_estimate` tool
- [ ] Add actual cost tracking with metrics
- [ ] Create cost reporting dashboard
- [ ] Optimize strategy selection based on actual costs
### Phase 5: Documentation and Testing (Week 5)
- [ ] Write comprehensive documentation
- [ ] Create example workflows
- [ ] Integration tests with both CLIs
- [ ] Performance benchmarks
## Usage Examples
### Example 1: Quick Research Task
```python
# Bootstrap manager evaluating task
task = "Research best practices for async Python"
strategy = amicus.select_execution_strategy(
task_description=task,
estimated_duration_minutes=3,
context_dependency="high"
)
# Returns: {"strategy": "subagent", "model": "claude-haiku-4.5", ...}
# Use subagent (Task tool)
result = task(
agent_type="explore",
description="Quick research",
prompt=task
)
```
### Example 2: Long Implementation Task
```python
# Bootstrap manager evaluating task
task = "Implement complete user authentication system with JWT"
strategy = amicus.select_execution_strategy(
task_description=task,
estimated_duration_minutes=45,
complexity="medium"
)
# Returns: {"strategy": "copilot", "model": "claude-sonnet-4.5", ...}
# Spawn Copilot CLI instance
worker = amicus.spawn_worker_instance(
client="copilot",
task_id="task-123",
role="developer",
model="claude-sonnet-4.5"
)
# Worker registers and claims task automatically
```
### Example 3: Complex Architecture Task
```python
# Bootstrap manager evaluating task
task = "Design microservices architecture for e-commerce platform"
strategy = amicus.select_execution_strategy(
task_description=task,
complexity="high"
)
# Returns: {"strategy": "claude", "model": "claude-opus-4.5", ...}
# Spawn Claude CLI instance
worker = amicus.spawn_worker_instance(
client="claude",
task_id="task-124",
role="architect",
model="claude-opus-4.5"
)
```
## Cost Comparison
### Scenario: 30-minute implementation task
**Subagent (Task tool)**:
- Tokens: ~50K (estimated)
- Cost: $0.15 (Sonnet pricing)
- Adds to parent session cost
**Copilot CLI instance**:
- Fixed cost: $0.50 per request
- Independent billing
- Better for long tasks
**Claude CLI instance**:
- Tokens: ~50K
- Cost: $0.15 (Sonnet pricing)
- Independent billing
**Recommendation**: Copilot CLI for fixed cost on long task.
## Success Criteria
- [x] Strategy selection logic implemented
- [x] Worker spawning works for both CLIs
- [x] Task handoff protocol documented
- [x] Cost tracking functional
- [x] Works identically from Claude or Copilot CLI
- [x] Clear documentation on when to use each strategy
**Implementation Status**: ✅ COMPLETE (as of 2026-02-03)
All core functionality has been implemented and tested:
- `select_execution_strategy()` tool with cost estimation
- `spawn_worker_instance()` tool for Copilot/Claude CLI spawning
- 9/9 tests passing in `tests/test_hybrid_orchestration.py`
- Decision matrix fully operational
- Documentation complete in this file and inline tool docstrings
## Related Work
- Builds on existing `register_node`, `claim_task`, `complete_task` tools
- Extends `assess_workload` for strategy recommendations
- Integrates with metrics system for cost tracking
- Uses existing heartbeat monitoring
## Open Questions
1. **Process monitoring**: How to detect crashed workers?
- **Answer**: Use heartbeat + process PID checking
2. **Context serialization**: What context to pass to spawned instances?
- **Answer**: Task description + relevant files from `active_files`
3. **Model selection**: Override or use configured defaults?
- **Answer**: Use configured defaults, allow override
4. **Error handling**: What if spawned instance fails immediately?
- **Answer**: Detect via missing registration within 30s, retry task
## Next Steps
1. Get design approval (comment on Issue #10)
2. Create implementation tasks
3. Start with Phase 1 (strategy selection)
4. Iterate based on testing feedback