Skip to main content
Glama

IDA Pro MCP Server

by fdrechsler
ida_remote_server.py55.9 kB
""" IDA Pro Remote Control Plugin This plugin creates an HTTP server to remotely control certain IDA functions. It exposes endpoints for executing scripts, getting strings, imports, exports, and functions. Author: Florian Drechsler (@fdrechsler) fd@fdrechsler.com """ import idaapi import idautils import idc import ida_funcs import ida_bytes import ida_nalt import ida_name import json from http.server import HTTPServer, BaseHTTPRequestHandler import threading import socket import ssl import base64 import traceback from urllib.parse import parse_qs, urlparse import time # Default settings DEFAULT_HOST = "127.0.0.1" # Localhost only for security DEFAULT_PORT = 9045 PLUGIN_NAME = "IDA Pro Remote Control" PLUGIN_VERSION = "1.0.0" AUTO_START = True # Automatically start server on plugin load # Global variables g_server = None g_server_thread = None # Synchronization flags for execute_sync MFF_FAST = 0x0 # Execute as soon as possible MFF_READ = 0x1 # Wait for the database to be read-ready MFF_WRITE = 0x2 # Wait for the database to be write-ready class RemoteControlHandler(BaseHTTPRequestHandler): """HTTP request handler for the IDA Pro remote control plugin.""" # Add timeout for HTTP requests timeout = 60 # 60-second timeout for HTTP requests def log_message(self, format, *args): """Override logging to use IDA's console.""" print(f"[RemoteControl] {format % args}") def _send_response(self, status_code, content_type, content): """Helper method to send HTTP response.""" try: self.send_response(status_code) self.send_header('Content-Type', content_type) self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) except (ConnectionResetError, BrokenPipeError, socket.error) as e: print(f"[RemoteControl] Connection error when sending response: {e}") def _send_json_response(self, data, status_code=200): """Helper method to send JSON response.""" try: content = json.dumps(data).encode('utf-8') self._send_response(status_code, 'application/json', content) except Exception as e: print(f"[RemoteControl] Error preparing JSON response: {e}") # Try to send a simplified error response try: simple_error = json.dumps({'error': 'Internal server error'}).encode('utf-8') self._send_response(500, 'application/json', simple_error) except: # Silently fail if we can't even send the error pass def _send_error_response(self, message, status_code=400): """Helper method to send error response.""" self._send_json_response({'error': message}, status_code) def _parse_post_data(self): """Parse POST data from request.""" content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length).decode('utf-8') # Handle different content types content_type = self.headers.get('Content-Type', '') if 'application/json' in content_type: return json.loads(post_data) elif 'application/x-www-form-urlencoded' in content_type: parsed_data = parse_qs(post_data) # Convert lists to single values where appropriate return {k: v[0] if len(v) == 1 else v for k, v in parsed_data.items()} else: return {'raw_data': post_data} def do_GET(self): """Handle GET requests.""" path = self.path.lower() try: if path == '/api/info': self._handle_info() elif path == '/api/strings': self._handle_get_strings() elif path == '/api/exports': self._handle_get_exports() elif path == '/api/imports': self._handle_get_imports() elif path == '/api/functions': self._handle_get_functions() elif path.startswith('/api/search/immediate'): self._handle_search_immediate() elif path.startswith('/api/search/text'): self._handle_search_text() elif path.startswith('/api/search/bytes'): self._handle_search_bytes() elif path.startswith('/api/search/names'): self._handle_search_in_names() elif path.startswith('/api/xrefs/to'): self._handle_get_xrefs_to() elif path.startswith('/api/xrefs/from'): self._handle_get_xrefs_from() elif path.startswith('/api/disassembly'): self._handle_get_disassembly() else: self._send_error_response('Endpoint not found', 404) except Exception as e: error_msg = f"Error processing request: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") self._send_error_response(error_msg, 500) def do_POST(self): """Handle POST requests.""" path = self.path.lower() try: if path == '/api/execute': self._handle_execute_script() elif path == '/api/executebypath': self._handle_execute_by_path() elif path == '/api/executebody': self._handle_execute_body() else: self._send_error_response('Endpoint not found', 404) except Exception as e: error_msg = f"Error processing request: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") self._send_error_response(error_msg, 500) def _handle_info(self): """Handle info request.""" result = self._execute_in_main_thread(self._get_info_impl) self._send_json_response(result) def _get_info_impl(self): """Implementation of getting info - runs in main thread.""" info = { 'plugin_name': PLUGIN_NAME, 'plugin_version': PLUGIN_VERSION, 'ida_version': idaapi.get_kernel_version(), 'file_name': idaapi.get_input_file_path(), 'endpoints': [ {'path': '/api/info', 'method': 'GET', 'description': 'Get plugin information'}, {'path': '/api/strings', 'method': 'GET', 'description': 'Get strings from binary'}, {'path': '/api/exports', 'method': 'GET', 'description': 'Get exports from binary'}, {'path': '/api/imports', 'method': 'GET', 'description': 'Get imports from binary'}, {'path': '/api/functions', 'method': 'GET', 'description': 'Get function list'}, {'path': '/api/search/immediate', 'method': 'GET', 'description': 'Search for immediate values'}, {'path': '/api/search/text', 'method': 'GET', 'description': 'Search for text in binary'}, {'path': '/api/search/bytes', 'method': 'GET', 'description': 'Search for byte sequence'}, {'path': '/api/search/names', 'method': 'GET', 'description': 'Search for names/symbols in binary'}, {'path': '/api/xrefs/to', 'method': 'GET', 'description': 'Get cross-references to an address'}, {'path': '/api/xrefs/from', 'method': 'GET', 'description': 'Get cross-references from an address'}, {'path': '/api/disassembly', 'method': 'GET', 'description': 'Get disassembly for an address range'}, {'path': '/api/execute', 'method': 'POST', 'description': 'Execute Python script (JSON/Form)'}, {'path': '/api/executebypath', 'method': 'POST', 'description': 'Execute Python script from file path'}, {'path': '/api/executebody', 'method': 'POST', 'description': 'Execute Python script from raw body'}, ] } return info def _handle_execute_script(self): """Handle script execution request.""" post_data = self._parse_post_data() if 'script' not in post_data: self._send_error_response('No script provided') return script = post_data['script'] # Execute script in the main thread result = self._execute_in_main_thread(self._execute_script_impl, script) if 'error' in result: self._send_error_response(result['error'], 500) else: self._send_json_response(result) def _handle_execute_by_path(self): """Handle script execution from a file path.""" post_data = self._parse_post_data() if 'path' not in post_data: self._send_error_response('No script path provided') return script_path = post_data['path'] try: # Use IDA's main thread to read the file def read_script_file(): try: with open(script_path, 'r') as f: return {'script': f.read()} except Exception as e: return {'error': f"Could not read script file: {str(e)}"} file_result = self._execute_in_main_thread(read_script_file) if 'error' in file_result: self._send_error_response(file_result['error'], 400) return script = file_result['script'] # Execute the script using our existing method result = self._execute_in_main_thread(self._execute_script_impl, script) if 'error' in result: self._send_error_response(result['error'], 500) else: self._send_json_response(result) except Exception as e: error_msg = f"Error executing script from path: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") self._send_error_response(error_msg, 500) def _handle_execute_body(self): """Handle script execution from raw body content.""" try: # Read raw body content content_length = int(self.headers.get('Content-Length', 0)) if content_length > 1000000: # 1MB limit self._send_error_response('Script too large (>1MB)', 413) return script = self.rfile.read(content_length).decode('utf-8') # Execute the script using our existing method result = self._execute_in_main_thread(self._execute_script_impl, script) if 'error' in result: self._send_error_response(result['error'], 500) else: self._send_json_response(result) except Exception as e: error_msg = f"Error executing script from body: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") self._send_error_response(error_msg, 500) def _execute_script_impl(self, script): """Implementation of script execution - runs in main thread with safety measures.""" # Create a safe execution environment with IDA modules exec_globals = { 'idaapi': idaapi, 'idautils': idautils, 'idc': idc, 'ida_funcs': ida_funcs, 'ida_bytes': ida_bytes, 'ida_nalt': ida_nalt, 'ida_name': ida_name, } # Redirect stdout to capture output import io import sys import signal original_stdout = sys.stdout captured_output = io.StringIO() sys.stdout = captured_output # Create hooks to automatically respond to IDA prompts original_funcs = {} # Store original functions we're going to override original_funcs['ask_yn'] = idaapi.ask_yn original_funcs['ask_buttons'] = idaapi.ask_buttons original_funcs['ask_text'] = idaapi.ask_text original_funcs['ask_str'] = idaapi.ask_str original_funcs['ask_file'] = idaapi.ask_file original_funcs['display_copyright_warning'] = idaapi.display_copyright_warning # Also handle lower-level IDA UI functions if hasattr(idaapi, "get_kernel_version") and idaapi.get_kernel_version() >= "7.0": # IDA 7+ has these functions if hasattr(idaapi, "warning"): original_funcs['warning'] = idaapi.warning idaapi.warning = lambda *args, **kwargs: print(f"[AUTO-CONFIRM] Warning suppressed: {args}") if hasattr(idaapi, "info"): original_funcs['info'] = idaapi.info idaapi.info = lambda *args, **kwargs: print(f"[AUTO-CONFIRM] Info suppressed: {args}") # For specific known dialogs like the "bad digit" dialog if hasattr(idc, "set_inf_attr"): # Suppress "bad digit" dialogs with this setting original_funcs['INFFL_ALLASM'] = idc.get_inf_attr(idc.INF_AF) idc.set_inf_attr(idc.INF_AF, idc.get_inf_attr(idc.INF_AF) | 0x2000) # Set INFFL_ALLASM flag # Create a UI hook to capture any other dialogs class DialogHook(idaapi.UI_Hooks): def populating_widget_popup(self, widget, popup): # Just suppress all popups print("[AUTO-CONFIRM] Suppressing popup") return 1 def finish_populating_widget_popup(self, widget, popup): # Also suppress here print("[AUTO-CONFIRM] Suppressing popup finish") return 1 def ready_to_run(self): # Always continue return 1 def updating_actions(self, ctx): # Always continue return 1 def updated_actions(self): # Always continue return 1 def ui_refresh(self, cnd): # Suppress UI refreshes return 1 # Install UI hook ui_hook = DialogHook() ui_hook.hook() # Functions to automatically respond to various prompts def auto_yes_no(*args, **kwargs): print(f"[AUTO-CONFIRM] Prompt intercepted (Yes/No): {args}") return idaapi.ASKBTN_YES # Always respond YES def auto_buttons(*args, **kwargs): print(f"[AUTO-CONFIRM] Prompt intercepted (Buttons): {args}") return 0 # Return first button (usually OK/Yes/Continue) def auto_text(*args, **kwargs): print(f"[AUTO-CONFIRM] Prompt intercepted (Text): {args}") return "" # Return empty string def auto_file(*args, **kwargs): print(f"[AUTO-CONFIRM] Prompt intercepted (File): {args}") return "" # Return empty string def auto_ignore(*args, **kwargs): print(f"[AUTO-CONFIRM] Warning intercepted: {args}") return 0 # Just return something # Override IDA's prompt functions with our auto-response versions idaapi.ask_yn = auto_yes_no idaapi.ask_buttons = auto_buttons idaapi.ask_text = auto_text idaapi.ask_str = auto_text idaapi.ask_file = auto_file idaapi.display_copyright_warning = auto_ignore # IMPORTANT: Also override searching functions with safer versions # The "Bad digit" dialog is often triggered by these if hasattr(idc, "find_binary"): original_funcs['find_binary'] = idc.find_binary def safe_find_binary(ea, flag, searchstr, radix=16): # Always treat as a string by adding quotes if not present if '"' not in searchstr and "'" not in searchstr: searchstr = f'"{searchstr}"' print(f"[AUTO-CONFIRM] Making search safe: {searchstr}") return original_funcs['find_binary'](ea, flag, searchstr, radix) idc.find_binary = safe_find_binary # Set batch mode to minimize UI interactions (stronger settings) orig_batch = idaapi.set_script_timeout(1) # Set script timeout to suppress dialogs # Additional batch mode settings orig_user_screen_ea = idaapi.get_screen_ea() # Save current IDA settings try: # Enable batch mode if available if hasattr(idaapi, "batch_mode_enabled"): original_funcs['batch_mode'] = idaapi.batch_mode_enabled() idaapi.enable_batch_mode(True) # Disable analysis wait box if hasattr(idaapi, "set_flag"): idaapi.set_flag(idaapi.SW_SHHID_ITEM, True) # Hide wait dialogs idaapi.set_flag(idaapi.SW_HIDE_UNDEF, True) # Hide undefined items idaapi.set_flag(idaapi.SW_HIDE_SEGADDRS, True) # Hide segment addressing # For newer versions of IDA if hasattr(idc, "batch"): original_funcs['batch_mode_idc'] = idc.batch(1) # Enable batch mode except Exception as e: print(f"[AUTO-CONFIRM] Error setting batch mode: {e}") # Script timeout handling class TimeoutException(Exception): pass def timeout_handler(signum, frame): raise TimeoutException("Script execution timed out") # Set timeout for script execution (10 seconds) old_handler = None try: # Only set alarm on platforms that support it (not Windows) if hasattr(signal, 'SIGALRM'): old_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(10) # 10 second timeout except (AttributeError, ValueError): # Signal module might not have SIGALRM on Windows pass try: # Execute the script with size limit to prevent memory issues if len(script) > 1000000: # 1MB limit return {'error': 'Script too large (>1MB)'} # Execute the script exec(script, exec_globals) output = captured_output.getvalue() # Get return value if set return_value = exec_globals.get('return_value', None) response = { 'success': True, 'output': output[:1000000] # Limit output size to 1MB } if return_value is not None: try: # Try to serialize return_value to JSON with size limit json_str = json.dumps(return_value) if len(json_str) <= 1000000: # 1MB limit response['return_value'] = return_value else: response['return_value'] = str(return_value)[:1000000] + "... (truncated)" except (TypeError, OverflowError): # If not JSON serializable, convert to string with limit response['return_value'] = str(return_value)[:1000000] + ( "... (truncated)" if len(str(return_value)) > 1000000 else "") return response except TimeoutException: error_msg = "Script execution timed out (exceeded 10 seconds)" print(f"[RemoteControl] {error_msg}") return {'error': error_msg} except MemoryError: error_msg = "Script caused a memory error" print(f"[RemoteControl] {error_msg}") return {'error': error_msg} except Exception as e: error_msg = f"Script execution error: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") return {'error': error_msg} finally: # Restore stdout sys.stdout = original_stdout # Restore original IDA functions for func_name, original_func in original_funcs.items(): # Special case for INFFL_ALLASM flag if func_name == 'INFFL_ALLASM': idc.set_inf_attr(idc.INF_AF, original_func) # Special case for batch mode elif func_name == 'batch_mode': if hasattr(idaapi, "enable_batch_mode"): idaapi.enable_batch_mode(original_func) elif func_name == 'batch_mode_idc': if hasattr(idc, "batch"): idc.batch(original_func) else: # For all other functions try: if hasattr(idaapi, func_name): setattr(idaapi, func_name, original_func) elif hasattr(idc, func_name): setattr(idc, func_name, original_func) except: print(f"[RemoteControl] Failed to restore {func_name}") # Restore screen position idaapi.jumpto(orig_user_screen_ea) # Unhook UI hooks ui_hook.unhook() # Restore original batch mode idaapi.set_script_timeout(orig_batch) # Cancel alarm if set (for non-Windows platforms) try: if hasattr(signal, 'SIGALRM'): signal.alarm(0) if old_handler is not None: signal.signal(signal.SIGALRM, old_handler) except (AttributeError, ValueError, UnboundLocalError): pass def _handle_get_strings(self): """Handle get strings request.""" result = self._execute_in_main_thread(self._get_strings_impl) self._send_json_response(result) def _get_strings_impl(self): """Implementation of getting strings - runs in main thread.""" min_length = 4 # Minimum string length to include strings_list = [] # Get all strings from binary for ea in idautils.Strings(): if ea.length >= min_length: string_value = str(ea) string_address = ea.ea string_info = { 'address': f"0x{string_address:X}", 'value': string_value, 'length': ea.length, 'type': 'pascal' if ea.strtype == 1 else 'c' } strings_list.append(string_info) return { 'count': len(strings_list), 'strings': strings_list } def _handle_get_exports(self): """Handle get exports request.""" result = self._execute_in_main_thread(self._get_exports_impl) self._send_json_response(result) def _get_exports_impl(self): """Implementation of getting exports - runs in main thread.""" exports_list = [] # Process exports for ordinal, ea, name in idautils.Entries(): exports_list.append({ 'address': f"0x{ea:X}", 'name': name, 'ordinal': ordinal }) return { 'count': len(exports_list), 'exports': exports_list } def _handle_get_imports(self): """Handle get imports request.""" result = self._execute_in_main_thread(self._get_imports_impl) self._send_json_response(result) def _get_imports_impl(self): """Implementation of getting imports - runs in main thread.""" imports_list = [] # Process imports nimps = ida_nalt.get_import_module_qty() for i in range(0, nimps): name = ida_nalt.get_import_module_name(i) if not name: continue def imp_cb(ea, name, ordinal): if name: imports_list.append({ 'address': f"0x{ea:X}", 'name': name, 'ordinal': ordinal }) return True ida_nalt.enum_import_names(i, imp_cb) return { 'count': len(imports_list), 'imports': imports_list } def _handle_get_functions(self): """Handle get functions request.""" result = self._execute_in_main_thread(self._get_functions_impl) self._send_json_response(result) def _get_functions_impl(self): """Implementation of getting functions - runs in main thread.""" functions_list = [] # Get all functions for ea in idautils.Functions(): func = ida_funcs.get_func(ea) if func: func_name = ida_name.get_ea_name(ea) function_info = { 'address': f"0x{ea:X}", 'name': func_name, 'size': func.size(), 'start': f"0x{func.start_ea:X}", 'end': f"0x{func.end_ea:X}", 'flags': func.flags } functions_list.append(function_info) return { 'count': len(functions_list), 'functions': functions_list } def _handle_search_immediate(self): """Handle search for immediate value request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults value = params.get('value', [''])[0] if not value: self._send_error_response('Missing required parameter: value') return # Optional parameters try: radix = int(params.get('radix', ['16'])[0]) except ValueError: radix = 16 try: start_ea = int(params.get('start', ['0'])[0], 0) except ValueError: start_ea = 0 try: end_ea = int(params.get('end', ['0'])[0], 0) except ValueError: end_ea = idc.BADADDR # Execute search in main thread result = self._execute_in_main_thread( self._search_immediate_impl, value, radix, start_ea, end_ea ) self._send_json_response(result) def _search_immediate_impl(self, value, radix, start_ea, end_ea): """Implementation of searching for immediate values - runs in main thread.""" results = [] try: # Convert value to integer if it's a number if isinstance(value, str) and value.isdigit(): value = int(value, radix) # Search for immediate values for ea in idautils.Functions(): func = ida_funcs.get_func(ea) if not func: continue # Skip if outside specified range if start_ea > 0 and func.start_ea < start_ea: continue if end_ea > 0 and func.start_ea >= end_ea: continue # Iterate through instructions in the function current_ea = func.start_ea while current_ea < func.end_ea: insn = idaapi.insn_t() insn_len = idaapi.decode_insn(insn, current_ea) if insn_len > 0: # Check operands for immediate values for i in range(len(insn.ops)): op = insn.ops[i] if op.type == idaapi.o_imm: # If searching for a specific value if isinstance(value, int) and op.value == value: disasm = idc.generate_disasm_line(current_ea, 0) results.append({ 'address': f"0x{current_ea:X}", 'instruction': disasm, 'value': op.value, 'operand_index': i }) # If searching for a string pattern in the disassembly elif isinstance(value, str) and value in idc.generate_disasm_line(current_ea, 0): disasm = idc.generate_disasm_line(current_ea, 0) results.append({ 'address': f"0x{current_ea:X}", 'instruction': disasm, 'value': op.value, 'operand_index': i }) current_ea += insn_len else: current_ea += 1 except Exception as e: return {'error': f"Error searching for immediate values: {str(e)}"} return { 'count': len(results), 'results': results } def _handle_search_text(self): """Handle search for text request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults text = params.get('text', [''])[0] if not text: self._send_error_response('Missing required parameter: text') return # Optional parameters try: start_ea = int(params.get('start', ['0'])[0], 0) except ValueError: start_ea = 0 try: end_ea = int(params.get('end', ['0'])[0], 0) except ValueError: end_ea = idc.BADADDR case_sensitive = params.get('case_sensitive', ['false'])[0].lower() == 'true' # Execute search in main thread result = self._execute_in_main_thread( self._search_text_impl, text, case_sensitive, start_ea, end_ea ) self._send_json_response(result) def _search_text_impl(self, text, case_sensitive, start_ea, end_ea): """Implementation of searching for text - runs in main thread.""" results = [] try: # Get all strings from binary for string_item in idautils.Strings(): if string_item.ea < start_ea: continue if end_ea > 0 and string_item.ea >= end_ea: continue string_value = str(string_item) # Check if text is in string if (case_sensitive and text in string_value) or \ (not case_sensitive and text.lower() in string_value.lower()): results.append({ 'address': f"0x{string_item.ea:X}", 'value': string_value, 'length': string_item.length, 'type': 'pascal' if string_item.strtype == 1 else 'c' }) except Exception as e: return {'error': f"Error searching for text: {str(e)}"} return { 'count': len(results), 'results': results } def _handle_search_bytes(self): """Handle search for byte sequence request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults byte_str = params.get('bytes', [''])[0] if not byte_str: self._send_error_response('Missing required parameter: bytes') return # Optional parameters try: start_ea = int(params.get('start', ['0'])[0], 0) except ValueError: start_ea = 0 try: end_ea = int(params.get('end', ['0'])[0], 0) except ValueError: end_ea = idc.BADADDR # Execute search in main thread result = self._execute_in_main_thread( self._search_bytes_impl, byte_str, start_ea, end_ea ) self._send_json_response(result) def _handle_search_in_names(self): """Handle search for names/symbols in the binary.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults pattern = params.get('pattern', [''])[0] if not pattern: self._send_error_response('Missing required parameter: pattern') return # Optional parameters case_sensitive = params.get('case_sensitive', ['false'])[0].lower() == 'true' # Get name type if specified name_type = params.get('type', ['all'])[0].lower() # Execute search in main thread result = self._execute_in_main_thread( self._search_in_names_impl, pattern, case_sensitive, name_type ) self._send_json_response(result) def _search_bytes_impl(self, byte_str, start_ea, end_ea): """Implementation of searching for byte sequence - runs in main thread.""" results = [] try: # Ensure byte_str is properly formatted for IDA's find_binary # IDA expects a string like "41 42 43" or "41 ?? 43" where ?? is a wildcard # Clean up the input to ensure it's in the right format byte_str = byte_str.strip() if not byte_str.startswith('"') and not byte_str.startswith("'"): byte_str = f'"{byte_str}"' # Start searching ea = start_ea while ea != idc.BADADDR: ea = idc.find_binary(ea, idc.SEARCH_DOWN | idc.SEARCH_NEXT, byte_str) if ea == idc.BADADDR or (end_ea > 0 and ea >= end_ea): break # Get some context around the found bytes disasm = idc.generate_disasm_line(ea, 0) # Add to results results.append({ 'address': f"0x{ea:X}", 'disassembly': disasm, 'bytes': ' '.join([f"{idc.get_wide_byte(ea + i):02X}" for i in range(8)]) # Show 8 bytes }) # Move to next byte to continue search ea += 1 except Exception as e: return {'error': f"Error searching for byte sequence: {str(e)}"} return { 'count': len(results), 'results': results } def _search_in_names_impl(self, pattern, case_sensitive, name_type): """Implementation of searching in names/symbols - runs in main thread.""" results = [] try: # Prepare name type filters is_func = name_type in ['function', 'func', 'functions', 'all'] is_data = name_type in ['data', 'variable', 'variables', 'all'] is_import = name_type in ['import', 'imports', 'all'] is_export = name_type in ['export', 'exports', 'all'] is_label = name_type in ['label', 'labels', 'all'] # Get all names in the database for ea, name in idautils.Names(): # Skip null names if not name: continue # Apply pattern matching based on case sensitivity if (case_sensitive and pattern in name) or \ (not case_sensitive and pattern.lower() in name.lower()): # Determine the type of the name name_info = { 'address': f"0x{ea:X}", 'name': name, 'type': 'unknown' } # Check if it's a function if is_func and ida_funcs.get_func(ea) is not None: name_info['type'] = 'function' if ida_funcs.get_func(ea).start_ea == ea: # Function start name_info['disassembly'] = idc.generate_disasm_line(ea, 0) name_info['is_start'] = True # Check if it's part of imports (using IDA's import list) elif is_import and ida_nalt.is_imported(ea): name_info['type'] = 'import' # Check if it's an export elif is_export and ida_nalt.is_exported(ea): name_info['type'] = 'export' # Check if it's a data variable elif is_data and ida_bytes.is_data(ida_bytes.get_flags(ea)): name_info['type'] = 'data' name_info['data_type'] = idc.get_type_name(ea) # Check if it's a label (non-function named location) elif is_label and not ida_funcs.get_func(ea): name_info['type'] = 'label' name_info['disassembly'] = idc.generate_disasm_line(ea, 0) # Filter out if it doesn't match the requested type if name_type != 'all' and name_info['type'] != name_type and \ not (name_type in ['function', 'func', 'functions'] and name_info['type'] == 'function') and \ not (name_type in ['import', 'imports'] and name_info['type'] == 'import') and \ not (name_type in ['export', 'exports'] and name_info['type'] == 'export') and \ not (name_type in ['data', 'variable', 'variables'] and name_info['type'] == 'data') and \ not (name_type in ['label', 'labels'] and name_info['type'] == 'label'): continue # Add to results results.append(name_info) # Sort results by address results.sort(key=lambda x: int(x['address'], 16)) except Exception as e: return {'error': f"Error searching in names: {str(e)}\n{traceback.format_exc()}"} return { 'count': len(results), 'results': results } def _handle_get_disassembly(self): """Handle get disassembly request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults try: start_ea = int(params.get('start', ['0'])[0], 0) except ValueError: self._send_error_response('Invalid start address') return # Optional parameters try: end_ea = int(params.get('end', ['0'])[0], 0) except ValueError: end_ea = 0 try: count = int(params.get('count', ['10'])[0]) except ValueError: count = 10 # Execute in main thread result = self._execute_in_main_thread( self._get_disassembly_impl, start_ea, end_ea, count ) self._send_json_response(result) def _get_disassembly_impl(self, start_ea, end_ea, count): """Implementation of getting disassembly - runs in main thread.""" disassembly = [] try: # If end_ea is specified, use it, otherwise use count if end_ea > 0: current_ea = start_ea while current_ea < end_ea: disasm = idc.generate_disasm_line(current_ea, 0) bytes_str = ' '.join([f"{idc.get_wide_byte(current_ea + i):02X}" for i in range(min(16, idc.get_item_size(current_ea)))]) disassembly.append({ 'address': f"0x{current_ea:X}", 'disassembly': disasm, 'bytes': bytes_str, 'size': idc.get_item_size(current_ea) }) current_ea += idc.get_item_size(current_ea) if len(disassembly) >= 1000: # Limit to 1000 instructions for safety break else: # Use count to limit the number of instructions current_ea = start_ea for _ in range(min(count, 1000)): # Limit to 1000 instructions for safety disasm = idc.generate_disasm_line(current_ea, 0) bytes_str = ' '.join([f"{idc.get_wide_byte(current_ea + i):02X}" for i in range(min(16, idc.get_item_size(current_ea)))]) disassembly.append({ 'address': f"0x{current_ea:X}", 'disassembly': disasm, 'bytes': bytes_str, 'size': idc.get_item_size(current_ea) }) current_ea += idc.get_item_size(current_ea) if current_ea == idc.BADADDR: break except Exception as e: return {'error': f"Error getting disassembly: {str(e)}"} return { 'count': len(disassembly), 'disassembly': disassembly, 'start_address': f"0x{start_ea:X}", 'end_address': f"0x{end_ea:X}" if end_ea > 0 else None } def _handle_get_xrefs_to(self): """Handle get xrefs to address request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults try: address = int(params.get('address', ['0'])[0], 0) except ValueError: self._send_error_response('Invalid address') return # Optional parameters xref_type = params.get('type', ['all'])[0].lower() # Execute in main thread result = self._execute_in_main_thread( self._get_xrefs_to_impl, address, xref_type ) self._send_json_response(result) def _get_xrefs_to_impl(self, address, xref_type): """Implementation of getting xrefs to address - runs in main thread.""" xrefs = [] try: # Get all cross-references to the specified address for xref in idautils.XrefsTo(address, 0): # Determine xref type xref_info = { 'from_address': f"0x{xref.frm:X}", 'to_address': f"0x{xref.to:X}", 'type': self._get_xref_type_name(xref.type), 'is_code': xref.iscode } # Filter by type if specified if xref_type != 'all': if xref_type == 'code' and not xref.iscode: continue if xref_type == 'data' and xref.iscode: continue # Get function name if available func = ida_funcs.get_func(xref.frm) if func: xref_info['function_name'] = ida_name.get_ea_name(func.start_ea) xref_info['function_address'] = f"0x{func.start_ea:X}" # Get disassembly for context xref_info['disassembly'] = idc.generate_disasm_line(xref.frm, 0) xrefs.append(xref_info) # Sort by address xrefs.sort(key=lambda x: int(x['from_address'], 16)) except Exception as e: return {'error': f"Error getting xrefs to address: {str(e)}\n{traceback.format_exc()}"} return { 'count': len(xrefs), 'xrefs': xrefs, 'address': f"0x{address:X}", 'name': ida_name.get_ea_name(address) } def _handle_get_xrefs_from(self): """Handle get xrefs from address request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults try: address = int(params.get('address', ['0'])[0], 0) except ValueError: self._send_error_response('Invalid address') return # Optional parameters xref_type = params.get('type', ['all'])[0].lower() # Execute in main thread result = self._execute_in_main_thread( self._get_xrefs_from_impl, address, xref_type ) self._send_json_response(result) def _get_xrefs_from_impl(self, address, xref_type): """Implementation of getting xrefs from address - runs in main thread.""" xrefs = [] try: # Get all cross-references from the specified address for xref in idautils.XrefsFrom(address, 0): # Determine xref type xref_info = { 'from_address': f"0x{xref.frm:X}", 'to_address': f"0x{xref.to:X}", 'type': self._get_xref_type_name(xref.type), 'is_code': xref.iscode } # Filter by type if specified if xref_type != 'all': if xref_type == 'code' and not xref.iscode: continue if xref_type == 'data' and xref.iscode: continue # Get target name if available target_name = ida_name.get_ea_name(xref.to) if target_name: xref_info['target_name'] = target_name # Check if target is a function func = ida_funcs.get_func(xref.to) if func and func.start_ea == xref.to: xref_info['target_is_function'] = True xref_info['target_function_name'] = ida_name.get_ea_name(func.start_ea) # Get disassembly for context xref_info['target_disassembly'] = idc.generate_disasm_line(xref.to, 0) xrefs.append(xref_info) # Sort by address xrefs.sort(key=lambda x: int(x['to_address'], 16)) except Exception as e: return {'error': f"Error getting xrefs from address: {str(e)}\n{traceback.format_exc()}"} return { 'count': len(xrefs), 'xrefs': xrefs, 'address': f"0x{address:X}", 'name': ida_name.get_ea_name(address) } def _get_xref_type_name(self, xref_type): """Convert IDA xref type code to human-readable name.""" # Code cross-reference types if xref_type == idaapi.fl_CF: return "call_far" elif xref_type == idaapi.fl_CN: return "call_near" elif xref_type == idaapi.fl_JF: return "jump_far" elif xref_type == idaapi.fl_JN: return "jump_near" # Data cross-reference types elif xref_type == idaapi.dr_O: return "data_offset" elif xref_type == idaapi.dr_W: return "data_write" elif xref_type == idaapi.dr_R: return "data_read" elif xref_type == idaapi.dr_T: return "data_text" elif xref_type == idaapi.dr_I: return "data_informational" else: return f"unknown_{xref_type}" def _execute_in_main_thread(self, func, *args, **kwargs): """Execute a function in the main thread with additional safeguards.""" result_container = {} execution_done = threading.Event() def sync_wrapper(): """Wrapper function to capture the result safely.""" try: result_container['result'] = func(*args, **kwargs) except Exception as e: result_container['error'] = str(e) result_container['traceback'] = traceback.format_exc() finally: # Signal that execution has finished execution_done.set() return 0 # Must return an integer # Schedule execution in the main thread idaapi.execute_sync(sync_wrapper, MFF_READ) # Wait for the result with a timeout max_wait = 30 # Maximum wait time in seconds if not execution_done.wait(max_wait): error_msg = f"Operation timed out after {max_wait} seconds" print(f"[RemoteControl] {error_msg}") return {'error': error_msg} if 'error' in result_container: print(f"[RemoteControl] Error in main thread: {result_container['error']}") print(result_container.get('traceback', '')) return {'error': result_container['error']} return result_container.get('result', {'error': 'Unknown error occurred'}) class RemoteControlServer: """HTTP server for IDA Pro remote control.""" def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT): self.host = host self.port = port self.server = None self.server_thread = None self.running = False def start(self): """Start the HTTP server.""" if self.running: print("[RemoteControl] Server is already running") return False try: self.server = HTTPServer((self.host, self.port), RemoteControlHandler) self.server_thread = threading.Thread(target=self.server.serve_forever) self.server_thread.daemon = True self.server_thread.start() self.running = True print(f"[RemoteControl] Server started on http://{self.host}:{self.port}") return True except Exception as e: print(f"[RemoteControl] Failed to start server: {str(e)}") return False def stop(self): """Stop the HTTP server.""" if not self.running: print("[RemoteControl] Server is not running") return False try: self.server.shutdown() self.server.server_close() self.server_thread.join() self.running = False print("[RemoteControl] Server stopped") return True except Exception as e: print(f"[RemoteControl] Failed to stop server: {str(e)}") return False def is_running(self): """Check if the server is running.""" return self.running class RemoteControlPlugin(idaapi.plugin_t): """IDA Pro plugin for remote control.""" flags = idaapi.PLUGIN_KEEP comment = "Remote control for IDA through HTTP" help = "Provides HTTP endpoints to control IDA Pro remotely" wanted_name = PLUGIN_NAME wanted_hotkey = "Alt-R" def init(self): """Initialize the plugin.""" print(f"[{PLUGIN_NAME}] Initializing...") # Auto-start server if configured if AUTO_START: global g_server g_server = RemoteControlServer(DEFAULT_HOST, DEFAULT_PORT) success = g_server.start() if success: print(f"[{PLUGIN_NAME}] Server auto-started on http://{DEFAULT_HOST}:{DEFAULT_PORT}") print(f"[{PLUGIN_NAME}] Available endpoints:") else: g_server = None print(f"[{PLUGIN_NAME}] Failed to auto-start server") return idaapi.PLUGIN_KEEP def run(self, arg): """Run the plugin when activated manually.""" global g_server # Check if server is already running if g_server and g_server.is_running(): response = idaapi.ask_yn(idaapi.ASKBTN_NO, "Remote control server is already running.\nDo you want to stop it?") if response == idaapi.ASKBTN_YES: g_server.stop() g_server = None return # If AUTO_START is enabled but server isn't running, start with default settings if AUTO_START: g_server = RemoteControlServer(DEFAULT_HOST, DEFAULT_PORT) success = g_server.start() if success: print(f"[{PLUGIN_NAME}] Server started on http://{DEFAULT_HOST}:{DEFAULT_PORT}") else: g_server = None print(f"[{PLUGIN_NAME}] Failed to start server") return # Manual configuration if AUTO_START is disabled # Get host and port from user host = idaapi.ask_str(DEFAULT_HOST, 0, "Enter host address (e.g. 127.0.0.1):") if not host: host = DEFAULT_HOST port_str = idaapi.ask_str(str(DEFAULT_PORT), 0, "Enter port number:") try: port = int(port_str) except (ValueError, TypeError): port = DEFAULT_PORT # Start server g_server = RemoteControlServer(host, port) success = g_server.start() if success: print(f"[{PLUGIN_NAME}] Server started on http://{host}:{port}") print(f"[{PLUGIN_NAME}] Available endpoints:") else: g_server = None print(f"[{PLUGIN_NAME}] Failed to start server") def term(self): """Terminate the plugin.""" global g_server if g_server and g_server.is_running(): g_server.stop() g_server = None print(f"[{PLUGIN_NAME}] Plugin terminated") # Register the plugin def PLUGIN_ENTRY(): """Return the plugin instance.""" return RemoteControlPlugin() # For testing/debugging in the script editor if __name__ == "__main__": # This will only run when executed in the IDA script editor plugin = RemoteControlPlugin() plugin.run(0)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fdrechsler/mcp-server-idapro'

If you have feedback or need assistance with the MCP directory API, please join our Discord server