mcp_tool_context_estimator.py•60.6 kB
#!/usr/bin/env python
"""
MCP Tool Context Estimator
This script connects to an already running MCP server and estimates how much
of an LLM's context window would be consumed by the registered tools when
they're sent to the model via the Model Context Protocol.
"""
import argparse
import asyncio
import json
import os
import sys
import traceback
from typing import Any, Dict, List, Optional
import aiohttp
import tiktoken
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from rich.console import Console
from rich.table import Table
# Add the current directory to the Python path to ensure we can import modules
sys.path.append("/data/projects/ultimate_mcp_server")
# Import the existing decouple configuration from the project
from ultimate_mcp_server.config import decouple_config
# Import actual model pricing from constants
from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS
# Removed dependency on STANDALONE_TOOL_FUNCTIONS to avoid circular imports
# from ultimate_mcp_server.tools import STANDALONE_TOOL_FUNCTIONS
# Define a function to read tool names from a file generated by the server
def read_tool_names_from_file(filename='tools_list.json', quiet=False):
"""Read tool names from a JSON file generated by the server"""
console = Console()
try:
if os.path.exists(filename):
with open(filename, 'r') as f:
tool_data = json.load(f)
if not quiet:
console.print(f"[green]Successfully loaded {len(tool_data)} tools from {filename}[/green]")
return tool_data
else:
if not quiet:
console.print(f"[yellow]Tool list file {filename} not found. Will use server-provided tools only.[/yellow]")
return []
except Exception as e:
if not quiet:
console.print(f"[red]Error reading tool list: {str(e)}[/red]")
return []
# Run another server with --load-all-tools for comparison
RUN_LOAD_ALL_TOOLS_COMPARISON = True
SHOW_DESCRIPTIONS = True
async def detect_server_transport(host: str, port: str, quiet: bool = False) -> tuple[str, str]:
"""
Detect what transport mode the server is running and return the appropriate URL and transport type.
Args:
host: Server hostname
port: Server port
quiet: If True, suppress detection messages
Returns:
Tuple of (url, transport_type) where transport_type is 'sse', 'streamable-http', or 'stdio'
"""
console = Console()
if not quiet:
console.print(f"[blue]Detecting transport mode for server at {host}:{port}...[/blue]")
# Test MCP protocol endpoints with proper requests
endpoints_to_try = [
(f"http://{host}:{port}/mcp/", "streamable-http"),
(f"http://{host}:{port}/sse", "sse"),
(f"http://{host}:{port}", "sse"), # fallback for sse
]
# Create a simple MCP initialization message for testing
test_message = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "mcp-detector", "version": "1.0.0"}
}
}
for url, transport in endpoints_to_try:
try:
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
if transport == "streamable-http":
# Test streamable-http with POST + MCP message
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}
async with session.post(url, json=test_message, headers=headers) as response:
if response.status == 200:
# Check if response looks like MCP
try:
data = await response.text()
if '"jsonrpc":"2.0"' in data or '"result"' in data:
if not quiet:
console.print(f"[green]Detected {transport} transport at {url}[/green]")
return url, transport
except Exception:
pass
elif response.status in [400, 404, 405, 406]:
# Server exists but doesn't support this transport
if not quiet:
console.print(f"[dim]Endpoint {url} returned {response.status}[/dim]")
continue
else:
# Test SSE endpoints - they might respond to GET or POST
# Try GET first for SSE
try:
async with session.get(url) as response:
if response.status == 200:
content_type = response.headers.get('content-type', '').lower()
if 'text/event-stream' in content_type:
if not quiet:
console.print(f"[green]Detected {transport} transport at {url}[/green]")
return url, transport
except Exception:
pass
# If GET failed, try POST for SSE (some servers might expect it)
try:
async with session.post(url, json=test_message) as response:
if response.status == 200:
content_type = response.headers.get('content-type', '').lower()
if 'text/event-stream' in content_type or 'application/json' in content_type:
if not quiet:
console.print(f"[green]Detected {transport} transport at {url}[/green]")
return url, transport
except Exception:
pass
except Exception as e:
if not quiet:
console.print(f"[dim]Could not connect to {url}: {str(e)}[/dim]")
continue
# If HTTP detection fails, try to guess based on what we know
# Check if port 8013 responds at all
try:
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(f"http://{host}:{port}/") as response:
if response.status == 200:
# Server is running, probably streamable-http since that's the new default
default_url = f"http://{host}:{port}/mcp/"
if not quiet:
console.print(f"[yellow]Server detected but transport unclear, defaulting to streamable-http at {default_url}[/yellow]")
return default_url, "streamable-http"
except Exception:
pass
# Final fallback to SSE for backwards compatibility
fallback_url = f"http://{host}:{port}/sse"
if not quiet:
console.print(f"[yellow]Could not detect transport mode, defaulting to SSE at {fallback_url}[/yellow]")
return fallback_url, "sse"
def get_server_url_and_transport() -> tuple[str, str]:
"""
Get the MCP server URL and transport type from .env file or environment variables
Returns:
Tuple of (server_url, transport_type)
"""
# Try to get from python-decouple (.env file)
try:
host = decouple_config('MCP_SERVER_HOST', default='localhost')
port = decouple_config('MCP_SERVER_PORT', default='8013')
# Try to detect transport type - this will be resolved in the async context
return host, port
except Exception:
# Fallback to environment variables if decouple fails
if "MCP_SERVER_HOST" in os.environ and "MCP_SERVER_PORT" in os.environ:
host = os.environ["MCP_SERVER_HOST"]
port = os.environ["MCP_SERVER_PORT"]
return host, port
# Default fallback
return "localhost", "8013"
# Calculate token counts for different models
def count_tokens(text: str) -> int:
"""Count tokens using tiktoken with cl100k_base encoding (used by most modern models)"""
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
# Use real pricing imported from constants.py
# Convert from dollars per million tokens to dollars per 1000 tokens for our calculations
MODEL_PRICES = {
model: price_info["input"] / 1000 # Convert from per million to per thousand
for model, price_info in COST_PER_MILLION_TOKENS.items()
}
def format_capabilities(capabilities):
"""Safely format capabilities object to string for display"""
result = {}
# Check for specific capabilities we know about
if hasattr(capabilities, "tools"):
result["tools"] = "Available" if capabilities.tools else "Not available"
if hasattr(capabilities, "prompts"):
result["prompts"] = "Available" if capabilities.prompts else "Not available"
if hasattr(capabilities, "resources"):
result["resources"] = "Available" if capabilities.resources else "Not available"
if hasattr(capabilities, "logging"):
result["logging"] = "Available" if capabilities.logging else "Not available"
if hasattr(capabilities, "completions"):
result["completions"] = "Available" if capabilities.completions else "Not available"
if hasattr(capabilities, "experimental"):
result["experimental"] = "Available" if capabilities.experimental else "Not available"
return json.dumps(result, indent=2)
async def get_mcp_server_tools_streamable_http(server_url: str, include_tools: Optional[List[str]] = None, console: Console = None, quiet: bool = False) -> Dict[str, Any]:
"""
Connect to an MCP server running in streamable-http mode and fetch all registered tools.
Args:
server_url: The URL of the running MCP server (should be http://host:port/mcp)
include_tools: Optional list of tool names to include (if None, get all tools)
console: Optional console for output
quiet: If True, only show most important output
Returns:
Dictionary with server info and tool definitions
"""
if console is None:
console = Console()
if not quiet:
console.print(f"[bold blue]Connecting to streamable-http MCP server at {server_url}...[/bold blue]")
try:
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
# First, try to initialize the MCP connection
init_data = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"roots": {"listChanged": True}},
"clientInfo": {"name": "mcp-tool-context-estimator", "version": "1.0.0"}
}
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}
if not quiet:
console.print("[bold blue]Initializing MCP protocol via streamable-http...[/bold blue]")
async with session.post(server_url, json=init_data, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to initialize: HTTP {response.status}")
# Capture session ID from response headers
session_id = response.headers.get('mcp-session-id')
if not session_id:
raise Exception("No session ID returned from server")
# Handle SSE-formatted response
response_text = await response.text()
if response.content_type == "text/event-stream":
# Parse SSE format
lines = response_text.strip().split('\n')
json_data = None
for line in lines:
if line.startswith('data: '):
json_data = line[6:] # Remove 'data: ' prefix
break
if json_data:
init_result = json.loads(json_data)
else:
raise Exception("No JSON data found in SSE response")
else:
init_result = await response.json()
if "error" in init_result:
raise Exception(f"MCP initialization error: {init_result['error']}")
if "result" not in init_result:
raise Exception("Invalid MCP initialization response")
result = init_result["result"]
server_info = result.get("serverInfo", {})
server_name = server_info.get("name", "Unknown Server")
server_version = server_info.get("version", "Unknown Version")
if not quiet:
console.print(f"[green]Connected to server:[/green] {server_name} v{server_version}")
# Show server capabilities
capabilities = result.get("capabilities", {})
if not quiet:
console.print("[bold blue]Server capabilities:[/bold blue]")
console.print(json.dumps(capabilities, indent=2))
# Check if tools capability is present
has_tools = capabilities.get("tools", False)
if not quiet and not has_tools:
console.print("[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]")
console.print("The server might not support tool listing, but we'll try anyway.")
# Get server instructions (from server info)
server_instructions = server_info.get("instructions", "")
if server_instructions and not quiet:
console.print(f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]")
elif not quiet:
console.print("[yellow]Server does not provide instructions[/yellow]")
# Update headers to include session ID for subsequent requests
headers["mcp-session-id"] = session_id
# Send initialized notification
init_notify_data = {
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
async with session.post(server_url, json=init_notify_data, headers=headers) as response:
# This is a notification, so we don't expect a response
pass
# Now list the tools
if not quiet:
console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
list_tools_data = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list"
}
async with session.post(server_url, json=list_tools_data, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to list tools: HTTP {response.status}")
# Handle SSE-formatted response for tools list
response_text = await response.text()
if response.content_type == "text/event-stream":
# Parse SSE format
lines = response_text.strip().split('\n')
json_data = None
for line in lines:
if line.startswith('data: '):
json_data = line[6:] # Remove 'data: ' prefix
break
if json_data:
tools_result = json.loads(json_data)
else:
raise Exception("No JSON data found in SSE response")
else:
tools_result = await response.json()
if "error" in tools_result:
raise Exception(f"MCP tools/list error: {tools_result['error']}")
if "result" not in tools_result:
raise Exception("Invalid MCP tools/list response")
tools_data = tools_result["result"]
tools = tools_data.get("tools", [])
# Count tools
tool_count = len(tools) if tools else 0
if not quiet:
console.print(f"[green]Found {tool_count} tools[/green]")
if tool_count == 0:
console.print("[bold yellow]No tools found on the server.[/bold yellow]")
return {}
# Convert tools to their JSON representation (exactly as sent to LLMs)
tool_defs = []
# Add debug information about descriptions
has_descriptions = 0
total_desc_length = 0
for tool in tools:
# Convert to dict that matches the MCP protocol spec for tool definitions
tool_dict = {
"name": tool.get("name"),
"inputSchema": tool.get("inputSchema")
}
# Debug description handling
if tool.get("description"):
desc = tool["description"]
has_descriptions += 1
total_desc_length += len(desc)
if not quiet:
console.print(f"[dim]Tool '{tool['name']}' has description ({len(desc):,} chars)[/dim]")
tool_dict["description"] = desc
elif not quiet:
console.print(f"[dim yellow]Tool '{tool['name']}' has no description[/dim yellow]")
if tool.get("annotations"):
tool_dict["annotations"] = tool["annotations"]
tool_defs.append(tool_dict)
# Print description statistics
if not quiet:
console.print(f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]")
if has_descriptions > 0:
console.print(f"[green]Average description length: {total_desc_length/has_descriptions:,.1f} chars[/green]")
# Include server info in the result to be used for creating the complete LLM prompt
return {
"tools": tool_defs,
"server_name": server_name,
"server_version": server_version,
"server_instructions": server_instructions
}
except Exception as e:
console.print(f"[bold red]Error connecting to streamable-http MCP server:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
raise
async def get_mcp_server_tools_stdio(command: str, args: Optional[List[str]] = None, include_tools: Optional[List[str]] = None, console: Console = None, quiet: bool = False) -> Dict[str, Any]:
"""
Connect to an MCP server via stdio transport and fetch all registered tools.
Args:
command: Command to run the MCP server
args: Additional arguments for the command
include_tools: Optional list of tool names to include (if None, get all tools)
console: Optional console for output
quiet: If True, only show most important output
Returns:
Dictionary with server info and tool definitions
"""
if console is None:
console = Console()
if not quiet:
console.print(f"[bold blue]Connecting to MCP server via stdio: {command} {' '.join(args or [])}[/bold blue]")
try:
# Build the command array
cmd = command.split() if isinstance(command, str) else [command]
if args:
cmd.extend(args)
async with stdio_client(cmd) as (read, write):
# Create a client session
async with ClientSession(read, write) as session:
# Initialize connection to server
if not quiet:
console.print("[bold blue]Initializing MCP protocol via stdio...[/bold blue]")
init_result = await session.initialize()
# Get server info
server_name = init_result.serverInfo.name
server_version = init_result.serverInfo.version
if not quiet:
console.print(f"[green]Connected to server:[/green] {server_name} v{server_version}")
# Show server capabilities safely
if not quiet:
console.print("[bold blue]Server capabilities:[/bold blue]")
console.print(format_capabilities(init_result.capabilities))
# Check if tools capability is present
has_tools = False
if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools:
has_tools = True
if not quiet and not has_tools:
console.print("[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]")
console.print("The server might not support tool listing, but we'll try anyway.")
# Get server instructions (will be used in the LLM prompt)
server_instructions = ""
if hasattr(init_result, "instructions") and init_result.instructions:
server_instructions = init_result.instructions
if not quiet:
console.print(f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]")
elif not quiet:
console.print("[yellow]Server does not provide instructions[/yellow]")
# List available tools
if not quiet:
console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
try:
tools_result = await session.list_tools()
# Handle ListToolsResult object
tools = []
if hasattr(tools_result, "tools"):
tools = tools_result.tools
else:
if not quiet:
console.print("[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]")
if hasattr(tools_result, "__iter__"):
tools = list(tools_result)
else:
if not quiet:
console.print(f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]")
console.print(f"Tools result attributes: {dir(tools_result)}")
raise ValueError("Unable to extract tools from server response")
# Count tools
tool_count = len(tools) if tools else 0
if not quiet:
console.print(f"[green]Found {tool_count} tools[/green]")
if tool_count == 0:
console.print("[bold yellow]No tools found on the server.[/bold yellow]")
return {}
# Convert tools to their JSON representation (exactly as sent to LLMs)
tool_defs = []
# Add debug information about descriptions
has_descriptions = 0
total_desc_length = 0
for tool in tools:
# Convert to dict that matches the MCP protocol spec for tool definitions
tool_dict = {
"name": tool.name,
"inputSchema": tool.inputSchema
}
# Debug description handling
if hasattr(tool, "description") and tool.description:
desc = tool.description
has_descriptions += 1
total_desc_length += len(desc)
if not quiet:
console.print(f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]")
tool_dict["description"] = desc
elif not quiet:
console.print(f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]")
if hasattr(tool, "annotations") and tool.annotations:
tool_dict["annotations"] = tool.annotations
tool_defs.append(tool_dict)
# Print description statistics
if not quiet:
console.print(f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]")
if has_descriptions > 0:
console.print(f"[green]Average description length: {total_desc_length/has_descriptions:,.1f} chars[/green]")
# Include server info in the result to be used for creating the complete LLM prompt
return {
"tools": tool_defs,
"server_name": server_name,
"server_version": server_version,
"server_instructions": server_instructions
}
except Exception as e:
console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
raise
except Exception as e:
console.print(f"[bold red]Error connecting to MCP server via stdio:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
raise
async def get_mcp_server_tools(server_url: str, transport_type: str, include_tools: Optional[List[str]] = None, console: Console = None, quiet: bool = False, command: Optional[str] = None, args: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Connect to an already running MCP server and fetch all registered tools.
Args:
server_url: The URL of the running MCP server (ignored for stdio)
transport_type: The transport type ('sse', 'streamable-http', or 'stdio')
include_tools: Optional list of tool names to include (if None, get all tools)
console: Optional console for output
quiet: If True, only show most important output
command: Command to run for stdio transport
args: Additional arguments for stdio command
Returns:
Dictionary with server info and tool definitions
"""
if console is None:
console = Console()
if transport_type == "streamable-http":
return await get_mcp_server_tools_streamable_http(server_url, include_tools, console, quiet)
elif transport_type == "stdio":
if not command:
raise ValueError("Command must be provided for stdio transport")
return await get_mcp_server_tools_stdio(command, args, include_tools, console, quiet)
# Original SSE implementation
if not quiet:
console.print(f"[bold blue]Connecting to MCP server at {server_url}...[/bold blue]")
try:
async with sse_client(server_url) as (read, write):
# Create a client session
async with ClientSession(read, write) as session:
# Initialize connection to server
if not quiet:
console.print("[bold blue]Initializing MCP protocol...[/bold blue]")
init_result = await session.initialize()
# Get server info
server_name = init_result.serverInfo.name
server_version = init_result.serverInfo.version
if not quiet:
console.print(f"[green]Connected to server:[/green] {server_name} v{server_version}")
# Show server capabilities safely
if not quiet:
console.print("[bold blue]Server capabilities:[/bold blue]")
console.print(format_capabilities(init_result.capabilities))
# Check if tools capability is present
has_tools = False
if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools:
has_tools = True
if not quiet and not has_tools:
console.print("[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]")
console.print("The server might not support tool listing, but we'll try anyway.")
# Get server instructions (will be used in the LLM prompt)
server_instructions = ""
if hasattr(init_result, "instructions") and init_result.instructions:
server_instructions = init_result.instructions
if not quiet:
console.print(f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]")
elif not quiet:
console.print("[yellow]Server does not provide instructions[/yellow]")
# List available tools
if not quiet:
console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
try:
tools_result = await session.list_tools()
# Handle ListToolsResult object
# The result should have a 'tools' attribute which is the actual list
tools = []
if hasattr(tools_result, "tools"):
tools = tools_result.tools
else:
# If it doesn't have a tools attribute, try to access it as a list directly
# or check other common patterns
if not quiet:
console.print("[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]")
if hasattr(tools_result, "__iter__"):
tools = list(tools_result)
else:
# Print the object to help diagnose
if not quiet:
console.print(f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]")
console.print(f"Tools result attributes: {dir(tools_result)}")
raise ValueError("Unable to extract tools from server response")
# Count tools
tool_count = len(tools) if tools else 0
if not quiet:
console.print(f"[green]Found {tool_count} tools[/green]")
if tool_count == 0:
console.print("[bold yellow]No tools found on the server.[/bold yellow]")
return {}
# Convert tools to their JSON representation (exactly as sent to LLMs)
tool_defs = []
# Add debug information about descriptions
has_descriptions = 0
total_desc_length = 0
for tool in tools:
# Convert to dict that matches the MCP protocol spec for tool definitions
tool_dict = {
"name": tool.name,
"inputSchema": tool.inputSchema
}
# Debug description handling
if hasattr(tool, "description") and tool.description:
desc = tool.description
has_descriptions += 1
total_desc_length += len(desc)
if not quiet:
console.print(f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]")
tool_dict["description"] = desc
elif not quiet:
console.print(f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]")
if hasattr(tool, "annotations") and tool.annotations:
tool_dict["annotations"] = tool.annotations
tool_defs.append(tool_dict)
# Print description statistics
if not quiet:
console.print(f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]")
if has_descriptions > 0:
console.print(f"[green]Average description length: {total_desc_length/has_descriptions:,.1f} chars[/green]")
# Include server info in the result to be used for creating the complete LLM prompt
return {
"tools": tool_defs,
"server_name": server_name,
"server_version": server_version,
"server_instructions": server_instructions
}
except Exception as e:
console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
# Try retrieving server details to help diagnose
if not quiet:
try:
console.print("[bold blue]Getting additional server information...[/bold blue]")
if hasattr(init_result.capabilities, "prompts") and init_result.capabilities.prompts:
prompts_result = await session.list_prompts()
prompt_count = 0
if hasattr(prompts_result, "prompts"):
prompt_count = len(prompts_result.prompts)
console.print(f"Server has {prompt_count} prompts available")
except Exception:
pass
raise
except Exception as e:
console.print(f"[bold red]Error connecting to MCP server:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
# Provide guidance based on the error
if "Connection refused" in str(e):
console.print("[bold yellow]The server doesn't appear to be running at the specified URL.[/bold yellow]")
console.print("Make sure your MCP server is running and available at the URL you specified.")
elif "401" in str(e):
console.print("[bold yellow]Authentication error - the server requires credentials.[/bold yellow]")
elif "404" in str(e):
console.print("[bold yellow]The server endpoint was not found.[/bold yellow]")
console.print("Check if you need to use a different URL path (e.g., /sse or /mcp)")
console.print("Try using /sse instead of just the port number.")
sys.exit(1)
def create_full_tool_registration_prompt(server_info, tools=None, quiet=False):
"""
Create a full, realistic prompt as would be sent to an LLM when registering MCP tools.
This generates the exact format used in the MCP client's format_tools_for_anthropic method
which sends tools to the Anthropic API.
Args:
server_info: Dictionary with server information
tools: List of tool definitions to include (if None, use all tools)
quiet: If True, only show most important output
Returns:
String with the serialized JSON representation of tools as sent to the API
"""
if tools is None:
tools = server_info["tools"]
# The actual format sent to Anthropic API is just:
# {
# "name": sanitized_name,
# "input_schema": tool.input_schema,
# "description": tool.description # only if present
# }
formatted_tools = []
# Track description statistics
desc_count = 0
total_desc_len = 0
console = Console()
for tool in tools:
# Create the tool dict exactly as in format_tools_for_anthropic
tool_dict_for_api = {
"name": tool["name"],
"input_schema": tool["inputSchema"]
}
if SHOW_DESCRIPTIONS:
# Add description only if it exists and is not empty
if "description" in tool and tool["description"]:
desc = tool["description"]
tool_dict_for_api["description"] = desc
desc_count += 1
total_desc_len += len(desc)
if not quiet and len(desc) > 100:
# Show abbreviated version for long descriptions
abbrev = desc[:50] + "..." + desc[-50:]
console.print(f"[dim]Including description for {tool['name']}: {abbrev}[/dim]")
elif not quiet:
console.print(f"[dim]Including description for {tool['name']}: {desc}[/dim]")
elif not quiet:
console.print(f"[dim yellow]No description for {tool['name']}[/dim yellow]")
formatted_tools.append(tool_dict_for_api)
# Final description statistics - ALWAYS show these since they're part of the requested output
console.print(f"[green]Included {desc_count} descriptions out of {len(tools)} tools in final output[/green]")
if desc_count > 0:
console.print(f"[green]Average description length in final output: {total_desc_len/desc_count:,.1f} chars[/green]")
# Return the serialized JSON that would be sent to the API
return json.dumps(formatted_tools, indent=2)
def format_tool_for_llm(tool: Dict[str, Any]) -> str:
"""
Format a tool definition exactly as it would be presented to an LLM.
This should match the format used in actual LLM prompt construction.
"""
# This is how tools are typically formatted for LLMs in the JSON format
return json.dumps(tool, indent=2)
def analyze_tools_token_usage(current_tools: Dict[str, Any], all_tools: Dict[str, Any], quiet: bool = False):
"""
Analyze token usage for a complete MCP tool registration prompt
Args:
current_tools: Current active toolset info
all_tools: Complete toolset info (with --load-all-tools)
quiet: If True, only show most important output
"""
console = Console()
# Format tools as they would be sent to an LLM
current_tools_subset = current_tools["tools"]
all_tools_subset = all_tools["tools"]
# Determine if we're likely comparing the same set vs different sets
same_toolsets = len(current_tools_subset) == len(all_tools_subset)
if same_toolsets and not quiet:
console.print("[yellow]Warning: Current tool count equals all tools count.[/yellow]")
console.print("[yellow]This suggests the server is already running with --load-all-tools[/yellow]")
# Adjust column labels based on what we're comparing
current_label = "Current Tools"
all_label = "All Tools"
# Get JSON representations
current_tools_json = "\n".join(format_tool_for_llm(tool) for tool in current_tools_subset)
all_tools_json = "\n".join(format_tool_for_llm(tool) for tool in all_tools_subset)
# Create the full prompts
current_tools_prompt = create_full_tool_registration_prompt(current_tools, current_tools_subset, quiet)
all_tools_prompt = create_full_tool_registration_prompt(all_tools, all_tools_subset, quiet)
# Calculate sizes for raw JSON
current_tools_size_kb = len(current_tools_json.encode('utf-8')) / 1024
all_tools_size_kb = len(all_tools_json.encode('utf-8')) / 1024
# Calculate sizes for full prompts
current_tools_prompt_size_kb = len(current_tools_prompt.encode('utf-8')) / 1024
all_tools_prompt_size_kb = len(all_tools_prompt.encode('utf-8')) / 1024
# Count tokens for raw JSON
current_tools_tokens = count_tokens(current_tools_json)
all_tools_tokens = count_tokens(all_tools_json)
# Count tokens for full prompts
current_tools_prompt_tokens = count_tokens(current_tools_prompt)
all_tools_prompt_tokens = count_tokens(all_tools_prompt)
# Calculate costs for different models (using full prompt tokens)
current_tools_costs = {model: (price * current_tools_prompt_tokens / 1000)
for model, price in MODEL_PRICES.items()}
all_tools_costs = {model: (price * all_tools_prompt_tokens / 1000)
for model, price in MODEL_PRICES.items()}
# Save the complete, untruncated text to files
with open("current_tools_sent_to_llm.json", "w", encoding="utf-8") as f:
f.write(current_tools_prompt)
console.print("[green]Saved current tools JSON to current_tools_sent_to_llm.json[/green]")
with open("all_tools_sent_to_llm.json", "w", encoding="utf-8") as f:
f.write(all_tools_prompt)
console.print("[green]Saved all tools JSON to all_tools_sent_to_llm.json[/green]\n\n")
# Create data for display - ensure the data is correct and consistent
data = {
"current_tools": {
"count": len(current_tools_subset),
"raw_size_kb": current_tools_size_kb,
"raw_tokens": current_tools_tokens,
"full_size_kb": current_tools_prompt_size_kb,
"full_tokens": current_tools_prompt_tokens,
"costs": current_tools_costs
},
"all_tools": {
"count": len(all_tools_subset),
"raw_size_kb": all_tools_size_kb,
"raw_tokens": all_tools_tokens,
"full_size_kb": all_tools_prompt_size_kb,
"full_tokens": all_tools_prompt_tokens,
"costs": all_tools_costs
}
}
# Create comparison table
table = Table(title="Tool Registration Token Usage")
# Add columns - including percentage column
table.add_column("Metric", style="white")
table.add_column(current_label, style="cyan")
table.add_column(all_label, style="magenta")
table.add_column("Difference", style="yellow")
table.add_column(f"{current_label} as % of {all_label}", style="green")
# SECTION 1: Number of Tools
# Calculate percentage for count
count_percentage = (data["current_tools"]["count"] / data["all_tools"]["count"]) * 100 if data["all_tools"]["count"] > 0 else 100
# Add rows - keep consistent format with other rows for the number of tools
table.add_row(
"Number of Tools",
str(data["current_tools"]["count"]),
str(data["all_tools"]["count"]),
str(data["current_tools"]["count"] - data["all_tools"]["count"]),
f"{count_percentage:.2f}%"
)
# Add a divider after Number of Tools
table.add_section()
# SECTION 2: Full Prompt stats
# Calculate percentage for full prompt size
full_size_percentage = (data["current_tools"]["full_size_kb"] / data["all_tools"]["full_size_kb"]) * 100 if data["all_tools"]["full_size_kb"] > 0 else 100
table.add_row(
"Full Prompt Size (KB)",
f"{data['current_tools']['full_size_kb']:,.2f}",
f"{data['all_tools']['full_size_kb']:,.2f}",
f"{data['current_tools']['full_size_kb'] - data['all_tools']['full_size_kb']:,.2f}",
f"{full_size_percentage:.2f}%"
)
# Calculate percentage for full tokens
full_tokens_percentage = (data["current_tools"]["full_tokens"] / data["all_tools"]["full_tokens"]) * 100 if data["all_tools"]["full_tokens"] > 0 else 100
table.add_row(
"Full Prompt Token Count",
f"{data['current_tools']['full_tokens']:,}",
f"{data['all_tools']['full_tokens']:,}",
f"{data['current_tools']['full_tokens'] - data['all_tools']['full_tokens']:,}",
f"{full_tokens_percentage:.2f}%"
)
# Add a divider after Full Prompt stats
table.add_section()
# SECTION 3: Model costs
# Specify the models to include and their order
models_to_include = [
"claude-3-7-sonnet-20250219",
"gpt-4.1",
"gemini-2.5-pro-preview-03-25",
"grok-3-latest"
]
# Add cost rows for selected models only, in specified order
for model in models_to_include:
if model in MODEL_PRICES:
current_cost = data["current_tools"]["costs"][model]
all_cost = data["all_tools"]["costs"][model]
diff_cost = current_cost - all_cost
# Calculate percentage
cost_percentage = (current_cost / all_cost) * 100 if all_cost > 0 else 100
table.add_row(
f"Cost ({model})",
f"${current_cost:.4f}",
f"${all_cost:.4f}",
f"${diff_cost:.4f}",
f"{cost_percentage:.2f}%"
)
# Print table
console.print(table)
# Print raw data as JSON (only if not in quiet mode)
if not quiet:
console.print("\nRaw token usage data:")
console.print(json.dumps(data, indent=2))
return data
async def get_complete_toolset(quiet: bool = False) -> List[Dict[str, Any]]:
"""
Generate the complete toolset that would be available with --load-all-tools
This uses a list of tool names read from a file generated by the server.
If the file doesn't exist, it will use a list of common tools from the current server.
Args:
quiet: If True, only show most important output
Returns:
Dictionary with server info and simulated complete toolset
"""
console = Console()
if not quiet:
console.print("[bold blue]Analyzing complete toolset (--load-all-tools)[/bold blue]")
# First get the current server's tools to extract real descriptions where possible
try:
# Get server connection details
host, port = get_server_url_and_transport()
server_url, transport_type = await detect_server_transport(host, port, quiet=quiet)
current_tools_info = await get_mcp_server_tools(server_url, transport_type, quiet=quiet, command=None, args=None)
current_tools = {tool["name"]: tool for tool in current_tools_info["tools"]} if current_tools_info else {}
if not quiet:
console.print(f"[green]Retrieved {len(current_tools)} tools from current server to use their real descriptions[/green]")
except Exception as e:
if not quiet:
console.print(f"[yellow]Could not get current tools: {str(e)}[/yellow]")
current_tools = {}
# Read tool names from file created by the server
all_tool_names = read_tool_names_from_file(quiet=quiet)
# If no tools found in file, use the tools we got from the server
if not all_tool_names and current_tools:
if not quiet:
console.print("[yellow]No tools found in file. Using current server tools and adding some common ones.[/yellow]")
all_tool_names = list(current_tools.keys())
# Add some common tool names that might not be in the current server
additional_tools = [
"excel_create_workbook", "excel_open_workbook", "excel_add_worksheet",
"excel_set_cell_value", "excel_get_cell_value", "excel_save_workbook",
"excel_get_worksheet_names", "excel_create_chart", "excel_set_range_format",
"smart_browser.autopilot", "smart_browser.parallel", "smart_browser.download_site_pdfs",
"generate_image", "analyze_image", "transcribe_audio"
]
# Add them if not already present
for tool in additional_tools:
if tool not in all_tool_names:
all_tool_names.append(tool)
if not quiet:
console.print(f"[green]Using complete list of {len(all_tool_names)} tools for all-tools mode[/green]")
# Create tool entries based on real data
tool_defs = []
for tool_name in all_tool_names:
# First check if we have real data for this tool
if tool_name in current_tools:
# Use the actual tool definition from the server
tool_def = current_tools[tool_name]
if not quiet:
console.print(f"[dim]Using real definition for tool '{tool_name}'[/dim]")
else:
# Create a definition with a realistic description based on the tool name
tool_desc = f"The {tool_name} tool provides functionality for {tool_name.replace('_', ' ')}. " + \
"This would be the actual docstring from the function when loaded with --load-all-tools."
# Create a basic definition
tool_def = {
"name": tool_name,
"inputSchema": {
"type": "object",
"properties": {
"param1": {"type": "string", "description": "First parameter"},
"param2": {"type": "string", "description": "Second parameter"}
},
"required": ["param1"]
},
"description": tool_desc
}
if not quiet:
console.print(f"[dim yellow]Created placeholder for tool '{tool_name}'[/dim yellow]")
tool_defs.append(tool_def)
# Return a similar structure to what get_mcp_server_tools returns
return {
"tools": tool_defs,
"server_name": "Ultimate MCP Server (with --load-all-tools)",
"server_version": "1.6.0",
"server_instructions": """This server provides access to the complete set of tools available in the Ultimate MCP Server.
When running with --load-all-tools, all tools from all categories are available, including:
- Completion tools for text generation
- Provider tools for model management
- Filesystem tools for file operations
- Optimization tools for cost and performance
- Text processing tools for manipulating text
- Meta tools for accessing tool information
- Search tools for querying databases
- Browser automation tools
- Web research tools
- HTML processing tools
- Extraction tools
- SQL database tools
- Document processing tools
- Audio transcription tools
- Excel spreadsheet tools
- OCR tools
- Sentiment analysis tools
"""
}
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description="MCP Tool Context Estimator")
parser.add_argument("--url", default=None,
help="URL of the MCP server (default: auto-detected)")
parser.add_argument("--transport", default=None,
choices=["sse", "streamable-http", "stdio"],
help="Force specific transport type (default: auto-detect)")
parser.add_argument("--command", default=None,
help="Command to run for stdio transport (e.g., 'python -m ultimate_mcp_server')")
parser.add_argument("--args", default=None, nargs="*",
help="Additional arguments for stdio command")
parser.add_argument("--no-all-tools", action="store_true",
help="Skip comparison with all tools")
parser.add_argument("--quiet", "-q", action="store_true",
help="Only show most important information and final table")
return parser.parse_args()
async def main():
"""Main function"""
console = Console()
args = parse_args()
# Handle stdio transport
if args.transport == "stdio":
if not args.command:
console.print("[bold red]Error: --command is required for stdio transport[/bold red]")
console.print("Example: --transport stdio --command 'python -m ultimate_mcp_server'")
sys.exit(1)
server_url = None # Not used for stdio
transport_type = "stdio"
command = args.command
stdio_args = args.args or []
if not args.quiet:
console.print(f"[blue]Using stdio transport with command: {command} {' '.join(stdio_args)}[/blue]")
else:
# Get server connection details for HTTP transports
if args.url:
# Parse URL to extract host and port
import urllib.parse
parsed = urllib.parse.urlparse(args.url)
host = parsed.hostname or "localhost"
port = str(parsed.port or 8013)
if args.transport:
transport_type = args.transport
if transport_type == "sse":
server_url = f"http://{host}:{port}/sse"
else: # streamable-http
server_url = f"http://{host}:{port}/mcp/"
else:
# Auto-detect transport for manually specified URL
server_url, transport_type = await detect_server_transport(host, port, quiet=args.quiet)
else:
# Auto-detect everything
host, port = get_server_url_and_transport()
if args.transport:
transport_type = args.transport
if transport_type == "sse":
server_url = f"http://{host}:{port}/sse"
else: # streamable-http
server_url = f"http://{host}:{port}/mcp/"
else:
server_url, transport_type = await detect_server_transport(host, port, quiet=args.quiet)
command = None
stdio_args = None
quiet_mode = args.quiet
try:
# Get the active toolset from the running server
current_tools = await get_mcp_server_tools(
server_url,
transport_type,
quiet=quiet_mode,
command=command,
args=stdio_args
)
if not current_tools or "tools" not in current_tools or not current_tools["tools"]:
console.print("[bold yellow]No tools found on the server.[/bold yellow]")
return
if args.no_all_tools:
# If we're not doing the comparison, create a meaningful subset for comparison
if not quiet_mode:
console.print("[yellow]Skipping comparison with full --load-all-tools[/yellow]")
console.print("[green]Creating an artificial subset of current tools for comparison[/green]")
# Create a more meaningful subset by taking half the tools
# If we have 1-4 tools, use all of them to avoid empty subset
total_tools = len(current_tools["tools"])
subset_size = max(total_tools // 2, min(total_tools, 4))
subset_tools = current_tools["tools"][:subset_size]
if not quiet_mode:
console.print(f"[green]Created subset with {subset_size} tools out of {total_tools} total[/green]")
# Create subset version
subset_data = {
"tools": subset_tools,
"server_name": current_tools["server_name"] + " (Subset)",
"server_version": current_tools["server_version"],
"server_instructions": current_tools["server_instructions"]
}
# Analyze token usage with the artificial subset vs full
analyze_tools_token_usage(subset_data, current_tools, quiet=quiet_mode)
else:
# Get the complete toolset that would be available with --load-all-tools
all_tools = await get_complete_toolset(quiet=quiet_mode)
# Check if current server is likely already running with all tools
current_tool_count = len(current_tools["tools"])
all_tool_count = len(all_tools["tools"])
if abs(current_tool_count - all_tool_count) <= 2: # Allow small difference
if not quiet_mode:
console.print(f"[yellow]Warning: Current server has {current_tool_count} tools, "
f"which is very close to the expected all-tools count of {all_tool_count}[/yellow]")
console.print("[yellow]This suggests the server is already running with --load-all-tools[/yellow]")
# For accurate comparison when counts are the same, we should just use the same data for both
# to ensure metrics are consistent
same_tools_data = { # noqa: F841
"tools": current_tools["tools"].copy(),
"server_name": "Current Server",
"server_version": current_tools["server_version"],
"server_instructions": current_tools["server_instructions"]
}
# Create a deep copy to ensure they're exactly the same
all_tools = {
"tools": current_tools["tools"].copy(),
"server_name": "All Tools",
"server_version": current_tools["server_version"],
"server_instructions": current_tools["server_instructions"]
}
# Analyze token usage with full prompt simulation
analyze_tools_token_usage(current_tools, all_tools, quiet=quiet_mode)
except KeyboardInterrupt:
console.print("[bold yellow]Operation cancelled by user[/bold yellow]")
except Exception as e:
console.print(f"[bold red]Unexpected error:[/bold red] {str(e)}")
if not quiet_mode:
console.print(traceback.format_exc())
if __name__ == "__main__":
asyncio.run(main())