Skip to main content
Glama

IDA Pro MCP

MIT License
3,847
  • Linux
  • Apple
mcp-plugin.py77.1 kB
import os import sys if sys.version_info < (3, 11): raise RuntimeError("Python 3.11 or higher is required for the MCP plugin") import json import struct import threading import http.server from urllib.parse import urlparse from typing import ( Any, Callable, get_type_hints, TypedDict, Optional, Annotated, TypeVar, Generic, NotRequired, overload, Literal, ) class JSONRPCError(Exception): def __init__(self, code: int, message: str, data: Any = None): self.code = code self.message = message self.data = data class RPCRegistry: def __init__(self): self.methods: dict[str, Callable] = {} self.unsafe: set[str] = set() def register(self, func: Callable) -> Callable: self.methods[func.__name__] = func return func def mark_unsafe(self, func: Callable) -> Callable: self.unsafe.add(func.__name__) return func def dispatch(self, method: str, params: Any) -> Any: if method not in self.methods: raise JSONRPCError(-32601, f"Method '{method}' not found") func = self.methods[method] hints = get_type_hints(func) # Remove return annotation if present hints.pop("return", None) if isinstance(params, list): if len(params) != len(hints): raise JSONRPCError(-32602, f"Invalid params: expected {len(hints)} arguments, got {len(params)}") # Validate and convert parameters converted_params = [] for value, (param_name, expected_type) in zip(params, hints.items()): try: if not isinstance(value, expected_type): value = expected_type(value) converted_params.append(value) except (ValueError, TypeError): raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}") return func(*converted_params) elif isinstance(params, dict): if set(params.keys()) != set(hints.keys()): raise JSONRPCError(-32602, f"Invalid params: expected {list(hints.keys())}") # Validate and convert parameters converted_params = {} for param_name, expected_type in hints.items(): value = params.get(param_name) try: if not isinstance(value, expected_type): value = expected_type(value) converted_params[param_name] = value except (ValueError, TypeError): raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}") return func(**converted_params) else: raise JSONRPCError(-32600, "Invalid Request: params must be array or object") rpc_registry = RPCRegistry() def jsonrpc(func: Callable) -> Callable: """Decorator to register a function as a JSON-RPC method""" global rpc_registry return rpc_registry.register(func) def unsafe(func: Callable) -> Callable: """Decorator to register mark a function as unsafe""" return rpc_registry.mark_unsafe(func) class JSONRPCRequestHandler(http.server.BaseHTTPRequestHandler): def send_jsonrpc_error(self, code: int, message: str, id: Any = None): response = { "jsonrpc": "2.0", "error": { "code": code, "message": message } } if id is not None: response["id"] = id response_body = json.dumps(response).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(response_body))) self.end_headers() self.wfile.write(response_body) def do_POST(self): global rpc_registry parsed_path = urlparse(self.path) if parsed_path.path != "/mcp": self.send_jsonrpc_error(-32098, "Invalid endpoint", None) return content_length = int(self.headers.get("Content-Length", 0)) if content_length == 0: self.send_jsonrpc_error(-32700, "Parse error: missing request body", None) return request_body = self.rfile.read(content_length) try: request = json.loads(request_body) except json.JSONDecodeError: self.send_jsonrpc_error(-32700, "Parse error: invalid JSON", None) return # Prepare the response response: dict[str, Any] = { "jsonrpc": "2.0" } if request.get("id") is not None: response["id"] = request.get("id") try: # Basic JSON-RPC validation if not isinstance(request, dict): raise JSONRPCError(-32600, "Invalid Request") if request.get("jsonrpc") != "2.0": raise JSONRPCError(-32600, "Invalid JSON-RPC version") if "method" not in request: raise JSONRPCError(-32600, "Method not specified") # Dispatch the method result = rpc_registry.dispatch(request["method"], request.get("params", [])) response["result"] = result except JSONRPCError as e: response["error"] = { "code": e.code, "message": e.message } if e.data is not None: response["error"]["data"] = e.data except IDAError as e: response["error"] = { "code": -32000, "message": e.message, } except Exception as e: traceback.print_exc() response["error"] = { "code": -32603, "message": "Internal error (please report a bug)", "data": traceback.format_exc(), } try: response_body = json.dumps(response).encode("utf-8") except Exception as e: traceback.print_exc() response_body = json.dumps({ "error": { "code": -32603, "message": "Internal error (please report a bug)", "data": traceback.format_exc(), } }).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(response_body))) self.end_headers() self.wfile.write(response_body) def log_message(self, format, *args): # Suppress logging pass class MCPHTTPServer(http.server.HTTPServer): allow_reuse_address = False class Server: HOST = "localhost" PORT = 13337 def __init__(self): self.server = None self.server_thread = None self.running = False def start(self): if self.running: print("[MCP] Server is already running") return self.server_thread = threading.Thread(target=self._run_server, daemon=True) self.running = True self.server_thread.start() def stop(self): if not self.running: return self.running = False if self.server: self.server.shutdown() self.server.server_close() if self.server_thread: self.server_thread.join() self.server = None print("[MCP] Server stopped") def _run_server(self): try: # Create server in the thread to handle binding self.server = MCPHTTPServer((Server.HOST, Server.PORT), JSONRPCRequestHandler) print(f"[MCP] Server started at http://{Server.HOST}:{Server.PORT}") self.server.serve_forever() except OSError as e: if e.errno == 98 or e.errno == 10048: # Port already in use (Linux/Windows) print("[MCP] Error: Port 13337 is already in use") else: print(f"[MCP] Server error: {e}") self.running = False except Exception as e: print(f"[MCP] Server error: {e}") finally: self.running = False # A module that helps with writing thread safe ida code. # Based on: # https://web.archive.org/web/20160305190440/http://www.williballenthin.com/blog/2015/09/04/idapython-synchronization-decorator/ import logging import queue import traceback import functools from enum import IntEnum, IntFlag import ida_hexrays import ida_kernwin import ida_funcs import ida_gdl import ida_lines import ida_idaapi import idc import idaapi import idautils import ida_nalt import ida_bytes import ida_typeinf import ida_xref import ida_entry import idautils import ida_idd import ida_dbg import ida_name import ida_ida import ida_frame ida_major, ida_minor = map(int, idaapi.get_kernel_version().split(".")) class IDAError(Exception): def __init__(self, message: str): super().__init__(message) @property def message(self) -> str: return self.args[0] class IDASyncError(Exception): pass # Important note: Always make sure the return value from your function f is a # copy of the data you have gotten from IDA, and not the original data. # # Example: # -------- # # Do this: # # @idaread # def ts_Functions(): # return list(idautils.Functions()) # # Don't do this: # # @idaread # def ts_Functions(): # return idautils.Functions() # logger = logging.getLogger(__name__) # Enum for safety modes. Higher means safer: class IDASafety(IntEnum): SAFE_NONE = ida_kernwin.MFF_FAST SAFE_READ = ida_kernwin.MFF_READ SAFE_WRITE = ida_kernwin.MFF_WRITE call_stack = queue.LifoQueue() def sync_wrapper(ff, safety_mode: IDASafety): """ Call a function ff with a specific IDA safety_mode. """ #logger.debug('sync_wrapper: {}, {}'.format(ff.__name__, safety_mode)) if safety_mode not in [IDASafety.SAFE_READ, IDASafety.SAFE_WRITE]: error_str = 'Invalid safety mode {} over function {}'\ .format(safety_mode, ff.__name__) logger.error(error_str) raise IDASyncError(error_str) # No safety level is set up: res_container = queue.Queue() def runned(): #logger.debug('Inside runned') # Make sure that we are not already inside a sync_wrapper: if not call_stack.empty(): last_func_name = call_stack.get() error_str = ('Call stack is not empty while calling the ' 'function {} from {}').format(ff.__name__, last_func_name) #logger.error(error_str) raise IDASyncError(error_str) call_stack.put((ff.__name__)) try: res_container.put(ff()) except Exception as x: res_container.put(x) finally: call_stack.get() #logger.debug('Finished runned') ret_val = idaapi.execute_sync(runned, safety_mode) res = res_container.get() if isinstance(res, Exception): raise res return res def idawrite(f): """ decorator for marking a function as modifying the IDB. schedules a request to be made in the main IDA loop to avoid IDB corruption. """ @functools.wraps(f) def wrapper(*args, **kwargs): ff = functools.partial(f, *args, **kwargs) ff.__name__ = f.__name__ # type: ignore return sync_wrapper(ff, idaapi.MFF_WRITE) return wrapper def idaread(f): """ decorator for marking a function as reading from the IDB. schedules a request to be made in the main IDA loop to avoid inconsistent results. MFF_READ constant via: http://www.openrce.org/forums/posts/1827 """ @functools.wraps(f) def wrapper(*args, **kwargs): ff = functools.partial(f, *args, **kwargs) ff.__name__ = f.__name__ # type: ignore return sync_wrapper(ff, idaapi.MFF_READ) return wrapper def is_window_active(): """Returns whether IDA is currently active""" try: from PyQt5.QtWidgets import QApplication except ImportError: return False app = QApplication.instance() if app is None: return False for widget in app.topLevelWidgets(): if widget.isActiveWindow(): return True return False class Metadata(TypedDict): path: str module: str base: str size: str md5: str sha256: str crc32: str filesize: str def get_image_size() -> int: try: # https://www.hex-rays.com/products/ida/support/sdkdoc/structidainfo.html info = idaapi.get_inf_structure() # type: ignore omin_ea = info.omin_ea omax_ea = info.omax_ea except AttributeError: import ida_ida omin_ea = ida_ida.inf_get_omin_ea() omax_ea = ida_ida.inf_get_omax_ea() # Bad heuristic for image size (bad if the relocations are the last section) image_size = omax_ea - omin_ea # Try to extract it from the PE header header = idautils.peutils_t().header() if header and header[:4] == b"PE\0\0": image_size = struct.unpack("<I", header[0x50:0x54])[0] return image_size @jsonrpc @idaread def get_metadata() -> Metadata: """Get metadata about the current IDB""" # Fat Mach-O binaries can return a None hash: # https://github.com/mrexodia/ida-pro-mcp/issues/26 def hash(f): try: return f().hex() except: return "" return Metadata(path=idaapi.get_input_file_path(), module=idaapi.get_root_filename(), base=hex(idaapi.get_imagebase()), size=hex(get_image_size()), md5=hash(ida_nalt.retrieve_input_file_md5), sha256=hash(ida_nalt.retrieve_input_file_sha256), crc32=hex(ida_nalt.retrieve_input_file_crc32()), filesize=hex(ida_nalt.retrieve_input_file_size())) def get_prototype(fn: ida_funcs.func_t) -> Optional[str]: try: prototype: ida_typeinf.tinfo_t = fn.get_prototype() if prototype is not None: return str(prototype) else: return None except AttributeError: try: return idc.get_type(fn.start_ea) except: tif = ida_typeinf.tinfo_t() if ida_nalt.get_tinfo(tif, fn.start_ea): return str(tif) return None except Exception as e: print(f"Error getting function prototype: {e}") return None class Function(TypedDict): address: str name: str size: str def parse_address(address: str | int) -> int: if isinstance(address, int): return address try: return int(address, 0) except ValueError: for ch in address: if ch not in "0123456789abcdefABCDEF": raise IDAError(f"Failed to parse address: {address}") raise IDAError(f"Failed to parse address (missing 0x prefix): {address}") @overload def get_function(address: int, *, raise_error: Literal[True]) -> Function: ... @overload def get_function(address: int) -> Function: ... @overload def get_function(address: int, *, raise_error: Literal[False]) -> Optional[Function]: ... def get_function(address, *, raise_error=True): fn = idaapi.get_func(address) if fn is None: if raise_error: raise IDAError(f"No function found at address {hex(address)}") return None try: name = fn.get_name() except AttributeError: name = ida_funcs.get_func_name(fn.start_ea) return Function(address=hex(address), name=name, size=hex(fn.end_ea - fn.start_ea)) DEMANGLED_TO_EA = {} def create_demangled_to_ea_map(): for ea in idautils.Functions(): # Get the function name and demangle it # MNG_NODEFINIT inhibits everything except the main name # where default demangling adds the function signature # and decorators (if any) demangled = idaapi.demangle_name( idc.get_name(ea, 0), idaapi.MNG_NODEFINIT) if demangled: DEMANGLED_TO_EA[demangled] = ea def get_type_by_name(type_name: str) -> ida_typeinf.tinfo_t: # 8-bit integers if type_name in ('int8', '__int8', 'int8_t', 'char', 'signed char'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT8) elif type_name in ('uint8', '__uint8', 'uint8_t', 'unsigned char', 'byte', 'BYTE'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT8) # 16-bit integers elif type_name in ('int16', '__int16', 'int16_t', 'short', 'short int', 'signed short', 'signed short int'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT16) elif type_name in ('uint16', '__uint16', 'uint16_t', 'unsigned short', 'unsigned short int', 'word', 'WORD'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT16) # 32-bit integers elif type_name in ('int32', '__int32', 'int32_t', 'int', 'signed int', 'long', 'long int', 'signed long', 'signed long int'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT32) elif type_name in ('uint32', '__uint32', 'uint32_t', 'unsigned int', 'unsigned long', 'unsigned long int', 'dword', 'DWORD'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT32) # 64-bit integers elif type_name in ('int64', '__int64', 'int64_t', 'long long', 'long long int', 'signed long long', 'signed long long int'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT64) elif type_name in ('uint64', '__uint64', 'uint64_t', 'unsigned int64', 'unsigned long long', 'unsigned long long int', 'qword', 'QWORD'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT64) # 128-bit integers elif type_name in ('int128', '__int128', 'int128_t', '__int128_t'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT128) elif type_name in ('uint128', '__uint128', 'uint128_t', '__uint128_t', 'unsigned int128'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT128) # Floating point types elif type_name in ('float', ): return ida_typeinf.tinfo_t(ida_typeinf.BTF_FLOAT) elif type_name in ('double', ): return ida_typeinf.tinfo_t(ida_typeinf.BTF_DOUBLE) elif type_name in ('long double', 'ldouble'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_LDOUBLE) # Boolean type elif type_name in ('bool', '_Bool', 'boolean'): return ida_typeinf.tinfo_t(ida_typeinf.BTF_BOOL) # Void type elif type_name in ('void', ): return ida_typeinf.tinfo_t(ida_typeinf.BTF_VOID) # If not a standard type, try to get a named type tif = ida_typeinf.tinfo_t() if tif.get_named_type(None, type_name, ida_typeinf.BTF_STRUCT): return tif if tif.get_named_type(None, type_name, ida_typeinf.BTF_TYPEDEF): return tif if tif.get_named_type(None, type_name, ida_typeinf.BTF_ENUM): return tif if tif.get_named_type(None, type_name, ida_typeinf.BTF_UNION): return tif if tif := ida_typeinf.tinfo_t(type_name): return tif raise IDAError(f"Unable to retrieve {type_name} type info object") @jsonrpc @idaread def get_function_by_name( name: Annotated[str, "Name of the function to get"] ) -> Function: """Get a function by its name""" function_address = idaapi.get_name_ea(idaapi.BADADDR, name) if function_address == idaapi.BADADDR: # If map has not been created yet, create it if len(DEMANGLED_TO_EA) == 0: create_demangled_to_ea_map() # Try to find the function in the map, else raise an error if name in DEMANGLED_TO_EA: function_address = DEMANGLED_TO_EA[name] else: raise IDAError(f"No function found with name {name}") return get_function(function_address) @jsonrpc @idaread def get_function_by_address( address: Annotated[str, "Address of the function to get"], ) -> Function: """Get a function by its address""" return get_function(parse_address(address)) @jsonrpc @idaread def get_current_address() -> str: """Get the address currently selected by the user""" return hex(idaapi.get_screen_ea()) @jsonrpc @idaread def get_current_function() -> Optional[Function]: """Get the function currently selected by the user""" return get_function(idaapi.get_screen_ea()) class ConvertedNumber(TypedDict): decimal: str hexadecimal: str bytes: str ascii: Optional[str] binary: str @jsonrpc def convert_number( text: Annotated[str, "Textual representation of the number to convert"], size: Annotated[Optional[int], "Size of the variable in bytes"], ) -> ConvertedNumber: """Convert a number (decimal, hexadecimal) to different representations""" try: value = int(text, 0) except ValueError: raise IDAError(f"Invalid number: {text}") # Estimate the size of the number if not size: size = 0 n = abs(value) while n: size += 1 n >>= 1 size += 7 size //= 8 # Convert the number to bytes try: bytes = value.to_bytes(size, "little", signed=True) except OverflowError: raise IDAError(f"Number {text} is too big for {size} bytes") # Convert the bytes to ASCII ascii = "" for byte in bytes.rstrip(b"\x00"): if byte >= 32 and byte <= 126: ascii += chr(byte) else: ascii = None break return ConvertedNumber( decimal=str(value), hexadecimal=hex(value), bytes=bytes.hex(" "), ascii=ascii, binary=bin(value), ) T = TypeVar("T") class Page(TypedDict, Generic[T]): data: list[T] next_offset: Optional[int] def paginate(data: list[T], offset: int, count: int) -> Page[T]: if count == 0: count = len(data) next_offset = offset + count if next_offset >= len(data): next_offset = None return { "data": data[offset:offset + count], "next_offset": next_offset, } def pattern_filter(data: list[T], pattern: str, key: str) -> list[T]: if not pattern: return data # TODO: implement /regex/ matching def matches(item) -> bool: return pattern.lower() in item[key].lower() return list(filter(matches, data)) @jsonrpc @idaread def list_functions( offset: Annotated[int, "Offset to start listing from (start at 0)"], count: Annotated[int, "Number of functions to list (100 is a good default, 0 means remainder)"], ) -> Page[Function]: """List all functions in the database (paginated)""" functions = [get_function(address) for address in idautils.Functions()] return paginate(functions, offset, count) class Global(TypedDict): address: str name: str @jsonrpc @idaread def list_globals_filter( offset: Annotated[int, "Offset to start listing from (start at 0)"], count: Annotated[int, "Number of globals to list (100 is a good default, 0 means remainder)"], filter: Annotated[str, "Filter to apply to the list (required parameter, empty string for no filter). Case-insensitive contains or /regex/ syntax"], ) -> Page[Global]: """List matching globals in the database (paginated, filtered)""" globals: list[Global] = [] for addr, name in idautils.Names(): # Skip functions and none if not idaapi.get_func(addr) or name is None: globals += [Global(address=hex(addr), name=name)] globals = pattern_filter(globals, filter, "name") return paginate(globals, offset, count) @jsonrpc def list_globals( offset: Annotated[int, "Offset to start listing from (start at 0)"], count: Annotated[int, "Number of globals to list (100 is a good default, 0 means remainder)"], ) -> Page[Global]: """List all globals in the database (paginated)""" return list_globals_filter(offset, count, "") class Import(TypedDict): address: str imported_name: str module: str @jsonrpc @idaread def list_imports( offset: Annotated[int, "Offset to start listing from (start at 0)"], count: Annotated[int, "Number of imports to list (100 is a good default, 0 means remainder)"], ) -> Page[Import]: """ List all imported symbols with their name and module (paginated) """ nimps = ida_nalt.get_import_module_qty() rv = [] for i in range(nimps): module_name = ida_nalt.get_import_module_name(i) if not module_name: module_name = "<unnamed>" def imp_cb(ea, symbol_name, ordinal, acc): if not symbol_name: symbol_name = f"#{ordinal}" acc += [Import(address=hex(ea), imported_name=symbol_name, module=module_name)] return True imp_cb_w_context = lambda ea, symbol_name, ordinal: imp_cb(ea, symbol_name, ordinal, rv) ida_nalt.enum_import_names(i, imp_cb_w_context) return paginate(rv, offset, count) class String(TypedDict): address: str length: int string: str @jsonrpc @idaread def list_strings_filter( offset: Annotated[int, "Offset to start listing from (start at 0)"], count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], filter: Annotated[str, "Filter to apply to the list (required parameter, empty string for no filter). Case-insensitive contains or /regex/ syntax"], ) -> Page[String]: """List matching strings in the database (paginated, filtered)""" strings: list[String] = [] for item in idautils.Strings(): if item is None: continue try: string = str(item) if string: strings += [ String(address=hex(item.ea), length=item.length, string=string), ] except: continue strings = pattern_filter(strings, filter, "string") return paginate(strings, offset, count) @jsonrpc def list_strings( offset: Annotated[int, "Offset to start listing from (start at 0)"], count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], ) -> Page[String]: """List all strings in the database (paginated)""" return list_strings_filter(offset, count, "") @jsonrpc @idaread def list_local_types(): """List all Local types in the database""" error = ida_hexrays.hexrays_failure_t() locals = [] idati = ida_typeinf.get_idati() type_count = ida_typeinf.get_ordinal_limit(idati) for ordinal in range(1, type_count): try: tif = ida_typeinf.tinfo_t() if tif.get_numbered_type(idati, ordinal): type_name = tif.get_type_name() if not type_name: type_name = f"<Anonymous Type #{ordinal}>" locals.append(f"\nType #{ordinal}: {type_name}") if tif.is_udt(): c_decl_flags = (ida_typeinf.PRTYPE_MULTI | ida_typeinf.PRTYPE_TYPE | ida_typeinf.PRTYPE_SEMI | ida_typeinf.PRTYPE_DEF | ida_typeinf.PRTYPE_METHODS | ida_typeinf.PRTYPE_OFFSETS) c_decl_output = tif._print(None, c_decl_flags) if c_decl_output: locals.append(f" C declaration:\n{c_decl_output}") else: simple_decl = tif._print(None, ida_typeinf.PRTYPE_1LINE | ida_typeinf.PRTYPE_TYPE | ida_typeinf.PRTYPE_SEMI) if simple_decl: locals.append(f" Simple declaration:\n{simple_decl}") else: message = f"\nType #{ordinal}: Failed to retrieve information." if error.str: message += f": {error.str}" if error.errea != idaapi.BADADDR: message += f"from (address: {hex(error.errea)})" raise IDAError(message) except: continue return locals def decompile_checked(address: int) -> ida_hexrays.cfunc_t: if not ida_hexrays.init_hexrays_plugin(): raise IDAError("Hex-Rays decompiler is not available") error = ida_hexrays.hexrays_failure_t() cfunc = ida_hexrays.decompile_func(address, error, ida_hexrays.DECOMP_WARNINGS) if not cfunc: if error.code == ida_hexrays.MERR_LICENSE: raise IDAError("Decompiler license is not available. Use `disassemble_function` to get the assembly code instead.") message = f"Decompilation failed at {hex(address)}" if error.str: message += f": {error.str}" if error.errea != idaapi.BADADDR: message += f" (address: {hex(error.errea)})" raise IDAError(message) return cfunc # type: ignore (this is a SWIG issue) @jsonrpc @idaread def decompile_function( address: Annotated[str, "Address of the function to decompile"], ) -> str: """Decompile a function at the given address""" start = parse_address(address) cfunc = decompile_checked(start) if is_window_active(): ida_hexrays.open_pseudocode(start, ida_hexrays.OPF_REUSE) sv = cfunc.get_pseudocode() pseudocode = "" for i, sl in enumerate(sv): sl: ida_kernwin.simpleline_t item = ida_hexrays.ctree_item_t() addr = None if i > 0 else cfunc.entry_ea if cfunc.get_line_item(sl.line, 0, False, None, item, None): # type: ignore (IDA SDK type hint wrong) dstr: str | None = item.dstr() if dstr: ds = dstr.split(": ") if len(ds) == 2: try: addr = int(ds[0], 16) except ValueError: pass line = ida_lines.tag_remove(sl.line) if len(pseudocode) > 0: pseudocode += "\n" if not addr: pseudocode += f"/* line: {i} */ {line}" else: pseudocode += f"/* line: {i}, address: {hex(addr)} */ {line}" return pseudocode class DisassemblyLine(TypedDict): segment: NotRequired[str] address: str label: NotRequired[str] instruction: str comments: NotRequired[list[str]] class Argument(TypedDict): name: str type: str class StackFrameVariable(TypedDict): name: str offset: str size: str type: str class DisassemblyFunction(TypedDict): name: str start_ea: str return_type: NotRequired[str] arguments: NotRequired[list[Argument]] stack_frame: list[StackFrameVariable] lines: list[DisassemblyLine] @jsonrpc @idaread def disassemble_function( start_address: Annotated[str, "Address of the function to disassemble"], ) -> DisassemblyFunction: """Get assembly code for a function (API-compatible with older IDA builds)""" start = parse_address(start_address) func = idaapi.get_func(start) if not func: raise IDAError(f"No function found at address {hex(start)}") if is_window_active(): ida_kernwin.jumpto(start) func_name: str = ida_funcs.get_func_name(func.start_ea) or "<unnamed>" lines: list[DisassemblyLine] = [] for ea in idautils.FuncItems(func.start_ea): if ea == idaapi.BADADDR: continue seg = idaapi.getseg(ea) segment: str | None = idaapi.get_segm_name(seg) if seg else None label: str | None = idc.get_name(ea, 0) if not label or (label == func_name and ea == func.start_ea): label = None comments: list[str] = [] c: str | None = idaapi.get_cmt(ea, False) if c: comments.append(c) c = idaapi.get_cmt(ea, True) if c: comments.append(c) mnem: str = idc.print_insn_mnem(ea) or "" ops: list[str] = [] for n in range(8): if idc.get_operand_type(ea, n) == idaapi.o_void: break ops.append(idc.print_operand(ea, n) or "") instruction = f"{mnem} {', '.join(ops)}".rstrip() line: DisassemblyLine = { "address": hex(ea), "instruction": instruction } if segment: line["segment"] = segment if label: line["label"] = label if comments: line["comments"] = comments lines.append(line) # prototype and args via tinfo (safe across versions) rettype = None args: Optional[list[Argument]] = None tif = ida_typeinf.tinfo_t() if ida_nalt.get_tinfo(tif, func.start_ea) and tif.is_func(): ftd = ida_typeinf.func_type_data_t() if tif.get_func_details(ftd): rettype = str(ftd.rettype) args = [Argument(name=(a.name or f"arg{i}"), type=str(a.type)) for i, a in enumerate(ftd)] out: DisassemblyFunction = { "name": func_name, "start_ea": hex(func.start_ea), "stack_frame": get_stack_frame_variables_internal(func.start_ea, False), "lines": lines, } if rettype: out["return_type"] = rettype if args is not None: out["arguments"] = args return out class Xref(TypedDict): address: str type: str function: Optional[Function] @jsonrpc @idaread def get_xrefs_to( address: Annotated[str, "Address to get cross references to"], ) -> list[Xref]: """Get all cross references to the given address""" xrefs = [] xref: ida_xref.xrefblk_t for xref in idautils.XrefsTo(parse_address(address)): # type: ignore (IDA SDK type hints are incorrect) xrefs += [ Xref(address=hex(xref.frm), type="code" if xref.iscode else "data", function=get_function(xref.frm, raise_error=False)) ] return xrefs @jsonrpc @idaread def get_xrefs_to_field( struct_name: Annotated[str, "Name of the struct (type) containing the field"], field_name: Annotated[str, "Name of the field (member) to get xrefs to"], ) -> list[Xref]: """Get all cross references to a named struct field (member)""" # Get the type library til = ida_typeinf.get_idati() if not til: raise IDAError("Failed to retrieve type library.") # Get the structure type info tif = ida_typeinf.tinfo_t() if not tif.get_named_type(til, struct_name, ida_typeinf.BTF_STRUCT, True, False): print(f"Structure '{struct_name}' not found.") return [] # Get The field index idx = ida_typeinf.get_udm_by_fullname(None, struct_name + '.' + field_name) # type: ignore (IDA SDK type hints are incorrect) if idx == -1: print(f"Field '{field_name}' not found in structure '{struct_name}'.") return [] # Get the type identifier tid = tif.get_udm_tid(idx) if tid == ida_idaapi.BADADDR: raise IDAError(f"Unable to get tid for structure '{struct_name}' and field '{field_name}'.") # Get xrefs to the tid xrefs = [] xref: ida_xref.xrefblk_t for xref in idautils.XrefsTo(tid): # type: ignore (IDA SDK type hints are incorrect) xrefs += [ Xref(address=hex(xref.frm), type="code" if xref.iscode else "data", function=get_function(xref.frm, raise_error=False)) ] return xrefs @jsonrpc @idaread def get_callees( function_address: Annotated[str, "Address of the function to get callee functions"], ) -> list[dict[str, str]]: """Get all the functions called (callees) by the function at function_address""" func_start = parse_address(function_address) func = idaapi.get_func(func_start) if not func: raise IDAError(f"No function found containing address {function_address}") func_end = idc.find_func_end(func_start) callees: list[dict[str, str]] = [] current_ea = func_start while current_ea < func_end: insn = idaapi.insn_t() idaapi.decode_insn(insn, current_ea) if insn.itype in [idaapi.NN_call, idaapi.NN_callfi, idaapi.NN_callni]: target = idc.get_operand_value(current_ea, 0) target_type = idc.get_operand_type(current_ea, 0) # check if it's a direct call - avoid getting the indirect call offset if target_type in [idaapi.o_mem, idaapi.o_near, idaapi.o_far]: # in here, we do not use get_function because the target can be external function. # but, we should mark the target as internal/external function. func_type = ( "internal" if idaapi.get_func(target) is not None else "external" ) func_name = idc.get_name(target) if func_name is not None: callees.append( {"address": hex(target), "name": func_name, "type": func_type} ) current_ea = idc.next_head(current_ea, func_end) # deduplicate callees unique_callee_tuples = {tuple(callee.items()) for callee in callees} unique_callees = [dict(callee) for callee in unique_callee_tuples] return unique_callees # type: ignore @jsonrpc @idaread def get_callers( function_address: Annotated[str, "Address of the function to get callers"], ) -> list[Function]: """Get all callers of the given address""" callers = {} for caller_address in idautils.CodeRefsTo(parse_address(function_address), 0): # validate the xref address is a function func = get_function(caller_address, raise_error=False) if not func: continue # load the instruction at the xref address insn = idaapi.insn_t() idaapi.decode_insn(insn, caller_address) # check the instruction is a call if insn.itype not in [idaapi.NN_call, idaapi.NN_callfi, idaapi.NN_callni]: continue # deduplicate callers by address callers[func["address"]] = func return list(callers.values()) @jsonrpc @idaread def get_entry_points() -> list[Function]: """Get all entry points in the database""" result = [] for i in range(ida_entry.get_entry_qty()): ordinal = ida_entry.get_entry_ordinal(i) address = ida_entry.get_entry(ordinal) func = get_function(address, raise_error=False) if func is not None: result.append(func) return result @jsonrpc @idawrite def set_comment( address: Annotated[str, "Address in the function to set the comment for"], comment: Annotated[str, "Comment text"], ): """Set a comment for a given address in the function disassembly and pseudocode""" ea = parse_address(address) if not idaapi.set_cmt(ea, comment, False): raise IDAError(f"Failed to set disassembly comment at {hex(ea)}") if not ida_hexrays.init_hexrays_plugin(): return # Reference: https://cyber.wtf/2019/03/22/using-ida-python-to-analyze-trickbot/ # Check if the address corresponds to a line try: cfunc = decompile_checked(ea) except IDAError: # Skip decompiler comment if decompilation fails return # Special case for function entry comments if ea == cfunc.entry_ea: idc.set_func_cmt(ea, comment, True) cfunc.refresh_func_ctext() return eamap = cfunc.get_eamap() if ea not in eamap: print(f"Failed to set decompiler comment at {hex(ea)}") return nearest_ea = eamap[ea][0].ea # Remove existing orphan comments if cfunc.has_orphan_cmts(): cfunc.del_orphan_cmts() cfunc.save_user_cmts() # Set the comment by trying all possible item types tl = idaapi.treeloc_t() tl.ea = nearest_ea for itp in range(idaapi.ITP_SEMI, idaapi.ITP_COLON): tl.itp = itp cfunc.set_user_cmt(tl, comment) cfunc.save_user_cmts() cfunc.refresh_func_ctext() if not cfunc.has_orphan_cmts(): return cfunc.del_orphan_cmts() cfunc.save_user_cmts() print(f"Failed to set decompiler comment at {hex(ea)}") def refresh_decompiler_widget(): widget = ida_kernwin.get_current_widget() if widget is not None: vu = ida_hexrays.get_widget_vdui(widget) if vu is not None: vu.refresh_ctext() def refresh_decompiler_ctext(function_address: int): error = ida_hexrays.hexrays_failure_t() cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func(function_address, error, ida_hexrays.DECOMP_WARNINGS) if cfunc: cfunc.refresh_func_ctext() @jsonrpc @idawrite def rename_local_variable( function_address: Annotated[str, "Address of the function containing the variable"], old_name: Annotated[str, "Current name of the variable"], new_name: Annotated[str, "New name for the variable (empty for a default name)"], ): """Rename a local variable in a function""" func = idaapi.get_func(parse_address(function_address)) if not func: raise IDAError(f"No function found at address {function_address}") if not ida_hexrays.rename_lvar(func.start_ea, old_name, new_name): raise IDAError(f"Failed to rename local variable {old_name} in function {hex(func.start_ea)}") refresh_decompiler_ctext(func.start_ea) @jsonrpc @idawrite def rename_global_variable( old_name: Annotated[str, "Current name of the global variable"], new_name: Annotated[str, "New name for the global variable (empty for a default name)"], ): """Rename a global variable""" ea = idaapi.get_name_ea(idaapi.BADADDR, old_name) if not idaapi.set_name(ea, new_name): raise IDAError(f"Failed to rename global variable {old_name} to {new_name}") refresh_decompiler_ctext(ea) @jsonrpc @idawrite def set_global_variable_type( variable_name: Annotated[str, "Name of the global variable"], new_type: Annotated[str, "New type for the variable"], ): """Set a global variable's type""" ea = idaapi.get_name_ea(idaapi.BADADDR, variable_name) tif = get_type_by_name(new_type) if not tif: raise IDAError(f"Parsed declaration is not a variable type") if not ida_typeinf.apply_tinfo(ea, tif, ida_typeinf.PT_SIL): raise IDAError(f"Failed to apply type") def patch_address_assemble( ea: int, assemble: str, ) -> int: """Patch Address Assemble""" (check_assemble, bytes_to_patch) = idautils.Assemble(ea, assemble) if check_assemble == False: raise IDAError(f"Failed to assemble instruction: {assemble}") try: ida_bytes.patch_bytes(ea, bytes_to_patch) except: raise IDAError(f"Failed to patch bytes at address {hex(ea)}") return len(bytes_to_patch) @jsonrpc @idawrite def patch_address_assembles( address: Annotated[str, "Starting Address to apply patch"], instructions: Annotated[str, "Assembly instructions separated by ';'"], ) -> str: ea = parse_address(address) assembles = instructions.split(";") for assemble in assembles: assemble = assemble.strip() try: patch_bytes_len = patch_address_assemble(ea, assemble) except IDAError as e: raise IDAError(f"Failed to patch bytes at address {hex(ea)}: {e}") ea += patch_bytes_len return f"Patched {len(assembles)} instructions" @jsonrpc @idaread def get_global_variable_value_by_name(variable_name: Annotated[str, "Name of the global variable"]) -> str: """ Read a global variable's value (if known at compile-time) Prefer this function over the `data_read_*` functions. """ ea = idaapi.get_name_ea(idaapi.BADADDR, variable_name) if ea == idaapi.BADADDR: raise IDAError(f"Global variable {variable_name} not found") return get_global_variable_value_internal(ea) @jsonrpc @idaread def get_global_variable_value_at_address(address: Annotated[str, "Address of the global variable"]) -> str: """ Read a global variable's value by its address (if known at compile-time) Prefer this function over the `data_read_*` functions. """ ea = parse_address(address) return get_global_variable_value_internal(ea) def get_global_variable_value_internal(ea: int) -> str: # Get the type information for the variable tif = ida_typeinf.tinfo_t() if not ida_nalt.get_tinfo(tif, ea): # No type info, maybe we can figure out its size by its name if not ida_bytes.has_any_name(ea): raise IDAError(f"Failed to get type information for variable at {ea:#x}") size = ida_bytes.get_item_size(ea) if size == 0: raise IDAError(f"Failed to get type information for variable at {ea:#x}") else: # Determine the size of the variable size = tif.get_size() # Read the value based on the size if size == 0 and tif.is_array() and tif.get_array_element().is_decl_char(): return_string = idaapi.get_strlit_contents(ea, -1, 0).decode("utf-8").strip() return f"\"{return_string}\"" elif size == 1: return hex(ida_bytes.get_byte(ea)) elif size == 2: return hex(ida_bytes.get_word(ea)) elif size == 4: return hex(ida_bytes.get_dword(ea)) elif size == 8: return hex(ida_bytes.get_qword(ea)) else: # For other sizes, return the raw bytes return ' '.join(hex(x) for x in ida_bytes.get_bytes(ea, size)) @jsonrpc @idawrite def rename_function( function_address: Annotated[str, "Address of the function to rename"], new_name: Annotated[str, "New name for the function (empty for a default name)"], ): """Rename a function""" func = idaapi.get_func(parse_address(function_address)) if not func: raise IDAError(f"No function found at address {function_address}") if not idaapi.set_name(func.start_ea, new_name): raise IDAError(f"Failed to rename function {hex(func.start_ea)} to {new_name}") refresh_decompiler_ctext(func.start_ea) @jsonrpc @idawrite def set_function_prototype( function_address: Annotated[str, "Address of the function"], prototype: Annotated[str, "New function prototype"], ): """Set a function's prototype""" func = idaapi.get_func(parse_address(function_address)) if not func: raise IDAError(f"No function found at address {function_address}") try: tif = ida_typeinf.tinfo_t(prototype, None, ida_typeinf.PT_SIL) if not tif.is_func(): raise IDAError(f"Parsed declaration is not a function type") if not ida_typeinf.apply_tinfo(func.start_ea, tif, ida_typeinf.PT_SIL): raise IDAError(f"Failed to apply type") refresh_decompiler_ctext(func.start_ea) except Exception as e: raise IDAError(f"Failed to parse prototype string: {prototype}") class my_modifier_t(ida_hexrays.user_lvar_modifier_t): def __init__(self, var_name: str, new_type: ida_typeinf.tinfo_t): ida_hexrays.user_lvar_modifier_t.__init__(self) self.var_name = var_name self.new_type = new_type def modify_lvars(self, lvinf): for lvar_saved in lvinf.lvvec: lvar_saved: ida_hexrays.lvar_saved_info_t if lvar_saved.name == self.var_name: lvar_saved.type = self.new_type return True return False # NOTE: This is extremely hacky, but necessary to get errors out of IDA def parse_decls_ctypes(decls: str, hti_flags: int) -> tuple[int, list[str]]: if sys.platform == "win32": import ctypes assert isinstance(decls, str), "decls must be a string" assert isinstance(hti_flags, int), "hti_flags must be an int" c_decls = decls.encode("utf-8") c_til = None ida_dll = ctypes.CDLL("ida") ida_dll.parse_decls.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p, ctypes.c_int, ] ida_dll.parse_decls.restype = ctypes.c_int messages: list[str] = [] @ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p) def magic_printer(fmt: bytes, arg1: bytes): if fmt.count(b"%") == 1 and b"%s" in fmt: formatted = fmt.replace(b"%s", arg1) messages.append(formatted.decode("utf-8")) return len(formatted) + 1 else: messages.append(f"unsupported magic_printer fmt: {repr(fmt)}") return 0 errors = ida_dll.parse_decls(c_til, c_decls, magic_printer, hti_flags) else: # NOTE: The approach above could also work on other platforms, but it's # not been tested and there are differences in the vararg ABIs. errors = ida_typeinf.parse_decls(None, decls, False, hti_flags) messages = [] return errors, messages @jsonrpc @idawrite def declare_c_type( c_declaration: Annotated[str, "C declaration of the type. Examples include: typedef int foo_t; struct bar { int a; bool b; };"], ): """Create or update a local type from a C declaration""" # PT_SIL: Suppress warning dialogs (although it seems unnecessary here) # PT_EMPTY: Allow empty types (also unnecessary?) # PT_TYP: Print back status messages with struct tags flags = ida_typeinf.PT_SIL | ida_typeinf.PT_EMPTY | ida_typeinf.PT_TYP errors, messages = parse_decls_ctypes(c_declaration, flags) pretty_messages = "\n".join(messages) if errors > 0: raise IDAError(f"Failed to parse type:\n{c_declaration}\n\nErrors:\n{pretty_messages}") return f"success\n\nInfo:\n{pretty_messages}" @jsonrpc @idawrite def set_local_variable_type( function_address: Annotated[str, "Address of the decompiled function containing the variable"], variable_name: Annotated[str, "Name of the variable"], new_type: Annotated[str, "New type for the variable"], ): """Set a local variable's type""" try: # Some versions of IDA don't support this constructor new_tif = ida_typeinf.tinfo_t(new_type, None, ida_typeinf.PT_SIL) except Exception: try: new_tif = ida_typeinf.tinfo_t() # parse_decl requires semicolon for the type ida_typeinf.parse_decl(new_tif, None, new_type + ";", ida_typeinf.PT_SIL) # type: ignore (IDA SDK type hints are incorrect) except Exception: raise IDAError(f"Failed to parse type: {new_type}") func = idaapi.get_func(parse_address(function_address)) if not func: raise IDAError(f"No function found at address {function_address}") if not ida_hexrays.rename_lvar(func.start_ea, variable_name, variable_name): raise IDAError(f"Failed to find local variable: {variable_name}") modifier = my_modifier_t(variable_name, new_tif) if not ida_hexrays.modify_user_lvars(func.start_ea, modifier): raise IDAError(f"Failed to modify local variable: {variable_name}") refresh_decompiler_ctext(func.start_ea) @jsonrpc @idaread def get_stack_frame_variables( function_address: Annotated[str, "Address of the disassembled function to retrieve the stack frame variables"] ) -> list[StackFrameVariable]: """ Retrieve the stack frame variables for a given function """ return get_stack_frame_variables_internal(parse_address(function_address), True) def get_stack_frame_variables_internal(function_address: int, raise_error: bool) -> list[StackFrameVariable]: # TODO: IDA 8.3 does not support tif.get_type_by_tid if ida_major < 9: return [] func = idaapi.get_func(function_address) if not func: if raise_error: raise IDAError(f"No function found at address {function_address}") return [] tif = ida_typeinf.tinfo_t() if not tif.get_type_by_tid(func.frame) or not tif.is_udt(): return [] members: list[StackFrameVariable] = [] udt = ida_typeinf.udt_type_data_t() tif.get_udt_details(udt) for udm in udt: if not udm.is_gap(): name = udm.name offset = udm.offset // 8 size = udm.size // 8 type = str(udm.type) members.append(StackFrameVariable( name=name, offset=hex(offset), size=hex(size), type=type )) return members class StructureMember(TypedDict): name: str offset: str size: str type: str class StructureDefinition(TypedDict): name: str size: str members: list[StructureMember] @jsonrpc @idaread def get_defined_structures() -> list[StructureDefinition]: """ Returns a list of all defined structures """ rv = [] limit = ida_typeinf.get_ordinal_limit() for ordinal in range(1, limit): tif = ida_typeinf.tinfo_t() tif.get_numbered_type(None, ordinal) if tif.is_udt(): udt = ida_typeinf.udt_type_data_t() members = [] if tif.get_udt_details(udt): members = [ StructureMember(name=x.name, offset=hex(x.offset // 8), size=hex(x.size // 8), type=str(x.type)) for _, x in enumerate(udt) ] rv += [StructureDefinition(name=tif.get_type_name(), # type: ignore (IDA SDK type hints are incorrect) size=hex(tif.get_size()), members=members)] return rv @jsonrpc @idaread def analyze_struct_detailed(name: Annotated[str, "Name of the structure to analyze"]) -> dict: """Detailed analysis of a structure with all fields""" # Get tinfo object tif = ida_typeinf.tinfo_t() if not tif.get_named_type(None, name): raise IDAError(f"Structure '{name}' not found!") result = { "name": name, "type": str(tif._print()), "size": tif.get_size(), "is_udt": tif.is_udt() } if not tif.is_udt(): result["error"] = "This is not a user-defined type!" return result # Get UDT (User Defined Type) details udt_data = ida_typeinf.udt_type_data_t() if not tif.get_udt_details(udt_data): result["error"] = "Failed to get structure details!" return result result["member_count"] = udt_data.size() result["is_union"] = udt_data.is_union result["udt_type"] = "Union" if udt_data.is_union else "Struct" # Output information about each field members = [] for i, member in enumerate(udt_data): offset = member.begin() // 8 # Convert bits to bytes size = member.size // 8 if member.size > 0 else member.type.get_size() member_type = member.type._print() member_name = member.name member_info = { "index": i, "offset": f"0x{offset:08X}", "size": size, "type": member_type, "name": member_name, "is_nested_udt": member.type.is_udt() } # If this is a nested structure, show additional information if member.type.is_udt(): member_info["nested_size"] = member.type.get_size() members.append(member_info) result["members"] = members result["total_size"] = tif.get_size() return result @jsonrpc @idaread def get_struct_at_address(address: Annotated[str, "Address to analyze structure at"], struct_name: Annotated[str, "Name of the structure"]) -> dict: """Get structure field values at a specific address""" addr = parse_address(address) # Get structure tinfo tif = ida_typeinf.tinfo_t() if not tif.get_named_type(None, struct_name): raise IDAError(f"Structure '{struct_name}' not found!") # Get structure details udt_data = ida_typeinf.udt_type_data_t() if not tif.get_udt_details(udt_data): raise IDAError("Failed to get structure details!") result = { "struct_name": struct_name, "address": f"0x{addr:X}", "members": [] } for member in udt_data: offset = member.begin() // 8 member_addr = addr + offset member_type = member.type._print() member_name = member.name member_size = member.type.get_size() # Try to get value based on size try: if member.type.is_ptr(): # Pointer is_64bit = ida_ida.inf_is_64bit() if ida_major >= 9 else idaapi.get_inf_structure().is_64bit() if is_64bit: value = idaapi.get_qword(member_addr) value_str = f"0x{value:016X}" else: value = idaapi.get_dword(member_addr) value_str = f"0x{value:08X}" elif member_size == 1: value = idaapi.get_byte(member_addr) value_str = f"0x{value:02X} ({value})" elif member_size == 2: value = idaapi.get_word(member_addr) value_str = f"0x{value:04X} ({value})" elif member_size == 4: value = idaapi.get_dword(member_addr) value_str = f"0x{value:08X} ({value})" elif member_size == 8: value = idaapi.get_qword(member_addr) value_str = f"0x{value:016X} ({value})" else: # For large structures, read first few bytes bytes_data = [] for i in range(min(member_size, 16)): try: byte_val = idaapi.get_byte(member_addr + i) bytes_data.append(f"{byte_val:02X}") except: break value_str = f"[{' '.join(bytes_data)}{'...' if member_size > 16 else ''}]" except: value_str = "<failed to read>" member_info = { "offset": f"0x{offset:08X}", "type": member_type, "name": member_name, "value": value_str } result["members"].append(member_info) return result @jsonrpc @idaread def get_struct_info_simple(name: Annotated[str, "Name of the structure"]) -> dict: """Simple function to get basic structure information""" tif = ida_typeinf.tinfo_t() if not tif.get_named_type(None, name): raise IDAError(f"Structure '{name}' not found!") info = { 'name': name, 'type': tif._print(), 'size': tif.get_size(), 'is_udt': tif.is_udt() } if tif.is_udt(): udt_data = ida_typeinf.udt_type_data_t() if tif.get_udt_details(udt_data): info['member_count'] = udt_data.size() info['is_union'] = udt_data.is_union members = [] for member in udt_data: members.append({ 'name': member.name, 'type': member.type._print(), 'offset': member.begin() // 8, 'size': member.type.get_size() }) info['members'] = members return info @jsonrpc @idaread def search_structures(filter: Annotated[str, "Filter pattern to search for structures (case-insensitive)"]) -> list[dict]: """Search for structures by name pattern""" results = [] limit = ida_typeinf.get_ordinal_limit() for ordinal in range(1, limit): tif = ida_typeinf.tinfo_t() if tif.get_numbered_type(None, ordinal): type_name: str = tif.get_type_name() # type: ignore (IDA SDK type hints are incorrect) if type_name and filter.lower() in type_name.lower(): if tif.is_udt(): udt_data = ida_typeinf.udt_type_data_t() member_count = 0 if tif.get_udt_details(udt_data): member_count = udt_data.size() results.append({ "name": type_name, "size": tif.get_size(), "member_count": member_count, "is_union": udt_data.is_union if tif.get_udt_details(udt_data) else False, "ordinal": ordinal }) return results @jsonrpc @idawrite def rename_stack_frame_variable( function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"], old_name: Annotated[str, "Current name of the variable"], new_name: Annotated[str, "New name for the variable (empty for a default name)"] ): """ Change the name of a stack variable for an IDA function """ func = idaapi.get_func(parse_address(function_address)) if not func: raise IDAError(f"No function found at address {function_address}") frame_tif = ida_typeinf.tinfo_t() if not ida_frame.get_func_frame(frame_tif, func): raise IDAError("No frame returned.") idx, udm = frame_tif.get_udm(old_name) # type: ignore (IDA SDK type hints are incorrect) if not udm: raise IDAError(f"{old_name} not found.") tid = frame_tif.get_udm_tid(idx) if ida_frame.is_special_frame_member(tid): raise IDAError(f"{old_name} is a special frame member. Will not change the name.") udm = ida_typeinf.udm_t() frame_tif.get_udm_by_tid(udm, tid) offset = udm.offset // 8 if ida_frame.is_funcarg_off(func, offset): raise IDAError(f"{old_name} is an argument member. Will not change the name.") sval = ida_frame.soff_to_fpoff(func, offset) if not ida_frame.define_stkvar(func, new_name, sval, udm.type): raise IDAError("failed to rename stack frame variable") @jsonrpc @idawrite def create_stack_frame_variable( function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"], offset: Annotated[str, "Offset of the stack frame variable"], variable_name: Annotated[str, "Name of the stack variable"], type_name: Annotated[str, "Type of the stack variable"] ): """ For a given function, create a stack variable at an offset and with a specific type """ func = idaapi.get_func(parse_address(function_address)) if not func: raise IDAError(f"No function found at address {function_address}") ea = parse_address(offset) frame_tif = ida_typeinf.tinfo_t() if not ida_frame.get_func_frame(frame_tif, func): raise IDAError("No frame returned.") tif = get_type_by_name(type_name) if not ida_frame.define_stkvar(func, variable_name, ea, tif): raise IDAError("failed to define stack frame variable") @jsonrpc @idawrite def set_stack_frame_variable_type( function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"], variable_name: Annotated[str, "Name of the stack variable"], type_name: Annotated[str, "Type of the stack variable"] ): """ For a given disassembled function, set the type of a stack variable """ func = idaapi.get_func(parse_address(function_address)) if not func: raise IDAError(f"No function found at address {function_address}") frame_tif = ida_typeinf.tinfo_t() if not ida_frame.get_func_frame(frame_tif, func): raise IDAError("No frame returned.") idx, udm = frame_tif.get_udm(variable_name) # type: ignore (IDA SDK type hints are incorrect) if not udm: raise IDAError(f"{variable_name} not found.") tid = frame_tif.get_udm_tid(idx) udm = ida_typeinf.udm_t() frame_tif.get_udm_by_tid(udm, tid) offset = udm.offset // 8 tif = get_type_by_name(type_name) if not ida_frame.set_frame_member_type(func, offset, tif): raise IDAError("failed to set stack frame variable type") @jsonrpc @idawrite def delete_stack_frame_variable( function_address: Annotated[str, "Address of the function to set the stack frame variables"], variable_name: Annotated[str, "Name of the stack variable"] ): """ Delete the named stack variable for a given function """ func = idaapi.get_func(parse_address(function_address)) if not func: raise IDAError(f"No function found at address {function_address}") frame_tif = ida_typeinf.tinfo_t() if not ida_frame.get_func_frame(frame_tif, func): raise IDAError("No frame returned.") idx, udm = frame_tif.get_udm(variable_name) # type: ignore (IDA SDK type hints are incorrect) if not udm: raise IDAError(f"{variable_name} not found.") tid = frame_tif.get_udm_tid(idx) if ida_frame.is_special_frame_member(tid): raise IDAError(f"{variable_name} is a special frame member. Will not delete.") udm = ida_typeinf.udm_t() frame_tif.get_udm_by_tid(udm, tid) offset = udm.offset // 8 size = udm.size // 8 if ida_frame.is_funcarg_off(func, offset): raise IDAError(f"{variable_name} is an argument member. Will not delete.") if not ida_frame.delete_frame_members(func, offset, offset+size): raise IDAError("failed to delete stack frame variable") @jsonrpc @idaread def read_memory_bytes( memory_address: Annotated[str, "Address of the memory value to be read"], size: Annotated[int, "size of memory to read"] ) -> str: """ Read bytes at a given address. Only use this function if `get_global_variable_at` and `get_global_variable_by_name` both failed. """ return ' '.join(f'{x:#02x}' for x in ida_bytes.get_bytes(parse_address(memory_address), size)) @jsonrpc @idaread def data_read_byte( address: Annotated[str, "Address to get 1 byte value from"], ) -> int: """ Read the 1 byte value at the specified address. Only use this function if `get_global_variable_at` failed. """ ea = parse_address(address) return ida_bytes.get_wide_byte(ea) @jsonrpc @idaread def data_read_word( address: Annotated[str, "Address to get 2 bytes value from"], ) -> int: """ Read the 2 byte value at the specified address as a WORD. Only use this function if `get_global_variable_at` failed. """ ea = parse_address(address) return ida_bytes.get_wide_word(ea) @jsonrpc @idaread def data_read_dword( address: Annotated[str, "Address to get 4 bytes value from"], ) -> int: """ Read the 4 byte value at the specified address as a DWORD. Only use this function if `get_global_variable_at` failed. """ ea = parse_address(address) return ida_bytes.get_wide_dword(ea) @jsonrpc @idaread def data_read_qword( address: Annotated[str, "Address to get 8 bytes value from"] ) -> int: """ Read the 8 byte value at the specified address as a QWORD. Only use this function if `get_global_variable_at` failed. """ ea = parse_address(address) return ida_bytes.get_qword(ea) @jsonrpc @idaread def data_read_string( address: Annotated[str, "Address to get string from"] ) -> str: """ Read the string at the specified address. Only use this function if `get_global_variable_at` failed. """ try: return idaapi.get_strlit_contents(parse_address(address),-1,0).decode("utf-8") except Exception as e: return "Error:" + str(e) class RegisterValue(TypedDict): name: str value: str class ThreadRegisters(TypedDict): thread_id: int registers: list[RegisterValue] def dbg_ensure_running() -> "ida_idd.debugger_t": dbg = ida_idd.get_dbg() if not dbg: raise IDAError("Debugger not running") if ida_dbg.get_ip_val() is None: raise IDAError("Debugger not running") return dbg @jsonrpc @idaread @unsafe def dbg_get_registers() -> list[ThreadRegisters]: """Get all registers and their values. This function is only available when debugging.""" result: list[ThreadRegisters] = [] dbg = dbg_ensure_running() for thread_index in range(ida_dbg.get_thread_qty()): tid = ida_dbg.getn_thread(thread_index) regs = [] regvals: ida_idd.regvals_t = ida_dbg.get_reg_vals(tid) for reg_index, rv in enumerate(regvals): rv: ida_idd.regval_t reg_info = dbg.regs(reg_index) # NOTE: Apparently this can fail under some circumstances try: reg_value = rv.pyval(reg_info.dtype) except ValueError: reg_value = ida_idaapi.BADADDR if isinstance(reg_value, int): reg_value = hex(reg_value) if isinstance(reg_value, bytes): reg_value = reg_value.hex(" ") else: reg_value = str(reg_value) regs.append({ "name": reg_info.name, "value": reg_value, }) result.append({ "thread_id": tid, "registers": regs, }) return result @jsonrpc @idaread @unsafe def dbg_get_call_stack() -> list[dict[str, str]]: """Get the current call stack.""" callstack = [] try: tid = ida_dbg.get_current_thread() trace = ida_idd.call_stack_t() if not ida_dbg.collect_stack_trace(tid, trace): return [] for frame in trace: frame_info = { "address": hex(frame.callea), } try: module_info = ida_idd.modinfo_t() if ida_dbg.get_module_info(frame.callea, module_info): frame_info["module"] = os.path.basename(module_info.name) else: frame_info["module"] = "<unknown>" name = ( ida_name.get_nice_colored_name( frame.callea, ida_name.GNCN_NOCOLOR | ida_name.GNCN_NOLABEL | ida_name.GNCN_NOSEG | ida_name.GNCN_PREFDBG, ) or "<unnamed>" ) frame_info["symbol"] = name except Exception as e: frame_info["module"] = "<error>" frame_info["symbol"] = str(e) callstack.append(frame_info) except Exception as e: pass return callstack class Breakpoint(TypedDict): ea: str enabled: bool condition: Optional[str] def list_breakpoints(): breakpoints: list[Breakpoint] = [] for i in range(ida_dbg.get_bpt_qty()): bpt = ida_dbg.bpt_t() if ida_dbg.getn_bpt(i, bpt): breakpoints.append(Breakpoint( ea=hex(bpt.ea), enabled=bpt.flags & ida_dbg.BPT_ENABLED, condition=str(bpt.condition) if bpt.condition else None, )) return breakpoints @jsonrpc @idaread @unsafe def dbg_list_breakpoints(): """List all breakpoints in the program.""" return list_breakpoints() @jsonrpc @idaread @unsafe def dbg_start_process(): """Start the debugger, returns the current instruction pointer""" if len(list_breakpoints()) == 0: for i in range(ida_entry.get_entry_qty()): ordinal = ida_entry.get_entry_ordinal(i) address = ida_entry.get_entry(ordinal) if address != ida_idaapi.BADADDR: ida_dbg.add_bpt(address, 0, idaapi.BPT_SOFT) if idaapi.start_process("", "", "") == 1: ip = ida_dbg.get_ip_val() if ip is not None: return hex(ip) raise IDAError("Failed to start debugger (did the user configure the debugger manually one time?)") @jsonrpc @idaread @unsafe def dbg_exit_process(): """Exit the debugger""" dbg_ensure_running() if idaapi.exit_process(): return raise IDAError("Failed to exit debugger") @jsonrpc @idaread @unsafe def dbg_continue_process() -> str: """Continue the debugger, returns the current instruction pointer""" dbg_ensure_running() if idaapi.continue_process(): ip = ida_dbg.get_ip_val() if ip is not None: return hex(ip) raise IDAError("Failed to continue debugger") @jsonrpc @idaread @unsafe def dbg_run_to( address: Annotated[str, "Run the debugger to the specified address"], ): """Run the debugger to the specified address""" dbg_ensure_running() ea = parse_address(address) if idaapi.run_to(ea): ip = ida_dbg.get_ip_val() if ip is not None: return hex(ip) raise IDAError(f"Failed to run to address {hex(ea)}") @jsonrpc @idaread @unsafe def dbg_set_breakpoint( address: Annotated[str, "Set a breakpoint at the specified address"], ): """Set a breakpoint at the specified address""" ea = parse_address(address) if idaapi.add_bpt(ea, 0, idaapi.BPT_SOFT): return f"Breakpoint set at {hex(ea)}" breakpoints = list_breakpoints() for bpt in breakpoints: if bpt["ea"] == hex(ea): return raise IDAError(f"Failed to set breakpoint at address {hex(ea)}") @jsonrpc @idaread @unsafe def dbg_step_into(): """Step into the current instruction""" dbg_ensure_running() if idaapi.step_into(): ip = ida_dbg.get_ip_val() if ip is not None: return hex(ip) raise IDAError("Failed to step into") @jsonrpc @idaread @unsafe def dbg_step_over(): """Step over the current instruction""" dbg_ensure_running() if idaapi.step_over(): ip = ida_dbg.get_ip_val() if ip is not None: return hex(ip) raise IDAError("Failed to step over") @jsonrpc @idaread @unsafe def dbg_delete_breakpoint( address: Annotated[str, "del a breakpoint at the specified address"], ): """Delete a breakpoint at the specified address""" ea = parse_address(address) if idaapi.del_bpt(ea): return raise IDAError(f"Failed to delete breakpoint at address {hex(ea)}") @jsonrpc @idaread @unsafe def dbg_enable_breakpoint( address: Annotated[str, "Enable or disable a breakpoint at the specified address"], enable: Annotated[bool, "Enable or disable a breakpoint"], ): """Enable or disable a breakpoint at the specified address""" ea = parse_address(address) if idaapi.enable_bpt(ea, enable): return raise IDAError(f"Failed to {'' if enable else 'disable '}breakpoint at address {hex(ea)}") class MCP(idaapi.plugin_t): flags = idaapi.PLUGIN_KEEP comment = "MCP Plugin" help = "MCP" wanted_name = "MCP" wanted_hotkey = "Ctrl-Alt-M" def init(self): self.server = Server() hotkey = MCP.wanted_hotkey.replace("-", "+") if sys.platform == "darwin": hotkey = hotkey.replace("Alt", "Option") print(f"[MCP] Plugin loaded, use Edit -> Plugins -> MCP ({hotkey}) to start the server") return idaapi.PLUGIN_KEEP def run(self, arg): self.server.start() def term(self): self.server.stop() def PLUGIN_ENTRY(): return MCP()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mrexodia/ida-pro-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server