MCP Claude Code
by SDGLBL
Verified
"""Agent tool implementation for MCP Claude Code.
This module implements the AgentTool that allows Claude to delegate tasks to sub-agents,
enabling concurrent execution of multiple operations and specialized processing.
"""
import json
import time
from collections.abc import Iterable
from typing import Any, final, override
import litellm
from mcp.server.fastmcp import Context as MCPContext
from mcp.server.fastmcp import FastMCP
from openai.types.chat import ChatCompletionMessageParam, ChatCompletionToolParam
from mcp_claude_code.tools.agent.prompt import (
get_allowed_agent_tools,
get_default_model,
get_model_parameters,
get_system_prompt,
)
from mcp_claude_code.tools.agent.tool_adapter import (
convert_tools_to_openai_functions,
)
from mcp_claude_code.tools.common.base import BaseTool
from mcp_claude_code.tools.common.context import DocumentContext, ToolContext, create_tool_context
from mcp_claude_code.tools.common.permissions import PermissionManager
from mcp_claude_code.tools.filesystem import get_read_only_filesystem_tools
from mcp_claude_code.tools.jupyter import get_read_only_jupyter_tools
from mcp_claude_code.tools.project import get_project_tools
from mcp_claude_code.tools.shell.command_executor import CommandExecutor
@final
class AgentTool(BaseTool):
"""Tool for delegating tasks to sub-agents.
The AgentTool allows Claude to create and manage sub-agents for performing
specialized tasks concurrently, such as code search, analysis, and more.
"""
@property
@override
def name(self) -> str:
"""Get the tool name.
Returns:
Tool name
"""
return "dispatch_agent"
@property
@override
def description(self) -> str:
"""Get the tool description.
Returns:
Tool description
"""
return """Launch one or more agents that can perform tasks using read-only tools.
This tool creates agents for delegation of tasks such as multi-step searches, complex analyses,
or other operations that benefit from focused processing. Multiple agents can work concurrently
on independent tasks, improving performance for complex operations.
Each agent works with its own context and provides a response containing the results of its work.
Results from all agents are combined in the final response.
Args:
prompts: A list of task descriptions, where each item launches an independent agent.
Returns:
Combined results from all agent executions
"""
@property
@override
def parameters(self) -> dict[str, Any]:
"""Get the parameter specifications for the tool.
Returns:
Parameter specifications
"""
return {
"properties": {
"prompts": {
"anyOf": [
{
"type": "string",
"description": "Single task for an agent to perform"
},
{
"type": "array",
"items": {
"type": "string"
},
"description": "List of tasks for agents to perform concurrently"
}
],
"description": "Task(s) for agent(s) to perform"
}
},
"required": ["prompts"],
"type": "object",
}
@property
@override
def required(self) -> list[str]:
"""Get the list of required parameter names.
Returns:
List of required parameter names
"""
return ["prompts"]
def __init__(
self, document_context: DocumentContext, permission_manager: PermissionManager, command_executor: CommandExecutor,
model: str | None = None, api_key: str | None = None, max_tokens: int | None = None
) -> None:
"""Initialize the agent tool.
Args:
document_context: Document context for tracking file contents
permission_manager: Permission manager for access control
command_executor: Command executor for running shell commands
model: Optional model name override in LiteLLM format (e.g., "openai/gpt-4o")
api_key: Optional API key for the model provider
max_tokens: Optional maximum tokens for model responses
"""
self.document_context = document_context
self.permission_manager = permission_manager
self.command_executor = command_executor
self.model_override = model
self.api_key_override = api_key
self.max_tokens_override = max_tokens
self.available_tools :list[BaseTool] = []
self.available_tools.extend(get_read_only_filesystem_tools(self.document_context, self.permission_manager))
self.available_tools.extend(get_read_only_jupyter_tools(self.document_context, self.permission_manager))
self.available_tools.extend(get_project_tools(self.document_context, self.permission_manager,self.command_executor))
@override
async def call(self, ctx: MCPContext, **params: Any) -> str:
"""Execute the tool with the given parameters.
Args:
ctx: MCP context
**params: Tool parameters
Returns:
Tool execution result
"""
start_time = time.time()
# Create tool context
tool_ctx = create_tool_context(ctx)
tool_ctx.set_tool_info(self.name)
# Extract parameters
prompts = params.get("prompts")
if prompts is None:
await tool_ctx.error("Parameter 'prompts' is required but was not provided")
return "Error: Parameter 'prompts' is required but was not provided"
# Validate prompts parameter
if isinstance(prompts, str):
# Handle single string case
if not prompts.strip(): # Empty string
await tool_ctx.error("Prompt cannot be empty")
return "Error: Prompt cannot be empty"
prompts = [prompts] # Convert to list for uniform handling
elif isinstance(prompts, list):
# Handle list case
if not prompts: # Empty list
await tool_ctx.error("At least one prompt must be provided in the array")
return "Error: At least one prompt must be provided in the array"
# Check for empty strings in the list
if any(not isinstance(p, str) or not p.strip() for p in prompts):
await tool_ctx.error("All prompts must be non-empty strings")
return "Error: All prompts must be non-empty strings"
else:
# Invalid type
await tool_ctx.error("Parameter 'prompts' must be a string or an array of strings")
return "Error: Parameter 'prompts' must be a string or an array of strings"
# Decide execution strategy based on number of prompts
if len(prompts) == 1:
# Single agent case
await tool_ctx.info(f"Launching agent with prompt: {prompts[0][:100]}...")
result = await self._execute_single_agent(prompts[0], tool_ctx)
else:
# Multiple agents case
await tool_ctx.info(f"Launching {len(prompts)} agents in parallel")
result = await self._execute_multiple_agents(prompts, tool_ctx)
# Calculate execution time
execution_time = time.time() - start_time
# Format the final result with metrics
formatted_result = self._format_result(result, execution_time)
# Log completion
await tool_ctx.info(f"Agent execution completed in {execution_time:.2f}s")
return formatted_result
async def _execute_single_agent(self, prompt: str, tool_ctx: ToolContext) -> str:
"""Execute a single agent with the given prompt.
Args:
prompt: The prompt for the agent
tool_ctx: Tool context for logging
Returns:
Agent execution result
"""
try:
# Get available tools for the agent
# The agent has access to the same tools we do, but filtered for safety
# Apply permissions filtering
agent_tools = get_allowed_agent_tools(
self.available_tools,
self.permission_manager,
)
# Create system prompt
system_prompt = get_system_prompt(
agent_tools,
self.permission_manager,
).format(prompt=prompt)
# Convert tools to OpenAI format
openai_tools = convert_tools_to_openai_functions(agent_tools)
# Execute the agent
await tool_ctx.info(f"Agent executing with {len(agent_tools)} available tools")
# Execute agent with tool handling
result = await self._execute_agent_with_tools(
system_prompt,
agent_tools,
openai_tools,
tool_ctx
)
return result
except Exception as e:
# Log and return any errors
error_message = f"Error executing agent: {str(e)}"
await tool_ctx.error(error_message)
return f"Error: {error_message}"
async def _execute_multiple_agents(self, prompts: list[str], tool_ctx: ToolContext) -> str:
"""Execute multiple agents concurrently with the given prompts.
Args:
prompts: List of prompts for the agents
tool_ctx: Tool context for logging
Returns:
Combined agent execution results
"""
# Get available tools for the agents (do this once to avoid redundant work)
agent_tools = get_allowed_agent_tools(
self.available_tools,
self.permission_manager,
)
# Convert tools to OpenAI format (do this once to avoid redundant work)
openai_tools = convert_tools_to_openai_functions(agent_tools)
# Log execution start
await tool_ctx.info(f"Starting parallel execution of {len(prompts)} agents")
# Create a list to store the tasks
tasks = []
results = []
# Handle exceptions for individual agent executions
for i, prompt in enumerate(prompts):
try:
# Create system prompt for this agent
system_prompt = get_system_prompt(
agent_tools,
self.permission_manager,
).format(prompt=prompt)
# Execute agent and collect the task
await tool_ctx.info(f"Launching agent {i+1}/{len(prompts)}: {prompt[:50]}...")
task = self._execute_agent_with_tools(
system_prompt,
agent_tools,
openai_tools,
tool_ctx
)
tasks.append(task)
except Exception as e:
# Log and add error result
error_message = f"Error preparing agent {i+1}: {str(e)}"
await tool_ctx.error(error_message)
results.append(f"Agent {i+1} Error: {error_message}")
# Execute all pending tasks concurrently
if tasks:
import asyncio
try:
# Wait for all tasks to complete
completed_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results, handling any exceptions
for i, result in enumerate(completed_results):
if isinstance(result, Exception):
results.append(f"Agent {i+1} Error: {str(result)}")
else:
results.append(f"Agent {i+1} Result:\n{result}")
except Exception as e:
# Handle any unexpected exceptions during gathering
error_message = f"Error executing agents concurrently: {str(e)}"
await tool_ctx.error(error_message)
results.append(f"Error: {error_message}")
# Combine results
combined_result = "\n\n" + "\n\n---\n\n".join(results)
return combined_result
async def _execute_agent_with_tools(
self,
system_prompt: str,
available_tools: list[BaseTool],
openai_tools: list[ChatCompletionToolParam],
tool_ctx: ToolContext,
) -> str:
"""Execute agent with tool handling.
Args:
system_prompt: System prompt for the agent
available_tools: List of available tools
openai_tools: List of tools in OpenAI format
tool_ctx: Tool context for logging
Returns:
Agent execution result
"""
# Get model parameters and name
model = get_default_model(self.model_override)
params = get_model_parameters(max_tokens=self.max_tokens_override)
# Initialize messages
messages:Iterable[ChatCompletionMessageParam] = []
messages.append({"role": "system", "content": system_prompt})
# Track tool usage for metrics
tool_usage = {}
max_tool_uses = 15 # Safety limit to prevent infinite loops
total_tool_use_count = 0
iteration_count = 0
max_iterations = 10 # Add a maximum number of iterations for safety
# Execute until the agent completes or reaches the limit
while total_tool_use_count < max_tool_uses and iteration_count < max_iterations:
iteration_count += 1
await tool_ctx.info(f"Calling model (iteration {iteration_count})...")
try:
# Configure model parameters based on capabilities
completion_params = {
"model": model,
"messages": messages,
"tools": openai_tools,
"tool_choice": "auto",
"temperature": params["temperature"],
"timeout": params["timeout"],
}
if self.api_key_override:
completion_params["api_key"] = self.api_key_override
# Add max_tokens if provided
if params.get("max_tokens"):
completion_params["max_tokens"] = params.get("max_tokens")
# Make the model call
response = litellm.completion(
**completion_params #pyright: ignore
)
if len(response.choices) == 0: #pyright: ignore
raise ValueError("No response choices returned")
message = response.choices[0].message #pyright: ignore
# Add message to conversation history
messages.append(message) #pyright: ignore
# If no tool calls, we're done
if not message.tool_calls:
return message.content or "Agent completed with no response."
# Process tool calls
tool_call_count = len(message.tool_calls)
await tool_ctx.info(f"Processing {tool_call_count} tool calls")
for tool_call in message.tool_calls:
if total_tool_use_count >= max_tool_uses:
await tool_ctx.info("Reached maximum tool usage limit")
break
total_tool_use_count += 1
function_name = tool_call.function.name
# Track usage
tool_usage[function_name] = tool_usage.get(function_name, 0) + 1
# Log tool usage
await tool_ctx.info(f"Agent using tool: {function_name}")
# Parse the arguments
try:
function_args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
function_args = {}
# Find the matching tool
tool = next((t for t in available_tools if t.name == function_name), None)
if not tool:
tool_result = f"Error: Tool '{function_name}' not found"
else:
try:
tool_result = await tool.call(ctx=tool_ctx.mcp_context, **function_args)
except Exception as e:
tool_result = f"Error executing {function_name}: {str(e)}"
# Add the tool result to messages
messages.append(
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result,
}
)
# Log progress
await tool_ctx.info(f"Processed {len(message.tool_calls)} tool calls. Total: {total_tool_use_count}")
except Exception as e:
await tool_ctx.error(f"Error in model call: {str(e)}")
# Avoid trying to JSON serialize message objects
await tool_ctx.error(f"Message count: {len(messages)}")
return f"Error in agent execution: {str(e)}"
# If we've reached the limit, add a warning and get final response
if total_tool_use_count >= max_tool_uses or iteration_count >= max_iterations:
limit_type = "tool usage" if total_tool_use_count >= max_tool_uses else "iterations"
await tool_ctx.info(f"Reached maximum {limit_type} limit. Getting final response.")
messages.append(
{
"role": "system",
"content": f"You have reached the maximum number of {limit_type}. Please provide your final response.",
}
)
try:
# Make a final call to get the result
final_response = litellm.completion(
model=model,
messages=messages,
temperature=params["temperature"],
timeout=params["timeout"],
max_tokens=params.get("max_tokens"),
)
return final_response.choices[0].message.content or f"Agent reached {limit_type} limit without a response." #pyright: ignore
except Exception as e:
await tool_ctx.error(f"Error in final model call: {str(e)}")
return f"Error in final response: {str(e)}"
# Should not reach here but just in case
return "Agent execution completed after maximum iterations."
def _format_result(self, result: str, execution_time: float) -> str:
"""Format agent result with metrics.
Args:
result: Raw result from agent(s)
execution_time: Execution time in seconds
Returns:
Formatted result with metrics
"""
# Check if this is a multi-agent result (contains our separator pattern)
if "\n\n---\n\n" in result and "Agent " in result:
# This is a multi-agent response
agent_count = result.count("Agent ")
return f"""Multi-agent execution completed in {execution_time:.2f} seconds ({agent_count} agents).
{result}
"""
else:
# Single agent response
return f"""Agent execution completed in {execution_time:.2f} seconds.
AGENT RESPONSE:
{result}
"""
@override
def register(self, mcp_server: FastMCP) -> None:
tool_self = self # Create a reference to self for use in the closure
@mcp_server.tool(name=self.name, description=self.mcp_description)
async def dispatch_agent(ctx: MCPContext, prompts: str | list[str]) -> str:
return await tool_self.call(ctx, prompts=prompts)