"""多实例协调器 (内存) + 工具转发 /call
设计目的
====================
在多个 IDA 实例并行时, 需要一种轻量方式发现彼此并统一转发工具调用。本模块提供一个**内存驻留**的小型 HTTP 服务, 仅占用本地 ``127.0.0.1:11337`` 端口。
角色与职责
--------------------
1. 第一个尝试注册的实例若发现 11337 空闲 → 直接绑定并成为协调器。
2. 其余实例仅向该协调器 POST /register 进行登记。
3. 无任何磁盘持久化; 退出后状态自动丢弃。
HTTP 接口
--------------------
* ``GET /instances`` : 返回当前注册的全部实例列表 (数组)
* ``POST /register`` : 注册或刷新单个实例 (覆盖 pid 相同的旧记录)
* ``POST /deregister`` : 注销实例 (进程退出 / 插件关闭)
* ``POST /call`` : 将工具调用转发到指定实例 (通过 pid 或 port 识别)
实例结构字段示例
--------------------
```
{
"pid": 1234,
"port": 10000,
"input_file": "/path/to/bin",
"idb": "/path/to/db.i64",
"started": 1730000000.123, # 启动时间戳
"python": "3.11.9"
}
```
转发机制 (/call)
--------------------
1. 客户端 (或代理) 提交: { tool, params, pid|port }
2. 协调器定位目标实例端口, 使用 fastmcp.Client 临时发起一次真实工具调用。
3. 对返回对象做 “可 JSON 序列化” 处理 (递归转普通结构) 后返回。
并发与线程
--------------------
* 采用 RLock 保护 _instances 列表。
* 协调器 HTTPServer 运行在守护线程, 不阻塞调用方。
扩展建议
--------------------
* 增加心跳(定期刷新时间戳) + 过期清理。
* 增加权限限制 (只允许本地请求 / 简单 token)。
* 支持广播调用 (例如对所有实例同步执行某工具)。
公开辅助函数
--------------------
* ``init_and_register`` : 保证协调器存在并注册当前实例。
* ``get_instances`` : 查询实例列表 (本地 or 远程)。
* ``deregister`` : 注销当前实例。
* ``call_tool`` : 调用 /call 进行一次转发。
"""
from __future__ import annotations
import threading
import json
import time
import socket
import http.server
import urllib.request
import urllib.error
from typing import List, Dict, Any, Optional
import os
import atexit
import sys
import ida_kernwin # type: ignore
# 内部通信固定使用本地回环地址
LOCALHOST = "127.0.0.1"
# 从配置文件加载,若失败则使用默认值
try:
from .config import (
get_coordinator_host,
get_coordinator_port,
get_request_timeout,
is_debug_enabled as _config_debug,
get_ida_host,
)
COORD_HOST = get_coordinator_host() # 监听地址(可配置为 0.0.0.0 等)
COORD_PORT = get_coordinator_port()
REQUEST_TIMEOUT = get_request_timeout()
DEBUG_ENABLED = _config_debug()
IDA_HOST = get_ida_host() # 实例监听地址
except Exception:
COORD_HOST = "127.0.0.1"
COORD_PORT = 11337
REQUEST_TIMEOUT = 30
DEBUG_ENABLED = False
IDA_HOST = "127.0.0.1"
DEBUG_MAX_LEN = 1000
_instances: List[Dict[str, Any]] = []
_lock = threading.RLock()
_is_coordinator = False
_server_thread: Optional[threading.Thread] = None
_self_pid = os.getpid()
_current_instance_port: Optional[int] = None
def _short(v: Any) -> str:
try:
s = json.dumps(v, ensure_ascii=False)
except Exception:
s = str(v)
if len(s) > DEBUG_MAX_LEN:
return s[:DEBUG_MAX_LEN] + "..."
return s
def _debug_log(event: str, **fields: Any): # pragma: no cover
if not DEBUG_ENABLED:
return
ts = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
kv = ' '.join(f"{k}={_short(v)}" for k, v in fields.items())
line = f"[{ts}] [registry] {event} {kv}\n"
try:
if hasattr(ida_kernwin, 'execute_sync') and hasattr(ida_kernwin, 'msg'):
def _emit(): # type: ignore
try:
ida_kernwin.msg(line) # type: ignore
except Exception:
try:
print(line, end='')
except Exception:
pass
return 0
try:
ida_kernwin.execute_sync(_emit, ida_kernwin.MFF_READ) # type: ignore
except Exception:
try:
ida_kernwin.msg(line) # type: ignore
except Exception:
try:
print(line, end='')
except Exception:
pass
else:
print(line, end='')
except Exception:
pass
def set_debug(enable: bool):
global DEBUG_ENABLED
DEBUG_ENABLED = bool(enable)
def is_debug_enabled() -> bool:
return DEBUG_ENABLED
class _Handler(http.server.BaseHTTPRequestHandler): # pragma: no cover
def log_message(self, format, *args):
return
def _send(self, code: int, obj: Any):
data = json.dumps(obj).encode('utf-8')
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(data)))
self.end_headers()
try:
self.wfile.write(data)
except (ConnectionAbortedError, BrokenPipeError, OSError) as e: # pragma: no cover
# 客户端在响应发送途中断开 (WinError 10053/10054 或 POSIX EPIPE/ECONNRESET),忽略即可,不影响后续请求。
# 选择静默处理 (方案A),避免在频繁探测/超时重试场景刷屏。
pass
def do_GET(self): # type: ignore
if self.path == '/instances':
with _lock:
_debug_log('GET /instances', count=len(_instances))
self._send(200, _instances)
elif self.path == '/current_instance':
with _lock:
_debug_log('GET /current_instance', port=_current_instance_port)
self._send(200, {"port": _current_instance_port})
elif self.path == '/debug':
self._send(200, {"enabled": DEBUG_ENABLED})
else:
self._send(404, {"error": "not found"})
def do_POST(self): # type: ignore
global _current_instance_port
length = int(self.headers.get('Content-Length', '0'))
raw = self.rfile.read(length) if length else b''
try:
payload = json.loads(raw.decode('utf-8') or '{}')
except Exception:
payload = {}
_debug_log('POST', path=self.path, client=f"{self.client_address}", content_length=length)
if self.path == '/register':
needed = {'pid', 'port'}
if not needed.issubset(payload):
self._send(400, {"error": "missing fields"})
return
with _lock:
pid = payload['pid']
existing = [e for e in _instances if e.get('pid') != pid]
_instances.clear()
_instances.extend(existing)
_instances.append(payload)
_debug_log('REGISTER', pid=payload.get('pid'), port=payload.get('port'), input_file=payload.get('input_file'), idb=payload.get('idb'))
self._send(200, {"status": "ok"})
elif self.path == '/deregister':
pid = payload.get('pid')
if pid is None:
self._send(400, {"error": "missing pid"})
return
with _lock:
remaining = [e for e in _instances if e.get('pid') != pid]
if _current_instance_port and not any(e.get('port') == _current_instance_port for e in remaining):
_current_instance_port = None
_instances.clear()
_instances.extend(remaining)
_debug_log('DEREGISTER', pid=pid, remaining=len(_instances))
self._send(200, {"status": "ok"})
elif self.path == '/select_instance':
port = payload.get('port')
with _lock:
if port is None:
# Auto-select if port is not provided
if not _instances:
self._send(404, {"error": "No instances to select from"})
return
# Prioritize 10000, then earliest started
sorted_instances = sorted(_instances, key=lambda x: (x.get('port') != 10000, x.get('started', float('inf'))))
_current_instance_port = sorted_instances[0].get('port')
else:
if not any(e.get('port') == port for e in _instances):
self._send(404, {"error": f"Instance with port {port} not found"})
return
_current_instance_port = port
_debug_log('SELECT_INSTANCE', requested=port, selected=_current_instance_port)
self._send(200, {"status": "ok", "selected_port": _current_instance_port})
elif self.path == '/debug':
# Toggle debug logging: payload { enable: bool }
enable = bool(payload.get('enable') if 'enable' in payload else payload.get('enabled', False))
set_debug(enable)
self._send(200, {"status": "ok", "enabled": DEBUG_ENABLED})
elif self.path == '/call':
# payload: { pid | port, tool, params }
target_pid = payload.get('pid')
target_port = payload.get('port')
tool = payload.get('tool')
params = payload.get('params') or {}
if not tool:
self._send(400, {"error": "missing tool"})
return
with _lock:
target = None
if target_pid is not None:
for e in _instances:
if e.get('pid') == target_pid:
target = e
break
elif target_port is not None:
for e in _instances:
if e.get('port') == target_port:
target = e
break
if target is None:
self._send(404, {"error": "instance not found"})
return
port = target.get('port')
if not isinstance(port, int):
self._send(500, {"error": "bad target port"})
return
t0 = time.time()
_debug_log('CALL_BEGIN', tool=tool, target_port=port, pid=target.get('pid'), params_keys=list((params or {}).keys()))
# Forward the tool call over SSE MCP (JSON-RPC) using fastmcp Client dynamically.
# 内部通信固定使用 127.0.0.1(协调器与实例在同一台机器上)
try:
from fastmcp import Client # type: ignore
import asyncio
async def _do():
async with Client(f"http://{LOCALHOST}:{port}/mcp/", timeout=REQUEST_TIMEOUT) as c: # type: ignore
resp = await c.call_tool(tool, params)
# Extract data from response content (JSON text)
# fastmcp returns data in resp.content[0].text as JSON string
data = None
if hasattr(resp, 'content') and resp.content:
for item in resp.content:
text = getattr(item, 'text', None)
if text:
try:
data = json.loads(text)
break
except (json.JSONDecodeError, TypeError):
continue
# Fallback: try resp.data with normalization
if data is None and hasattr(resp, 'data') and resp.data is not None:
def norm(x):
if isinstance(x, list):
return [norm(i) for i in x]
if isinstance(x, dict):
return {k: norm(v) for k, v in x.items()}
if hasattr(x, 'model_dump'):
return x.model_dump()
if hasattr(x, '__dict__') and x.__dict__:
return norm(vars(x))
return x
data = norm(resp.data)
return {"tool": tool, "data": data}
result = asyncio.run(_do())
dt_ms = int((time.time() - t0) * 1000)
# Attempt to estimate response size
try:
resp_size = len(json.dumps(result, ensure_ascii=False))
except Exception:
resp_size = 0
_debug_log('CALL_OK', tool=tool, target_port=port, elapsed_ms=dt_ms, resp_size=resp_size)
self._send(200, result)
except Exception as e: # pragma: no cover
dt_ms = int((time.time() - t0) * 1000)
_debug_log('CALL_FAIL', tool=tool, target_port=port, elapsed_ms=dt_ms, error=str(e))
self._send(500, {"error": f"call failed: {e}"})
else:
self._send(404, {"error": "not found"})
def _start_coordinator(): # pragma: no cover
global _server_thread
if _server_thread and _server_thread.is_alive():
return
def run():
try:
httpd = http.server.HTTPServer((COORD_HOST, COORD_PORT), _Handler)
httpd.serve_forever()
except Exception:
pass
_server_thread = threading.Thread(target=run, name="IDA-MCP-Registry", daemon=True)
_server_thread.start()
# 根据配置启动 HTTP 代理
_try_start_http_proxy()
def _try_start_http_proxy(): # pragma: no cover
"""启动 HTTP MCP 代理(始终启动,同时支持 stdio 和 HTTP 两种连接方式)。"""
try:
from .config import get_http_host, get_http_port, get_http_path
from .http import start_http_proxy, get_http_url
host = get_http_host()
port = get_http_port()
path = get_http_path()
if start_http_proxy(host, port, path):
url = get_http_url()
_log_info(f"HTTP MCP proxy started at {url}")
else:
_log_info("Failed to start HTTP MCP proxy")
except Exception as e:
# HTTP 模块不可用时静默忽略
_log_info(f"HTTP proxy not started: {e}")
def _log_info(msg: str): # pragma: no cover
"""输出日志到 IDA 消息窗口或控制台。"""
ts = time.strftime('%H:%M:%S', time.localtime())
line = f"[IDA-MCP][INFO][{ts}] {msg}\n"
try:
if hasattr(ida_kernwin, 'execute_sync') and hasattr(ida_kernwin, 'msg'):
def _emit():
try:
ida_kernwin.msg(line)
except Exception:
print(line, end='')
return 0
try:
ida_kernwin.execute_sync(_emit, ida_kernwin.MFF_READ)
except Exception:
try:
ida_kernwin.msg(line)
except Exception:
print(line, end='')
else:
print(line, end='')
except Exception:
pass
def _coordinator_alive() -> bool:
"""检测协调器是否存活(内部通信,固定使用 127.0.0.1)。"""
try:
with socket.create_connection((LOCALHOST, COORD_PORT), timeout=0.3):
return True
except OSError:
return False
def init_and_register(port: int, input_file: str | None, idb_path: str | None):
"""确保协调器运行, 若不存在则当前进程抢占成为协调器, 然后注册本实例。
参数:
port: 当前实例监听的 MCP 端口
input_file: 输入文件路径 (可能为 None)
idb_path: IDB 路径 (可能为 None)
逻辑:
1. 尝试连接 11337; 若失败则尝试 bind -> 成为协调器并启动 HTTP 服务。
2. 构造实例 payload 并 POST /register。
3. 注册 atexit 钩子, 确保正常退出时自动注销。
"""
global _is_coordinator
if not _coordinator_alive():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((COORD_HOST, COORD_PORT))
s.close()
_is_coordinator = True
_start_coordinator()
except OSError:
_is_coordinator = False
payload = {
'pid': _self_pid,
'port': port,
'host': IDA_HOST, # 实例监听地址,用于远程访问
'input_file': input_file,
'idb': idb_path,
'started': time.time(),
'python': sys.version.split()[0],
}
# 如果是协调器,直接操作内存;否则通过 HTTP 注册
if _is_coordinator:
_register_local(payload)
else:
_post_json('/register', payload)
atexit.register(deregister)
def is_coordinator() -> bool:
"""返回当前进程是否为协调器 (第一个绑定 11337 的实例).
供外部在完成 init_and_register 调用后输出额外日志。
"""
return _is_coordinator
def _post_json(path: str, obj: Any):
"""通过 HTTP POST 发送 JSON 到协调器(内部通信,固定使用 127.0.0.1)。"""
data = json.dumps(obj).encode('utf-8')
req = urllib.request.Request(f'http://{LOCALHOST}:{COORD_PORT}{path}', data=data, method='POST', headers={'Content-Type': 'application/json'})
try:
urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT)
except Exception:
pass
def _register_local(payload: dict):
"""本地注册实例(协调器进程直接操作内存)。"""
with _lock:
pid = payload.get('pid')
existing = [e for e in _instances if e.get('pid') != pid]
_instances.clear()
_instances.extend(existing)
_instances.append(payload)
_debug_log('REGISTER_LOCAL', pid=payload.get('pid'), port=payload.get('port'))
def get_instances() -> List[Dict[str, Any]]:
"""获取所有已注册实例(内部通信,固定使用 127.0.0.1)。"""
if _is_coordinator:
with _lock:
return list(_instances)
try:
with urllib.request.urlopen(f'http://{LOCALHOST}:{COORD_PORT}/instances', timeout=REQUEST_TIMEOUT) as resp: # type: ignore
raw = resp.read()
data = json.loads(raw.decode('utf-8') or '[]')
if isinstance(data, list):
return data
except Exception:
return []
return []
def deregister(): # pragma: no cover
_post_json('/deregister', {'pid': _self_pid})
def call_tool(pid: int | None = None, port: int | None = None, tool: str = '', params: dict | None = None) -> dict:
"""调用指定实例的工具(内部通信,固定使用 127.0.0.1)。"""
body = json.dumps({"pid": pid, "port": port, "tool": tool, "params": params or {}}).encode('utf-8')
req = urllib.request.Request(f'http://{LOCALHOST}:{COORD_PORT}/call', data=body, method='POST', headers={'Content-Type': 'application/json'})
try:
with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT) as resp: # type: ignore
raw = resp.read()
return json.loads(raw.decode('utf-8') or '{}')
except Exception as e:
return {"error": str(e)}
def check_connection() -> dict:
"""检查当前是否存在至少一个已注册的 IDA MCP 实例。
返回:
{"ok": True, "count": n} 当 n>0 时
{"ok": False, "count": 0} 当没有实例或协调器不可达
说明:
* 供外部快速健康探测使用, 不会抛异常, 统一结构。
"""
inst = get_instances()
return {"ok": bool(inst), "count": len(inst)}