Skip to main content
Glama

IDA-MCP

by jelasin
ida_mcp.py16.6 kB
"""IDA Pro MCP 插件 (SSE + 多实例协调器注册) 功能综述 ==================== 本插件为每个启动的 IDA 实例提供一个最小化 **FastMCP SSE** 服务, 暴露逆向分析能力给支持 MCP 的外部客户端。 核心特性: 1. 启动/关闭采用“切换式”触发(再次运行插件即关闭)。 2. 自动选择空闲端口 (从 8765 开始向上扫描), SSE 路径固定为 ``/mcp``。 3. 首个成功启动的实例会在 ``127.0.0.1:11337`` 上创建一个 **内存型协调器(coordinator)**。 4. 后续实例向协调器注册, 仅在内存维护实例列表, 不落盘 (避免文件锁 / 清理问题)。 5. 工具最小化: 仅保留 ``list_functions`` 与 ``instances`` (实例列表)。 6. 可配合独立进程型代理 ``ida_mcp_proxy.py`` 统一访问多个实例。 运行时架构 -------------------- ``IDA 实例 (N 个)`` → 各自运行 uvicorn FastMCP (SSE) → 向协调器登记元信息(pid, port, input_file 等)。 ``协调器`` 负责: 记录活跃实例; 接收代理或其他客户端的 /call 请求并转发至目标实例。 线程与生命周期 -------------------- * uvicorn 服务器在 **后台守护线程** 中运行, 便于主线程继续响应 IDA 事件。 * 关闭流程: 设置 ``_uv_server.should_exit = True`` → 等待线程退出 → 调用协调器注销。 * IDA 退出或插件终止时, 若仍在运行则自动停止并反注册。 端口选择策略 -------------------- * 若设置环境变量 ``IDA_MCP_PORT`` 且合法, 则使用该端口 (不再扫描)。 * 否则从 ``DEFAULT_PORT (=8765)`` 起向上扫描 (最大 50 次)。 * 允许多个 IDA 实例并行, 避免端口冲突。 环境变量 (可选) -------------------- * ``IDA_MCP_PORT``: 指定固定端口。 * ``IDA_MCP_HOST``: 监听地址, 默认 ``127.0.0.1``。 * ``IDA_MCP_NAME``: MCP 服务名, 默认 ``IDA-MCP``。 主要内部变量 -------------------- * ``_server_thread``: 后台 uvicorn 线程对象。 * ``_uv_server``: uvicorn Server 实例 (用于发出停止信号)。 * ``_active_port``: 当前实例实际使用端口。 * ``_stop_lock``: 防止并发关闭竞争。 公共函数概览 -------------------- * ``start_server_async(host, port)``: 启动 MCP 服务器 (线程)。 * ``stop_server()``: 发送退出信号并等待线程结束, 注销协调器。 * ``is_running()``: 判断当前服务器线程是否存活。 扩展建议 -------------------- 未来可在 ``ida_mcp/server.py`` 内增量添加更多工具 (反编译、交叉引用、数据段搜索等)。协调器 ``registry.py`` 已支持 /call 转发, 添加工具仅需在每个实例服务端注册, 代理端(可选)补一层转发包装。 使用方式 -------------------- 1. 将本文件与 ``ida_mcp`` 目录复制到 IDA ``plugins/``。 2. 打开目标二进制, 分析完成后在菜单或快捷键中执行插件 (第一次执行 = 启动)。 3. 再次执行插件 = 停止并反注册。 4. 可启动多个 IDA 实例重复步骤 2, 通过协调器配合代理统一访问。 调试提示 -------------------- * 如果端口被占用, 会自动向上扫描; 如全部失败, 仍可能抛出绑定异常 (检查是否被防火墙或安全软件占用)。 * 服务器崩溃日志会打印堆栈; 若需更详细日志可将 uvicorn log_level 改为 info/debug。 本文件只包含逻辑入口与生命周期管理, 实际工具定义在 ``ida_mcp/server.py``。 """ import threading import os import traceback import socket import time try: # IDA imports (only available inside IDA) import idaapi # type: ignore import ida_kernwin # type: ignore except Exception: # pragma: no cover - outside IDA idaapi = None # type: ignore ida_kernwin = None # type: ignore from ida_mcp.server import create_mcp_server, DEFAULT_PORT from ida_mcp import registry _server_thread: threading.Thread | None = None # 后台 uvicorn 线程 (运行 FastMCP ASGI 服务) _uv_server = None # type: ignore # uvicorn.Server 实例引用, 用于优雅关闭 (should_exit) _stop_lock = threading.Lock() # 防止 stop_server 并发重入的互斥锁 _active_port: int | None = None # 当前实例实际监听的 MCP 端口 (启动后写入, 停止时清空) _hb_thread: threading.Thread | None = None # 心跳/保活线程对象 (负责检测协调器状态与定期刷新注册) _hb_stop = threading.Event() # 心跳线程停止信号 (stop_server 中置位) _last_register_ts: float | None = None # 最近一次成功调用 registry.init_and_register 的时间戳 (仅在缺失后重注册时更新) _ENABLE_PERIODIC_REFRESH = False # 设为 True 才会启用“超时周期刷新”逻辑,默认只在缺失时重注册 _REGISTER_INTERVAL = 300 # (可选) 原本用于周期 refresh 的阈值; 默认禁用 _HEARTBEAT_INTERVAL = 60 # 心跳循环唤醒/巡检间隔 _cached_input_file: str | None = None # 缓存的输入二进制路径 (仅主线程初始化; 心跳线程避免直接调用 IDA API) _cached_idb_path: str | None = None # 缓存的 IDB 路径 (同上, 避免后台线程访问 IDA C 接口) def _heartbeat_loop(): """后台心跳: 定期确认协调器仍可访问且本实例记录存在, 否则重新注册。 触发条件: * 协调器列表为空 (所有实例丢失) -> 重新注册 (可能重建协调器) * 本实例 pid 未出现在 get_instances() 结果中 -> 重新注册 * 正常情况下每 _REGISTER_INTERVAL 秒做一次 refresh (覆盖 started 时间, 保持活跃) 设计考量: * registry 当前无心跳超时机制, 但某些情况下协调器线程可能被系统/异常终止。 * 使用轻量轮询, 避免对 IDA 主线程的调用; 仅访问 registry (纯网络/内存操作)。 * 若服务器已停止 (_active_port 为空) 则直接退出。 """ global _last_register_ts pid = os.getpid() while not _hb_stop.is_set(): # 若服务已经关闭, 退出 if _active_port is None or _uv_server is None: break try: inst_list = registry.get_instances() except Exception: inst_list = [] need_register = False now = time.time() if not inst_list: need_register = True else: found = any(e.get('pid') == pid for e in inst_list) if not found: need_register = True # 不再默认进行“时间驱动的强制 refresh”,仅在实例缺失或协调器重建时重注册。 if (not need_register and _ENABLE_PERIODIC_REFRESH and _last_register_ts and (now - _last_register_ts) > _REGISTER_INTERVAL): need_register = True # 可选:用户显式启用时恢复旧逻辑 if need_register and _active_port is not None: try: # 仅用缓存的路径/文件, 避免后台线程再触碰 IDA API registry.init_and_register(_active_port, _cached_input_file, _cached_idb_path) _last_register_ts = now if inst_list: _info("Heartbeat re-register (periodic refresh) done.") if _ENABLE_PERIODIC_REFRESH else None else: _info("Heartbeat re-register successful (coordinator rebuilt or entry missing).") except Exception as e: # pragma: no cover _warn(f"Heartbeat re-register failed: {e}") _hb_stop.wait(_HEARTBEAT_INTERVAL) _info("Heartbeat thread exit.") # ---------------- Logging Helpers (INFO/WARN/ERROR) ----------------- def _now_ts() -> str: return time.strftime("%H:%M:%S") + f".{int(time.time()*1000)%1000:03d}" def _log(level: str, msg: str): """Unified log output with timestamp (HH:MM:SS.mmm).""" print(f"[IDA-MCP][{level}][{_now_ts()}] {msg}") def _info(msg: str): _log("INFO", msg) def _warn(msg: str): _log("WARN", msg) def _error(msg: str): _log("ERROR", msg) def _find_free_port(preferred: int, max_scan: int = 50) -> int: """端口扫描: 从 preferred 起向上尝试绑定, 返回第一个可用端口; 若全部失败则返回 preferred (保底)。""" for i in range(max_scan): p = preferred + i with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: s.bind(("127.0.0.1", p)) except OSError: continue return p _warn(f"Port scan exhausted; falling back to preferred {preferred}") return preferred def _register_with_coordinator(port: int): """向协调器注册当前实例元信息。 参数: port: 当前实例 FastMCP SSE 监听端口。 说明: * 首个实例若发现协调器端口空闲会由 registry 内部启动协调器。 * 注册内容包括: pid / port / 输入文件路径 / idb 路径 / Python 版本等。 """ if idaapi is None: return global _cached_input_file, _cached_idb_path if _cached_input_file is None: _cached_input_file = getattr(idaapi, 'get_input_file_path', lambda: None)() # type: ignore if _cached_idb_path is None and hasattr(idaapi, 'get_path'): try: _cached_idb_path = idaapi.get_path(idaapi.PATH_TYPE_IDB) # type: ignore except Exception: _cached_idb_path = None try: registry.init_and_register(port, _cached_input_file, _cached_idb_path) _info(f"Registered instance at port={port} pid={os.getpid()} input='{_cached_input_file}' idb='{_cached_idb_path}'") # 若本实例成为协调器, 追加一条提示日志 (用户需求) try: if getattr(registry, 'is_coordinator', lambda: False)(): # type: ignore[attr-defined] _info("This instance is COORDINATOR (registry listening on 127.0.0.1:11337)") except Exception: pass except Exception as e: # pragma: no cover _error(f"Coordinator registration failed: {e}") traceback.print_exc() def is_running() -> bool: return _server_thread is not None and _server_thread.is_alive() def stop_server(): """停止服务器 (切换)。 步骤: 1. 设置 ``_uv_server.should_exit`` 触发 uvicorn 事件循环退出。 2. join 后台线程 (最多 5 秒)。 3. 若已注册协调器则执行注销。 并发安全: 使用 ``_stop_lock`` 以防多次同时调用。 """ global _uv_server, _server_thread with _stop_lock: if _uv_server is None: _info("Stop requested, but server not running.") return try: # Graceful shutdown _uv_server.should_exit = True # type: ignore[attr-defined] _info("Shutdown signal sent to uvicorn server.") except Exception as e: # pragma: no cover _error(f"Failed to signal shutdown: {e}") if _server_thread: # Join server thread with timeout _server_thread.join(timeout=5) global _active_port _server_thread = None _uv_server = None if _active_port is not None: try: registry.deregister() except Exception as e: # pragma: no cover _warn(f"Deregister failed: {e}") _active_port = None # 停止心跳线程 global _hb_thread if _hb_thread and _hb_thread.is_alive(): _hb_stop.set() _hb_thread.join(timeout=3) _hb_thread = None _info("Server stopped.") def PLUGIN_ENTRY(): # IDA looks for this symbol return IDAMCPPlugin() class IDAMCPPlugin(idaapi.plugin_t if idaapi else object): # type: ignore flags = 0 comment = "FastMCP SSE server for IDA" help = "Expose IDA features through Model Context Protocol" wanted_name = "IDA-MCP" wanted_hotkey = "" def init(self): # type: ignore if idaapi is None: _warn("Outside IDA environment; plugin inactive.") return idaapi.PLUGIN_SKIP if idaapi else 0 # 不自动启动, 等待用户菜单/快捷方式显式触发。 _info("Plugin initialized and ready (not auto-starting).") return idaapi.PLUGIN_KEEP # type: ignore def run(self, arg): # type: ignore # 切换行为: 运行中 -> 停止; 否则启动。仅打印日志, 不弹出对话框。 if not idaapi: _warn("Run invoked but not inside IDA.") return if is_running(): _info("Server running -> toggling to stop.") stop_server() return # 端口选择: 优先使用环境变量; 否则自动扫描以支持多实例。 env_port = os.getenv("IDA_MCP_PORT") if env_port and env_port.isdigit(): port = int(env_port) else: port = _find_free_port(DEFAULT_PORT) host = os.getenv("IDA_MCP_HOST", "127.0.0.1") _info(f"Starting SSE server http://{host}:{port}/mcp/ (toggle to stop)") start_server_async(host, port) def term(self): # type: ignore _info("Plugin terminating.") if is_running(): stop_server() def start_server_async(host: str, port: int): """异步(线程)启动 uvicorn FastMCP 服务。 设计要点: * 使用守护线程避免阻塞 IDA 主线程。 * 通过保存 ``_uv_server`` 引用实现优雅关闭 (设置 should_exit)。 * 线程启动后立即向协调器注册 (保持实例可发现性)。 """ global _server_thread, _uv_server if is_running(): _info("Server already running; start request ignored.") return def worker(): global _uv_server try: # Windows 控制台噪音抑制: 使用 Selector 事件循环替代 Proactor, # 规避 asyncio 在 _ProactorBasePipeTransport._call_connection_lost 中 # 打印的 ConnectionResetError(WinError 10054) 回调异常。 if os.name == "nt": try: import asyncio # type: ignore if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore[attr-defined] except Exception: pass # 策略设置失败时不影响后续逻辑,最多产生原有控制台提示 server = create_mcp_server() # 构建 ASGI 应用 (包含 SSE 端点), 挂载路径 '/mcp' app = server.http_app(path="/mcp") # type: ignore[attr-defined] import uvicorn # Local import to avoid overhead if never started # 使用 warning 日志级别并关闭 access log, 避免输出无意义的 CTRL+C 提示。 config = uvicorn.Config(app, host=host, port=port, log_level="warning", access_log=False) _uv_server = uvicorn.Server(config) _uv_server.run() except Exception as e: # pragma: no cover _error(f"Server crashed: {e}") traceback.print_exc() finally: _uv_server = None _info("Server thread exit.") _server_thread = threading.Thread(target=worker, name="IDA-MCP-Server", daemon=True) _server_thread.start() # Record chosen port after thread start global _active_port _active_port = port _register_with_coordinator(port) # 记录注册时间并启动心跳线程 global _hb_thread, _last_register_ts _last_register_ts = time.time() if _hb_thread is None or not _hb_thread.is_alive(): _hb_stop.clear() _hb_thread = threading.Thread(target=_heartbeat_loop, name="IDA-MCP-Heartbeat", daemon=True) _hb_thread.start() _info("Heartbeat thread started.") if __name__ == "__main__": _info("Standalone mode: starting server.") start_server_async("127.0.0.1", DEFAULT_PORT) if _server_thread: _server_thread.join()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jelasin/IDA-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server