import os
import sys
if sys.version_info < (3, 11):
raise RuntimeError("Python 3.11 or higher is required for the MCP plugin")
import json
import struct
import threading
import http.server
from urllib.parse import urlparse
from typing import Any, Callable, get_type_hints, TypedDict, Optional, Annotated, TypeVar, Generic, NotRequired
class JSONRPCError(Exception):
def __init__(self, code: int, message: str, data: Any = None):
self.code = code
self.message = message
self.data = data
class RPCRegistry:
def __init__(self):
self.methods: dict[str, Callable] = {}
self.unsafe: set[str] = set()
def register(self, func: Callable) -> Callable:
self.methods[func.__name__] = func
return func
def mark_unsafe(self, func: Callable) -> Callable:
self.unsafe.add(func.__name__)
return func
def dispatch(self, method: str, params: Any) -> Any:
if method not in self.methods:
raise JSONRPCError(-32601, f"Method '{method}' not found")
func = self.methods[method]
hints = get_type_hints(func)
# Remove return annotation if present
hints.pop("return", None)
if isinstance(params, list):
if len(params) != len(hints):
raise JSONRPCError(-32602, f"Invalid params: expected {len(hints)} arguments, got {len(params)}")
# Validate and convert parameters
converted_params = []
for value, (param_name, expected_type) in zip(params, hints.items()):
try:
if not isinstance(value, expected_type):
value = expected_type(value)
converted_params.append(value)
except (ValueError, TypeError):
raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}")
return func(*converted_params)
elif isinstance(params, dict):
if set(params.keys()) != set(hints.keys()):
raise JSONRPCError(-32602, f"Invalid params: expected {list(hints.keys())}")
# Validate and convert parameters
converted_params = {}
for param_name, expected_type in hints.items():
value = params.get(param_name)
try:
if not isinstance(value, expected_type):
value = expected_type(value)
converted_params[param_name] = value
except (ValueError, TypeError):
raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}")
return func(**converted_params)
else:
raise JSONRPCError(-32600, "Invalid Request: params must be array or object")
rpc_registry = RPCRegistry()
def jsonrpc(func: Callable) -> Callable:
"""Decorator to register a function as a JSON-RPC method"""
global rpc_registry
return rpc_registry.register(func)
def unsafe(func: Callable) -> Callable:
"""Decorator to register mark a function as unsafe"""
return rpc_registry.mark_unsafe(func)
class JSONRPCRequestHandler(http.server.BaseHTTPRequestHandler):
def send_jsonrpc_error(self, code: int, message: str, id: Any = None):
response = {
"jsonrpc": "2.0",
"error": {
"code": code,
"message": message
}
}
if id is not None:
response["id"] = id
response_body = json.dumps(response).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(response_body))
self.end_headers()
self.wfile.write(response_body)
def do_POST(self):
global rpc_registry
parsed_path = urlparse(self.path)
if parsed_path.path != "/mcp":
self.send_jsonrpc_error(-32098, "Invalid endpoint", None)
return
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self.send_jsonrpc_error(-32700, "Parse error: missing request body", None)
return
request_body = self.rfile.read(content_length)
try:
request = json.loads(request_body)
except json.JSONDecodeError:
self.send_jsonrpc_error(-32700, "Parse error: invalid JSON", None)
return
# Prepare the response
response = {
"jsonrpc": "2.0"
}
if request.get("id") is not None:
response["id"] = request.get("id")
try:
# Basic JSON-RPC validation
if not isinstance(request, dict):
raise JSONRPCError(-32600, "Invalid Request")
if request.get("jsonrpc") != "2.0":
raise JSONRPCError(-32600, "Invalid JSON-RPC version")
if "method" not in request:
raise JSONRPCError(-32600, "Method not specified")
# Dispatch the method
result = rpc_registry.dispatch(request["method"], request.get("params", []))
response["result"] = result
except JSONRPCError as e:
response["error"] = {
"code": e.code,
"message": e.message
}
if e.data is not None:
response["error"]["data"] = e.data
except IDAError as e:
response["error"] = {
"code": -32000,
"message": e.message,
}
except Exception as e:
traceback.print_exc()
response["error"] = {
"code": -32603,
"message": "Internal error (please report a bug)",
"data": traceback.format_exc(),
}
try:
response_body = json.dumps(response).encode("utf-8")
except Exception as e:
traceback.print_exc()
response_body = json.dumps({
"error": {
"code": -32603,
"message": "Internal error (please report a bug)",
"data": traceback.format_exc(),
}
}).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(response_body))
self.end_headers()
self.wfile.write(response_body)
def log_message(self, format, *args):
# Suppress logging
pass
class MCPHTTPServer(http.server.HTTPServer):
allow_reuse_address = False
class Server:
HOST = "localhost"
PORT = 13337
def __init__(self):
self.server = None
self.server_thread = None
self.running = False
def start(self):
if self.running:
print("[MCP] Server is already running")
return
self.server_thread = threading.Thread(target=self._run_server, daemon=True)
self.running = True
self.server_thread.start()
def stop(self):
if not self.running:
return
self.running = False
if self.server:
self.server.shutdown()
self.server.server_close()
if self.server_thread:
self.server_thread.join()
self.server = None
print("[MCP] Server stopped")
def _run_server(self):
try:
# Create server in the thread to handle binding
self.server = MCPHTTPServer((Server.HOST, Server.PORT), JSONRPCRequestHandler)
print(f"[MCP] Server started at http://{Server.HOST}:{Server.PORT}")
self.server.serve_forever()
except OSError as e:
if e.errno == 98 or e.errno == 10048: # Port already in use (Linux/Windows)
print("[MCP] Error: Port 13337 is already in use")
else:
print(f"[MCP] Server error: {e}")
self.running = False
except Exception as e:
print(f"[MCP] Server error: {e}")
finally:
self.running = False
# A module that helps with writing thread safe ida code.
# Based on:
# https://web.archive.org/web/20160305190440/http://www.williballenthin.com/blog/2015/09/04/idapython-synchronization-decorator/
import logging
import queue
import traceback
import functools
import ida_hexrays
import ida_kernwin
import ida_funcs
import ida_gdl
import ida_lines
import ida_idaapi
import idc
import idaapi
import idautils
import ida_nalt
import ida_bytes
import ida_typeinf
import ida_xref
import ida_entry
import idautils
import ida_idd
import ida_dbg
import ida_name
import ida_ida
import ida_frame
class IDAError(Exception):
def __init__(self, message: str):
super().__init__(message)
@property
def message(self) -> str:
return self.args[0]
class IDASyncError(Exception):
pass
class DecompilerLicenseError(IDAError):
pass
# Important note: Always make sure the return value from your function f is a
# copy of the data you have gotten from IDA, and not the original data.
#
# Example:
# --------
#
# Do this:
#
# @idaread
# def ts_Functions():
# return list(idautils.Functions())
#
# Don't do this:
#
# @idaread
# def ts_Functions():
# return idautils.Functions()
#
logger = logging.getLogger(__name__)
# Enum for safety modes. Higher means safer:
class IDASafety:
ida_kernwin.MFF_READ
SAFE_NONE = ida_kernwin.MFF_FAST
SAFE_READ = ida_kernwin.MFF_READ
SAFE_WRITE = ida_kernwin.MFF_WRITE
call_stack = queue.LifoQueue()
def sync_wrapper(ff, safety_mode: IDASafety):
"""
Call a function ff with a specific IDA safety_mode.
"""
#logger.debug('sync_wrapper: {}, {}'.format(ff.__name__, safety_mode))
if safety_mode not in [IDASafety.SAFE_READ, IDASafety.SAFE_WRITE]:
error_str = 'Invalid safety mode {} over function {}'\
.format(safety_mode, ff.__name__)
logger.error(error_str)
raise IDASyncError(error_str)
# No safety level is set up:
res_container = queue.Queue()
def runned():
#logger.debug('Inside runned')
# Make sure that we are not already inside a sync_wrapper:
if not call_stack.empty():
last_func_name = call_stack.get()
error_str = ('Call stack is not empty while calling the '
'function {} from {}').format(ff.__name__, last_func_name)
#logger.error(error_str)
raise IDASyncError(error_str)
call_stack.put((ff.__name__))
try:
res_container.put(ff())
except Exception as x:
res_container.put(x)
finally:
call_stack.get()
#logger.debug('Finished runned')
ret_val = idaapi.execute_sync(runned, safety_mode)
res = res_container.get()
if isinstance(res, Exception):
raise res
return res
def idawrite(f):
"""
decorator for marking a function as modifying the IDB.
schedules a request to be made in the main IDA loop to avoid IDB corruption.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
ff = functools.partial(f, *args, **kwargs)
ff.__name__ = f.__name__
return sync_wrapper(ff, idaapi.MFF_WRITE)
return wrapper
def idaread(f):
"""
decorator for marking a function as reading from the IDB.
schedules a request to be made in the main IDA loop to avoid
inconsistent results.
MFF_READ constant via: http://www.openrce.org/forums/posts/1827
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
ff = functools.partial(f, *args, **kwargs)
ff.__name__ = f.__name__
return sync_wrapper(ff, idaapi.MFF_READ)
return wrapper
def is_window_active():
"""Returns whether IDA is currently active"""
try:
from PyQt5.QtWidgets import QApplication
except ImportError:
return False
app = QApplication.instance()
if app is None:
return False
for widget in app.topLevelWidgets():
if widget.isActiveWindow():
return True
return False
class Metadata(TypedDict):
path: str
module: str
base: str
size: str
md5: str
sha256: str
crc32: str
filesize: str
def get_image_size() -> int:
try:
# https://www.hex-rays.com/products/ida/support/sdkdoc/structidainfo.html
info = idaapi.get_inf_structure()
omin_ea = info.omin_ea
omax_ea = info.omax_ea
except AttributeError:
import ida_ida
omin_ea = ida_ida.inf_get_omin_ea()
omax_ea = ida_ida.inf_get_omax_ea()
# Bad heuristic for image size (bad if the relocations are the last section)
image_size = omax_ea - omin_ea
# Try to extract it from the PE header
header = idautils.peutils_t().header()
if header and header[:4] == b"PE\0\0":
image_size = struct.unpack("<I", header[0x50:0x54])[0]
return image_size
@jsonrpc
@idaread
def get_metadata() -> Metadata:
"""Get metadata about the current IDB"""
# Fat Mach-O binaries can return a None hash:
# https://github.com/mrexodia/ida-pro-mcp/issues/26
def hash(f):
try:
return f().hex()
except:
return None
return Metadata(path=idaapi.get_input_file_path(),
module=idaapi.get_root_filename(),
base=hex(idaapi.get_imagebase()),
size=hex(get_image_size()),
md5=hash(ida_nalt.retrieve_input_file_md5),
sha256=hash(ida_nalt.retrieve_input_file_sha256),
crc32=hex(ida_nalt.retrieve_input_file_crc32()),
filesize=hex(ida_nalt.retrieve_input_file_size()))
def get_prototype(fn: ida_funcs.func_t) -> Optional[str]:
try:
prototype: ida_typeinf.tinfo_t = fn.get_prototype()
if prototype is not None:
return str(prototype)
else:
return None
except AttributeError:
try:
return idc.get_type(fn.start_ea)
except:
tif = ida_typeinf.tinfo_t()
if ida_nalt.get_tinfo(tif, fn.start_ea):
return str(tif)
return None
except Exception as e:
print(f"Error getting function prototype: {e}")
return None
class Function(TypedDict):
address: str
name: str
size: str
def parse_address(address: str) -> int:
try:
return int(address, 0)
except ValueError:
for ch in address:
if ch not in "0123456789abcdefABCDEF":
raise IDAError(f"Failed to parse address: {address}")
raise IDAError(f"Failed to parse address (missing 0x prefix): {address}")
def get_function(address: int, *, raise_error=True) -> Function:
fn = idaapi.get_func(address)
if fn is None:
if raise_error:
raise IDAError(f"No function found at address {hex(address)}")
return None
try:
name = fn.get_name()
except AttributeError:
name = ida_funcs.get_func_name(fn.start_ea)
return Function(address=hex(address), name=name, size=hex(fn.end_ea - fn.start_ea))
DEMANGLED_TO_EA = {}
def create_demangled_to_ea_map():
for ea in idautils.Functions():
# Get the function name and demangle it
# MNG_NODEFINIT inhibits everything except the main name
# where default demangling adds the function signature
# and decorators (if any)
demangled = idaapi.demangle_name(
idc.get_name(ea, 0), idaapi.MNG_NODEFINIT)
if demangled:
DEMANGLED_TO_EA[demangled] = ea
def get_type_by_name(type_name: str) -> ida_typeinf.tinfo_t:
# 8-bit integers
if type_name in ('int8', '__int8', 'int8_t', 'char', 'signed char'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT8)
elif type_name in ('uint8', '__uint8', 'uint8_t', 'unsigned char', 'byte', 'BYTE'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT8)
# 16-bit integers
elif type_name in ('int16', '__int16', 'int16_t', 'short', 'short int', 'signed short', 'signed short int'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT16)
elif type_name in ('uint16', '__uint16', 'uint16_t', 'unsigned short', 'unsigned short int', 'word', 'WORD'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT16)
# 32-bit integers
elif type_name in ('int32', '__int32', 'int32_t', 'int', 'signed int', 'long', 'long int', 'signed long', 'signed long int'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT32)
elif type_name in ('uint32', '__uint32', 'uint32_t', 'unsigned int', 'unsigned long', 'unsigned long int', 'dword', 'DWORD'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT32)
# 64-bit integers
elif type_name in ('int64', '__int64', 'int64_t', 'long long', 'long long int', 'signed long long', 'signed long long int'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT64)
elif type_name in ('uint64', '__uint64', 'uint64_t', 'unsigned int64', 'unsigned long long', 'unsigned long long int', 'qword', 'QWORD'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT64)
# 128-bit integers
elif type_name in ('int128', '__int128', 'int128_t', '__int128_t'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT128)
elif type_name in ('uint128', '__uint128', 'uint128_t', '__uint128_t', 'unsigned int128'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT128)
# Floating point types
elif type_name in ('float', ):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_FLOAT)
elif type_name in ('double', ):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_DOUBLE)
elif type_name in ('long double', 'ldouble'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_LDOUBLE)
# Boolean type
elif type_name in ('bool', '_Bool', 'boolean'):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_BOOL)
# Void type
elif type_name in ('void', ):
return ida_typeinf.tinfo_t(ida_typeinf.BTF_VOID)
# If not a standard type, try to get a named type
tif = ida_typeinf.tinfo_t()
if tif.get_named_type(None, type_name, ida_typeinf.BTF_STRUCT):
return tif
if tif.get_named_type(None, type_name, ida_typeinf.BTF_TYPEDEF):
return tif
if tif.get_named_type(None, type_name, ida_typeinf.BTF_ENUM):
return tif
if tif.get_named_type(None, type_name, ida_typeinf.BTF_UNION):
return tif
if tif := ida_typeinf.tinfo_t(type_name):
return tif
raise IDAError(f"Unable to retrieve {type_name} type info object")
@jsonrpc
@idaread
def get_function_by_name(
name: Annotated[str, "Name of the function to get"]
) -> Function:
"""Get a function by its name"""
function_address = idaapi.get_name_ea(idaapi.BADADDR, name)
if function_address == idaapi.BADADDR:
# If map has not been created yet, create it
if len(DEMANGLED_TO_EA) == 0:
create_demangled_to_ea_map()
# Try to find the function in the map, else raise an error
if name in DEMANGLED_TO_EA:
function_address = DEMANGLED_TO_EA[name]
else:
raise IDAError(f"No function found with name {name}")
return get_function(function_address)
@jsonrpc
@idaread
def get_function_by_address(
address: Annotated[str, "Address of the function to get"],
) -> Function:
"""Get a function by its address"""
return get_function(parse_address(address))
@jsonrpc
@idaread
def get_current_address() -> str:
"""Get the address currently selected by the user"""
return hex(idaapi.get_screen_ea())
@jsonrpc
@idaread
def get_current_function() -> Optional[Function]:
"""Get the function currently selected by the user"""
return get_function(idaapi.get_screen_ea())
class ConvertedNumber(TypedDict):
decimal: str
hexadecimal: str
bytes: str
ascii: Optional[str]
binary: str
@jsonrpc
def convert_number(
text: Annotated[str, "Textual representation of the number to convert"],
size: Annotated[Optional[int], "Size of the variable in bytes"],
) -> ConvertedNumber:
"""Convert a number (decimal, hexadecimal) to different representations"""
try:
value = int(text, 0)
except ValueError:
raise IDAError(f"Invalid number: {text}")
# Estimate the size of the number
if not size:
size = 0
n = abs(value)
while n:
size += 1
n >>= 1
size += 7
size //= 8
# Convert the number to bytes
try:
bytes = value.to_bytes(size, "little", signed=True)
except OverflowError:
raise IDAError(f"Number {text} is too big for {size} bytes")
# Convert the bytes to ASCII
ascii = ""
for byte in bytes.rstrip(b"\x00"):
if byte >= 32 and byte <= 126:
ascii += chr(byte)
else:
ascii = None
break
return ConvertedNumber(
decimal=str(value),
hexadecimal=hex(value),
bytes=bytes.hex(" "),
ascii=ascii,
binary=bin(value),
)
T = TypeVar("T")
class Page(TypedDict, Generic[T]):
data: list[T]
next_offset: Optional[int]
def paginate(data: list[T], offset: int, count: int) -> Page[T]:
if count == 0:
count = len(data)
next_offset = offset + count
if next_offset >= len(data):
next_offset = None
return {
"data": data[offset:offset + count],
"next_offset": next_offset,
}
def pattern_filter(data: list[T], pattern: str, key: str) -> list[T]:
if not pattern:
return data
# TODO: implement /regex/ matching
def matches(item: T) -> bool:
return pattern.lower() in item[key].lower()
return list(filter(matches, data))
@jsonrpc
@idaread
def list_functions(
offset: Annotated[int, "Offset to start listing from (start at 0)"],
count: Annotated[int, "Number of functions to list (100 is a good default, 0 means remainder)"],
) -> Page[Function]:
"""List all functions in the database (paginated)"""
functions = [get_function(address) for address in idautils.Functions()]
return paginate(functions, offset, count)
class Global(TypedDict):
address: str
name: str
@jsonrpc
@idaread
def list_globals_filter(
offset: Annotated[int, "Offset to start listing from (start at 0)"],
count: Annotated[int, "Number of globals to list (100 is a good default, 0 means remainder)"],
filter: Annotated[str, "Filter to apply to the list (required parameter, empty string for no filter). Case-insensitive contains or /regex/ syntax"],
) -> Page[Global]:
"""List matching globals in the database (paginated, filtered)"""
globals = []
for addr, name in idautils.Names():
# Skip functions
if not idaapi.get_func(addr):
globals += [Global(address=hex(addr), name=name)]
globals = pattern_filter(globals, filter, "name")
return paginate(globals, offset, count)
@jsonrpc
def list_globals(
offset: Annotated[int, "Offset to start listing from (start at 0)"],
count: Annotated[int, "Number of globals to list (100 is a good default, 0 means remainder)"],
) -> Page[Global]:
"""List all globals in the database (paginated)"""
return list_globals_filter(offset, count, "")
class Import(TypedDict):
address: str
imported_name: str
module: str
@jsonrpc
@idaread
def list_imports(
offset: Annotated[int, "Offset to start listing from (start at 0)"],
count: Annotated[int, "Number of imports to list (100 is a good default, 0 means remainder)"],
) -> Page[Import]:
""" List all imported symbols with their name and module (paginated) """
nimps = ida_nalt.get_import_module_qty()
rv = []
for i in range(nimps):
module_name = ida_nalt.get_import_module_name(i)
if not module_name:
module_name = "<unnamed>"
def imp_cb(ea, symbol_name, ordinal, acc):
if not symbol_name:
symbol_name = f"#{ordinal}"
acc += [Import(address=hex(ea), imported_name=symbol_name, module=module_name)]
return True
imp_cb_w_context = lambda ea, symbol_name, ordinal: imp_cb(ea, symbol_name, ordinal, rv)
ida_nalt.enum_import_names(i, imp_cb_w_context)
return paginate(rv, offset, count)
class String(TypedDict):
address: str
length: int
string: str
@jsonrpc
@idaread
def list_strings_filter(
offset: Annotated[int, "Offset to start listing from (start at 0)"],
count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"],
filter: Annotated[str, "Filter to apply to the list (required parameter, empty string for no filter). Case-insensitive contains or /regex/ syntax"],
) -> Page[String]:
"""List matching strings in the database (paginated, filtered)"""
strings = []
for item in idautils.Strings():
try:
string = str(item)
if string:
strings += [
String(address=hex(item.ea), length=item.length, string=string),
]
except:
continue
strings = pattern_filter(strings, filter, "string")
return paginate(strings, offset, count)
@jsonrpc
def list_strings(
offset: Annotated[int, "Offset to start listing from (start at 0)"],
count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"],
) -> Page[String]:
"""List all strings in the database (paginated)"""
return list_strings_filter(offset, count, "")
@jsonrpc
@idaread
def list_local_types():
"""List all Local types in the database"""
error = ida_hexrays.hexrays_failure_t()
locals = []
idati = ida_typeinf.get_idati()
type_count = ida_typeinf.get_ordinal_limit(idati)
for ordinal in range(1, type_count):
try:
tif = ida_typeinf.tinfo_t()
if tif.get_numbered_type(idati, ordinal):
type_name = tif.get_type_name()
if not type_name:
type_name = f"<Anonymous Type #{ordinal}>"
locals.append(f"\nType #{ordinal}: {type_name}")
if tif.is_udt():
c_decl_flags = (ida_typeinf.PRTYPE_MULTI | ida_typeinf.PRTYPE_TYPE | ida_typeinf.PRTYPE_SEMI | ida_typeinf.PRTYPE_DEF | ida_typeinf.PRTYPE_METHODS | ida_typeinf.PRTYPE_OFFSETS)
c_decl_output = tif._print(None, c_decl_flags)
if c_decl_output:
locals.append(f" C declaration:\n{c_decl_output}")
else:
simple_decl = tif._print(None, ida_typeinf.PRTYPE_1LINE | ida_typeinf.PRTYPE_TYPE | ida_typeinf.PRTYPE_SEMI)
if simple_decl:
locals.append(f" Simple declaration:\n{simple_decl}")
else:
message = f"\nType #{ordinal}: Failed to retrieve information."
if error.str:
message += f": {error.str}"
if error.errea != idaapi.BADADDR:
message += f"from (address: {hex(error.errea)})"
raise IDAError(message)
except:
continue
return locals
def decompile_checked(address: int) -> ida_hexrays.cfunc_t:
if not ida_hexrays.init_hexrays_plugin():
raise IDAError("Hex-Rays decompiler is not available")
error = ida_hexrays.hexrays_failure_t()
cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func(address, error, ida_hexrays.DECOMP_WARNINGS)
if not cfunc:
if error.code == ida_hexrays.MERR_LICENSE:
raise DecompilerLicenseError("Decompiler licence is not available. Use `disassemble_function` to get the assembly code instead.")
message = f"Decompilation failed at {hex(address)}"
if error.str:
message += f": {error.str}"
if error.errea != idaapi.BADADDR:
message += f" (address: {hex(error.errea)})"
raise IDAError(message)
return cfunc
@jsonrpc
@idaread
def decompile_function(
address: Annotated[str, "Address of the function to decompile"],
) -> str:
"""Decompile a function at the given address"""
address = parse_address(address)
cfunc = decompile_checked(address)
if is_window_active():
ida_hexrays.open_pseudocode(address, ida_hexrays.OPF_REUSE)
sv = cfunc.get_pseudocode()
pseudocode = ""
for i, sl in enumerate(sv):
sl: ida_kernwin.simpleline_t
item = ida_hexrays.ctree_item_t()
addr = None if i > 0 else cfunc.entry_ea
if cfunc.get_line_item(sl.line, 0, False, None, item, None):
ds = item.dstr().split(": ")
if len(ds) == 2:
try:
addr = int(ds[0], 16)
except ValueError:
pass
line = ida_lines.tag_remove(sl.line)
if len(pseudocode) > 0:
pseudocode += "\n"
if not addr:
pseudocode += f"/* line: {i} */ {line}"
else:
pseudocode += f"/* line: {i}, address: {hex(addr)} */ {line}"
return pseudocode
class DisassemblyLine(TypedDict):
segment: NotRequired[str]
address: str
label: NotRequired[str]
instruction: str
comments: NotRequired[list[str]]
class Argument(TypedDict):
name: str
type: str
class DisassemblyFunction(TypedDict):
name: str
start_ea: str
return_type: NotRequired[str]
arguments: NotRequired[list[Argument]]
stack_frame: list[dict]
lines: list[DisassemblyLine]
@jsonrpc
@idaread
def disassemble_function(
start_address: Annotated[str, "Address of the function to disassemble"],
) -> DisassemblyFunction:
"""Get assembly code for a function"""
start = parse_address(start_address)
func: ida_funcs.func_t = idaapi.get_func(start)
if not func:
raise IDAError(f"No function found containing address {start_address}")
if is_window_active():
ida_kernwin.jumpto(start)
lines = []
for address in ida_funcs.func_item_iterator_t(func):
seg = idaapi.getseg(address)
segment = idaapi.get_segm_name(seg) if seg else None
label = idc.get_name(address, 0)
if label and label == func.name and address == func.start_ea:
label = None
if label == "":
label = None
comments = []
if comment := idaapi.get_cmt(address, False):
comments += [comment]
if comment := idaapi.get_cmt(address, True):
comments += [comment]
raw_instruction = idaapi.generate_disasm_line(address, 0)
tls = ida_kernwin.tagged_line_sections_t()
ida_kernwin.parse_tagged_line_sections(tls, raw_instruction)
insn_section = tls.first(ida_lines.COLOR_INSN)
operands = []
for op_tag in range(ida_lines.COLOR_OPND1, ida_lines.COLOR_OPND8 + 1):
op_n = tls.first(op_tag)
if not op_n:
break
op: str = op_n.substr(raw_instruction)
op_str = ida_lines.tag_remove(op)
# Do a lot of work to add address comments for symbols
for idx in range(len(op) - 2):
if op[idx] != idaapi.COLOR_ON:
continue
idx += 1
if ord(op[idx]) != idaapi.COLOR_ADDR:
continue
idx += 1
addr_string = op[idx:idx + idaapi.COLOR_ADDR_SIZE]
idx += idaapi.COLOR_ADDR_SIZE
addr = int(addr_string, 16)
# Find the next color and slice until there
symbol = op[idx:op.find(idaapi.COLOR_OFF, idx)]
if symbol == '':
# We couldn't figure out the symbol, so use the whole op_str
symbol = op_str
comments += [f"{symbol}={addr:#x}"]
# print its value if its type is available
try:
value = get_global_variable_value_internal(addr)
except:
continue
comments += [f"*{symbol}={value}"]
operands += [op_str]
mnem = ida_lines.tag_remove(insn_section.substr(raw_instruction))
instruction = f"{mnem} {', '.join(operands)}"
line = DisassemblyLine(
address=f"{address:#x}",
instruction=instruction,
)
if len(comments) > 0:
line.update(comments=comments)
if segment:
line.update(segment=segment)
if label:
line.update(label=label)
lines += [line]
prototype = func.get_prototype()
arguments: list[Argument] = [Argument(name=arg.name, type=f"{arg.type}") for arg in prototype.iter_func()] if prototype else None
disassembly_function = DisassemblyFunction(
name=func.name,
start_ea=f"{func.start_ea:#x}",
stack_frame=get_stack_frame_variables_internal(func.start_ea),
lines=lines
)
if prototype:
disassembly_function.update(return_type=f"{prototype.get_rettype()}")
if arguments:
disassembly_function.update(arguments=arguments)
return disassembly_function
class Xref(TypedDict):
address: str
type: str
function: Optional[Function]
@jsonrpc
@idaread
def get_xrefs_to(
address: Annotated[str, "Address to get cross references to"],
) -> list[Xref]:
"""Get all cross references to the given address"""
xrefs = []
xref: ida_xref.xrefblk_t
for xref in idautils.XrefsTo(parse_address(address)):
xrefs += [
Xref(address=hex(xref.frm),
type="code" if xref.iscode else "data",
function=get_function(xref.frm, raise_error=False))
]
return xrefs
@jsonrpc
@idaread
def get_xrefs_to_field(
struct_name: Annotated[str, "Name of the struct (type) containing the field"],
field_name: Annotated[str, "Name of the field (member) to get xrefs to"],
) -> list[Xref]:
"""Get all cross references to a named struct field (member)"""
# Get the type library
til = ida_typeinf.get_idati()
if not til:
raise IDAError("Failed to retrieve type library.")
# Get the structure type info
tif = ida_typeinf.tinfo_t()
if not tif.get_named_type(til, struct_name, ida_typeinf.BTF_STRUCT, True, False):
print(f"Structure '{struct_name}' not found.")
return []
# Get The field index
idx = ida_typeinf.get_udm_by_fullname(None, struct_name + '.' + field_name)
if idx == -1:
print(f"Field '{field_name}' not found in structure '{struct_name}'.")
return []
# Get the type identifier
tid = tif.get_udm_tid(idx)
if tid == ida_idaapi.BADADDR:
raise IDAError(f"Unable to get tid for structure '{struct_name}' and field '{field_name}'.")
# Get xrefs to the tid
xrefs = []
xref: ida_xref.xrefblk_t
for xref in idautils.XrefsTo(tid):
xrefs += [
Xref(address=hex(xref.frm),
type="code" if xref.iscode else "data",
function=get_function(xref.frm, raise_error=False))
]
return xrefs
@jsonrpc
@idaread
def get_entry_points() -> list[Function]:
"""Get all entry points in the database"""
result = []
for i in range(ida_entry.get_entry_qty()):
ordinal = ida_entry.get_entry_ordinal(i)
address = ida_entry.get_entry(ordinal)
func = get_function(address, raise_error=False)
if func is not None:
result.append(func)
return result
@jsonrpc
@idawrite
def set_comment(
address: Annotated[str, "Address in the function to set the comment for"],
comment: Annotated[str, "Comment text"],
):
"""Set a comment for a given address in the function disassembly and pseudocode"""
address = parse_address(address)
if not idaapi.set_cmt(address, comment, False):
raise IDAError(f"Failed to set disassembly comment at {hex(address)}")
if not ida_hexrays.init_hexrays_plugin():
return
# Reference: https://cyber.wtf/2019/03/22/using-ida-python-to-analyze-trickbot/
# Check if the address corresponds to a line
try:
cfunc = decompile_checked(address)
except DecompilerLicenseError:
# We failed to decompile the function due to a decompiler license error
return
# Special case for function entry comments
if address == cfunc.entry_ea:
idc.set_func_cmt(address, comment, True)
cfunc.refresh_func_ctext()
return
eamap = cfunc.get_eamap()
if address not in eamap:
print(f"Failed to set decompiler comment at {hex(address)}")
return
nearest_ea = eamap[address][0].ea
# Remove existing orphan comments
if cfunc.has_orphan_cmts():
cfunc.del_orphan_cmts()
cfunc.save_user_cmts()
# Set the comment by trying all possible item types
tl = idaapi.treeloc_t()
tl.ea = nearest_ea
for itp in range(idaapi.ITP_SEMI, idaapi.ITP_COLON):
tl.itp = itp
cfunc.set_user_cmt(tl, comment)
cfunc.save_user_cmts()
cfunc.refresh_func_ctext()
if not cfunc.has_orphan_cmts():
return
cfunc.del_orphan_cmts()
cfunc.save_user_cmts()
print(f"Failed to set decompiler comment at {hex(address)}")
def refresh_decompiler_widget():
widget = ida_kernwin.get_current_widget()
if widget is not None:
vu = ida_hexrays.get_widget_vdui(widget)
if vu is not None:
vu.refresh_ctext()
def refresh_decompiler_ctext(function_address: int):
error = ida_hexrays.hexrays_failure_t()
cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func(function_address, error, ida_hexrays.DECOMP_WARNINGS)
if cfunc:
cfunc.refresh_func_ctext()
@jsonrpc
@idawrite
def rename_local_variable(
function_address: Annotated[str, "Address of the function containing the variable"],
old_name: Annotated[str, "Current name of the variable"],
new_name: Annotated[str, "New name for the variable (empty for a default name)"],
):
"""Rename a local variable in a function"""
func = idaapi.get_func(parse_address(function_address))
if not func:
raise IDAError(f"No function found at address {function_address}")
if not ida_hexrays.rename_lvar(func.start_ea, old_name, new_name):
raise IDAError(f"Failed to rename local variable {old_name} in function {hex(func.start_ea)}")
refresh_decompiler_ctext(func.start_ea)
@jsonrpc
@idawrite
def rename_global_variable(
old_name: Annotated[str, "Current name of the global variable"],
new_name: Annotated[str, "New name for the global variable (empty for a default name)"],
):
"""Rename a global variable"""
ea = idaapi.get_name_ea(idaapi.BADADDR, old_name)
if not idaapi.set_name(ea, new_name):
raise IDAError(f"Failed to rename global variable {old_name} to {new_name}")
refresh_decompiler_ctext(ea)
@jsonrpc
@idawrite
def set_global_variable_type(
variable_name: Annotated[str, "Name of the global variable"],
new_type: Annotated[str, "New type for the variable"],
):
"""Set a global variable's type"""
ea = idaapi.get_name_ea(idaapi.BADADDR, variable_name)
tif = get_type_by_name(new_type)
if not tif:
raise IDAError(f"Parsed declaration is not a variable type")
if not ida_typeinf.apply_tinfo(ea, tif, ida_typeinf.PT_SIL):
raise IDAError(f"Failed to apply type")
@jsonrpc
@idaread
def get_global_variable_value_by_name(variable_name: Annotated[str, "Name of the global variable"]) -> str:
"""
Read a global variable's value (if known at compile-time)
Prefer this function over the `data_read_*` functions.
"""
ea = idaapi.get_name_ea(idaapi.BADADDR, variable_name)
if ea == idaapi.BADADDR:
raise IDAError(f"Global variable {variable_name} not found")
return get_global_variable_value_internal(ea)
@jsonrpc
@idaread
def get_global_variable_value_at_address(ea: Annotated[str, "Address of the global variable"]) -> str:
"""
Read a global variable's value by its address (if known at compile-time)
Prefer this function over the `data_read_*` functions.
"""
ea = parse_address(ea)
return get_global_variable_value_internal(ea)
def get_global_variable_value_internal(ea: int) -> str:
# Get the type information for the variable
tif = ida_typeinf.tinfo_t()
if not ida_nalt.get_tinfo(tif, ea):
# No type info, maybe we can figure out its size by its name
if not ida_bytes.has_any_name(ea):
raise IDAError(f"Failed to get type information for variable at {ea:#x}")
size = ida_bytes.get_item_size(ea)
if size == 0:
raise IDAError(f"Failed to get type information for variable at {ea:#x}")
else:
# Determine the size of the variable
size = tif.get_size()
# Read the value based on the size
if size == 0 and tif.is_array() and tif.get_array_element().is_decl_char():
return_string = idaapi.get_strlit_contents(ea, -1, 0).decode("utf-8").strip()
return f"\"{return_string}\""
elif size == 1:
return hex(ida_bytes.get_byte(ea))
elif size == 2:
return hex(ida_bytes.get_word(ea))
elif size == 4:
return hex(ida_bytes.get_dword(ea))
elif size == 8:
return hex(ida_bytes.get_qword(ea))
else:
# For other sizes, return the raw bytes
return ' '.join(hex(x) for x in ida_bytes.get_bytes(ea, size))
@jsonrpc
@idawrite
def rename_function(
function_address: Annotated[str, "Address of the function to rename"],
new_name: Annotated[str, "New name for the function (empty for a default name)"],
):
"""Rename a function"""
func = idaapi.get_func(parse_address(function_address))
if not func:
raise IDAError(f"No function found at address {function_address}")
if not idaapi.set_name(func.start_ea, new_name):
raise IDAError(f"Failed to rename function {hex(func.start_ea)} to {new_name}")
refresh_decompiler_ctext(func.start_ea)
@jsonrpc
@idawrite
def set_function_prototype(
function_address: Annotated[str, "Address of the function"],
prototype: Annotated[str, "New function prototype"],
):
"""Set a function's prototype"""
func = idaapi.get_func(parse_address(function_address))
if not func:
raise IDAError(f"No function found at address {function_address}")
try:
tif = ida_typeinf.tinfo_t(prototype, None, ida_typeinf.PT_SIL)
if not tif.is_func():
raise IDAError(f"Parsed declaration is not a function type")
if not ida_typeinf.apply_tinfo(func.start_ea, tif, ida_typeinf.PT_SIL):
raise IDAError(f"Failed to apply type")
refresh_decompiler_ctext(func.start_ea)
except Exception as e:
raise IDAError(f"Failed to parse prototype string: {prototype}")
class my_modifier_t(ida_hexrays.user_lvar_modifier_t):
def __init__(self, var_name: str, new_type: ida_typeinf.tinfo_t):
ida_hexrays.user_lvar_modifier_t.__init__(self)
self.var_name = var_name
self.new_type = new_type
def modify_lvars(self, lvars):
for lvar_saved in lvars.lvvec:
lvar_saved: ida_hexrays.lvar_saved_info_t
if lvar_saved.name == self.var_name:
lvar_saved.type = self.new_type
return True
return False
# NOTE: This is extremely hacky, but necessary to get errors out of IDA
def parse_decls_ctypes(decls: str, hti_flags: int) -> tuple[int, str]:
if sys.platform == "win32":
import ctypes
assert isinstance(decls, str), "decls must be a string"
assert isinstance(hti_flags, int), "hti_flags must be an int"
c_decls = decls.encode("utf-8")
c_til = None
ida_dll = ctypes.CDLL("ida")
ida_dll.parse_decls.argtypes = [
ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_void_p,
ctypes.c_int,
]
ida_dll.parse_decls.restype = ctypes.c_int
messages = []
@ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p)
def magic_printer(fmt: bytes, arg1: bytes):
if fmt.count(b"%") == 1 and b"%s" in fmt:
formatted = fmt.replace(b"%s", arg1)
messages.append(formatted.decode("utf-8"))
return len(formatted) + 1
else:
messages.append(f"unsupported magic_printer fmt: {repr(fmt)}")
return 0
errors = ida_dll.parse_decls(c_til, c_decls, magic_printer, hti_flags)
else:
# NOTE: The approach above could also work on other platforms, but it's
# not been tested and there are differences in the vararg ABIs.
errors = ida_typeinf.parse_decls(None, decls, False, hti_flags)
messages = []
return errors, messages
@jsonrpc
@idawrite
def declare_c_type(
c_declaration: Annotated[str, "C declaration of the type. Examples include: typedef int foo_t; struct bar { int a; bool b; };"],
):
"""Create or update a local type from a C declaration"""
# PT_SIL: Suppress warning dialogs (although it seems unnecessary here)
# PT_EMPTY: Allow empty types (also unnecessary?)
# PT_TYP: Print back status messages with struct tags
flags = ida_typeinf.PT_SIL | ida_typeinf.PT_EMPTY | ida_typeinf.PT_TYP
errors, messages = parse_decls_ctypes(c_declaration, flags)
pretty_messages = "\n".join(messages)
if errors > 0:
raise IDAError(f"Failed to parse type:\n{c_declaration}\n\nErrors:\n{pretty_messages}")
return f"success\n\nInfo:\n{pretty_messages}"
@jsonrpc
@idawrite
def set_local_variable_type(
function_address: Annotated[str, "Address of the decompiled function containing the variable"],
variable_name: Annotated[str, "Name of the variable"],
new_type: Annotated[str, "New type for the variable"],
):
"""Set a local variable's type"""
try:
# Some versions of IDA don't support this constructor
new_tif = ida_typeinf.tinfo_t(new_type, None, ida_typeinf.PT_SIL)
except Exception:
try:
new_tif = ida_typeinf.tinfo_t()
# parse_decl requires semicolon for the type
ida_typeinf.parse_decl(new_tif, None, new_type + ";", ida_typeinf.PT_SIL)
except Exception:
raise IDAError(f"Failed to parse type: {new_type}")
func = idaapi.get_func(parse_address(function_address))
if not func:
raise IDAError(f"No function found at address {function_address}")
if not ida_hexrays.rename_lvar(func.start_ea, variable_name, variable_name):
raise IDAError(f"Failed to find local variable: {variable_name}")
modifier = my_modifier_t(variable_name, new_tif)
if not ida_hexrays.modify_user_lvars(func.start_ea, modifier):
raise IDAError(f"Failed to modify local variable: {variable_name}")
refresh_decompiler_ctext(func.start_ea)
class StackFrameVariable(TypedDict):
name: str
offset: str
size: str
type: str
@jsonrpc
@idaread
def get_stack_frame_variables(
function_address: Annotated[str, "Address of the disassembled function to retrieve the stack frame variables"]
) -> list[StackFrameVariable]:
""" Retrieve the stack frame variables for a given function """
return get_stack_frame_variables_internal(parse_address(function_address))
def get_stack_frame_variables_internal(function_address: int) -> list[dict]:
func = idaapi.get_func(function_address)
if not func:
raise IDAError(f"No function found at address {function_address}")
members = []
tif = ida_typeinf.tinfo_t()
if not tif.get_type_by_tid(func.frame) or not tif.is_udt():
return []
udt = ida_typeinf.udt_type_data_t()
tif.get_udt_details(udt)
for udm in udt:
if not udm.is_gap():
name = udm.name
offset = udm.offset // 8
size = udm.size // 8
type = str(udm.type)
members += [StackFrameVariable(name=name,
offset=hex(offset),
size=hex(size),
type=type)
]
return members
class StructureMember(TypedDict):
name: str
offset: str
size: str
type: str
class StructureDefinition(TypedDict):
name: str
size: str
members: list[StructureMember]
@jsonrpc
@idaread
def get_defined_structures() -> list[StructureDefinition]:
""" Returns a list of all defined structures """
rv = []
limit = ida_typeinf.get_ordinal_limit()
for ordinal in range(1, limit):
tif = ida_typeinf.tinfo_t()
tif.get_numbered_type(None, ordinal)
if tif.is_udt():
udt = ida_typeinf.udt_type_data_t()
members = []
if tif.get_udt_details(udt):
members = [
StructureMember(name=x.name,
offset=hex(x.offset // 8),
size=hex(x.size // 8),
type=str(x.type))
for _, x in enumerate(udt)
]
rv += [StructureDefinition(name=tif.get_type_name(),
size=hex(tif.get_size()),
members=members)]
return rv
@jsonrpc
@idawrite
def rename_stack_frame_variable(
function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"],
old_name: Annotated[str, "Current name of the variable"],
new_name: Annotated[str, "New name for the variable (empty for a default name)"]
):
""" Change the name of a stack variable for an IDA function """
func = idaapi.get_func(parse_address(function_address))
if not func:
raise IDAError(f"No function found at address {function_address}")
frame_tif = ida_typeinf.tinfo_t()
if not ida_frame.get_func_frame(frame_tif, func):
raise IDAError("No frame returned.")
idx, udm = frame_tif.get_udm(old_name)
if not udm:
raise IDAError(f"{old_name} not found.")
tid = frame_tif.get_udm_tid(idx)
if ida_frame.is_special_frame_member(tid):
raise IDAError(f"{old_name} is a special frame member. Will not change the name.")
udm = ida_typeinf.udm_t()
frame_tif.get_udm_by_tid(udm, tid)
offset = udm.offset // 8
if ida_frame.is_funcarg_off(func, offset):
raise IDAError(f"{old_name} is an argument member. Will not change the name.")
sval = ida_frame.soff_to_fpoff(func, offset)
if not ida_frame.define_stkvar(func, new_name, sval, udm.type):
raise IDAError("failed to rename stack frame variable")
@jsonrpc
@idawrite
def create_stack_frame_variable(
function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"],
offset: Annotated[str, "Offset of the stack frame variable"],
variable_name: Annotated[str, "Name of the stack variable"],
type_name: Annotated[str, "Type of the stack variable"]
):
""" For a given function, create a stack variable at an offset and with a specific type """
func = idaapi.get_func(parse_address(function_address))
if not func:
raise IDAError(f"No function found at address {function_address}")
offset = parse_address(offset)
frame_tif = ida_typeinf.tinfo_t()
if not ida_frame.get_func_frame(frame_tif, func):
raise IDAError("No frame returned.")
tif = get_type_by_name(type_name)
if not ida_frame.define_stkvar(func, variable_name, offset, tif):
raise IDAError("failed to define stack frame variable")
@jsonrpc
@idawrite
def set_stack_frame_variable_type(
function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"],
variable_name: Annotated[str, "Name of the stack variable"],
type_name: Annotated[str, "Type of the stack variable"]
):
""" For a given disassembled function, set the type of a stack variable """
func = idaapi.get_func(parse_address(function_address))
if not func:
raise IDAError(f"No function found at address {function_address}")
frame_tif = ida_typeinf.tinfo_t()
if not ida_frame.get_func_frame(frame_tif, func):
raise IDAError("No frame returned.")
idx, udm = frame_tif.get_udm(variable_name)
if not udm:
raise IDAError(f"{variable_name} not found.")
tid = frame_tif.get_udm_tid(idx)
udm = ida_typeinf.udm_t()
frame_tif.get_udm_by_tid(udm, tid)
offset = udm.offset // 8
tif = get_type_by_name(type_name)
if not ida_frame.set_frame_member_type(func, offset, tif):
raise IDAError("failed to set stack frame variable type")
@jsonrpc
@idawrite
def delete_stack_frame_variable(
function_address: Annotated[str, "Address of the function to set the stack frame variables"],
variable_name: Annotated[str, "Name of the stack variable"]
):
""" Delete the named stack variable for a given function """
func = idaapi.get_func(parse_address(function_address))
if not func:
raise IDAError(f"No function found at address {function_address}")
frame_tif = ida_typeinf.tinfo_t()
if not ida_frame.get_func_frame(frame_tif, func):
raise IDAError("No frame returned.")
idx, udm = frame_tif.get_udm(variable_name)
if not udm:
raise IDAError(f"{variable_name} not found.")
tid = frame_tif.get_udm_tid(idx)
if ida_frame.is_special_frame_member(tid):
raise IDAError(f"{variable_name} is a special frame member. Will not delete.")
udm = ida_typeinf.udm_t()
frame_tif.get_udm_by_tid(udm, tid)
offset = udm.offset // 8
size = udm.size // 8
if ida_frame.is_funcarg_off(func, offset):
raise IDAError(f"{variable_name} is an argument member. Will not delete.")
if not ida_frame.delete_frame_members(func, offset, offset+size):
raise IDAError("failed to delete stack frame variable")
@jsonrpc
@idaread
def read_memory_bytes(
memory_address: Annotated[str, "Address of the memory value to be read"],
size: Annotated[int, "size of memory to read"]
) -> str:
"""
Read bytes at a given address.
Only use this function if `get_global_variable_at` and `get_global_variable_by_name`
both failed.
"""
return ' '.join(f'{x:#02x}' for x in ida_bytes.get_bytes(parse_address(memory_address), size))
@jsonrpc
@idaread
def data_read_byte(
address: Annotated[str, "Address to get 1 byte value from"],
) -> int:
"""
Read the 1 byte value at the specified address.
Only use this function if `get_global_variable_at` failed.
"""
ea = parse_address(address)
return ida_bytes.get_wide_byte(ea)
@jsonrpc
@idaread
def data_read_word(
address: Annotated[str, "Address to get 2 bytes value from"],
) -> int:
"""
Read the 2 byte value at the specified address as a WORD.
Only use this function if `get_global_variable_at` failed.
"""
ea = parse_address(address)
return ida_bytes.get_wide_word(ea)
@jsonrpc
@idaread
def data_read_dword(
address: Annotated[str, "Address to get 4 bytes value from"],
) -> int:
"""
Read the 4 byte value at the specified address as a DWORD.
Only use this function if `get_global_variable_at` failed.
"""
ea = parse_address(address)
return ida_bytes.get_wide_dword(ea)
@jsonrpc
@idaread
def data_read_qword(
address: Annotated[str, "Address to get 8 bytes value from"]
) -> int:
"""
Read the 8 byte value at the specified address as a QWORD.
Only use this function if `get_global_variable_at` failed.
"""
ea = parse_address(address)
return ida_bytes.get_qword(ea)
@jsonrpc
@idaread
def data_read_string(
address: Annotated[str, "Address to get string from"]
) -> str:
"""
Read the string at the specified address.
Only use this function if `get_global_variable_at` failed.
"""
try:
return idaapi.get_strlit_contents(parse_address(address),-1,0).decode("utf-8")
except Exception as e:
return "Error:" + str(e)
@jsonrpc
@idaread
@unsafe
def dbg_get_registers() -> list[dict[str, str]]:
"""Get all registers and their values. This function is only available when debugging."""
result = []
dbg = ida_idd.get_dbg()
# TODO: raise an exception when not debugging?
for thread_index in range(ida_dbg.get_thread_qty()):
tid = ida_dbg.getn_thread(thread_index)
regs = []
regvals = ida_dbg.get_reg_vals(tid)
for reg_index, rv in enumerate(regvals):
reg_info = dbg.regs(reg_index)
reg_value = rv.pyval(reg_info.dtype)
if isinstance(reg_value, int):
reg_value = hex(reg_value)
if isinstance(reg_value, bytes):
reg_value = reg_value.hex(" ")
regs.append({
"name": reg_info.name,
"value": reg_value,
})
result.append({
"thread_id": tid,
"registers": regs,
})
return result
@jsonrpc
@idaread
@unsafe
def dbg_get_call_stack() -> list[dict[str, str]]:
"""Get the current call stack."""
callstack = []
try:
tid = ida_dbg.get_current_thread()
trace = ida_idd.call_stack_t()
if not ida_dbg.collect_stack_trace(tid, trace):
return []
for frame in trace:
frame_info = {
"address": hex(frame.callea),
}
try:
module_info = ida_idd.modinfo_t()
if ida_dbg.get_module_info(frame.callea, module_info):
frame_info["module"] = os.path.basename(module_info.name)
else:
frame_info["module"] = "<unknown>"
name = (
ida_name.get_nice_colored_name(
frame.callea,
ida_name.GNCN_NOCOLOR
| ida_name.GNCN_NOLABEL
| ida_name.GNCN_NOSEG
| ida_name.GNCN_PREFDBG,
)
or "<unnamed>"
)
frame_info["symbol"] = name
except Exception as e:
frame_info["module"] = "<error>"
frame_info["symbol"] = str(e)
callstack.append(frame_info)
except Exception as e:
pass
return callstack
def list_breakpoints():
ea = ida_ida.inf_get_min_ea()
end_ea = ida_ida.inf_get_max_ea()
breakpoints = []
while ea <= end_ea:
bpt = ida_dbg.bpt_t()
if ida_dbg.get_bpt(ea, bpt):
breakpoints.append(
{
"ea": hex(bpt.ea),
"type": bpt.type,
"enabled": bpt.flags & ida_dbg.BPT_ENABLED,
"condition": bpt.condition if bpt.condition else None,
}
)
ea = ida_bytes.next_head(ea, end_ea)
return breakpoints
@jsonrpc
@idaread
@unsafe
def dbg_list_breakpoints():
"""List all breakpoints in the program."""
return list_breakpoints()
@jsonrpc
@idaread
@unsafe
def dbg_start_process() -> str:
"""Start the debugger"""
if idaapi.start_process("", "", ""):
return "Debugger started"
return "Failed to start debugger"
@jsonrpc
@idaread
@unsafe
def dbg_exit_process() -> str:
"""Exit the debugger"""
if idaapi.exit_process():
return "Debugger exited"
return "Failed to exit debugger"
@jsonrpc
@idaread
@unsafe
def dbg_continue_process() -> str:
"""Continue the debugger"""
if idaapi.continue_process():
return "Debugger continued"
return "Failed to continue debugger"
@jsonrpc
@idaread
@unsafe
def dbg_run_to(
address: Annotated[str, "Run the debugger to the specified address"],
) -> str:
"""Run the debugger to the specified address"""
ea = parse_address(address)
if idaapi.run_to(ea):
return f"Debugger run to {hex(ea)}"
return f"Failed to run to address {hex(ea)}"
@jsonrpc
@idaread
@unsafe
def dbg_set_breakpoint(
address: Annotated[str, "Set a breakpoint at the specified address"],
) -> str:
"""Set a breakpoint at the specified address"""
ea = parse_address(address)
if idaapi.add_bpt(ea, 0, idaapi.BPT_SOFT):
return f"Breakpoint set at {hex(ea)}"
breakpoints = list_breakpoints()
for bpt in breakpoints:
if bpt["ea"] == hex(ea):
return f"Breakpoint already exists at {hex(ea)}"
return f"Failed to set breakpoint at address {hex(ea)}"
@jsonrpc
@idaread
@unsafe
def dbg_delete_breakpoint(
address: Annotated[str, "del a breakpoint at the specified address"],
) -> str:
"""del a breakpoint at the specified address"""
ea = parse_address(address)
if idaapi.del_bpt(ea):
return f"Breakpoint deleted at {hex(ea)}"
return f"Failed to delete breakpoint at address {hex(ea)}"
@jsonrpc
@idaread
@unsafe
def dbg_enable_breakpoint(
address: Annotated[str, "Enable or disable a breakpoint at the specified address"],
enable: Annotated[bool, "Enable or disable a breakpoint"],
) -> str:
"""Enable or disable a breakpoint at the specified address"""
ea = parse_address(address)
if idaapi.enable_bpt(ea, enable):
return f"Breakpoint {'enabled' if enable else 'disabled'} at {hex(ea)}"
return f"Failed to {'' if enable else 'disable '}breakpoint at address {hex(ea)}"
class MCP(idaapi.plugin_t):
flags = idaapi.PLUGIN_KEEP
comment = "MCP Plugin"
help = "MCP"
wanted_name = "MCP"
wanted_hotkey = "Ctrl-Alt-M"
def init(self):
self.server = Server()
hotkey = MCP.wanted_hotkey.replace("-", "+")
if sys.platform == "darwin":
hotkey = hotkey.replace("Alt", "Option")
print(f"[MCP] Plugin loaded, use Edit -> Plugins -> MCP ({hotkey}) to start the server")
return idaapi.PLUGIN_KEEP
def run(self, args):
self.server.start()
def term(self):
self.server.stop()
def PLUGIN_ENTRY():
return MCP()