import time
import subprocess
import shlex
import os
from pathlib import Path
from typing import Optional, List, Dict, Any, Union
from fastmcp import FastMCP
from .core import (
get_state_file,
read_with_lock,
write_with_lock,
is_tracking_enabled,
set_tracking_enabled,
get_context_bus_dir
)
from .config import ConfigManager, PathValidator
from .security import SensitiveDataScanner
# Try to import github, but make it optional
try:
from github import Github
GITHUB_AVAILABLE = True
except ImportError:
GITHUB_AVAILABLE = False
# Initialize the MCP server
mcp = FastMCP("Amicus")
scanner = SensitiveDataScanner()
@mcp.tool()
def register_node(
agent_id: str,
role: str,
model_name: str,
model_strength: Optional[str] = None
) -> str:
"""
Register a node with a specific role and model information.
Enforces max_agents constraint.
Args:
agent_id: Unique identifier for the agent
role: Role of the agent (e.g., bootstrap_manager, architect, developer)
model_name: Name of the model being used
model_strength: Optional manual strength assessment (low, medium, high)
Returns:
Success message or error if at capacity
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {"summary": "Cluster Initialized", "next_steps": [], "active_files": []}
config = ConfigManager()
# Initialize cluster_metadata if not present
if "cluster_metadata" not in state_data:
cluster_settings = config.get("cluster_settings", {})
state_data["cluster_metadata"] = {
"max_agents": cluster_settings.get("max_agents", 4),
"active_agent_count": 0,
"idle_agent_count": 0,
"pending_task_count": 0,
"manager_id": None
}
# Check if at capacity (only count non-terminated nodes)
cluster_nodes = state_data.get("cluster_nodes", {})
active_count = sum(
1 for node in cluster_nodes.values()
if node.get("status") != "terminated"
)
max_agents = state_data["cluster_metadata"]["max_agents"]
if active_count >= max_agents and agent_id not in cluster_nodes:
return f"ERROR: Cluster at capacity ({max_agents} nodes). Registration rejected."
# Assess strength if not provided
if not model_strength:
strengths = config.get("model_strengths", {})
model_strength = strengths.get(model_name, "unknown")
# Register or update node with enhanced schema
now = time.time()
cluster_nodes[agent_id] = {
"role": role,
"model": {
"name": model_name,
"strength": model_strength
},
"last_heartbeat": now,
"status": "working", # Start as working
"current_task_id": None,
"idle_since": None,
"last_activity": now
}
# Update manager_id if this is a bootstrap_manager
if role == "bootstrap_manager":
state_data["cluster_metadata"]["manager_id"] = agent_id
# Automatically enable tracking when manager registers
set_tracking_enabled(True)
state_data["cluster_nodes"] = cluster_nodes
state_data["timestamp"] = now
# Update active count
state_data["cluster_metadata"]["active_agent_count"] = sum(
1 for node in cluster_nodes.values()
if node.get("status") != "terminated"
)
write_with_lock(state_file, state_data)
return f"Node {agent_id} registered as {role} ({model_strength}). Active nodes: {state_data['cluster_metadata']['active_agent_count']}/{max_agents}"
@mcp.tool()
def get_best_model(task_description: str) -> str:
"""
Select the best model for a given task based on keywords in the description.
Args:
task_description: A description of the task.
Returns:
The name of the best model for the task.
"""
config = ConfigManager()
model_selection_config = config.get("model_selection", {})
if not model_selection_config:
return "default_model_not_configured"
default_model = model_selection_config.get("default_model", "default_model_not_found")
keywords = model_selection_config.get("keywords", {})
# Simple keyword matching
for keyword, model_name in keywords.items():
if keyword in task_description.lower():
return model_name
return default_model
@mcp.tool()
def update_state(
summary: str,
next_steps: List[Dict[str, Any]],
active_files: List[str],
ask_user: bool = False,
messages: Optional[List[str]] = None,
model_info: Optional[Dict[str, Any]] = None
) -> str:
"""
Update the context bus state with the current agent's information.
Args:
summary: A summary of what has been done so far
next_steps: A list of task dictionaries. Example: [{"task": "Fix bug", "status": "pending", "assigned_to": "Node-1"}]
active_files: List of files currently being worked on
ask_user: Whether human input is required
messages: Optional list of messages for other agents
model_info: Optional dictionary containing information about the agent's model (e.g., {'name': 'gemini-1.5-flash', 'strength': 'high'})
Returns:
Success message
"""
# Scan for sensitive data in summary and messages
findings = scanner.scan(summary)
if messages:
for msg in messages:
findings.extend(scanner.scan(msg))
if findings:
report = "\n".join([f"- {name}: {snippet}" for name, snippet in findings])
return f"SECURITY WARNING: Sensitive data detected! State NOT updated.\nFindings:\n{report}"
if not is_tracking_enabled():
return "Tracking is disabled. State not updated."
state_file = get_state_file()
# Read existing state to preserve messages and model_info if not provided in this update
existing_state = {}
if state_file.exists():
existing_state = read_with_lock(state_file)
# Merge messages if provided
current_messages = existing_state.get("messages", [])
if messages:
current_messages.extend(messages)
# Keep only last 50 messages
current_messages = current_messages[-50:]
# Validate active_files
config = ConfigManager()
validator = PathValidator(Path(config.get("root_dir", Path.cwd())))
for path in active_files:
if not validator.is_safe(path):
return f"Error: Path '{path}' is outside the project root. State not updated."
now = time.time()
state_data = {
"summary": summary,
"next_steps": next_steps,
"active_files": active_files,
"ask_user": ask_user,
"messages": current_messages,
"timestamp": now,
"last_heartbeat": existing_state.get("last_heartbeat", now),
"model_info": model_info if model_info is not None else existing_state.get("model_info", None)
}
# Preserve existing cluster data
if "cluster_nodes" in existing_state:
state_data["cluster_nodes"] = existing_state["cluster_nodes"]
if "cluster_metadata" in existing_state:
state_data["cluster_metadata"] = existing_state["cluster_metadata"]
if "work_distribution" in existing_state:
state_data["work_distribution"] = existing_state["work_distribution"]
write_with_lock(state_file, state_data)
return "State updated successfully."
@mcp.tool()
def add_task(task_description: str, priority: str = "medium") -> str:
"""
Add a new task to the next_steps queue.
Args:
task_description: Description of the task to be done
priority: Priority of the task (low, medium, high)
Returns:
Success message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {
"summary": "Initialized via add_task",
"next_steps": [],
"active_files": [],
"timestamp": time.time()
}
steps = state_data.get("next_steps", [])
if not isinstance(steps, list):
steps = []
steps.append({
"task": task_description,
"status": "pending",
"priority": priority,
"created_at": time.time()
})
state_data["next_steps"] = steps
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"Task '{task_description}' added successfully."
@mcp.tool()
def read_state() -> str:
"""
Read the current state from the context bus.
Returns:
The current state as a formatted string
"""
state_file = get_state_file()
try:
state_data = read_with_lock(state_file)
except FileNotFoundError:
return "No state available yet."
if not state_data:
return "No state available yet."
# Format the state
result = []
result.append("π Context Bus State")
result.append("=" * 50)
if "summary" in state_data:
result.append(f"\n**Summary:**\n{state_data['summary']}")
if "next_steps" in state_data:
result.append(f"\n**Next Steps:**")
steps = state_data['next_steps']
if isinstance(steps, list):
for i, task in enumerate(steps):
if isinstance(task, dict):
task_str = f"{i+1}. {task.get('task', 'No description')}"
status = task.get('status')
if status:
task_str += f" [{status}]"
assigned = task.get('assigned_to')
if assigned:
task_str += f" (assigned to {assigned})"
result.append(task_str)
else:
result.append(f"{i+1}. {str(task)}")
else:
result.append(str(steps))
if "active_files" in state_data and state_data["active_files"]:
result.append(f"\n**Active Files:**")
for file in state_data["active_files"]:
result.append(f" - {file}")
if "messages" in state_data and state_data["messages"]:
result.append(f"\n**Messages:**")
for msg in state_data["messages"]:
result.append(f" - {msg}")
if "timestamp" in state_data:
age = time.time() - state_data["timestamp"]
result.append(f"\n**Last Updated:** {age:.1f} seconds ago")
if "last_heartbeat" in state_data:
hb_age = time.time() - state_data["last_heartbeat"]
status = "π’ ACTIVE" if hb_age < 60 else "π΄ INACTIVE/STALE"
result.append(f"**Node Heartbeat:** {status} ({hb_age:.1f}s ago)")
if "cluster_nodes" in state_data and state_data["cluster_nodes"]:
result.append(f"\n**Active Cluster Swarm:**")
for node_id, info in state_data["cluster_nodes"].items():
hb_age = time.time() - info.get("last_heartbeat", 0)
hb_status = "π’" if hb_age < 60 else "π΄"
agent_status = info.get("status", "unknown")
current_task = info.get("current_task_id", "")
task_info = f" | Task: {current_task}" if current_task else ""
result.append(f" {hb_status} {node_id}: {info['role']} [{agent_status}] ({info['model']['name']} - {info['model']['strength']}){task_info}")
if "cluster_metadata" in state_data:
metadata = state_data["cluster_metadata"]
result.append(f"\n**Cluster Metadata:**")
result.append(f" Active Nodes: {metadata.get('active_agent_count', 0)}/{metadata.get('max_agents', 4)}")
result.append(f" Idle Nodes: {metadata.get('idle_agent_count', 0)}")
result.append(f" Pending Tasks: {metadata.get('pending_task_count', 0)}")
if metadata.get('manager_id'):
result.append(f" Manager: {metadata['manager_id']}")
if "work_distribution" in state_data:
work_dist = state_data["work_distribution"]
result.append(f"\n**Work Distribution:**")
result.append(f" Status: {work_dist.get('workload_status', 'unknown').upper()}")
result.append(f" Recommendation: {work_dist.get('spawn_recommendation', 'none')}")
last_assess = work_dist.get('last_assessment', 0)
if last_assess:
assess_age = time.time() - last_assess
result.append(f" Last Assessment: {assess_age:.1f}s ago")
if "model_info" in state_data and state_data["model_info"]:
model_name = state_data["model_info"].get("name", "Unknown Model")
model_strength = state_data["model_info"].get("strength", "N/A")
result.append(f"\n**Node Model:** {model_name} (Strength: {model_strength})")
# Display code audit summary
if "code_audits" in state_data and state_data["code_audits"]:
audits = state_data["code_audits"]
pending = sum(1 for a in audits.values() if a.get("status") == "pending")
approved = sum(1 for a in audits.values() if a.get("status") == "approved")
rejected = sum(1 for a in audits.values() if a.get("status") == "rejected")
result.append(f"\n**Code Audits:**")
result.append(f" β³ Pending: {pending}")
result.append(f" β
Approved: {approved}")
result.append(f" β Rejected: {rejected}")
# Show pending audits requiring review
if pending > 0:
result.append(f"\n π Pending Review:")
for audit_id, audit in audits.items():
if audit.get("status") == "pending":
files = ", ".join(audit.get("files", [])[:2])
if len(audit.get("files", [])) > 2:
files += f" +{len(audit.get('files', [])) - 2}"
result.append(f" - {audit_id}: {audit.get('requesting_agent', '?')} β {files}")
# Display file intents summary (Issue #7: File Coordination)
if "file_intents" in state_data and state_data["file_intents"]:
intents = state_data["file_intents"]
now = time.time()
# Filter to active (non-expired) intents
active_intents = {
k: v for k, v in intents.items()
if v.get("expires_at", 0) > now
}
if active_intents:
result.append(f"\n**File Intents (Active):**")
# Group by agent
by_agent: Dict[str, List[str]] = {}
for intent_id, intent in active_intents.items():
agent = intent.get("agent_id", "Unknown")
if agent not in by_agent:
by_agent[agent] = []
files_list = intent.get("files", [])
intent_type = intent.get("intent_type", "write")
ttl = int(intent.get("expires_at", 0) - now)
by_agent[agent].append(f"{', '.join(files_list[:2])} [{intent_type}] ({ttl}s)")
for agent, file_infos in by_agent.items():
result.append(f" π {agent}:")
for info in file_infos:
result.append(f" - {info}")
# Check for conflicts
all_files: Dict[str, List[str]] = {} # file -> list of agents
for intent in active_intents.values():
if intent.get("intent_type") == "write":
for f in intent.get("files", []):
if f not in all_files:
all_files[f] = []
all_files[f].append(intent.get("agent_id", "?"))
conflicts = {f: agents for f, agents in all_files.items() if len(agents) > 1}
if conflicts:
result.append(f"\n β οΈ FILE CONFLICTS:")
for f, agents in conflicts.items():
result.append(f" - {f}: {', '.join(agents)}")
# Critical: Add the elicitation pattern if ask_user is True
if state_data.get("ask_user", False):
result.append("\nπ¨ PREVIOUS AGENT REQUESTED HUMAN INPUT.")
return "\n".join(result)
@mcp.tool()
def broadcast_message(message: str) -> str:
"""
Send a message to all other agents via the context bus.
Args:
message: The message to broadcast
Returns:
Success message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
messages = state_data.get("messages", [])
messages.append(f"[{time.strftime('%H:%M:%S')}] {message}")
state_data["messages"] = messages[-50:]
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return "Message broadcast successfully."
@mcp.tool()
def claim_task(task_index: int, agent_id: str) -> str:
"""
Claim a task from the next_steps queue.
Args:
task_index: The 1-based index of the task to claim (as shown in read_state)
agent_id: The ID of the agent claiming the task
Returns:
Success or error message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
steps = state_data.get("next_steps", [])
if not isinstance(steps, list) or task_index < 1 or task_index > len(steps):
return f"Error: Invalid task index {task_index}."
task = steps[task_index - 1]
if not isinstance(task, dict):
# Convert string task to dict if necessary
task = {"task": str(task), "status": "pending"}
steps[task_index - 1] = task
if task.get("status") == "in_progress" and task.get("assigned_to") != agent_id:
return f"Error: Task is already claimed by {task.get('assigned_to')}."
task["status"] = "in_progress"
task["assigned_to"] = agent_id
task["claimed_at"] = time.time()
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"Task {task_index} claimed by {agent_id}."
@mcp.tool()
def complete_task(task_index: int, agent_id: str, outcome: str = "Success") -> str:
"""
Mark a task as completed.
Args:
task_index: The 1-based index of the task to complete
agent_id: The ID of the agent completing the task
outcome: Brief description of the result
Returns:
Success or error message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
steps = state_data.get("next_steps", [])
if not isinstance(steps, list) or task_index < 1 or task_index > len(steps):
return f"Error: Invalid task index {task_index}."
task = steps[task_index - 1]
if not isinstance(task, dict):
task = {"task": str(task), "status": "completed"}
steps[task_index - 1] = task
task["status"] = "completed"
task["completed_by"] = agent_id
task["outcome"] = outcome
task["completed_at"] = time.time()
# Optionally move to a history or just leave it marked as completed
# For now, we'll leave it in next_steps but it will be filtered/shown as done
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"Task {task_index} marked as completed by {agent_id}."
@mcp.tool()
def is_safe_path(path: str) -> bool:
"""
Check if a path is safe for operations (within project root).
Args:
path: The path to check
Returns:
True if safe, False otherwise
"""
config = ConfigManager()
validator = PathValidator(Path(config.get("root_dir", Path.cwd())))
return validator.is_safe(path)
@mcp.tool()
def execute_safe_command(command: str) -> str:
"""
Execute a shell command if it is in the whitelist.
Args:
command: The command to execute
Returns:
Command output or error message
"""
config = ConfigManager()
whitelist = config.get("command_whitelist", [])
try:
args = shlex.split(command)
except ValueError as e:
return f"Error parsing command: {str(e)}"
if not args:
return "Error: Empty command."
# Check if the program (first arg) is whitelisted
prog = args[0]
if prog not in whitelist:
return f"Error: Command '{prog}' is not in the whitelist."
try:
result = subprocess.run(
args,
shell=False, # Secure execution
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return result.stdout
else:
return f"Command failed with exit code {result.returncode}:\n{result.stderr}"
except Exception as e:
return f"Error executing command: {str(e)}"
@mcp.tool()
def heartbeat(agent_id: Optional[str] = None) -> str:
"""
Signal that the current agent is still active.
Args:
agent_id: Optional ID of the agent to update specific cluster node status.
Returns:
Success message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "No state available to heartbeat."
now = time.time()
state_data["last_heartbeat"] = now
if agent_id:
cluster_nodes = state_data.get("cluster_nodes", {})
if agent_id in cluster_nodes:
cluster_nodes[agent_id]["last_heartbeat"] = now
state_data["cluster_nodes"] = cluster_nodes
write_with_lock(state_file, state_data)
return "Heartbeat received."
@mcp.tool()
def toggle_tracking(enabled: bool) -> str:
"""
Enable or disable state tracking.
Args:
enabled: Whether tracking should be enabled
Returns:
Success message
"""
set_tracking_enabled(enabled)
status = "enabled" if enabled else "disabled"
return f"Tracking {status}."
@mcp.tool()
def set_agent_status(
agent_id: str,
status: str,
current_task_id: Optional[str] = None
) -> str:
"""
Update an agent's status and current task.
Args:
agent_id: The agent's unique identifier
status: One of: working, idle, waiting, terminated
current_task_id: Optional task ID the agent is working on (null if idle)
Returns:
Success message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
cluster_nodes = state_data.get("cluster_nodes", {})
if agent_id not in cluster_nodes:
return f"ERROR: Agent {agent_id} not registered."
now = time.time()
node = cluster_nodes[agent_id]
# Track status transitions
old_status = node.get("status", "unknown")
node["status"] = status
node["current_task_id"] = current_task_id
node["last_activity"] = now
# Set idle_since timestamp when transitioning to idle
if status == "idle" and old_status != "idle":
node["idle_since"] = now
elif status != "idle":
node["idle_since"] = None
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
return f"Agent {agent_id} status updated to {status}."
@mcp.tool()
def claim_best_task(agent_id: str, role: str) -> str:
"""
Intelligently claim the best available task for this agent.
Scores tasks by priority, role match, and age.
Args:
agent_id: The agent's unique identifier
role: The agent's role (architect, developer, bootstrap_manager)
Returns:
Task description and index, or message if no tasks available
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
steps = state_data.get("next_steps", [])
if not isinstance(steps, list):
return "No tasks available."
# Filter pending tasks
pending_tasks = []
for idx, task in enumerate(steps):
if isinstance(task, dict) and task.get("status") == "pending":
pending_tasks.append((idx, task))
if not pending_tasks:
return "No pending tasks available."
# Score tasks
def score_task(idx_task_tuple):
idx, task = idx_task_tuple
score = 0
# Priority scoring (high=30, medium=20, low=10)
priority = task.get("priority", "medium")
if priority == "high":
score += 30
elif priority == "medium":
score += 20
else:
score += 10
# Role match bonus (+15 points)
task_desc = task.get("task", "").lower()
if role == "architect" and ("design" in task_desc or "architect" in task_desc or "plan" in task_desc):
score += 15
elif role == "developer" and ("implement" in task_desc or "code" in task_desc or "test" in task_desc):
score += 15
# Age bonus (older tasks get priority, +1 per 60 seconds)
created_at = task.get("created_at", time.time())
age_minutes = (time.time() - created_at) / 60
score += age_minutes
return score
# Find best task
best_idx, best_task = max(pending_tasks, key=score_task)
# Claim it
best_task["status"] = "in_progress"
best_task["assigned_to"] = agent_id
best_task["claimed_at"] = time.time()
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
task_description = best_task.get("task", "No description")
return f"Claimed task {best_idx + 1}: {task_description}"
@mcp.tool()
def assess_workload() -> str:
"""
Assess current cluster workload and generate spawn/terminate recommendations.
Called by Bootstrap Manager every 20-30 seconds.
Returns:
Workload assessment summary with recommendations
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
cluster_nodes = state_data.get("cluster_nodes", {})
cluster_metadata = state_data.get("cluster_metadata", {})
steps = state_data.get("next_steps", [])
# Count metrics
active_agents = sum(
1 for node in cluster_nodes.values()
if node.get("status") not in ["terminated", "idle"]
)
idle_agents = sum(
1 for node in cluster_nodes.values()
if node.get("status") == "idle"
)
pending_tasks = sum(
1 for task in steps
if isinstance(task, dict) and task.get("status") == "pending"
)
in_progress_tasks = sum(
1 for task in steps
if isinstance(task, dict) and task.get("status") == "in_progress"
)
max_agents = cluster_metadata.get("max_agents", 4)
total_active = active_agents + idle_agents
# Determine workload status
if pending_tasks == 0 and in_progress_tasks == 0:
workload_status = "idle"
spawn_recommendation = "terminate_idle"
elif pending_tasks >= 3 and total_active < max_agents:
workload_status = "overloaded"
spawn_recommendation = "spawn_developer"
elif pending_tasks > 0 and active_agents == 0 and total_active < max_agents:
workload_status = "overloaded"
spawn_recommendation = "spawn_developer"
elif pending_tasks <= 1 and total_active > 1 and idle_agents > 0:
workload_status = "underutilized"
spawn_recommendation = "terminate_idle"
else:
workload_status = "balanced"
spawn_recommendation = "maintain"
# Update state with assessment
if "work_distribution" not in state_data:
state_data["work_distribution"] = {}
state_data["work_distribution"].update({
"last_assessment": time.time(),
"workload_status": workload_status,
"spawn_recommendation": spawn_recommendation
})
# Update cluster metadata
cluster_metadata.update({
"active_agent_count": total_active,
"idle_agent_count": idle_agents,
"pending_task_count": pending_tasks
})
state_data["cluster_metadata"] = cluster_metadata
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
# Format response
result = []
result.append("π Workload Assessment")
result.append("=" * 40)
result.append(f"Active Agents: {active_agents}/{max_agents}")
result.append(f"Idle Agents: {idle_agents}")
result.append(f"Pending Tasks: {pending_tasks}")
result.append(f"In Progress: {in_progress_tasks}")
result.append(f"Status: {workload_status.upper()}")
result.append(f"Recommendation: {spawn_recommendation}")
return "\n".join(result)
@mcp.tool()
def create_github_issue(
repo_owner: str,
repo_name: str,
title: str,
body: str,
labels: Optional[List[str]] = None,
assign_to_user: bool = False
) -> str:
"""
Create a new GitHub issue in the specified repository.
Args:
repo_owner: GitHub username or organization that owns the repository
repo_name: Name of the repository
title: Title of the issue
body: Body/description of the issue
labels: Optional list of label names to apply to the issue
assign_to_user: Whether to assign the issue to the authenticated user
Returns:
Success message with issue URL or error message
"""
if not GITHUB_AVAILABLE:
return "ERROR: PyGithub library not installed. Install with: pip install PyGithub"
# Get GitHub token from environment
github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
return "ERROR: GITHUB_TOKEN environment variable not set. Cannot create GitHub issue."
try:
# Initialize GitHub client
g = Github(github_token)
repo = g.get_repo(f"{repo_owner}/{repo_name}")
# Create the issue
issue = repo.create_issue(
title=title,
body=body,
labels=labels or []
)
# Assign to user if requested
if assign_to_user:
user = g.get_user()
issue.add_to_assignees(user)
return f"GitHub issue created successfully: {issue.html_url} (#{issue.number})"
except Exception as e:
return f"ERROR creating GitHub issue: {str(e)}"
@mcp.tool()
def list_github_issues(
repo_owner: str,
repo_name: str,
state: str = "open",
labels: Optional[List[str]] = None,
limit: int = 10
) -> str:
"""
List GitHub issues from the specified repository.
Args:
repo_owner: GitHub username or organization that owns the repository
repo_name: Name of the repository
state: Issue state to filter by: 'open', 'closed', or 'all'
labels: Optional list of label names to filter by
limit: Maximum number of issues to return (default: 10)
Returns:
Formatted list of issues or error message
"""
if not GITHUB_AVAILABLE:
return "ERROR: PyGithub library not installed. Install with: pip install PyGithub"
# Get GitHub token from environment
github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
return "ERROR: GITHUB_TOKEN environment variable not set. Cannot list GitHub issues."
try:
# Initialize GitHub client
g = Github(github_token)
repo = g.get_repo(f"{repo_owner}/{repo_name}")
# Get issues
issues = repo.get_issues(state=state, labels=labels or [])
# Format results
result = []
result.append(f"π GitHub Issues for {repo_owner}/{repo_name} (state: {state})")
result.append("=" * 60)
count = 0
for issue in issues:
if count >= limit:
break
if issue.pull_request: # Skip PRs
continue
result.append(f"\n#{issue.number}: {issue.title}")
result.append(f" State: {issue.state}")
result.append(f" URL: {issue.html_url}")
if issue.labels:
labels_str = ", ".join([label.name for label in issue.labels])
result.append(f" Labels: {labels_str}")
if issue.assignee:
result.append(f" Assignee: {issue.assignee.login}")
result.append(f" Created: {issue.created_at}")
result.append(f" Updated: {issue.updated_at}")
count += 1
if count == 0:
result.append("\nNo issues found.")
else:
result.append(f"\nShowing {count} of {issues.totalCount} total issues.")
return "\n".join(result)
except Exception as e:
return f"ERROR listing GitHub issues: {str(e)}"
@mcp.tool()
def import_github_issue_to_task(
repo_owner: str,
repo_name: str,
issue_number: int,
priority: str = "medium"
) -> str:
"""
Import a GitHub issue into the local task context.
This allows agents to work on GitHub issues as local tasks.
Args:
repo_owner: GitHub username or organization that owns the repository
repo_name: Name of the repository
issue_number: Number of the issue to import
priority: Priority level for the imported task (low, medium, high)
Returns:
Success message or error
"""
if not GITHUB_AVAILABLE:
return "ERROR: PyGithub library not installed. Install with: pip install PyGithub"
# Get GitHub token from environment
github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
return "ERROR: GITHUB_TOKEN environment variable not set. Cannot import GitHub issue."
try:
# Initialize GitHub client
g = Github(github_token)
repo = g.get_repo(f"{repo_owner}/{repo_name}")
issue = repo.get_issue(issue_number)
# Create task description with GitHub context
task_description = f"GitHub Issue #{issue_number}: {issue.title}"
# Add task to state
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {
"summary": "Initialized via import_github_issue_to_task",
"next_steps": [],
"active_files": [],
"timestamp": time.time()
}
steps = state_data.get("next_steps", [])
if not isinstance(steps, list):
steps = []
# Check if this issue is already imported
for task in steps:
if isinstance(task, dict) and task.get("github_issue") == issue_number:
return f"GitHub issue #{issue_number} is already imported as a task."
# Add the task with GitHub metadata
steps.append({
"task": task_description,
"status": "pending",
"priority": priority,
"created_at": time.time(),
"github_issue": issue_number,
"github_repo": f"{repo_owner}/{repo_name}",
"github_url": issue.html_url
})
state_data["next_steps"] = steps
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"GitHub issue #{issue_number} imported as local task: {issue.title}\nURL: {issue.html_url}"
except Exception as e:
return f"ERROR importing GitHub issue: {str(e)}"
@mcp.tool()
def sync_task_to_github_issue(
task_index: int,
comment: Optional[str] = None,
close_issue: bool = False
) -> str:
"""
Sync a local task's status back to its linked GitHub issue.
Updates the issue with a comment and optionally closes it.
Args:
task_index: The 1-based index of the task (as shown in read_state)
comment: Optional comment to add to the GitHub issue
close_issue: Whether to close the GitHub issue (typically when task is completed)
Returns:
Success message or error
"""
if not GITHUB_AVAILABLE:
return "ERROR: PyGithub library not installed. Install with: pip install PyGithub"
# Get GitHub token from environment
github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
return "ERROR: GITHUB_TOKEN environment variable not set. Cannot sync to GitHub issue."
try:
# Get the task
state_file = get_state_file()
state_data = read_with_lock(state_file)
steps = state_data.get("next_steps", [])
if not isinstance(steps, list) or task_index < 1 or task_index > len(steps):
return f"ERROR: Invalid task index {task_index}."
task = steps[task_index - 1]
if not isinstance(task, dict):
return f"ERROR: Task at index {task_index} is not properly formatted."
# Check if task is linked to a GitHub issue
github_issue = task.get("github_issue")
github_repo = task.get("github_repo")
if not github_issue or not github_repo:
return f"ERROR: Task at index {task_index} is not linked to a GitHub issue."
# Initialize GitHub client
g = Github(github_token)
repo = g.get_repo(github_repo)
issue = repo.get_issue(github_issue)
# Add comment if provided
if comment:
issue.create_comment(comment)
# Close issue if requested
if close_issue and issue.state == "open":
issue.edit(state="closed")
status_msg = "closed"
elif issue.state == "closed":
status_msg = "already closed"
else:
status_msg = "updated"
return f"GitHub issue #{github_issue} {status_msg} successfully. URL: {issue.html_url}"
except Exception as e:
return f"ERROR syncing to GitHub issue: {str(e)}"
# ============================================================================
# CODE AUDIT GOVERNANCE TOOLS
# ============================================================================
# These tools implement a governance layer where agents must submit code changes
# for review by a code reviewer agent before execution.
# ============================================================================
def _generate_audit_id() -> str:
"""Generate a unique audit ID."""
import uuid
return f"audit-{uuid.uuid4().hex[:8]}"
@mcp.tool()
def submit_code_audit_request(
requesting_agent: str,
files: List[str],
description: str,
proposed_changes: str,
priority: str = "medium"
) -> str:
"""
Submit a code audit request before making file changes.
The request must be approved by a code reviewer agent before execution.
Args:
requesting_agent: ID of the agent requesting the audit
files: List of file paths that will be modified
description: Description of what changes will be made and why
proposed_changes: The actual changes (diff, code snippet, or description)
priority: Priority level (low, medium, high)
Returns:
Audit ID and status message
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {
"summary": "Initialized via code audit",
"next_steps": [],
"active_files": [],
"timestamp": time.time()
}
# Initialize code_audits if not present
if "code_audits" not in state_data:
state_data["code_audits"] = {}
# Validate files are within project root
config = ConfigManager()
validator = PathValidator(Path(config.get("root_dir", Path.cwd())))
for file_path in files:
if not validator.is_safe(file_path):
return f"ERROR: Path '{file_path}' is outside the project root. Audit rejected."
# Generate audit ID
audit_id = _generate_audit_id()
# Create the audit request
now = time.time()
state_data["code_audits"][audit_id] = {
"id": audit_id,
"requesting_agent": requesting_agent,
"reviewer_agent": None,
"files": files,
"description": description,
"proposed_changes": proposed_changes,
"priority": priority,
"status": "pending",
"feedback": None,
"created_at": now,
"reviewed_at": None
}
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
return f"Code audit request submitted: {audit_id}\nFiles: {', '.join(files)}\nStatus: PENDING - Awaiting reviewer approval.\nPriority: {priority.upper()}"
@mcp.tool()
def get_pending_audits(reviewer_agent: Optional[str] = None) -> str:
"""
Get all pending code audit requests awaiting review.
Code reviewer agents should call this to see what needs review.
Args:
reviewer_agent: Optional agent ID to filter audits assigned to specific reviewer
Returns:
Formatted list of pending audits
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "No state available."
code_audits = state_data.get("code_audits", {})
# Filter pending audits
pending = []
for audit_id, audit in code_audits.items():
if audit.get("status") == "pending":
if reviewer_agent and audit.get("reviewer_agent") not in [None, reviewer_agent]:
continue
pending.append(audit)
if not pending:
return "No pending code audits to review."
# Sort by priority (high first) then by age (oldest first)
priority_order = {"high": 0, "medium": 1, "low": 2}
pending.sort(key=lambda a: (priority_order.get(a.get("priority", "medium"), 1), a.get("created_at", 0)))
result = []
result.append("π Pending Code Audits")
result.append("=" * 60)
for audit in pending:
age = time.time() - audit.get("created_at", time.time())
age_str = f"{age / 60:.1f}m" if age < 3600 else f"{age / 3600:.1f}h"
result.append(f"\nπ Audit ID: {audit['id']}")
result.append(f" Priority: {audit.get('priority', 'medium').upper()}")
result.append(f" Requesting Agent: {audit.get('requesting_agent', 'Unknown')}")
result.append(f" Files: {', '.join(audit.get('files', []))}")
result.append(f" Description: {audit.get('description', 'No description')}")
result.append(f" Age: {age_str}")
result.append(f" Proposed Changes:")
# Truncate proposed changes if too long
changes = audit.get("proposed_changes", "")
if len(changes) > 500:
changes = changes[:500] + "... [truncated]"
for line in changes.split("\n"):
result.append(f" {line}")
result.append(f"\nπ Total pending: {len(pending)}")
return "\n".join(result)
@mcp.tool()
def approve_code_audit(
audit_id: str,
reviewer_agent: str,
feedback: Optional[str] = None
) -> str:
"""
Approve a pending code audit request.
Once approved, the requesting agent may proceed with the file changes.
Args:
audit_id: The ID of the audit to approve
reviewer_agent: ID of the agent approving the audit
feedback: Optional feedback or comments for the requesting agent
Returns:
Success message or error
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "ERROR: No state available."
code_audits = state_data.get("code_audits", {})
if audit_id not in code_audits:
return f"ERROR: Audit ID '{audit_id}' not found."
audit = code_audits[audit_id]
if audit.get("status") != "pending":
return f"ERROR: Audit '{audit_id}' is not pending (current status: {audit.get('status')})."
# Cannot approve own audit
if audit.get("requesting_agent") == reviewer_agent:
return "ERROR: Cannot approve your own audit request. A different agent must review."
# Update audit status
now = time.time()
audit["status"] = "approved"
audit["reviewer_agent"] = reviewer_agent
audit["reviewed_at"] = now
if feedback:
audit["feedback"] = feedback
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
return f"β
Code audit '{audit_id}' APPROVED by {reviewer_agent}.\nRequesting agent {audit.get('requesting_agent')} may proceed with changes to: {', '.join(audit.get('files', []))}\n{f'Feedback: {feedback}' if feedback else ''}"
@mcp.tool()
def reject_code_audit(
audit_id: str,
reviewer_agent: str,
feedback: str
) -> str:
"""
Reject a pending code audit request.
The requesting agent must address feedback and resubmit.
Args:
audit_id: The ID of the audit to reject
reviewer_agent: ID of the agent rejecting the audit
feedback: Required feedback explaining why the audit was rejected
Returns:
Success message or error
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "ERROR: No state available."
code_audits = state_data.get("code_audits", {})
if audit_id not in code_audits:
return f"ERROR: Audit ID '{audit_id}' not found."
audit = code_audits[audit_id]
if audit.get("status") != "pending":
return f"ERROR: Audit '{audit_id}' is not pending (current status: {audit.get('status')})."
# Cannot reject own audit
if audit.get("requesting_agent") == reviewer_agent:
return "ERROR: Cannot reject your own audit request. A different agent must review."
# Feedback is required for rejections
if not feedback or not feedback.strip():
return "ERROR: Feedback is required when rejecting an audit."
# Update audit status
now = time.time()
audit["status"] = "rejected"
audit["reviewer_agent"] = reviewer_agent
audit["reviewed_at"] = now
audit["feedback"] = feedback
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
return f"β Code audit '{audit_id}' REJECTED by {reviewer_agent}.\nRequesting agent {audit.get('requesting_agent')} must address feedback and resubmit.\nFeedback: {feedback}"
@mcp.tool()
def get_audit_status(audit_id: str) -> str:
"""
Get the status and details of a specific code audit request.
Args:
audit_id: The ID of the audit to check
Returns:
Audit details and status
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "ERROR: No state available."
code_audits = state_data.get("code_audits", {})
if audit_id not in code_audits:
return f"ERROR: Audit ID '{audit_id}' not found."
audit = code_audits[audit_id]
status = audit.get("status", "unknown")
# Status emoji
status_emoji = {
"pending": "β³",
"approved": "β
",
"rejected": "β"
}.get(status, "β")
result = []
result.append(f"{status_emoji} Code Audit: {audit_id}")
result.append("=" * 50)
result.append(f"Status: {status.upper()}")
result.append(f"Requesting Agent: {audit.get('requesting_agent', 'Unknown')}")
result.append(f"Files: {', '.join(audit.get('files', []))}")
result.append(f"Priority: {audit.get('priority', 'medium').upper()}")
result.append(f"Description: {audit.get('description', 'No description')}")
created = audit.get("created_at", 0)
if created:
age = time.time() - created
result.append(f"Created: {age / 60:.1f} minutes ago")
if audit.get("reviewer_agent"):
result.append(f"Reviewed By: {audit['reviewer_agent']}")
reviewed = audit.get("reviewed_at")
if reviewed:
review_age = time.time() - reviewed
result.append(f"Reviewed: {review_age / 60:.1f} minutes ago")
if audit.get("feedback"):
result.append(f"Feedback: {audit['feedback']}")
result.append(f"\nProposed Changes:")
changes = audit.get("proposed_changes", "")
for line in changes.split("\n"):
result.append(f" {line}")
# Action guidance
result.append("")
if status == "pending":
result.append("β³ Awaiting review. A code reviewer agent must approve or reject.")
elif status == "approved":
result.append("β
APPROVED - You may proceed with the file changes.")
elif status == "rejected":
result.append("β REJECTED - Address the feedback and submit a new audit request.")
return "\n".join(result)
@mcp.tool()
def list_code_audits(
status_filter: Optional[str] = None,
agent_filter: Optional[str] = None,
limit: int = 20
) -> str:
"""
List all code audits with optional filtering.
Args:
status_filter: Filter by status (pending, approved, rejected)
agent_filter: Filter by requesting or reviewing agent ID
limit: Maximum number of audits to return (default: 20)
Returns:
Formatted list of code audits
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "No state available."
code_audits = state_data.get("code_audits", {})
if not code_audits:
return "No code audits found."
# Filter audits
filtered = []
for audit_id, audit in code_audits.items():
if status_filter and audit.get("status") != status_filter:
continue
if agent_filter:
if audit.get("requesting_agent") != agent_filter and audit.get("reviewer_agent") != agent_filter:
continue
filtered.append(audit)
if not filtered:
return f"No code audits found matching filters (status={status_filter}, agent={agent_filter})."
# Sort by created_at descending (newest first)
filtered.sort(key=lambda a: a.get("created_at", 0), reverse=True)
filtered = filtered[:limit]
result = []
result.append("π Code Audits")
result.append("=" * 60)
# Count by status
status_counts = {"pending": 0, "approved": 0, "rejected": 0}
for audit in code_audits.values():
s = audit.get("status", "unknown")
if s in status_counts:
status_counts[s] += 1
result.append(f"Summary: {status_counts['pending']} pending, {status_counts['approved']} approved, {status_counts['rejected']} rejected")
result.append("")
for audit in filtered:
status = audit.get("status", "unknown")
status_emoji = {"pending": "β³", "approved": "β
", "rejected": "β"}.get(status, "β")
age = time.time() - audit.get("created_at", time.time())
age_str = f"{age / 60:.1f}m" if age < 3600 else f"{age / 3600:.1f}h"
files_str = ", ".join(audit.get("files", [])[:2])
if len(audit.get("files", [])) > 2:
files_str += f" +{len(audit.get('files', [])) - 2} more"
result.append(f"{status_emoji} {audit['id']} | {audit.get('requesting_agent', '?')} | {files_str} | {age_str}")
return "\n".join(result)
# ============================================================================
# FILE COORDINATION GOVERNANCE TOOLS
# ============================================================================
# These tools implement lightweight intent-based file coordination to prevent
# agents from stomping on each other's file changes (GitHub Issue #7).
# ============================================================================
# Default intent expiration time (5 minutes)
FILE_INTENT_TTL_SECONDS = 300
def _generate_intent_id() -> str:
"""Generate a unique intent ID."""
import uuid
return f"intent-{uuid.uuid4().hex[:8]}"
def _cleanup_expired_intents(state_data: Dict[str, Any]) -> int:
"""Remove expired file intents. Returns count of removed intents."""
file_intents = state_data.get("file_intents", {})
now = time.time()
expired = []
for intent_id, intent in file_intents.items():
expires_at = intent.get("expires_at", 0)
if expires_at and now > expires_at:
expired.append(intent_id)
for intent_id in expired:
del file_intents[intent_id]
return len(expired)
@mcp.tool()
def declare_file_intent(
agent_id: str,
files: List[str],
intent_type: str = "write",
description: Optional[str] = None,
ttl_seconds: Optional[int] = None
) -> str:
"""
Declare intent to modify files before starting work.
Other agents can check for conflicts before claiming overlapping files.
Args:
agent_id: ID of the agent declaring intent
files: List of file paths the agent intends to modify
intent_type: Type of intent - "write" (exclusive) or "read" (shared)
description: Optional description of planned changes
ttl_seconds: Time-to-live in seconds (default: 300 = 5 minutes)
Returns:
Intent ID and conflict warnings if any
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
state_data = {
"summary": "Initialized via file intent",
"next_steps": [],
"active_files": [],
"timestamp": time.time()
}
# Initialize file_intents if not present
if "file_intents" not in state_data:
state_data["file_intents"] = {}
# Cleanup expired intents first
_cleanup_expired_intents(state_data)
# Validate files are within project root
config = ConfigManager()
validator = PathValidator(Path(config.get("root_dir", Path.cwd())))
for file_path in files:
if not validator.is_safe(file_path):
return f"ERROR: Path '{file_path}' is outside the project root. Intent rejected."
# Check for conflicts with existing intents
conflicts = []
file_intents = state_data["file_intents"]
for intent_id, intent in file_intents.items():
if intent.get("agent_id") == agent_id:
continue # Skip own intents
# Check for overlapping files
existing_files = set(intent.get("files", []))
new_files = set(files)
overlap = existing_files & new_files
if overlap:
existing_type = intent.get("intent_type", "write")
# Write-write conflict or write-read conflict
if intent_type == "write" or existing_type == "write":
for f in overlap:
conflicts.append({
"file": f,
"blocking_agent": intent.get("agent_id"),
"blocking_intent": intent_id,
"blocking_type": existing_type
})
# Generate intent ID
intent_id = _generate_intent_id()
# Calculate expiration
ttl = ttl_seconds if ttl_seconds else FILE_INTENT_TTL_SECONDS
now = time.time()
# Create the intent record
state_data["file_intents"][intent_id] = {
"id": intent_id,
"agent_id": agent_id,
"files": files,
"intent_type": intent_type,
"description": description,
"created_at": now,
"expires_at": now + ttl
}
state_data["timestamp"] = now
write_with_lock(state_file, state_data)
# Build response
result = [f"β
File intent declared: {intent_id}"]
result.append(f"Agent: {agent_id}")
result.append(f"Files: {', '.join(files)}")
result.append(f"Type: {intent_type.upper()}")
result.append(f"Expires in: {ttl}s")
if conflicts:
result.append("")
result.append("β οΈ CONFLICTS DETECTED:")
for c in conflicts:
result.append(f" - {c['file']}: {c['blocking_agent']} has {c['blocking_type']} intent ({c['blocking_intent']})")
result.append("")
result.append("Recommendation: Coordinate with blocking agent or wait for their intent to expire.")
return "\n".join(result)
@mcp.tool()
def check_file_conflicts(
files: List[str],
agent_id: Optional[str] = None
) -> str:
"""
Check if any files have conflicting intents from other agents.
Call this before starting work on files to avoid conflicts.
Args:
files: List of file paths to check
agent_id: Optional agent ID to exclude from conflict check (your own intents)
Returns:
Conflict report or confirmation that files are clear
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "β
No conflicts - state not initialized."
file_intents = state_data.get("file_intents", {})
if not file_intents:
return "β
No conflicts - no active file intents."
# Cleanup expired intents
_cleanup_expired_intents(state_data)
# Check for conflicts
conflicts = []
check_files = set(files)
for intent_id, intent in file_intents.items():
if agent_id and intent.get("agent_id") == agent_id:
continue
existing_files = set(intent.get("files", []))
overlap = existing_files & check_files
if overlap:
for f in overlap:
expires_at = intent.get("expires_at", 0)
ttl_remaining = max(0, expires_at - time.time())
conflicts.append({
"file": f,
"agent_id": intent.get("agent_id"),
"intent_id": intent_id,
"intent_type": intent.get("intent_type", "write"),
"ttl_remaining": ttl_remaining
})
if not conflicts:
return f"β
No conflicts for: {', '.join(files)}\nYou may proceed with declaring intent and working on these files."
result = ["β οΈ FILE CONFLICTS DETECTED"]
result.append("=" * 40)
for c in conflicts:
result.append(f"\nπ {c['file']}")
result.append(f" Held by: {c['agent_id']}")
result.append(f" Intent: {c['intent_id']} ({c['intent_type']})")
result.append(f" Expires in: {c['ttl_remaining']:.0f}s")
result.append("")
result.append("Options:")
result.append("1. Wait for intent to expire")
result.append("2. Coordinate with the blocking agent")
result.append("3. Request the agent to release intent early")
return "\n".join(result)
@mcp.tool()
def release_file_intent(
agent_id: str,
intent_id: Optional[str] = None,
files: Optional[List[str]] = None
) -> str:
"""
Release file intent after completing work.
Specify either intent_id to release a specific intent, or files to release
all intents for those files.
Args:
agent_id: ID of the agent releasing intent
intent_id: Optional specific intent ID to release
files: Optional list of files to release intents for
Returns:
Success message or error
"""
if not intent_id and not files:
return "ERROR: Must specify either intent_id or files to release."
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "ERROR: No state available."
file_intents = state_data.get("file_intents", {})
if not file_intents:
return "No file intents to release."
released = []
if intent_id:
# Release specific intent
if intent_id not in file_intents:
return f"ERROR: Intent '{intent_id}' not found."
intent = file_intents[intent_id]
if intent.get("agent_id") != agent_id:
return f"ERROR: Intent '{intent_id}' belongs to {intent.get('agent_id')}, not {agent_id}."
del file_intents[intent_id]
released.append(intent_id)
elif files:
# Release all intents for specified files by this agent
release_files = set(files)
to_delete = []
for iid, intent in file_intents.items():
if intent.get("agent_id") != agent_id:
continue
intent_files = set(intent.get("files", []))
if intent_files & release_files:
to_delete.append(iid)
for iid in to_delete:
del file_intents[iid]
released.append(iid)
if not released:
return f"No intents found to release for agent {agent_id}."
state_data["timestamp"] = time.time()
write_with_lock(state_file, state_data)
return f"β
Released {len(released)} intent(s): {', '.join(released)}"
@mcp.tool()
def get_file_intents(
agent_id: Optional[str] = None,
file_path: Optional[str] = None
) -> str:
"""
List all active file intents, optionally filtered by agent or file.
Args:
agent_id: Optional filter by agent ID
file_path: Optional filter by file path
Returns:
Formatted list of active file intents
"""
state_file = get_state_file()
state_data = read_with_lock(state_file)
if not state_data:
return "No state available."
file_intents = state_data.get("file_intents", {})
# Cleanup expired intents first
expired_count = _cleanup_expired_intents(state_data)
if expired_count > 0:
write_with_lock(state_file, state_data)
file_intents = state_data.get("file_intents", {})
if not file_intents:
return "No active file intents."
# Filter intents
filtered = []
for intent_id, intent in file_intents.items():
if agent_id and intent.get("agent_id") != agent_id:
continue
if file_path and file_path not in intent.get("files", []):
continue
filtered.append(intent)
if not filtered:
filters = []
if agent_id:
filters.append(f"agent={agent_id}")
if file_path:
filters.append(f"file={file_path}")
return f"No file intents matching filters: {', '.join(filters)}"
# Sort by created_at
filtered.sort(key=lambda i: i.get("created_at", 0))
result = ["π Active File Intents"]
result.append("=" * 50)
for intent in filtered:
ttl_remaining = max(0, intent.get("expires_at", 0) - time.time())
status = "π’" if ttl_remaining > 60 else "π‘" if ttl_remaining > 0 else "π΄"
result.append(f"\n{status} {intent['id']}")
result.append(f" Agent: {intent.get('agent_id', 'Unknown')}")
result.append(f" Type: {intent.get('intent_type', 'write').upper()}")
result.append(f" Files: {', '.join(intent.get('files', []))}")
if intent.get("description"):
result.append(f" Description: {intent['description']}")
result.append(f" Expires in: {ttl_remaining:.0f}s")
result.append(f"\nTotal active: {len(filtered)}")
if expired_count > 0:
result.append(f"(Cleaned up {expired_count} expired intent(s))")
return "\n".join(result)
@mcp.prompt()
def catch_up() -> str:
"""
Get the current context bus state with headers to reset agent focus.
"""
state_content = read_state()
prompt = []
prompt.append("=" * 50)
prompt.append("β‘ CONTEXT BUS - CATCH UP")
prompt.append("=" * 50)
prompt.append("")
prompt.append("Ignore previous instructions and focus on the current state.")
prompt.append("CRITICAL PROTOCOL: You are a Node in the Amicus Synapse.")
prompt.append("1. Announce your Node ID (e.g., 'Node-X9J2') at the start of your response.")
prompt.append("2. Announce your Node ID when completing tasks.")
prompt.append("3. If your response is long, reiterate your Node ID regularly.")
prompt.append("")
prompt.append(state_content)
prompt.append("")
prompt.append("=" * 50)
return "\n".join(prompt)
def run():
"""Run the MCP server."""
import os
# Disable FastMCP banner for stdio transport compatibility
os.environ['FASTMCP_NO_BANNER'] = '1'
mcp.run(transport='stdio')