import time
import subprocess
import shlex
import os
import asyncio
from pathlib import Path
from typing import Optional, List, Dict, Any, Union
from fastmcp import FastMCP
from .core import (
get_state_file,
read_with_lock,
write_with_lock,
is_tracking_enabled,
set_tracking_enabled,
get_context_bus_dir
)
from .config import ConfigManager, PathValidator
from .security import SensitiveDataScanner
from .communication import UnifiedCommunicationLayer
# Try to import github, but make it optional
try:
from github import Github
GITHUB_AVAILABLE = True
except ImportError:
GITHUB_AVAILABLE = False
# Initialize the MCP server
mcp = FastMCP("Amicus")
scanner = SensitiveDataScanner()
comm_layer: Optional[UnifiedCommunicationLayer] = None
@mcp.tool()
def register_node(
agent_id: str,
role: str,
model_name: str,
model_strength: Optional[str] = None
) -> str:
"""
Register a node with a specific role and model information.
Enforces max_agents constraint.
Args:
agent_id: Unique identifier for the agent
role: Role of the agent (e.g., bootstrap_manager, architect, developer)
model_name: Name of the model being used
model_strength: Optional manual strength assessment (low, medium, high)
Returns:
Success message or error if at capacity
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {"summary": "Cluster Initialized", "next_steps": [], "active_files": []}
config = ConfigManager()
# Initialize cluster_metadata if not present
if "cluster_metadata" not in state_data:
cluster_settings = config.get("cluster_settings", {})
state_data["cluster_metadata"] = {
"max_agents": cluster_settings.get("max_agents", 4),
"active_agent_count": 0,
"idle_agent_count": 0,
"pending_task_count": 0,
"manager_id": None
}
# Check if at capacity (only count non-terminated nodes)
cluster_nodes = state_data.get("cluster_nodes", {})
active_count = sum(
1 for node in cluster_nodes.values()
if node.get("status") != "terminated"
)
max_agents = state_data["cluster_metadata"]["max_agents"]
if active_count >= max_agents and agent_id not in cluster_nodes:
return f"ERROR: Cluster at capacity ({max_agents} nodes). Registration rejected."
# Assess strength if not provided
if not model_strength:
strengths = config.get("model_strengths", {})
model_strength = strengths.get(model_name, "unknown")
# Register or update node with enhanced schema
now = time.time()
cluster_nodes[agent_id] = {
"role": role,
"model": {
"name": model_name,
"strength": model_strength
},
"last_heartbeat": now,
"status": "working", # Start as working
"current_task_id": None,
"idle_since": None,
"last_activity": now
}
# Update manager_id if this is a bootstrap_manager
if role == "bootstrap_manager":
state_data["cluster_metadata"]["manager_id"] = agent_id
# Automatically enable tracking when manager registers
set_tracking_enabled(True)
state_data["cluster_nodes"] = cluster_nodes
state_data["timestamp"] = now
# Update active count
state_data["cluster_metadata"]["active_agent_count"] = sum(
1 for node in cluster_nodes.values()
if node.get("status") != "terminated"
)
write_with_lock(state_file, state_data)
return f"Node {agent_id} registered as {role} ({model_strength}). Active nodes: {state_data['cluster_metadata']['active_agent_count']}/{max_agents}"
@mcp.tool()
def get_best_model(task_description: str) -> str:
"""
Select the best model for a given task based on keywords in the description.
Args:
task_description: A description of the task.
Returns:
The name of the best model for the task.
"""
config = ConfigManager()
model_selection_config = config.get("model_selection", {})
if not model_selection_config:
return "default_model_not_configured"
default_model = model_selection_config.get("default_model", "default_model_not_found")
keywords = model_selection_config.get("keywords", {})
# Simple keyword matching
for keyword, model_name in keywords.items():
if keyword in task_description.lower():
return model_name
return default_model
@mcp.tool()
async def trigger_update(summary: str) -> str:
"""A test tool to trigger update_state on the server."""
await update_state.fn(summary=summary, next_steps=[], active_files=[])
return "Update triggered."
@mcp.tool()
async def update_state(
summary: str,
next_steps: List[Dict[str, Any]],
active_files: List[str],
ask_user: bool = False,
messages: Optional[List[str]] = None,
model_info: Optional[Dict[str, Any]] = None
) -> str:
"""
Update the context bus state with the current agent's information.
Args:
summary: A summary of what has been done so far
next_steps: A list of task dictionaries. Example: [{"task": "Fix bug", "status": "pending", "assigned_to": "Node-1"}]
active_files: List of files currently being worked on
ask_user: Whether human input is required
messages: Optional list of messages for other agents
model_info: Optional dictionary containing information about the agent's model (e.g., {'name': 'gemini-1.5-flash', 'strength': 'high'})
Returns:
Success message
"""
# Scan for sensitive data in summary and messages
findings = scanner.scan(summary)
if messages:
for msg in messages:
findings.extend(scanner.scan(msg))
if findings:
report = "\n".join([f"- {name}: {snippet}" for name, snippet in findings])
return f"SECURITY WARNING: Sensitive data detected! State NOT updated.\nFindings:\n{report}"
if not is_tracking_enabled():
return "Tracking is disabled. State not updated."
state_file = get_state_file()
# Read existing state to preserve messages and model_info if not provided in this update
existing_state = {}
if state_file.exists():
existing_state = read_with_lock(state_file)
# Merge messages if provided
current_messages = existing_state.get("messages", [])
if messages:
current_messages.extend(messages)
# Keep only last 50 messages
current_messages = current_messages[-50:]
# Validate active_files
config = ConfigManager()
validator = PathValidator(Path(config.get("root_dir", Path.cwd())))
for path in active_files:
if not validator.is_safe(path):
return f"Error: Path '{path}' is outside the project root. State not updated."
now = time.time()
state_data = {
"summary": summary,
"next_steps": next_steps,
"active_files": active_files,
"ask_user": ask_user,
"messages": current_messages,
"timestamp": now,
"last_heartbeat": existing_state.get("last_heartbeat", now),
"model_info": model_info if model_info is not None else existing_state.get("model_info", None)
}
# Preserve existing cluster data
if "cluster_nodes" in existing_state:
state_data["cluster_nodes"] = existing_state["cluster_nodes"]
if "cluster_metadata" in existing_state:
state_data["cluster_metadata"] = existing_state["cluster_metadata"]
if "work_distribution" in existing_state:
state_data["work_distribution"] = existing_state["work_distribution"]
write_with_lock(state_file, state_data)
# Broadcast state change notification
if comm_layer:
await comm_layer.publish(
"state_updated",
{"summary": summary, "timestamp": now}
)
return "State updated successfully."
@mcp.tool()
def add_task(task_description: str, priority: str = "medium") -> str:
"""
Add a new task to the next_steps queue.
Args:
task_description: Description of the task to be done
priority: Priority of the task (low, medium, high)
Returns:
Success message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {
"summary": "Initialized via add_task",
"next_steps": [],
"active_files": [],
"timestamp": time.time()
}
steps = state_data.get("next_steps", [])
if not isinstance(steps, list):
steps = []
steps.append({
"task": task_description,
"status": "pending",
"priority": priority,
"created_at": time.time()
})
state_data["next_steps"] = steps
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"Task '{task_description}' added successfully."
@mcp.tool()
def read_state() -> str:
"""
Read the current state from the context bus.
Returns:
The current state as a formatted string
"""
state_file = get_state_file()
try:
state_data = read_with_lock(state_file)
except FileNotFoundError:
return "No state available yet."
if not state_data:
return "No state available yet."
# Format the state
result = []
result.append("π Context Bus State")
result.append("=" * 50)
if "summary" in state_data:
result.append(f"\n**Summary:**\n{state_data['summary']}")
if "next_steps" in state_data:
result.append(f"\n**Next Steps:**")
steps = state_data['next_steps']
if isinstance(steps, list):
for i, task in enumerate(steps):
if isinstance(task, dict):
task_str = f"{i+1}. {task.get('task', 'No description')}"
status = task.get('status')
if status:
task_str += f" [{status}]"
assigned = task.get('assigned_to')
if assigned:
task_str += f" (assigned to {assigned})"
result.append(task_str)
else:
result.append(f"{i+1}. {str(task)}")
else:
result.append(str(steps))
if "active_files" in state_data and state_data["active_files"]:
result.append(f"\n**Active Files:**")
for file in state_data["active_files"]:
result.append(f" - {file}")
if "messages" in state_data and state_data["messages"]:
result.append(f"\n**Messages:**")
for msg in state_data["messages"]:
result.append(f" - {msg}")
if "timestamp" in state_data:
age = time.time() - state_data["timestamp"]
result.append(f"\n**Last Updated:** {age:.1f} seconds ago")
if "last_heartbeat" in state_data:
hb_age = time.time() - state_data["last_heartbeat"]
status = "π’ ACTIVE" if hb_age < 60 else "π΄ INACTIVE/STALE"
result.append(f"**Node Heartbeat:** {status} ({hb_age:.1f}s ago)")
if "cluster_nodes" in state_data and state_data["cluster_nodes"]:
result.append(f"\n**Active Cluster Swarm:**")
for node_id, info in state_data["cluster_nodes"].items():
hb_age = time.time() - info.get("last_heartbeat", 0)
hb_status = "π’" if hb_age < 60 else "π΄"
agent_status = info.get("status", "unknown")
current_task = info.get("current_task_id", "")
task_info = f" | Task: {current_task}" if current_task else ""
result.append(f" {hb_status} {node_id}: {info['role']} [{agent_status}] ({info['model']['name']} - {info['model']['strength']}){task_info}")
if "cluster_metadata" in state_data:
metadata = state_data["cluster_metadata"]
result.append(f"\n**Cluster Metadata:**")
result.append(f" Active Nodes: {metadata.get('active_agent_count', 0)}/{metadata.get('max_agents', 4)}")
result.append(f" Idle Nodes: {metadata.get('idle_agent_count', 0)}")
result.append(f" Pending Tasks: {metadata.get('pending_task_count', 0)}")
if metadata.get('manager_id'):
result.append(f" Manager: {metadata['manager_id']}")
if "work_distribution" in state_data:
work_dist = state_data["work_distribution"]
result.append(f"\n**Work Distribution:**")
result.append(f" Status: {work_dist.get('workload_status', 'unknown').upper()}")
result.append(f" Recommendation: {work_dist.get('spawn_recommendation', 'none')}")
last_assess = work_dist.get('last_assessment', 0)
if last_assess:
assess_age = time.time() - last_assess
result.append(f" Last Assessment: {assess_age:.1f}s ago")
if "model_info" in state_data and state_data["model_info"]:
model_name = state_data["model_info"].get("name", "Unknown Model")
model_strength = state_data["model_info"].get("strength", "N/A")
result.append(f"\n**Node Model:** {model_name} (Strength: {model_strength})")
# Display code audit summary
if "code_audits" in state_data and state_data["code_audits"]:
audits = state_data["code_audits"]
pending = sum(1 for a in audits.values() if a.get("status") == "pending")
approved = sum(1 for a in audits.values() if a.get("status") == "approved")
rejected = sum(1 for a in audits.values() if a.get("status") == "rejected")
result.append(f"\n**Code Audits:**")
result.append(f" β³ Pending: {pending}")
result.append(f" β
Approved: {approved}")
result.append(f" β Rejected: {rejected}")
# Show pending audits requiring review
if pending > 0:
result.append(f"\n π Pending Review:")
for audit_id, audit in audits.items():
if audit.get("status") == "pending":
files = ", ".join(audit.get("files", [])[:2])
if len(audit.get("files", [])) > 2:
files += f" +{len(audit.get('files', [])) - 2}"
result.append(f" - {audit_id}: {audit.get('requesting_agent', '?')} β {files}")
# Display file intents summary (Issue #7: File Coordination)
if "file_intents" in state_data and state_data["file_intents"]:
intents = state_data["file_intents"]
now = time.time()
# Filter to active (non-expired) intents
active_intents = {
k: v for k, v in intents.items()
if v.get("expires_at", 0) > now
}
if active_intents:
result.append(f"\n**File Intents (Active):**")
# Group by agent
by_agent: Dict[str, List[str]] = {}
for intent_id, intent in active_intents.items():
agent = intent.get("agent_id", "Unknown")
if agent not in by_agent:
by_agent[agent] = []
files_list = intent.get("files", [])
intent_type = intent.get("intent_type", "write")
ttl = int(intent.get("expires_at", 0) - now)
by_agent[agent].append(f"{', '.join(files_list[:2])} [{intent_type}] ({ttl}s)")
for agent, file_infos in by_agent.items():
result.append(f" π {agent}:")
for info in file_infos:
result.append(f" - {info}")
# Check for conflicts
all_files: Dict[str, List[str]] = {} # file -> list of agents
for intent in active_intents.values():
if intent.get("intent_type") == "write":
for f in intent.get("files", []):
if f not in all_files:
all_files[f] = []
all_files[f].append(intent.get("agent_id", "?"))
conflicts = {f: agents for f, agents in all_files.items() if len(agents) > 1}
if conflicts:
result.append(f"\n β οΈ FILE CONFLICTS:")
for f, agents in conflicts.items():
result.append(f" - {f}: {', '.join(agents)}")
# Critical: Add the elicitation pattern if ask_user is True
if state_data.get("ask_user", False):
result.append("\nπ¨ PREVIOUS AGENT REQUESTED HUMAN INPUT.")
return "\n".join(result)
@mcp.tool()
def broadcast_message(message: str) -> str:
"""
Send a message to all other agents via the context bus.
Args:
message: The message to broadcast
Returns:
Success message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
messages = state_data.get("messages", [])
messages.append(f"[{time.strftime('%H:%M:%S')}] {message}")
state_data["messages"] = messages[-50:]
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return "Message broadcast successfully."
@mcp.tool()
def claim_task(task_index: int, agent_id: str) -> str:
"""
Claim a task from the next_steps queue.
Args:
task_index: The 1-based index of the task to claim (as shown in read_state)
agent_id: The ID of the agent claiming the task
Returns:
Success or error message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
steps = state_data.get("next_steps", [])
if not isinstance(steps, list) or task_index < 1 or task_index > len(steps):
return f"Error: Invalid task index {task_index}."
task = steps[task_index - 1]
if not isinstance(task, dict):
# Convert string task to dict if necessary
task = {"task": str(task), "status": "pending"}
steps[task_index - 1] = task
if task.get("status") == "in_progress" and task.get("assigned_to") != agent_id:
return f"Error: Task is already claimed by {task.get('assigned_to')}."
task["status"] = "in_progress"
task["assigned_to"] = agent_id
task["claimed_at"] = time.time()
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"Task {task_index} claimed by {agent_id}."
@mcp.tool()
def complete_task(task_index: int, agent_id: str, outcome: str = "Success") -> str:
"""
Mark a task as completed.
Args:
task_index: The 1-based index of the task to complete
agent_id: The ID of the agent completing the task
outcome: Brief description of the result
Returns:
Success or error message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
steps = state_data.get("next_steps", [])
if not isinstance(steps, list) or task_index < 1 or task_index > len(steps):
return f"Error: Invalid task index {task_index}."
task = steps[task_index - 1]
if not isinstance(task, dict):
task = {"task": str(task), "status": "completed"}
steps[task_index - 1] = task
task["status"] = "completed"
task["completed_by"] = agent_id
task["outcome"] = outcome
task["completed_at"] = time.time()
# Optionally move to a history or just leave it marked as completed
# For now, we'll leave it in next_steps but it will be filtered/shown as done
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"Task {task_index} marked as completed by {agent_id}."
@mcp.tool()
def is_safe_path(path: str) -> bool:
"""
Check if a path is safe for operations (within project root).
Args:
path: The path to check
Returns:
True if safe, False otherwise
"""
config = ConfigManager()
validator = PathValidator(Path(config.get("root_dir", Path.cwd())))
return validator.is_safe(path)
@mcp.tool()
def execute_safe_command(command: str) -> str:
"""
Execute a shell command if it is in the whitelist.
Args:
command: The command to execute
Returns:
Command output or error message
"""
config = ConfigManager()
whitelist = config.get("command_whitelist", [])
try:
args = shlex.split(command)
except ValueError as e:
return f"Error parsing command: {str(e)}"
if not args:
return "Error: Empty command."
# Check if the program (first arg) is whitelisted
prog = args[0]
if prog not in whitelist:
return f"Error: Command '{prog}' is not in the whitelist."
try:
result = subprocess.run(
args,
shell=False, # Secure execution
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return result.stdout
else:
return f"Command failed with exit code {result.returncode}:\n{result.stderr}"
except Exception as e:
return f"Error executing command: {str(e)}"
@mcp.tool()
def reset_context(
preserve_cluster_config: bool = False,
preserve_messages: bool = False,
confirm: bool = False
) -> str:
"""
Reset the shared context to a clean state.
Args:
preserve_cluster_config: Keep max_agents and other cluster settings
preserve_messages: Keep message history
confirm: Must be True to execute (safety check)
Returns:
Success message or error if confirm=False
"""
if not confirm:
return "ERROR: This will clear all shared context. Call with confirm=True to proceed."
state_file = get_state_file()
state_data = read_with_lock(state_file)
# Preserve selected data if requested
preserved_metadata = {}
preserved_messages = []
if preserve_cluster_config and "cluster_metadata" in state_data:
preserved_metadata = {
"max_agents": state_data["cluster_metadata"].get("max_agents"),
"active_agent_count": 0, # Reset count
}
if preserve_messages and "messages" in state_data:
preserved_messages = state_data["messages"]
# Create fresh state
new_state = {
"summary": "",
"next_steps": [],
"active_files": [],
"messages": preserved_messages,
"last_heartbeat": time.time(),
"cluster_nodes": {},
"cluster_metadata": preserved_metadata if preserved_metadata else {
"max_agents": 5,
"active_agent_count": 0
},
"work_distribution": {},
"code_audits": {},
"file_intents": {}
}
write_with_lock(state_file, new_state)
preserved_items = []
if preserve_cluster_config:
preserved_items.append("cluster_config")
if preserve_messages:
preserved_items.append("messages")
preserved_note = f" (preserved: {', '.join(preserved_items)})" if preserved_items else ""
return f"β
Context reset. All state cleared{preserved_note}."
@mcp.tool()
def heartbeat(agent_id: Optional[str] = None) -> str:
"""
Signal that the current agent is still active.
Args:
agent_id: Optional ID of the agent to update specific cluster node status.
Returns:
Success message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "No state available to heartbeat."
now = time.time()
state_data["last_heartbeat"] = now
if agent_id:
cluster_nodes = state_data.get("cluster_nodes", {})
if agent_id in cluster_nodes:
cluster_nodes[agent_id]["last_heartbeat"] = now
state_data["cluster_nodes"] = cluster_nodes
write_with_lock(state_file, state_data)
return "Heartbeat received."
@mcp.tool()
def toggle_tracking(enabled: bool) -> str:
"""
Enable or disable state tracking.
Args:
enabled: Whether tracking should be enabled
Returns:
Success message
"""
set_tracking_enabled(enabled)
status = "enabled" if enabled else "disabled"
return f"Tracking {status}."
@mcp.tool()
def set_agent_status(
agent_id: str,
status: str,
current_task_id: Optional[str] = None
) -> str:
"""
Update an agent's status and current task.
Args:
agent_id: The agent's unique identifier
status: One of: working, idle, waiting, terminated
current_task_id: Optional task ID the agent is working on (null if idle)
Returns:
Success message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
cluster_nodes = state_data.get("cluster_nodes", {})
if agent_id not in cluster_nodes:
return f"ERROR: Agent {agent_id} not registered."
now = time.time()
node = cluster_nodes[agent_id]
# Track status transitions
old_status = node.get("status", "unknown")
node["status"] = status
node["current_task_id"] = current_task_id
node["last_activity"] = now
# Set idle_since timestamp when transitioning to idle
if status == "idle" and old_status != "idle":
node["idle_since"] = now
elif status != "idle":
node["idle_since"] = None
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
return f"Agent {agent_id} status updated to {status}."
@mcp.tool()
def claim_best_task(agent_id: str, role: str) -> str:
"""
Intelligently claim the best available task for this agent.
Scores tasks by priority, role match, and age.
Args:
agent_id: The agent's unique identifier
role: The agent's role (architect, developer, bootstrap_manager)
Returns:
Task description and index, or message if no tasks available
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
steps = state_data.get("next_steps", [])
if not isinstance(steps, list):
return "No tasks available."
# Filter pending tasks
pending_tasks = []
for idx, task in enumerate(steps):
if isinstance(task, dict) and task.get("status") == "pending":
pending_tasks.append((idx, task))
if not pending_tasks:
return "No pending tasks available."
# Score tasks
def score_task(idx_task_tuple):
idx, task = idx_task_tuple
score = 0
# Priority scoring (high=30, medium=20, low=10)
priority = task.get("priority", "medium")
if priority == "high":
score += 30
elif priority == "medium":
score += 20
else:
score += 10
# Role match bonus (+15 points)
task_desc = task.get("task", "").lower()
if role == "architect" and ("design" in task_desc or "architect" in task_desc or "plan" in task_desc):
score += 15
elif role == "developer" and ("implement" in task_desc or "code" in task_desc or "test" in task_desc):
score += 15
# Age bonus (older tasks get priority, +1 per 60 seconds)
created_at = task.get("created_at", time.time())
age_minutes = (time.time() - created_at) / 60
score += age_minutes
return score
# Find best task
best_idx, best_task = max(pending_tasks, key=score_task)
# Claim it
best_task["status"] = "in_progress"
best_task["assigned_to"] = agent_id
best_task["claimed_at"] = time.time()
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
task_description = best_task.get("task", "No description")
return f"Claimed task {best_idx + 1}: {task_description}"
@mcp.tool()
def assess_workload() -> str:
"""
Assess current cluster workload and generate spawn/terminate recommendations.
Called by Bootstrap Manager every 20-30 seconds.
Returns:
Workload assessment summary with recommendations
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
cluster_nodes = state_data.get("cluster_nodes", {})
cluster_metadata = state_data.get("cluster_metadata", {})
steps = state_data.get("next_steps", [])
# Count metrics
active_agents = sum(
1 for node in cluster_nodes.values()
if node.get("status") not in ["terminated", "idle"]
)
idle_agents = sum(
1 for node in cluster_nodes.values()
if node.get("status") == "idle"
)
pending_tasks = sum(
1 for task in steps
if isinstance(task, dict) and task.get("status") == "pending"
)
in_progress_tasks = sum(
1 for task in steps
if isinstance(task, dict) and task.get("status") == "in_progress"
)
max_agents = cluster_metadata.get("max_agents", 4)
total_active = active_agents + idle_agents
# Determine workload status
if pending_tasks == 0 and in_progress_tasks == 0:
workload_status = "idle"
spawn_recommendation = "terminate_idle"
elif pending_tasks >= 3 and total_active < max_agents:
workload_status = "overloaded"
spawn_recommendation = "spawn_developer"
elif pending_tasks > 0 and active_agents == 0 and total_active < max_agents:
workload_status = "overloaded"
spawn_recommendation = "spawn_developer"
elif pending_tasks <= 1 and total_active > 1 and idle_agents > 0:
workload_status = "underutilized"
spawn_recommendation = "terminate_idle"
else:
workload_status = "balanced"
spawn_recommendation = "maintain"
# Update state with assessment
if "work_distribution" not in state_data:
state_data["work_distribution"] = {}
state_data["work_distribution"].update({
"last_assessment": time.time(),
"workload_status": workload_status,
"spawn_recommendation": spawn_recommendation
})
# Update cluster metadata
cluster_metadata.update({
"active_agent_count": total_active,
"idle_agent_count": idle_agents,
"pending_task_count": pending_tasks
})
state_data["cluster_metadata"] = cluster_metadata
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
# Format response
result = []
result.append("π Workload Assessment")
result.append("=" * 40)
result.append(f"Active Agents: {active_agents}/{max_agents}")
result.append(f"Idle Agents: {idle_agents}")
result.append(f"Pending Tasks: {pending_tasks}")
result.append(f"In Progress: {in_progress_tasks}")
result.append(f"Status: {workload_status.upper()}")
result.append(f"Recommendation: {spawn_recommendation}")
return "\n".join(result)
@mcp.tool()
def create_github_issue(
repo_owner: str,
repo_name: str,
title: str,
body: str,
labels: Optional[List[str]] = None,
assign_to_user: bool = False
) -> str:
"""
Create a new GitHub issue in the specified repository.
Args:
repo_owner: GitHub username or organization that owns the repository
repo_name: Name of the repository
title: Title of the issue
body: Body/description of the issue
labels: Optional list of label names to apply to the issue
assign_to_user: Whether to assign the issue to the authenticated user
Returns:
Success message with issue URL or error message
"""
if not GITHUB_AVAILABLE:
return "ERROR: PyGithub library not installed. Install with: pip install PyGithub"
# Get GitHub token from environment
github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
return "ERROR: GITHUB_TOKEN environment variable not set. Cannot create GitHub issue."
try:
# Initialize GitHub client
g = Github(github_token)
repo = g.get_repo(f"{repo_owner}/{repo_name}")
# Create the issue
issue = repo.create_issue(
title=title,
body=body,
labels=labels or []
)
# Assign to user if requested
if assign_to_user:
user = g.get_user()
issue.add_to_assignees(user)
return f"GitHub issue created successfully: {issue.html_url} (#{issue.number})"
except Exception as e:
return f"ERROR creating GitHub issue: {str(e)}"
@mcp.tool()
def list_github_issues(
repo_owner: str,
repo_name: str,
state: str = "open",
labels: Optional[List[str]] = None,
limit: int = 10
) -> str:
"""
List GitHub issues from the specified repository.
Args:
repo_owner: GitHub username or organization that owns the repository
repo_name: Name of the repository
state: Issue state to filter by: 'open', 'closed', or 'all'
labels: Optional list of label names to filter by
limit: Maximum number of issues to return (default: 10)
Returns:
Formatted list of issues or error message
"""
if not GITHUB_AVAILABLE:
return "ERROR: PyGithub library not installed. Install with: pip install PyGithub"
# Get GitHub token from environment
github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
return "ERROR: GITHUB_TOKEN environment variable not set. Cannot list GitHub issues."
try:
# Initialize GitHub client
g = Github(github_token)
repo = g.get_repo(f"{repo_owner}/{repo_name}")
# Get issues
issues = repo.get_issues(state=state, labels=labels or [])
# Format results
result = []
result.append(f"π GitHub Issues for {repo_owner}/{repo_name} (state: {state})")
result.append("=" * 60)
count = 0
for issue in issues:
if count >= limit:
break
if issue.pull_request: # Skip PRs
continue
result.append(f"\n#{issue.number}: {issue.title}")
result.append(f" State: {issue.state}")
result.append(f" URL: {issue.html_url}")
if issue.labels:
labels_str = ", ".join([label.name for label in issue.labels])
result.append(f" Labels: {labels_str}")
if issue.assignee:
result.append(f" Assignee: {issue.assignee.login}")
result.append(f" Created: {issue.created_at}")
result.append(f" Updated: {issue.updated_at}")
count += 1
if count == 0:
result.append("\nNo issues found.")
else:
result.append(f"\nShowing {count} of {issues.totalCount} total issues.")
return "\n".join(result)
except Exception as e:
return f"ERROR listing GitHub issues: {str(e)}"
@mcp.tool()
def import_github_issue_to_task(
repo_owner: str,
repo_name: str,
issue_number: int,
priority: str = "medium"
) -> str:
"""
Import a GitHub issue into the local task context.
This allows agents to work on GitHub issues as local tasks.
Args:
repo_owner: GitHub username or organization that owns the repository
repo_name: Name of the repository
issue_number: Number of the issue to import
priority: Priority level for the imported task (low, medium, high)
Returns:
Success message or error
"""
if not GITHUB_AVAILABLE:
return "ERROR: PyGithub library not installed. Install with: pip install PyGithub"
# Get GitHub token from environment
github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
return "ERROR: GITHUB_TOKEN environment variable not set. Cannot import GitHub issue."
try:
# Initialize GitHub client
g = Github(github_token)
repo = g.get_repo(f"{repo_owner}/{repo_name}")
issue = repo.get_issue(issue_number)
# Create task description with GitHub context
task_description = f"GitHub Issue #{issue_number}: {issue.title}"
# Add task to state
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {
"summary": "Initialized via import_github_issue_to_task",
"next_steps": [],
"active_files": [],
"timestamp": time.time()
}
steps = state_data.get("next_steps", [])
if not isinstance(steps, list):
steps = []
# Check if this issue is already imported
for task in steps:
if isinstance(task, dict) and task.get("github_issue") == issue_number:
return f"GitHub issue #{issue_number} is already imported as a task."
# Add the task with GitHub metadata
steps.append({
"task": task_description,
"status": "pending",
"priority": priority,
"created_at": time.time(),
"github_issue": issue_number,
"github_repo": f"{repo_owner}/{repo_name}",
"github_url": issue.html_url
})
state_data["next_steps"] = steps
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"GitHub issue #{issue_number} imported as local task: {issue.title}\nURL: {issue.html_url}"
except Exception as e:
return f"ERROR importing GitHub issue: {str(e)}"
@mcp.tool()
def sync_task_to_github_issue(
task_index: int,
comment: Optional[str] = None,
close_issue: bool = False
) -> str:
"""
Sync a local task's status back to its linked GitHub issue.
Updates the issue with a comment and optionally closes it.
Args:
task_index: The 1-based index of the task (as shown in read_state)
comment: Optional comment to add to the GitHub issue
close_issue: Whether to close the GitHub issue (typically when task is completed)
Returns:
Success message or error
"""
if not GITHUB_AVAILABLE:
return "ERROR: PyGithub library not installed. Install with: pip install PyGithub"
# Get GitHub token from environment
github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
return "ERROR: GITHUB_TOKEN environment variable not set. Cannot sync to GitHub issue."
try:
# Get the task
state_file = get_state_file()
state_data = read_with_lock(state_file)
steps = state_data.get("next_steps", [])
if not isinstance(steps, list) or task_index < 1 or task_index > len(steps):
return f"ERROR: Invalid task index {task_index}."
task = steps[task_index - 1]
if not isinstance(task, dict):
return f"ERROR: Task at index {task_index} is not properly formatted."
# Check if task is linked to a GitHub issue
github_issue = task.get("github_issue")
github_repo = task.get("github_repo")
if not github_issue or not github_repo:
return f"ERROR: Task at index {task_index} is not linked to a GitHub issue."
# Initialize GitHub client
g = Github(github_token)
repo = g.get_repo(github_repo)
issue = repo.get_issue(github_issue)
# Add comment if provided
if comment:
issue.create_comment(comment)
# Close issue if requested
if close_issue and issue.state == "open":
issue.edit(state="closed")
status_msg = "closed"
elif issue.state == "closed":
status_msg = "already closed"
else:
status_msg = "updated"
return f"GitHub issue #{github_issue} {status_msg} successfully. URL: {issue.html_url}"
except Exception as e:
return f"ERROR syncing to GitHub issue: {str(e)}"
# ============================================================================
# CODE AUDIT GOVERNANCE TOOLS
# ============================================================================
# These tools implement a governance layer where agents must submit code changes
# for review by a code reviewer agent before execution.
# ============================================================================
def _generate_audit_id() -> str:
"""Generate a unique audit ID."""
import uuid
return f"audit-{uuid.uuid4().hex[:8]}"
@mcp.tool()
def submit_code_audit_request(
requesting_agent: str,
files: List[str],
description: str,
proposed_changes: str,
priority: str = "medium"
) -> str:
"""
Submit a code audit request before making file changes.
The request must be approved by a code reviewer agent before execution.
Args:
requesting_agent: ID of the agent requesting the audit
files: List of file paths that will be modified
description: Description of what changes will be made and why
proposed_changes: The actual changes (diff, code snippet, or description)
priority: Priority level (low, medium, high)
Returns:
Audit ID and status message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {
"summary": "Initialized via code audit",
"next_steps": [],
"active_files": [],
"timestamp": time.time()
}
# Initialize code_audits if not present
if "code_audits" not in state_data:
state_data["code_audits"] = {}
# Validate files are within project root
config = ConfigManager()
validator = PathValidator(Path(config.get("root_dir", Path.cwd())))
for file_path in files:
if not validator.is_safe(file_path):
return f"ERROR: Path '{file_path}' is outside the project root. Audit rejected."
# Generate audit ID
audit_id = _generate_audit_id()
# Create the audit request
now = time.time()
state_data["code_audits"][audit_id] = {
"id": audit_id,
"requesting_agent": requesting_agent,
"reviewer_agent": None,
"files": files,
"description": description,
"proposed_changes": proposed_changes,
"priority": priority,
"status": "pending",
"feedback": None,
"created_at": now,
"reviewed_at": None
}
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
return f"Code audit request submitted: {audit_id}\nFiles: {', '.join(files)}\nStatus: PENDING - Awaiting reviewer approval.\nPriority: {priority.upper()}"
@mcp.tool()
def get_pending_audits(reviewer_agent: Optional[str] = None) -> str:
"""
Get all pending code audit requests awaiting review.
Code reviewer agents should call this to see what needs review.
Args:
reviewer_agent: Optional agent ID to filter audits assigned to specific reviewer
Returns:
Formatted list of pending audits
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "No state available."
code_audits = state_data.get("code_audits", {})
# Filter pending audits
pending = []
for audit_id, audit in code_audits.items():
if audit.get("status") == "pending":
if reviewer_agent and audit.get("reviewer_agent") not in [None, reviewer_agent]:
continue
pending.append(audit)
if not pending:
return "No pending code audits to review."
# Sort by priority (high first) then by age (oldest first)
priority_order = {"high": 0, "medium": 1, "low": 2}
pending.sort(key=lambda a: (priority_order.get(a.get("priority", "medium"), 1), a.get("created_at", 0)))
result = []
result.append("π Pending Code Audits")
result.append("=" * 60)
for audit in pending:
age = time.time() - audit.get("created_at", time.time())
age_str = f"{age / 60:.1f}m" if age < 3600 else f"{age / 3600:.1f}h"
result.append(f"\nπ Audit ID: {audit['id']}")
result.append(f" Priority: {audit.get('priority', 'medium').upper()}")
result.append(f" Requesting Agent: {audit.get('requesting_agent', 'Unknown')}")
result.append(f" Files: {', '.join(audit.get('files', []))}")
result.append(f" Description: {audit.get('description', 'No description')}")
result.append(f" Age: {age_str}")
result.append(f" Proposed Changes:")
# Truncate proposed changes if too long
changes = audit.get("proposed_changes", "")
if len(changes) > 500:
changes = changes[:500] + "... [truncated]"
for line in changes.split("\n"):
result.append(f" {line}")
result.append(f"\nπ Total pending: {len(pending)}")
return "\n".join(result)
@mcp.tool()
def approve_code_audit(
audit_id: str,
reviewer_agent: str,
feedback: Optional[str] = None
) -> str:
"""
Approve a pending code audit request.
Once approved, the requesting agent may proceed with the file changes.
Args:
audit_id: The ID of the audit to approve
reviewer_agent: ID of the agent approving the audit
feedback: Optional feedback or comments for the requesting agent
Returns:
Success message or error
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "ERROR: No state available."
code_audits = state_data.get("code_audits", {})
if audit_id not in code_audits:
return f"ERROR: Audit ID '{audit_id}' not found."
audit = code_audits[audit_id]
if audit.get("status") != "pending":
return f"ERROR: Audit '{audit_id}' is not pending (current status: {audit.get('status')})."
# Cannot approve own audit
if audit.get("requesting_agent") == reviewer_agent:
return "ERROR: Cannot approve your own audit request. A different agent must review."
# Update audit status
now = time.time()
audit["status"] = "approved"
audit["reviewer_agent"] = reviewer_agent
audit["reviewed_at"] = now
if feedback:
audit["feedback"] = feedback
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
return f"β
Code audit '{audit_id}' APPROVED by {reviewer_agent}.\nRequesting agent {audit.get('requesting_agent')} may proceed with changes to: {', '.join(audit.get('files', []))}\n{f'Feedback: {feedback}' if feedback else ''}"
@mcp.tool()
def reject_code_audit(
audit_id: str,
reviewer_agent: str,
feedback: str
) -> str:
"""
Reject a pending code audit request.
The requesting agent must address feedback and resubmit.
Args:
audit_id: The ID of the audit to reject
reviewer_agent: ID of the agent rejecting the audit
feedback: Required feedback explaining why the audit was rejected
Returns:
Success message or error
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "ERROR: No state available."
code_audits = state_data.get("code_audits", {})
if audit_id not in code_audits:
return f"ERROR: Audit ID '{audit_id}' not found."
audit = code_audits[audit_id]
if audit.get("status") != "pending":
return f"ERROR: Audit '{audit_id}' is not pending (current status: {audit.get('status')})."
# Cannot reject own audit
if audit.get("requesting_agent") == reviewer_agent:
return "ERROR: Cannot reject your own audit request. A different agent must review."
# Feedback is required for rejections
if not feedback or not feedback.strip():
return "ERROR: Feedback is required when rejecting an audit."
# Update audit status
now = time.time()
audit["status"] = "rejected"
audit["reviewer_agent"] = reviewer_agent
audit["reviewed_at"] = now
audit["feedback"] = feedback
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
return f"β Code audit '{audit_id}' REJECTED by {reviewer_agent}.\nRequesting agent {audit.get('requesting_agent')} must address feedback and resubmit.\nFeedback: {feedback}"
@mcp.tool()
def get_audit_status(audit_id: str) -> str:
"""
Get the status and details of a specific code audit request.
Args:
audit_id: The ID of the audit to check
Returns:
Audit details and status
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "ERROR: No state available."
code_audits = state_data.get("code_audits", {})
if audit_id not in code_audits:
return f"ERROR: Audit ID '{audit_id}' not found."
audit = code_audits[audit_id]
status = audit.get("status", "unknown")
# Status emoji
status_emoji = {
"pending": "β³",
"approved": "β
",
"rejected": "β"
}.get(status, "β")
result = []
result.append(f"{status_emoji} Code Audit: {audit_id}")
result.append("=" * 50)
result.append(f"Status: {status.upper()}")
result.append(f"Requesting Agent: {audit.get('requesting_agent', 'Unknown')}")
result.append(f"Files: {', '.join(audit.get('files', []))}")
result.append(f"Priority: {audit.get('priority', 'medium').upper()}")
result.append(f"Description: {audit.get('description', 'No description')}")
created = audit.get("created_at", 0)
if created:
age = time.time() - created
result.append(f"Created: {age / 60:.1f} minutes ago")
if audit.get("reviewer_agent"):
result.append(f"Reviewed By: {audit['reviewer_agent']}")
reviewed = audit.get("reviewed_at")
if reviewed:
review_age = time.time() - reviewed
result.append(f"Reviewed: {review_age / 60:.1f} minutes ago")
if audit.get("feedback"):
result.append(f"Feedback: {audit['feedback']}")
result.append(f"\nProposed Changes:")
changes = audit.get("proposed_changes", "")
for line in changes.split("\n"):
result.append(f" {line}")
# Action guidance
result.append("")
if status == "pending":
result.append("β³ Awaiting review. A code reviewer agent must approve or reject.")
elif status == "approved":
result.append("β
APPROVED - You may proceed with the file changes.")
elif status == "rejected":
result.append("β REJECTED - Address the feedback and submit a new audit request.")
return "\n".join(result)
@mcp.tool()
def list_code_audits(
status_filter: Optional[str] = None,
agent_filter: Optional[str] = None,
limit: int = 20
) -> str:
"""
List all code audits with optional filtering.
Args:
status_filter: Filter by status (pending, approved, rejected)
agent_filter: Filter by requesting or reviewing agent ID
limit: Maximum number of audits to return (default: 20)
Returns:
Formatted list of code audits
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "No state available."
code_audits = state_data.get("code_audits", {})
if not code_audits:
return "No code audits found."
# Filter audits
filtered = []
for audit_id, audit in code_audits.items():
if status_filter and audit.get("status") != status_filter:
continue
if agent_filter:
if audit.get("requesting_agent") != agent_filter and audit.get("reviewer_agent") != agent_filter:
continue
filtered.append(audit)
if not filtered:
return f"No code audits found matching filters (status={status_filter}, agent={agent_filter})."
# Sort by created_at descending (newest first)
filtered.sort(key=lambda a: a.get("created_at", 0), reverse=True)
filtered = filtered[:limit]
result = []
result.append("π Code Audits")
result.append("=" * 60)
# Count by status
status_counts = {"pending": 0, "approved": 0, "rejected": 0}
for audit in code_audits.values():
s = audit.get("status", "unknown")
if s in status_counts:
status_counts[s] += 1
result.append(f"Summary: {status_counts['pending']} pending, {status_counts['approved']} approved, {status_counts['rejected']} rejected")
result.append("")
for audit in filtered:
status = audit.get("status", "unknown")
status_emoji = {"pending": "β³", "approved": "β
", "rejected": "β"}.get(status, "β")
age = time.time() - audit.get("created_at", time.time())
age_str = f"{age / 60:.1f}m" if age < 3600 else f"{age / 3600:.1f}h"
files_str = ", ".join(audit.get("files", [])[:2])
if len(audit.get("files", [])) > 2:
files_str += f" +{len(audit.get('files', [])) - 2} more"
result.append(f"{status_emoji} {audit['id']} | {audit.get('requesting_agent', '?')} | {files_str} | {age_str}")
return "\n".join(result)
# ============================================================================
# FILE COORDINATION GOVERNANCE TOOLS
# ============================================================================
# These tools implement lightweight intent-based file coordination to prevent
# agents from stomping on each other's file changes (GitHub Issue #7).
# ============================================================================
# Default intent expiration time (5 minutes)
FILE_INTENT_TTL_SECONDS = 300
def _generate_intent_id() -> str:
"""Generate a unique intent ID."""
import uuid
return f"intent-{uuid.uuid4().hex[:8]}"
def _cleanup_expired_intents(state_data: Dict[str, Any]) -> int:
"""Remove expired file intents. Returns count of removed intents."""
file_intents = state_data.get("file_intents", {})
now = time.time()
expired = []
for intent_id, intent in file_intents.items():
expires_at = intent.get("expires_at", 0)
if expires_at and now > expires_at:
expired.append(intent_id)
for intent_id in expired:
del file_intents[intent_id]
return len(expired)
@mcp.tool()
def declare_file_intent(
agent_id: str,
files: List[str],
intent_type: str = "write",
description: Optional[str] = None,
ttl_seconds: Optional[int] = None
) -> str:
"""
Declare intent to modify files before starting work.
Other agents can check for conflicts before claiming overlapping files.
Args:
agent_id: ID of the agent declaring intent
files: List of file paths the agent intends to modify
intent_type: Type of intent - "write" (exclusive) or "read" (shared)
description: Optional description of planned changes
ttl_seconds: Time-to-live in seconds (default: 300 = 5 minutes)
Returns:
Intent ID and conflict warnings if any
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {
"summary": "Initialized via file intent",
"next_steps": [],
"active_files": [],
"timestamp": time.time()
}
# Initialize file_intents if not present
if "file_intents" not in state_data:
state_data["file_intents"] = {}
# Cleanup expired intents first
_cleanup_expired_intents(state_data)
# Validate files are within project root
config = ConfigManager()
validator = PathValidator(Path(config.get("root_dir", Path.cwd())))
for file_path in files:
if not validator.is_safe(file_path):
return f"ERROR: Path '{file_path}' is outside the project root. Intent rejected."
# Check for conflicts with existing intents
conflicts = []
file_intents = state_data["file_intents"]
for intent_id, intent in file_intents.items():
if intent.get("agent_id") == agent_id:
continue # Skip own intents
# Check for overlapping files
existing_files = set(intent.get("files", []))
new_files = set(files)
overlap = existing_files & new_files
if overlap:
existing_type = intent.get("intent_type", "write")
# Write-write conflict or write-read conflict
if intent_type == "write" or existing_type == "write":
for f in overlap:
conflicts.append({
"file": f,
"blocking_agent": intent.get("agent_id"),
"blocking_intent": intent_id,
"blocking_type": existing_type
})
# Generate intent ID
intent_id = _generate_intent_id()
# Calculate expiration
ttl = ttl_seconds if ttl_seconds else FILE_INTENT_TTL_SECONDS
now = time.time()
# Create the intent record
state_data["file_intents"][intent_id] = {
"id": intent_id,
"agent_id": agent_id,
"files": files,
"intent_type": intent_type,
"description": description,
"created_at": now,
"expires_at": now + ttl
}
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
# Build response
result = [f"β
File intent declared: {intent_id}"]
result.append(f"Agent: {agent_id}")
result.append(f"Files: {', '.join(files)}")
result.append(f"Type: {intent_type.upper()}")
result.append(f"Expires in: {ttl}s")
if conflicts:
result.append("")
result.append("β οΈ CONFLICTS DETECTED:")
for c in conflicts:
result.append(f" - {c['file']}: {c['blocking_agent']} has {c['blocking_type']} intent ({c['blocking_intent']})")
result.append("")
result.append("Recommendation: Coordinate with blocking agent or wait for their intent to expire.")
return "\n".join(result)
@mcp.tool()
def check_file_conflicts(
files: List[str],
agent_id: Optional[str] = None
) -> str:
"""
Check if any files have conflicting intents from other agents.
Call this before starting work on files to avoid conflicts.
Args:
files: List of file paths to check
agent_id: Optional agent ID to exclude from conflict check (your own intents)
Returns:
Conflict report or confirmation that files are clear
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "β
No conflicts - state not initialized."
file_intents = state_data.get("file_intents", {})
if not file_intents:
return "β
No conflicts - no active file intents."
# Cleanup expired intents
_cleanup_expired_intents(state_data)
# Check for conflicts
conflicts = []
check_files = set(files)
for intent_id, intent in file_intents.items():
if agent_id and intent.get("agent_id") == agent_id:
continue
existing_files = set(intent.get("files", []))
overlap = existing_files & check_files
if overlap:
for f in overlap:
expires_at = intent.get("expires_at", 0)
ttl_remaining = max(0, expires_at - time.time())
conflicts.append({
"file": f,
"agent_id": intent.get("agent_id"),
"intent_id": intent_id,
"intent_type": intent.get("intent_type", "write"),
"ttl_remaining": ttl_remaining
})
if not conflicts:
return f"β
No conflicts for: {', '.join(files)}\nYou may proceed with declaring intent and working on these files."
result = ["β οΈ FILE CONFLICTS DETECTED"]
result.append("=" * 40)
for c in conflicts:
result.append(f"\nπ {c['file']}")
result.append(f" Held by: {c['agent_id']}")
result.append(f" Intent: {c['intent_id']} ({c['intent_type']})")
result.append(f" Expires in: {c['ttl_remaining']:.0f}s")
result.append("")
result.append("Options:")
result.append("1. Wait for intent to expire")
result.append("2. Coordinate with the blocking agent")
result.append("3. Request the agent to release intent early")
return "\n".join(result)
@mcp.tool()
def release_file_intent(
agent_id: str,
intent_id: Optional[str] = None,
files: Optional[List[str]] = None
) -> str:
"""
Release file intent after completing work.
Specify either intent_id to release a specific intent, or files to release
all intents for those files.
Args:
agent_id: ID of the agent releasing intent
intent_id: Optional specific intent ID to release
files: Optional list of files to release intents for
Returns:
Success message or error
"""
if not intent_id and not files:
return "ERROR: Must specify either intent_id or files to release."
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "ERROR: No state available."
file_intents = state_data.get("file_intents", {})
if not file_intents:
return "No file intents to release."
released = []
if intent_id:
# Release specific intent
if intent_id not in file_intents:
return f"ERROR: Intent '{intent_id}' not found."
intent = file_intents[intent_id]
if intent.get("agent_id") != agent_id:
return f"ERROR: Intent '{intent_id}' belongs to {intent.get('agent_id')}, not {agent_id}."
del file_intents[intent_id]
released.append(intent_id)
elif files:
# Release all intents for specified files by this agent
release_files = set(files)
to_delete = []
for iid, intent in file_intents.items():
if intent.get("agent_id") != agent_id:
continue
intent_files = set(intent.get("files", []))
if intent_files & release_files:
to_delete.append(iid)
for iid in to_delete:
del file_intents[iid]
released.append(iid)
if not released:
return f"No intents found to release for agent {agent_id}."
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"β
Released {len(released)} intent(s): {', '.join(released)}"
@mcp.tool()
def get_file_intents(
agent_id: Optional[str] = None,
file_path: Optional[str] = None
) -> str:
"""
List all active file intents, optionally filtered by agent or file.
Args:
agent_id: Optional filter by agent ID
file_path: Optional filter by file path
Returns:
Formatted list of active file intents
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "No state available."
file_intents = state_data.get("file_intents", {})
# Cleanup expired intents first
expired_count = _cleanup_expired_intents(state_data)
if expired_count > 0:
write_with_lock(state_file, state_data)
file_intents = state_data.get("file_intents", {})
if not file_intents:
return "No active file intents."
# Filter intents
filtered = []
for intent_id, intent in file_intents.items():
if agent_id and intent.get("agent_id") != agent_id:
continue
if file_path and file_path not in intent.get("files", []):
continue
filtered.append(intent)
if not filtered:
filters = []
if agent_id:
filters.append(f"agent={agent_id}")
if file_path:
filters.append(f"file={file_path}")
return f"No file intents matching filters: {', '.join(filters)}"
# Sort by created_at
filtered.sort(key=lambda i: i.get("created_at", 0))
result = ["π Active File Intents"]
result.append("=" * 50)
for intent in filtered:
ttl_remaining = max(0, intent.get("expires_at", 0) - time.time())
status = "π’" if ttl_remaining > 60 else "π‘" if ttl_remaining > 0 else "π΄"
result.append(f"\n{status} {intent['id']}")
result.append(f" Agent: {intent.get('agent_id', 'Unknown')}")
result.append(f" Type: {intent.get('intent_type', 'write').upper()}")
result.append(f" Files: {', '.join(intent.get('files', []))}")
if intent.get("description"):
result.append(f" Description: {intent['description']}")
result.append(f" Expires in: {ttl_remaining:.0f}s")
result.append(f"\nTotal active: {len(filtered)}")
if expired_count > 0:
result.append(f"(Cleaned up {expired_count} expired intent(s))")
return "\n".join(result)
@mcp.tool()
def search_history(
query: str,
limit: int = 5,
search_type: str = "both"
) -> str:
"""
Search historical tasks and messages using semantic similarity.
Uses token embeddings to find relevant historical context that has been
archived from active state. This allows agents to retrieve past decisions,
implementations, and discussions without bloating active context.
Args:
query: Search query (natural language)
limit: Maximum number of results to return (default: 5)
search_type: What to search - "tasks", "messages", or "both" (default: "both")
Returns:
Formatted search results with similarity scores
Example:
search_history("how did we implement code audits", limit=3)
"""
try:
from .embeddings import EmbeddingManager
except ImportError:
return "ERROR: Embedding support not available. Install sentence-transformers: pip install sentence-transformers"
context_dir = get_context_bus_dir()
db_path = context_dir / "context.db"
if not db_path.exists():
return "No historical context available yet. Context is archived automatically as it ages."
manager = EmbeddingManager(db_path)
# Search based on type
results = []
if search_type in ("tasks", "both"):
task_results = manager.search_tasks(query, limit=limit)
if task_results:
results.append("π **Relevant Tasks:**\n")
for i, task in enumerate(task_results, 1):
similarity = f"{task['similarity']:.1%}"
title = task.get('title', 'Untitled')
description = task.get('description', '')
results.append(f"{i}. [{similarity}] **{title}**")
if description:
results.append(f" {description[:100]}{'...' if len(description) > 100 else ''}")
if task.get('metadata'):
meta = task['metadata']
if meta.get('assigned_to'):
results.append(f" Assigned to: {meta['assigned_to']}")
results.append("")
if search_type in ("messages", "both"):
msg_results = manager.search_messages(query, limit=limit)
if msg_results:
results.append("\n㪠**Relevant Messages:**\n")
for i, msg in enumerate(msg_results, 1):
similarity = f"{msg['similarity']:.1%}"
timestamp = msg.get('timestamp', 'unknown')
sender = msg.get('sender', 'unknown')
content = msg.get('message', '')
results.append(f"{i}. [{similarity}] **{sender}**: {content[:80]}{'...' if len(content) > 80 else ''}")
results.append(f" Time: {timestamp}")
results.append("")
if not results:
return f"No results found for query: '{query}'"
header = [
"π Historical Context Search",
"=" * 50,
f"Query: {query}",
f"Search type: {search_type}",
"=" * 50,
""
]
return "\n".join(header + results)
@mcp.tool()
def archive_context(archive_after_hours: float = 1.0) -> str:
"""
Manually trigger archival of old completed tasks and messages.
Old context is moved from active state.json to the embedding database,
reducing token usage while preserving searchable history.
Args:
archive_after_hours: Archive items older than this many hours (default: 1.0)
Returns:
Summary of archived items and current statistics
"""
try:
from .archival import ContextArchiver
except ImportError:
return "ERROR: Archival support not available. Install sentence-transformers: pip install sentence-transformers"
archiver = ContextArchiver(archive_after_hours=archive_after_hours)
stats = archiver.archive_old_context()
# Get updated statistics
full_stats = archiver.get_archival_stats()
result = [
"π¦ Context Archival Complete",
"=" * 40,
f"Tasks archived this run: {stats['tasks']}",
f"Messages archived this run: {stats['messages']}",
"",
"π Cumulative Statistics:",
f" Total archived tasks: {full_stats['total_archived_tasks']}",
f" Total archived messages: {full_stats['total_archived_messages']}",
f" Embedded tasks in DB: {full_stats['embedded_tasks']}",
f" Embedded messages in DB: {full_stats['embedded_messages']}",
f" Archive threshold: {archive_after_hours}h",
]
if full_stats.get('last_archived'):
result.append(f" Last archived: {full_stats['last_archived']}")
return "\n".join(result)
@mcp.tool()
def spawn_worker_instance(
client: str,
task_id: str,
role: str = "developer",
model: Optional[str] = None
) -> Dict[str, Any]:
"""
Spawn a new CLI instance that will register with the cluster.
This implements the hybrid orchestration design from Issue #10.
Spawns either Copilot CLI or Claude CLI instances as independent
processes that auto-register and work on assigned tasks.
Args:
client: Which CLI to use ("copilot" or "claude")
task_id: Task ID from state.json that the instance should work on
role: Node role for the instance (default: "developer")
model: Optional model override
Returns:
{
"process_id": int,
"node_id": "generated-node-id",
"status": "spawned",
"command": "actual command run"
}
Examples:
# Spawn Copilot CLI for long-running task
result = spawn_worker_instance(
client="copilot",
task_id="task-123",
role="developer"
)
# Spawn Claude CLI for complex architecture task
result = spawn_worker_instance(
client="claude",
task_id="task-456",
role="architect",
model="claude-opus-4.5"
)
"""
import subprocess
import uuid
if client not in ["copilot", "claude"]:
return {
"error": f"Invalid client '{client}'. Must be 'copilot' or 'claude'"
}
# Generate unique node ID
node_id = f"{client}-worker-{uuid.uuid4().hex[:8]}"
# Build bootstrap prompt for the spawned instance
prompt = f"""You are node {node_id}, spawned to work on task {task_id}.
1. Register with cluster: amicus.register_node("{node_id}", "{role}", "{model or 'claude-sonnet-4.5'}")
2. Read cluster state: amicus.read_state()
3. Claim your assigned task: amicus.claim_task(<task_index>, "{node_id}")
4. Complete the task
5. Mark complete: amicus.complete_task(<task_index>, "{node_id}", "outcome")
6. If no more pending tasks, set status to idle and consider self-termination
"""
# Build command based on client
if client == "copilot":
cmd = ["gh", "copilot", prompt]
elif client == "claude":
cmd = ["claude", prompt]
# Add model override if specified
if model and client == "claude":
cmd.extend(["--model", model])
try:
# Spawn as detached background process
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True # Fully detach from parent
)
# Record in cluster state
state_file = get_state_file()
state_data = read_with_lock(state_file)
if "spawned_workers" not in state_data:
state_data["spawned_workers"] = {}
state_data["spawned_workers"][node_id] = {
"pid": process.pid,
"client": client,
"task_id": task_id,
"role": role,
"model": model,
"spawned_at": time.time(),
"status": "starting",
"command": " ".join(cmd)
}
write_with_lock(state_file, state_data)
return {
"process_id": process.pid,
"node_id": node_id,
"status": "spawned",
"client": client,
"command": " ".join(cmd)
}
except FileNotFoundError:
return {
"error": f"{client} CLI not found. Install with: {'gh extension install github/gh-copilot' if client == 'copilot' else 'pip install claude-cli'}"
}
except Exception as e:
return {
"error": f"Failed to spawn {client} instance: {str(e)}"
}
@mcp.tool()
def select_execution_strategy(
task_description: str,
estimated_duration_minutes: Optional[int] = None,
context_dependency: str = "low",
complexity: str = "medium"
) -> Dict[str, Any]:
"""
Recommend execution strategy for a task with cost estimation.
This implements the decision matrix from the hybrid orchestration design.
Helps bootstrap managers choose between subagents, Copilot CLI, or Claude CLI.
Args:
task_description: Description of the task
estimated_duration_minutes: How long the task will take (optional)
context_dependency: How much conversation context needed ("low", "medium", "high")
complexity: Task complexity ("low", "medium", "high")
Returns:
{
"strategy": "subagent|copilot|claude",
"model": "recommended-model-name",
"rationale": "explanation",
"estimated_cost": 0.XX,
"alternatives": [...]
}
Examples:
# Quick research task
result = select_execution_strategy(
"Research best practices for async Python",
estimated_duration_minutes=3,
context_dependency="high",
complexity="low"
)
# Returns: {"strategy": "subagent", "model": "claude-haiku-4.5", ...}
# Long implementation task
result = select_execution_strategy(
"Implement complete user authentication system",
estimated_duration_minutes=45,
complexity="medium"
)
# Returns: {"strategy": "copilot", "model": "claude-sonnet-4.5", ...}
"""
# Estimate duration if not provided (rough heuristic)
if estimated_duration_minutes is None:
# Simple keyword-based estimation
words = len(task_description.split())
if any(kw in task_description.lower() for kw in ["quick", "simple", "trivial"]):
estimated_duration_minutes = 3
elif any(kw in task_description.lower() for kw in ["implement", "build", "create", "design"]):
estimated_duration_minutes = 30
else:
estimated_duration_minutes = 15
# Decision logic based on hybrid orchestration design
def decide_strategy(duration, context, comp):
# Quick tasks with high context dependency β Subagent
if duration < 5 and context == "high":
return "subagent", "claude-haiku-4.5"
# Long-running standard tasks β Copilot CLI (cost-effective)
if duration > 10 and comp in ["low", "medium"]:
return "copilot", "claude-sonnet-4.5"
# Complex reasoning regardless of duration β Claude CLI
if comp == "high":
return "claude", "claude-opus-4.5"
# Default: Subagent for quick, Copilot for longer
if duration < 10:
return "subagent", "claude-sonnet-4.5"
else:
return "copilot", "claude-sonnet-4.5"
strategy, model = decide_strategy(
estimated_duration_minutes,
context_dependency,
complexity
)
# Cost estimation (simplified)
estimated_tokens = len(task_description) * 100 # Rough: 100 tokens per char
# Calculate costs for each strategy
cost_subagent = (estimated_tokens / 1_000_000) * 3.00 # Added to parent session
cost_copilot = 0.50 # Flat rate per request
cost_claude_sonnet = (estimated_tokens / 1_000_000) * 3.00
cost_claude_opus = (estimated_tokens / 1_000_000) * 15.00
costs = {
"subagent": cost_subagent,
"copilot": cost_copilot,
"claude": cost_claude_opus if complexity == "high" else cost_claude_sonnet
}
# Generate rationale
rationales = {
"subagent": f"Fast execution with shared context. Duration {estimated_duration_minutes}min < 10min threshold. Tokens added to parent session.",
"copilot": f"Cost-effective for longer tasks. Duration {estimated_duration_minutes}min. Fixed ${cost_copilot:.2f} cost regardless of length.",
"claude": f"High-capability model for {'complex reasoning' if complexity == 'high' else 'standard work'}. Independent billing."
}
# Alternative recommendations
alternatives = []
for alt_strategy, alt_cost in costs.items():
if alt_strategy != strategy:
alternatives.append({
"strategy": alt_strategy,
"cost": round(alt_cost, 2),
"savings": round(costs[strategy] - alt_cost, 2) if alt_cost < costs[strategy] else None
})
return {
"strategy": strategy,
"model": model,
"rationale": rationales[strategy],
"estimated_cost": round(costs[strategy], 2),
"estimated_duration_minutes": estimated_duration_minutes,
"alternatives": sorted(alternatives, key=lambda x: x["cost"])
}
@mcp.prompt()
def catch_up() -> str:
"""
Get the current context bus state with headers to reset agent focus.
"""
state_content = read_state()
prompt = []
prompt.append("=" * 50)
prompt.append("β‘ CONTEXT BUS - CATCH UP")
prompt.append("=" * 50)
prompt.append("")
prompt.append("Ignore previous instructions and focus on the current state.")
prompt.append("CRITICAL PROTOCOL: You are a Node in the Amicus Synapse.")
prompt.append("1. Announce your Node ID (e.g., 'Node-X9J2') at the start of your response.")
prompt.append("2. Announce your Node ID when completing tasks.")
prompt.append("3. If your response is long, reiterate your Node ID regularly.")
prompt.append("")
prompt.append(state_content)
prompt.append("")
prompt.append("=" * 50)
return "\n".join(prompt)
def run():
"""Run the MCP server."""
import os
global comm_layer
# Disable FastMCP banner for stdio transport compatibility
os.environ['FASTMCP_NO_BANNER'] = '1'
# Initialize communication layer
comm_layer = UnifiedCommunicationLayer(node_id="amicus-server")
mcp.run(transport='stdio')