server.py•30.5 kB
import os
import sys
import json
import shutil
import argparse
import http.client
import tempfile
import traceback
import tomllib
import tomli_w
from typing import TYPE_CHECKING
from urllib.parse import urlparse
import glob
if TYPE_CHECKING:
from ida_pro_mcp.ida_mcp.zeromcp import McpServer
from ida_pro_mcp.ida_mcp.zeromcp.jsonrpc import JsonRpcResponse, JsonRpcRequest
else:
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "ida_mcp"))
from zeromcp import McpServer
from zeromcp.jsonrpc import JsonRpcResponse, JsonRpcRequest
sys.path.pop(0) # Clean up
IDA_HOST = "127.0.0.1"
IDA_PORT = 13337
mcp = McpServer("ida-pro-mcp")
dispatch_original = mcp.registry.dispatch
def dispatch_proxy(request: dict | str | bytes | bytearray) -> JsonRpcResponse | None:
"""Dispatch JSON-RPC requests to the MCP server registry"""
if not isinstance(request, dict):
request_obj: JsonRpcRequest = json.loads(request)
else:
request_obj: JsonRpcRequest = request # type: ignore
if request_obj["method"] == "initialize":
return dispatch_original(request)
elif request_obj["method"].startswith("notifications/"):
return dispatch_original(request)
conn = http.client.HTTPConnection(IDA_HOST, IDA_PORT, timeout=30)
try:
if isinstance(request, dict):
request = json.dumps(request)
elif isinstance(request, str):
request = request.encode("utf-8")
conn.request("POST", "/mcp", request, {"Content-Type": "application/json"})
response = conn.getresponse()
data = response.read().decode()
return json.loads(data)
except Exception as e:
full_info = traceback.format_exc()
id = request_obj.get("id")
if id is None:
return None # Notification, no response needed
if sys.platform == "darwin":
shortcut = "Ctrl+Option+M"
else:
shortcut = "Ctrl+Alt+M"
return JsonRpcResponse(
{
"jsonrpc": "2.0",
"error": {
"code": -32000,
"message": f"Failed to connect to IDA Pro! Did you run Edit -> Plugins -> MCP ({shortcut}) to start the server?\n{full_info}",
"data": str(e),
},
"id": id,
}
)
finally:
conn.close()
mcp.registry.dispatch = dispatch_proxy
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
IDA_PLUGIN_PKG = os.path.join(SCRIPT_DIR, "ida_mcp")
IDA_PLUGIN_LOADER = os.path.join(SCRIPT_DIR, "ida_mcp.py")
# NOTE: This is in the global scope on purpose
if not os.path.exists(IDA_PLUGIN_PKG):
raise RuntimeError(
f"IDA plugin package not found at {IDA_PLUGIN_PKG} (did you move it?)"
)
if not os.path.exists(IDA_PLUGIN_LOADER):
raise RuntimeError(
f"IDA plugin loader not found at {IDA_PLUGIN_LOADER} (did you move it?)"
)
def get_python_executable():
"""Get the path to the Python executable"""
venv = os.environ.get("VIRTUAL_ENV")
if venv:
if sys.platform == "win32":
python = os.path.join(venv, "Scripts", "python.exe")
else:
python = os.path.join(venv, "bin", "python3")
if os.path.exists(python):
return python
for path in sys.path:
if sys.platform == "win32":
path = path.replace("/", "\\")
split = path.split(os.sep)
if split[-1].endswith(".zip"):
path = os.path.dirname(path)
if sys.platform == "win32":
python_executable = os.path.join(path, "python.exe")
else:
python_executable = os.path.join(path, "..", "bin", "python3")
python_executable = os.path.abspath(python_executable)
if os.path.exists(python_executable):
return python_executable
return sys.executable
def copy_python_env(env: dict[str, str]):
# Reference: https://docs.python.org/3/using/cmdline.html#environment-variables
python_vars = [
"PYTHONHOME",
"PYTHONPATH",
"PYTHONSAFEPATH",
"PYTHONPLATLIBDIR",
"PYTHONPYCACHEPREFIX",
"PYTHONNOUSERSITE",
"PYTHONUSERBASE",
]
# MCP servers are run without inheriting the environment, so we need to forward
# the environment variables that affect Python's dependency resolution by hand.
# Issue: https://github.com/mrexodia/ida-pro-mcp/issues/111
result = False
for var in python_vars:
value = os.environ.get(var)
if value:
result = True
env[var] = value
return result
def generate_mcp_config(*, stdio: bool):
if stdio:
mcp_config = {
"command": get_python_executable(),
"args": [
__file__,
"--ida-rpc",
f"http://{IDA_HOST}:{IDA_PORT}",
],
}
env = {}
if copy_python_env(env):
print("[WARNING] Custom Python environment variables detected")
mcp_config["env"] = env
return mcp_config
else:
return {"type": "http", "url": f"http://{IDA_HOST}:{IDA_PORT}/mcp"}
def print_mcp_config():
print("[HTTP MCP CONFIGURATION]")
print(
json.dumps(
{"mcpServers": {mcp.name: generate_mcp_config(stdio=False)}}, indent=2
)
)
print("\n[STDIO MCP CONFIGURATION]")
print(
json.dumps(
{"mcpServers": {mcp.name: generate_mcp_config(stdio=True)}}, indent=2
)
)
def install_mcp_servers(*, stdio: bool = False, uninstall=False, quiet=False):
# Map client names to their JSON key paths for clients that don't use "mcpServers"
# Format: client_name -> (top_level_key, nested_key)
# None means use default "mcpServers" at top level
special_json_structures = {
"VS Code": ("mcp", "servers"),
"Visual Studio 2022": (None, "servers"), # servers at top level
}
if sys.platform == "win32":
configs = {
"Cline": (
os.path.join(
os.getenv("APPDATA", ""),
"Code",
"User",
"globalStorage",
"saoudrizwan.claude-dev",
"settings",
),
"cline_mcp_settings.json",
),
"Roo Code": (
os.path.join(
os.getenv("APPDATA", ""),
"Code",
"User",
"globalStorage",
"rooveterinaryinc.roo-cline",
"settings",
),
"mcp_settings.json",
),
"Kilo Code": (
os.path.join(
os.getenv("APPDATA", ""),
"Code",
"User",
"globalStorage",
"kilocode.kilo-code",
"settings",
),
"mcp_settings.json",
),
"Claude": (
os.path.join(os.getenv("APPDATA", ""), "Claude"),
"claude_desktop_config.json",
),
"Cursor": (os.path.join(os.path.expanduser("~"), ".cursor"), "mcp.json"),
"Windsurf": (
os.path.join(os.path.expanduser("~"), ".codeium", "windsurf"),
"mcp_config.json",
),
"Claude Code": (os.path.join(os.path.expanduser("~")), ".claude.json"),
"LM Studio": (
os.path.join(os.path.expanduser("~"), ".lmstudio"),
"mcp.json",
),
"Codex": (os.path.join(os.path.expanduser("~"), ".codex"), "config.toml"),
"Zed": (
os.path.join(os.getenv("APPDATA", ""), "Zed"),
"settings.json",
),
"Gemini CLI": (
os.path.join(os.path.expanduser("~"), ".gemini"),
"settings.json",
),
"Qwen Coder": (
os.path.join(os.path.expanduser("~"), ".qwen"),
"settings.json",
),
"Copilot CLI": (
os.path.join(os.path.expanduser("~"), ".copilot"),
"mcp-config.json",
),
"Crush": (
os.path.join(os.path.expanduser("~")),
"crush.json",
),
"Augment Code": (
os.path.join(
os.getenv("APPDATA", ""),
"Code",
"User",
),
"settings.json",
),
"Qodo Gen": (
os.path.join(
os.getenv("APPDATA", ""),
"Code",
"User",
),
"settings.json",
),
"Antigravity IDE": (
os.path.join(os.path.expanduser("~"), ".gemini", "antigravity"),
"mcp_config.json",
),
"Warp": (
os.path.join(os.path.expanduser("~"), ".warp"),
"mcp_config.json",
),
"Amazon Q": (
os.path.join(os.path.expanduser("~"), ".aws", "amazonq"),
"mcp_config.json",
),
"Opencode": (
os.path.join(os.path.expanduser("~"), ".opencode"),
"mcp_config.json",
),
"Kiro": (
os.path.join(os.path.expanduser("~"), ".kiro"),
"mcp_config.json",
),
"Trae": (
os.path.join(os.path.expanduser("~"), ".trae"),
"mcp_config.json",
),
"VS Code": (
os.path.join(
os.getenv("APPDATA", ""),
"Code",
"User",
),
"settings.json",
),
}
elif sys.platform == "darwin":
configs = {
"Cline": (
os.path.join(
os.path.expanduser("~"),
"Library",
"Application Support",
"Code",
"User",
"globalStorage",
"saoudrizwan.claude-dev",
"settings",
),
"cline_mcp_settings.json",
),
"Roo Code": (
os.path.join(
os.path.expanduser("~"),
"Library",
"Application Support",
"Code",
"User",
"globalStorage",
"rooveterinaryinc.roo-cline",
"settings",
),
"mcp_settings.json",
),
"Kilo Code": (
os.path.join(
os.path.expanduser("~"),
"Library",
"Application Support",
"Code",
"User",
"globalStorage",
"kilocode.kilo-code",
"settings",
),
"mcp_settings.json",
),
"Claude": (
os.path.join(
os.path.expanduser("~"), "Library", "Application Support", "Claude"
),
"claude_desktop_config.json",
),
"Cursor": (os.path.join(os.path.expanduser("~"), ".cursor"), "mcp.json"),
"Windsurf": (
os.path.join(os.path.expanduser("~"), ".codeium", "windsurf"),
"mcp_config.json",
),
"Claude Code": (os.path.join(os.path.expanduser("~")), ".claude.json"),
"LM Studio": (
os.path.join(os.path.expanduser("~"), ".lmstudio"),
"mcp.json",
),
"Codex": (os.path.join(os.path.expanduser("~"), ".codex"), "config.toml"),
"Antigravity IDE": (
os.path.join(os.path.expanduser("~"), ".gemini", "antigravity"),
"mcp_config.json",
),
"Zed": (
os.path.join(
os.path.expanduser("~"), "Library", "Application Support", "Zed"
),
"settings.json",
),
"Gemini CLI": (
os.path.join(os.path.expanduser("~"), ".gemini"),
"settings.json",
),
"Qwen Coder": (
os.path.join(os.path.expanduser("~"), ".qwen"),
"settings.json",
),
"Copilot CLI": (
os.path.join(os.path.expanduser("~"), ".copilot"),
"mcp-config.json",
),
"Crush": (
os.path.join(os.path.expanduser("~")),
"crush.json",
),
"Augment Code": (
os.path.join(
os.path.expanduser("~"),
"Library",
"Application Support",
"Code",
"User",
),
"settings.json",
),
"Qodo Gen": (
os.path.join(
os.path.expanduser("~"),
"Library",
"Application Support",
"Code",
"User",
),
"settings.json",
),
"BoltAI": (
os.path.join(
os.path.expanduser("~"),
"Library",
"Application Support",
"BoltAI",
),
"config.json",
),
"Perplexity": (
os.path.join(
os.path.expanduser("~"),
"Library",
"Application Support",
"Perplexity",
),
"mcp_config.json",
),
"Warp": (
os.path.join(os.path.expanduser("~"), ".warp"),
"mcp_config.json",
),
"Amazon Q": (
os.path.join(os.path.expanduser("~"), ".aws", "amazonq"),
"mcp_config.json",
),
"Opencode": (
os.path.join(os.path.expanduser("~"), ".opencode"),
"mcp_config.json",
),
"Kiro": (
os.path.join(os.path.expanduser("~"), ".kiro"),
"mcp_config.json",
),
"Trae": (
os.path.join(os.path.expanduser("~"), ".trae"),
"mcp_config.json",
),
"VS Code": (
os.path.join(
os.path.expanduser("~"),
"Library",
"Application Support",
"Code",
"User",
),
"settings.json",
),
}
elif sys.platform == "linux":
configs = {
"Cline": (
os.path.join(
os.path.expanduser("~"),
".config",
"Code",
"User",
"globalStorage",
"saoudrizwan.claude-dev",
"settings",
),
"cline_mcp_settings.json",
),
"Roo Code": (
os.path.join(
os.path.expanduser("~"),
".config",
"Code",
"User",
"globalStorage",
"rooveterinaryinc.roo-cline",
"settings",
),
"mcp_settings.json",
),
"Kilo Code": (
os.path.join(
os.path.expanduser("~"),
".config",
"Code",
"User",
"globalStorage",
"kilocode.kilo-code",
"settings",
),
"mcp_settings.json",
),
# Claude not supported on Linux
"Cursor": (os.path.join(os.path.expanduser("~"), ".cursor"), "mcp.json"),
"Windsurf": (
os.path.join(os.path.expanduser("~"), ".codeium", "windsurf"),
"mcp_config.json",
),
"Claude Code": (os.path.join(os.path.expanduser("~")), ".claude.json"),
"LM Studio": (
os.path.join(os.path.expanduser("~"), ".lmstudio"),
"mcp.json",
),
"Codex": (os.path.join(os.path.expanduser("~"), ".codex"), "config.toml"),
"Antigravity IDE": (
os.path.join(os.path.expanduser("~"), ".gemini", "antigravity"),
"mcp_config.json",
),
"Zed": (
os.path.join(os.path.expanduser("~"), ".config", "zed"),
"settings.json",
),
"Gemini CLI": (
os.path.join(os.path.expanduser("~"), ".gemini"),
"settings.json",
),
"Qwen Coder": (
os.path.join(os.path.expanduser("~"), ".qwen"),
"settings.json",
),
"Copilot CLI": (
os.path.join(os.path.expanduser("~"), ".copilot"),
"mcp-config.json",
),
"Crush": (
os.path.join(os.path.expanduser("~")),
"crush.json",
),
"Augment Code": (
os.path.join(
os.path.expanduser("~"),
".config",
"Code",
"User",
),
"settings.json",
),
"Qodo Gen": (
os.path.join(
os.path.expanduser("~"),
".config",
"Code",
"User",
),
"settings.json",
),
"Warp": (
os.path.join(os.path.expanduser("~"), ".warp"),
"mcp_config.json",
),
"Amazon Q": (
os.path.join(os.path.expanduser("~"), ".aws", "amazonq"),
"mcp_config.json",
),
"Opencode": (
os.path.join(os.path.expanduser("~"), ".opencode"),
"mcp_config.json",
),
"Kiro": (
os.path.join(os.path.expanduser("~"), ".kiro"),
"mcp_config.json",
),
"Trae": (
os.path.join(os.path.expanduser("~"), ".trae"),
"mcp_config.json",
),
"VS Code": (
os.path.join(
os.path.expanduser("~"),
".config",
"Code",
"User",
),
"settings.json",
),
}
else:
print(f"Unsupported platform: {sys.platform}")
return
installed = 0
for name, (config_dir, config_file) in configs.items():
config_path = os.path.join(config_dir, config_file)
is_toml = config_file.endswith(".toml")
if not os.path.exists(config_dir):
action = "uninstall" if uninstall else "installation"
if not quiet:
print(f"Skipping {name} {action}\n Config: {config_path} (not found)")
continue
# Read existing config
if not os.path.exists(config_path):
config = {}
else:
with open(
config_path,
"rb" if is_toml else "r",
encoding=None if is_toml else "utf-8",
) as f:
if is_toml:
data = f.read()
if len(data) == 0:
config = {}
else:
try:
config = tomllib.loads(data.decode("utf-8"))
except tomllib.TOMLDecodeError:
if not quiet:
print(
f"Skipping {name} uninstall\n Config: {config_path} (invalid TOML)"
)
continue
else:
data = f.read().strip()
if len(data) == 0:
config = {}
else:
try:
config = json.loads(data)
except json.decoder.JSONDecodeError:
if not quiet:
print(
f"Skipping {name} uninstall\n Config: {config_path} (invalid JSON)"
)
continue
# Handle TOML vs JSON structure
if is_toml:
if "mcp_servers" not in config:
config["mcp_servers"] = {}
mcp_servers = config["mcp_servers"]
else:
# Check if this client uses a special JSON structure
if name in special_json_structures:
top_key, nested_key = special_json_structures[name]
if top_key is None:
# servers at top level (e.g., Visual Studio 2022)
if nested_key not in config:
config[nested_key] = {}
mcp_servers = config[nested_key]
else:
# nested structure (e.g., VS Code uses mcp.servers)
if top_key not in config:
config[top_key] = {}
if nested_key not in config[top_key]:
config[top_key][nested_key] = {}
mcp_servers = config[top_key][nested_key]
else:
# Default: mcpServers at top level
if "mcpServers" not in config:
config["mcpServers"] = {}
mcp_servers = config["mcpServers"]
# Migrate old name
old_name = "github.com/mrexodia/ida-pro-mcp"
if old_name in mcp_servers:
mcp_servers[mcp.name] = mcp_servers[old_name]
del mcp_servers[old_name]
if uninstall:
if mcp.name not in mcp_servers:
if not quiet:
print(
f"Skipping {name} uninstall\n Config: {config_path} (not installed)"
)
continue
del mcp_servers[mcp.name]
else:
mcp_servers[mcp.name] = generate_mcp_config(stdio=stdio)
# Atomic write: temp file + rename
suffix = ".toml" if is_toml else ".json"
fd, temp_path = tempfile.mkstemp(
dir=config_dir, prefix=".tmp_", suffix=suffix, text=True
)
try:
with os.fdopen(
fd, "wb" if is_toml else "w", encoding=None if is_toml else "utf-8"
) as f:
if is_toml:
f.write(tomli_w.dumps(config).encode("utf-8"))
else:
json.dump(config, f, indent=2)
os.replace(temp_path, config_path)
except:
os.unlink(temp_path)
raise
if not quiet:
action = "Uninstalled" if uninstall else "Installed"
print(
f"{action} {name} MCP server (restart required)\n Config: {config_path}"
)
installed += 1
if not uninstall and installed == 0:
print(
"No MCP servers installed. For unsupported MCP clients, use the following config:\n"
)
print_mcp_config()
def install_ida_plugin(
*, uninstall: bool = False, quiet: bool = False, allow_ida_free: bool = False
):
if sys.platform == "win32":
ida_folder = os.path.join(os.environ["APPDATA"], "Hex-Rays", "IDA Pro")
else:
ida_folder = os.path.join(os.path.expanduser("~"), ".idapro")
if not allow_ida_free:
free_licenses = glob.glob(os.path.join(ida_folder, "idafree_*.hexlic"))
if len(free_licenses) > 0:
print(
"IDA Free does not support plugins and cannot be used. Purchase and install IDA Pro instead."
)
sys.exit(1)
ida_plugin_folder = os.path.join(ida_folder, "plugins")
# Install both the loader file and package directory
loader_source = IDA_PLUGIN_LOADER
loader_destination = os.path.join(ida_plugin_folder, "ida_mcp.py")
pkg_source = IDA_PLUGIN_PKG
pkg_destination = os.path.join(ida_plugin_folder, "ida_mcp")
# Clean up old plugin if it exists
old_plugin = os.path.join(ida_plugin_folder, "mcp-plugin.py")
if uninstall:
# Remove loader
if os.path.lexists(loader_destination):
os.remove(loader_destination)
if not quiet:
print(f"Uninstalled IDA plugin loader\n Path: {loader_destination}")
# Remove package
if os.path.exists(pkg_destination):
if os.path.isdir(pkg_destination) and not os.path.islink(pkg_destination):
shutil.rmtree(pkg_destination)
else:
os.remove(pkg_destination)
if not quiet:
print(f"Uninstalled IDA plugin package\n Path: {pkg_destination}")
# Remove old plugin if it exists
if os.path.lexists(old_plugin):
os.remove(old_plugin)
if not quiet:
print(f"Removed old plugin\n Path: {old_plugin}")
else:
# Create IDA plugins folder
if not os.path.exists(ida_plugin_folder):
os.makedirs(ida_plugin_folder)
# Remove old plugin if it exists
if os.path.lexists(old_plugin):
os.remove(old_plugin)
if not quiet:
print(f"Removed old plugin file\n Path: {old_plugin}")
installed_items = []
# Install loader file
loader_realpath = (
os.path.realpath(loader_destination)
if os.path.lexists(loader_destination)
else None
)
if loader_realpath != loader_source:
if os.path.lexists(loader_destination):
os.remove(loader_destination)
try:
os.symlink(loader_source, loader_destination)
installed_items.append(f"loader: {loader_destination}")
except OSError:
shutil.copy(loader_source, loader_destination)
installed_items.append(f"loader: {loader_destination}")
# Install package directory
pkg_realpath = (
os.path.realpath(pkg_destination)
if os.path.lexists(pkg_destination)
else None
)
if pkg_realpath != pkg_source:
if os.path.lexists(pkg_destination):
if os.path.isdir(pkg_destination) and not os.path.islink(
pkg_destination
):
shutil.rmtree(pkg_destination)
else:
os.remove(pkg_destination)
try:
os.symlink(pkg_source, pkg_destination)
installed_items.append(f"package: {pkg_destination}")
except OSError:
shutil.copytree(pkg_source, pkg_destination)
installed_items.append(f"package: {pkg_destination}")
if not quiet:
if installed_items:
print("Installed IDA Pro plugin (IDA restart required)")
for item in installed_items:
print(f" {item}")
else:
print("Skipping IDA plugin installation (already up to date)")
def main():
global IDA_HOST, IDA_PORT
parser = argparse.ArgumentParser(description="IDA Pro MCP Server")
parser.add_argument(
"--install", action="store_true", help="Install the MCP Server and IDA plugin"
)
parser.add_argument(
"--uninstall",
action="store_true",
help="Uninstall the MCP Server and IDA plugin",
)
parser.add_argument(
"--allow-ida-free",
action="store_true",
help="Allow installation despite IDA Free being installed",
)
parser.add_argument(
"--transport",
type=str,
default="stdio",
help="MCP transport protocol to use (stdio or http://127.0.0.1:8744)",
)
parser.add_argument(
"--ida-rpc",
type=str,
default=f"http://{IDA_HOST}:{IDA_PORT}",
help=f"IDA RPC server to use (default: http://{IDA_HOST}:{IDA_PORT})",
)
parser.add_argument(
"--config", action="store_true", help="Generate MCP config JSON"
)
args = parser.parse_args()
# Parse IDA RPC server argument
ida_rpc = urlparse(args.ida_rpc)
if ida_rpc.hostname is None or ida_rpc.port is None:
raise Exception(f"Invalid IDA RPC server: {args.ida_rpc}")
IDA_HOST = ida_rpc.hostname
IDA_PORT = ida_rpc.port
if args.install and args.uninstall:
print("Cannot install and uninstall at the same time")
return
if args.install:
install_ida_plugin(allow_ida_free=args.allow_ida_free)
install_mcp_servers(stdio=(args.transport == "stdio"))
return
if args.uninstall:
install_ida_plugin(uninstall=True, allow_ida_free=args.allow_ida_free)
install_mcp_servers(uninstall=True)
return
if args.config:
print_mcp_config()
return
try:
if args.transport == "stdio":
mcp.stdio()
else:
url = urlparse(args.transport)
if url.hostname is None or url.port is None:
raise Exception(f"Invalid transport URL: {args.transport}")
# NOTE: npx -y @modelcontextprotocol/inspector for debugging
mcp.serve(url.hostname, url.port)
input("Server is running, press Enter or Ctrl+C to stop.")
except (KeyboardInterrupt, EOFError):
pass
if __name__ == "__main__":
main()