"""
WinDbg 扩展插件,用于 MCP 集成
需要通过 PyKD 在 WinDbg 中加载此文件
"""
import os
import sys
import json
import threading
import http.server
from urllib.parse import urlparse
from typing import Any, Callable, get_type_hints, TypedDict, Optional, Annotated
# 导入 pykd 用于与 WinDbg 交互
try:
import pykd
PYKD_AVAILABLE = True
except ImportError:
PYKD_AVAILABLE = False
print("[WinDbg MCP] Warning: pykd not available, some features may not work")
class JSONRPCError(Exception):
def __init__(self, code: int, message: str, data: Any = None):
self.code = code
self.message = message
self.data = data
class RPCRegistry:
def __init__(self):
self.methods: dict[str, Callable] = {}
def register(self, func: Callable) -> Callable:
self.methods[func.__name__] = func
return func
def dispatch(self, method: str, params: Any) -> Any:
if method not in self.methods:
raise JSONRPCError(-32601, f"Method '{method}' not found")
func = self.methods[method]
hints = get_type_hints(func)
hints.pop("return", None)
if isinstance(params, list):
if len(params) != len(hints):
raise JSONRPCError(-32602, f"Invalid params: expected {len(hints)} arguments, got {len(params)}")
converted_params = []
for value, (param_name, expected_type) in zip(params, hints.items()):
try:
if not isinstance(value, expected_type):
value = expected_type(value)
converted_params.append(value)
except (ValueError, TypeError):
raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}")
return func(*converted_params)
elif isinstance(params, dict):
if set(params.keys()) != set(hints.keys()):
raise JSONRPCError(-32602, f"Invalid params: expected {list(hints.keys())}")
converted_params = {}
for param_name, expected_type in hints.items():
value = params.get(param_name)
try:
if not isinstance(value, expected_type):
value = expected_type(value)
converted_params[param_name] = value
except (ValueError, TypeError):
raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}")
return func(**converted_params)
else:
raise JSONRPCError(-32600, "Invalid Request: params must be array or object")
rpc_registry = RPCRegistry()
def jsonrpc(func: Callable) -> Callable:
"""Decorator to register a function as a JSON-RPC method"""
global rpc_registry
return rpc_registry.register(func)
class JSONRPCRequestHandler(http.server.BaseHTTPRequestHandler):
def send_jsonrpc_error(self, code: int, message: str, id: Any = None):
response = {
"jsonrpc": "2.0",
"error": {
"code": code,
"message": message
}
}
if id is not None:
response["id"] = id
response_body = json.dumps(response).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(response_body)))
self.end_headers()
self.wfile.write(response_body)
def do_POST(self):
global rpc_registry
parsed_path = urlparse(self.path)
if parsed_path.path != "/windbg":
self.send_jsonrpc_error(-32098, "Invalid endpoint", None)
return
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self.send_jsonrpc_error(-32700, "Parse error: missing request body", None)
return
request_body = self.rfile.read(content_length)
try:
request = json.loads(request_body)
except json.JSONDecodeError:
self.send_jsonrpc_error(-32700, "Parse error: invalid JSON", None)
return
response: dict[str, Any] = {
"jsonrpc": "2.0"
}
if request.get("id") is not None:
response["id"] = request.get("id")
try:
if not isinstance(request, dict):
raise JSONRPCError(-32600, "Invalid Request")
if request.get("jsonrpc") != "2.0":
raise JSONRPCError(-32600, "Invalid JSON-RPC version")
if "method" not in request:
raise JSONRPCError(-32600, "Method not specified")
result = rpc_registry.dispatch(request["method"], request.get("params", []))
response["result"] = result
except JSONRPCError as e:
response["error"] = {
"code": e.code,
"message": e.message
}
if e.data is not None:
response["error"]["data"] = e.data
except Exception as e:
import traceback
response["error"] = {
"code": -32603,
"message": "Internal error",
"data": traceback.format_exc(),
}
try:
response_body = json.dumps(response).encode("utf-8")
except Exception:
import traceback
response_body = json.dumps({
"error": {
"code": -32603,
"message": "Internal error",
"data": traceback.format_exc(),
}
}).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(response_body)))
self.end_headers()
self.wfile.write(response_body)
def log_message(self, format, *args):
# Suppress logging
pass
class WinDbgHTTPServer(http.server.HTTPServer):
allow_reuse_address = True
class Server:
HOST = "localhost"
PORT = 13338 # WinDbg MCP 专用端口
def __init__(self):
self.server = None
self.server_thread = None
self.running = False
def start(self):
if self.running:
print("[WinDbg MCP] Server is already running")
return
self.server_thread = threading.Thread(target=self._run_server, daemon=True)
self.running = True
self.server_thread.start()
def stop(self):
if not self.running:
return
self.running = False
if self.server:
self.server.shutdown()
self.server.server_close()
if self.server_thread:
self.server_thread.join()
self.server = None
print("[WinDbg MCP] Server stopped")
def _run_server(self):
try:
self.server = WinDbgHTTPServer((Server.HOST, Server.PORT), JSONRPCRequestHandler)
print(f"[WinDbg MCP] Server started at http://{Server.HOST}:{Server.PORT}")
self.server.serve_forever()
except OSError as e:
if e.errno == 98 or e.errno == 10048:
print("[WinDbg MCP] Error: Port 13338 is already in use")
else:
print(f"[WinDbg MCP] Server error: {e}")
self.running = False
except Exception as e:
print(f"[WinDbg MCP] Server error: {e}")
finally:
self.running = False
# =============================================================================
# WinDbg 命令包装函数
# =============================================================================
class DebuggerInfo(TypedDict):
is_kernel_mode: bool
target_arch: str
debugger_version: str
@jsonrpc
def get_debugger_info() -> DebuggerInfo:
"""获取当前调试器会话的信息"""
if not PYKD_AVAILABLE:
return DebuggerInfo(
is_kernel_mode=False,
target_arch="unknown",
debugger_version="pykd not available"
)
return DebuggerInfo(
is_kernel_mode=pykd.isKernelDebugging(),
target_arch="x64" if pykd.is64bitSystem() else "x86",
debugger_version=pykd.getDebuggerVersion()
)
@jsonrpc
def execute_command(command: Annotated[str, "要执行的 WinDbg 命令"]) -> str:
"""执行 WinDbg 命令并返回输出"""
if not PYKD_AVAILABLE:
return "Error: pykd not available"
try:
result = pykd.dbgCommand(command)
return result if result else "Command executed successfully (no output)"
except Exception as e:
return f"Error executing command: {str(e)}"
@jsonrpc
def get_stack_trace() -> list[str]:
"""获取当前调用堆栈"""
if not PYKD_AVAILABLE:
return ["Error: pykd not available"]
try:
stack = pykd.getStack()
return [str(frame) for frame in stack]
except Exception as e:
return [f"Error getting stack trace: {str(e)}"]
@jsonrpc
def get_registers() -> dict[str, str]:
"""获取所有 CPU 寄存器"""
if not PYKD_AVAILABLE:
return {"error": "pykd not available"}
try:
result = pykd.dbgCommand("r")
registers = {}
for line in result.split('\n'):
if '=' in line:
parts = line.split('=')
if len(parts) >= 2:
reg_name = parts[0].strip()
reg_value = parts[1].strip().split()[0]
registers[reg_name] = reg_value
return registers
except Exception as e:
return {"error": str(e)}
@jsonrpc
def get_modules() -> list[dict[str, str]]:
"""获取已加载模块的信息"""
if not PYKD_AVAILABLE:
return [{"error": "pykd not available"}]
try:
result = pykd.dbgCommand("lm")
modules = []
for line in result.split('\n'):
if line.strip() and not line.startswith('start'):
modules.append({"module": line.strip()})
return modules
except Exception as e:
return [{"error": str(e)}]
@jsonrpc
def read_memory(
address: Annotated[str, "要读取的内存地址"],
size: Annotated[int, "要读取的字节数"]
) -> str:
"""读取指定地址的内存"""
if not PYKD_AVAILABLE:
return "Error: pykd not available"
try:
addr = int(address, 16) if isinstance(address, str) else address
result = pykd.dbgCommand(f"db {hex(addr)} L{size}")
return result
except Exception as e:
return f"Error reading memory: {str(e)}"
@jsonrpc
def analyze_exception() -> str:
"""对当前异常运行 !analyze -v"""
if not PYKD_AVAILABLE:
return "Error: pykd not available"
try:
result = pykd.dbgCommand("!analyze -v")
return result
except Exception as e:
return f"Error analyzing exception: {str(e)}"
@jsonrpc
def get_threads() -> list[dict[str, str]]:
"""获取所有线程的信息"""
if not PYKD_AVAILABLE:
return [{"error": "pykd not available"}]
try:
result = pykd.dbgCommand("~")
threads = []
for line in result.split('\n'):
if line.strip():
threads.append({"thread": line.strip()})
return threads
except Exception as e:
return [{"error": str(e)}]
# =============================================================================
# 服务器管理
# =============================================================================
server_instance = None
def start_server():
"""启动 WinDbg MCP 服务器"""
global server_instance
if server_instance is None:
server_instance = Server()
server_instance.start()
print("[WinDbg MCP] To use this with MCP clients, configure the server to connect to http://localhost:13338/windbg")
def stop_server():
"""停止 WinDbg MCP 服务器"""
global server_instance
if server_instance is not None:
server_instance.stop()
# 作为扩展加载时自动启动
if __name__ == "__main__" or True:
start_server()