IDA Pro MCP Server

by fdrechsler
Verified
""" IDA Pro Remote Control Plugin This plugin creates an HTTP server to remotely control certain IDA functions. It exposes endpoints for executing scripts, getting strings, imports, exports, and functions. Author: Florian Drechsler (@fdrechsler) fd@fdrechsler.com """ import idaapi import idautils import idc import ida_funcs import ida_bytes import ida_nalt import ida_name import json from http.server import HTTPServer, BaseHTTPRequestHandler import threading import socket import ssl import base64 import traceback from urllib.parse import parse_qs, urlparse import time # Default settings DEFAULT_HOST = "127.0.0.1" # Localhost only for security DEFAULT_PORT = 9045 PLUGIN_NAME = "IDA Pro Remote Control" PLUGIN_VERSION = "1.0.0" AUTO_START = True # Automatically start server on plugin load # Global variables g_server = None g_server_thread = None # Synchronization flags for execute_sync MFF_FAST = 0x0 # Execute as soon as possible MFF_READ = 0x1 # Wait for the database to be read-ready MFF_WRITE = 0x2 # Wait for the database to be write-ready class RemoteControlHandler(BaseHTTPRequestHandler): """HTTP request handler for the IDA Pro remote control plugin.""" # Add timeout for HTTP requests timeout = 60 # 60-second timeout for HTTP requests def log_message(self, format, *args): """Override logging to use IDA's console.""" print(f"[RemoteControl] {format % args}") def _send_response(self, status_code, content_type, content): """Helper method to send HTTP response.""" try: self.send_response(status_code) self.send_header('Content-Type', content_type) self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) except (ConnectionResetError, BrokenPipeError, socket.error) as e: print(f"[RemoteControl] Connection error when sending response: {e}") def _send_json_response(self, data, status_code=200): """Helper method to send JSON response.""" try: content = json.dumps(data).encode('utf-8') self._send_response(status_code, 'application/json', content) except Exception as e: print(f"[RemoteControl] Error preparing JSON response: {e}") # Try to send a simplified error response try: simple_error = json.dumps({'error': 'Internal server error'}).encode('utf-8') self._send_response(500, 'application/json', simple_error) except: # Silently fail if we can't even send the error pass def _send_error_response(self, message, status_code=400): """Helper method to send error response.""" self._send_json_response({'error': message}, status_code) def _parse_post_data(self): """Parse POST data from request.""" content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length).decode('utf-8') # Handle different content types content_type = self.headers.get('Content-Type', '') if 'application/json' in content_type: return json.loads(post_data) elif 'application/x-www-form-urlencoded' in content_type: parsed_data = parse_qs(post_data) # Convert lists to single values where appropriate return {k: v[0] if len(v) == 1 else v for k, v in parsed_data.items()} else: return {'raw_data': post_data} def do_GET(self): """Handle GET requests.""" path = self.path.lower() try: if path == '/api/info': self._handle_info() elif path == '/api/strings': self._handle_get_strings() elif path == '/api/exports': self._handle_get_exports() elif path == '/api/imports': self._handle_get_imports() elif path == '/api/functions': self._handle_get_functions() elif path.startswith('/api/search/immediate'): self._handle_search_immediate() elif path.startswith('/api/search/text'): self._handle_search_text() elif path.startswith('/api/search/bytes'): self._handle_search_bytes() elif path.startswith('/api/search/names'): self._handle_search_in_names() elif path.startswith('/api/xrefs/to'): self._handle_get_xrefs_to() elif path.startswith('/api/xrefs/from'): self._handle_get_xrefs_from() elif path.startswith('/api/disassembly'): self._handle_get_disassembly() else: self._send_error_response('Endpoint not found', 404) except Exception as e: error_msg = f"Error processing request: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") self._send_error_response(error_msg, 500) def do_POST(self): """Handle POST requests.""" path = self.path.lower() try: if path == '/api/execute': self._handle_execute_script() elif path == '/api/executebypath': self._handle_execute_by_path() elif path == '/api/executebody': self._handle_execute_body() else: self._send_error_response('Endpoint not found', 404) except Exception as e: error_msg = f"Error processing request: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") self._send_error_response(error_msg, 500) def _handle_info(self): """Handle info request.""" result = self._execute_in_main_thread(self._get_info_impl) self._send_json_response(result) def _get_info_impl(self): """Implementation of getting info - runs in main thread.""" info = { 'plugin_name': PLUGIN_NAME, 'plugin_version': PLUGIN_VERSION, 'ida_version': idaapi.get_kernel_version(), 'file_name': idaapi.get_input_file_path(), 'endpoints': [ {'path': '/api/info', 'method': 'GET', 'description': 'Get plugin information'}, {'path': '/api/strings', 'method': 'GET', 'description': 'Get strings from binary'}, {'path': '/api/exports', 'method': 'GET', 'description': 'Get exports from binary'}, {'path': '/api/imports', 'method': 'GET', 'description': 'Get imports from binary'}, {'path': '/api/functions', 'method': 'GET', 'description': 'Get function list'}, {'path': '/api/search/immediate', 'method': 'GET', 'description': 'Search for immediate values'}, {'path': '/api/search/text', 'method': 'GET', 'description': 'Search for text in binary'}, {'path': '/api/search/bytes', 'method': 'GET', 'description': 'Search for byte sequence'}, {'path': '/api/search/names', 'method': 'GET', 'description': 'Search for names/symbols in binary'}, {'path': '/api/xrefs/to', 'method': 'GET', 'description': 'Get cross-references to an address'}, {'path': '/api/xrefs/from', 'method': 'GET', 'description': 'Get cross-references from an address'}, {'path': '/api/disassembly', 'method': 'GET', 'description': 'Get disassembly for an address range'}, {'path': '/api/execute', 'method': 'POST', 'description': 'Execute Python script (JSON/Form)'}, {'path': '/api/executebypath', 'method': 'POST', 'description': 'Execute Python script from file path'}, {'path': '/api/executebody', 'method': 'POST', 'description': 'Execute Python script from raw body'}, ] } return info def _handle_execute_script(self): """Handle script execution request.""" post_data = self._parse_post_data() if 'script' not in post_data: self._send_error_response('No script provided') return script = post_data['script'] # Execute script in the main thread result = self._execute_in_main_thread(self._execute_script_impl, script) if 'error' in result: self._send_error_response(result['error'], 500) else: self._send_json_response(result) def _handle_execute_by_path(self): """Handle script execution from a file path.""" post_data = self._parse_post_data() if 'path' not in post_data: self._send_error_response('No script path provided') return script_path = post_data['path'] try: # Use IDA's main thread to read the file def read_script_file(): try: with open(script_path, 'r') as f: return {'script': f.read()} except Exception as e: return {'error': f"Could not read script file: {str(e)}"} file_result = self._execute_in_main_thread(read_script_file) if 'error' in file_result: self._send_error_response(file_result['error'], 400) return script = file_result['script'] # Execute the script using our existing method result = self._execute_in_main_thread(self._execute_script_impl, script) if 'error' in result: self._send_error_response(result['error'], 500) else: self._send_json_response(result) except Exception as e: error_msg = f"Error executing script from path: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") self._send_error_response(error_msg, 500) def _handle_execute_body(self): """Handle script execution from raw body content.""" try: # Read raw body content content_length = int(self.headers.get('Content-Length', 0)) if content_length > 1000000: # 1MB limit self._send_error_response('Script too large (>1MB)', 413) return script = self.rfile.read(content_length).decode('utf-8') # Execute the script using our existing method result = self._execute_in_main_thread(self._execute_script_impl, script) if 'error' in result: self._send_error_response(result['error'], 500) else: self._send_json_response(result) except Exception as e: error_msg = f"Error executing script from body: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") self._send_error_response(error_msg, 500) def _execute_script_impl(self, script): """Implementation of script execution - runs in main thread with safety measures.""" # Create a safe execution environment with IDA modules exec_globals = { 'idaapi': idaapi, 'idautils': idautils, 'idc': idc, 'ida_funcs': ida_funcs, 'ida_bytes': ida_bytes, 'ida_nalt': ida_nalt, 'ida_name': ida_name, } # Redirect stdout to capture output import io import sys import signal original_stdout = sys.stdout captured_output = io.StringIO() sys.stdout = captured_output # Create hooks to automatically respond to IDA prompts original_funcs = {} # Store original functions we're going to override original_funcs['ask_yn'] = idaapi.ask_yn original_funcs['ask_buttons'] = idaapi.ask_buttons original_funcs['ask_text'] = idaapi.ask_text original_funcs['ask_str'] = idaapi.ask_str original_funcs['ask_file'] = idaapi.ask_file original_funcs['display_copyright_warning'] = idaapi.display_copyright_warning # Also handle lower-level IDA UI functions if hasattr(idaapi, "get_kernel_version") and idaapi.get_kernel_version() >= "7.0": # IDA 7+ has these functions if hasattr(idaapi, "warning"): original_funcs['warning'] = idaapi.warning idaapi.warning = lambda *args, **kwargs: print(f"[AUTO-CONFIRM] Warning suppressed: {args}") if hasattr(idaapi, "info"): original_funcs['info'] = idaapi.info idaapi.info = lambda *args, **kwargs: print(f"[AUTO-CONFIRM] Info suppressed: {args}") # For specific known dialogs like the "bad digit" dialog if hasattr(idc, "set_inf_attr"): # Suppress "bad digit" dialogs with this setting original_funcs['INFFL_ALLASM'] = idc.get_inf_attr(idc.INF_AF) idc.set_inf_attr(idc.INF_AF, idc.get_inf_attr(idc.INF_AF) | 0x2000) # Set INFFL_ALLASM flag # Create a UI hook to capture any other dialogs class DialogHook(idaapi.UI_Hooks): def populating_widget_popup(self, widget, popup): # Just suppress all popups print("[AUTO-CONFIRM] Suppressing popup") return 1 def finish_populating_widget_popup(self, widget, popup): # Also suppress here print("[AUTO-CONFIRM] Suppressing popup finish") return 1 def ready_to_run(self): # Always continue return 1 def updating_actions(self, ctx): # Always continue return 1 def updated_actions(self): # Always continue return 1 def ui_refresh(self, cnd): # Suppress UI refreshes return 1 # Install UI hook ui_hook = DialogHook() ui_hook.hook() # Functions to automatically respond to various prompts def auto_yes_no(*args, **kwargs): print(f"[AUTO-CONFIRM] Prompt intercepted (Yes/No): {args}") return idaapi.ASKBTN_YES # Always respond YES def auto_buttons(*args, **kwargs): print(f"[AUTO-CONFIRM] Prompt intercepted (Buttons): {args}") return 0 # Return first button (usually OK/Yes/Continue) def auto_text(*args, **kwargs): print(f"[AUTO-CONFIRM] Prompt intercepted (Text): {args}") return "" # Return empty string def auto_file(*args, **kwargs): print(f"[AUTO-CONFIRM] Prompt intercepted (File): {args}") return "" # Return empty string def auto_ignore(*args, **kwargs): print(f"[AUTO-CONFIRM] Warning intercepted: {args}") return 0 # Just return something # Override IDA's prompt functions with our auto-response versions idaapi.ask_yn = auto_yes_no idaapi.ask_buttons = auto_buttons idaapi.ask_text = auto_text idaapi.ask_str = auto_text idaapi.ask_file = auto_file idaapi.display_copyright_warning = auto_ignore # IMPORTANT: Also override searching functions with safer versions # The "Bad digit" dialog is often triggered by these if hasattr(idc, "find_binary"): original_funcs['find_binary'] = idc.find_binary def safe_find_binary(ea, flag, searchstr, radix=16): # Always treat as a string by adding quotes if not present if '"' not in searchstr and "'" not in searchstr: searchstr = f'"{searchstr}"' print(f"[AUTO-CONFIRM] Making search safe: {searchstr}") return original_funcs['find_binary'](ea, flag, searchstr, radix) idc.find_binary = safe_find_binary # Set batch mode to minimize UI interactions (stronger settings) orig_batch = idaapi.set_script_timeout(1) # Set script timeout to suppress dialogs # Additional batch mode settings orig_user_screen_ea = idaapi.get_screen_ea() # Save current IDA settings try: # Enable batch mode if available if hasattr(idaapi, "batch_mode_enabled"): original_funcs['batch_mode'] = idaapi.batch_mode_enabled() idaapi.enable_batch_mode(True) # Disable analysis wait box if hasattr(idaapi, "set_flag"): idaapi.set_flag(idaapi.SW_SHHID_ITEM, True) # Hide wait dialogs idaapi.set_flag(idaapi.SW_HIDE_UNDEF, True) # Hide undefined items idaapi.set_flag(idaapi.SW_HIDE_SEGADDRS, True) # Hide segment addressing # For newer versions of IDA if hasattr(idc, "batch"): original_funcs['batch_mode_idc'] = idc.batch(1) # Enable batch mode except Exception as e: print(f"[AUTO-CONFIRM] Error setting batch mode: {e}") # Script timeout handling class TimeoutException(Exception): pass def timeout_handler(signum, frame): raise TimeoutException("Script execution timed out") # Set timeout for script execution (10 seconds) old_handler = None try: # Only set alarm on platforms that support it (not Windows) if hasattr(signal, 'SIGALRM'): old_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(10) # 10 second timeout except (AttributeError, ValueError): # Signal module might not have SIGALRM on Windows pass try: # Execute the script with size limit to prevent memory issues if len(script) > 1000000: # 1MB limit return {'error': 'Script too large (>1MB)'} # Execute the script exec(script, exec_globals) output = captured_output.getvalue() # Get return value if set return_value = exec_globals.get('return_value', None) response = { 'success': True, 'output': output[:1000000] # Limit output size to 1MB } if return_value is not None: try: # Try to serialize return_value to JSON with size limit json_str = json.dumps(return_value) if len(json_str) <= 1000000: # 1MB limit response['return_value'] = return_value else: response['return_value'] = str(return_value)[:1000000] + "... (truncated)" except (TypeError, OverflowError): # If not JSON serializable, convert to string with limit response['return_value'] = str(return_value)[:1000000] + ( "... (truncated)" if len(str(return_value)) > 1000000 else "") return response except TimeoutException: error_msg = "Script execution timed out (exceeded 10 seconds)" print(f"[RemoteControl] {error_msg}") return {'error': error_msg} except MemoryError: error_msg = "Script caused a memory error" print(f"[RemoteControl] {error_msg}") return {'error': error_msg} except Exception as e: error_msg = f"Script execution error: {str(e)}\n{traceback.format_exc()}" print(f"[RemoteControl] {error_msg}") return {'error': error_msg} finally: # Restore stdout sys.stdout = original_stdout # Restore original IDA functions for func_name, original_func in original_funcs.items(): # Special case for INFFL_ALLASM flag if func_name == 'INFFL_ALLASM': idc.set_inf_attr(idc.INF_AF, original_func) # Special case for batch mode elif func_name == 'batch_mode': if hasattr(idaapi, "enable_batch_mode"): idaapi.enable_batch_mode(original_func) elif func_name == 'batch_mode_idc': if hasattr(idc, "batch"): idc.batch(original_func) else: # For all other functions try: if hasattr(idaapi, func_name): setattr(idaapi, func_name, original_func) elif hasattr(idc, func_name): setattr(idc, func_name, original_func) except: print(f"[RemoteControl] Failed to restore {func_name}") # Restore screen position idaapi.jumpto(orig_user_screen_ea) # Unhook UI hooks ui_hook.unhook() # Restore original batch mode idaapi.set_script_timeout(orig_batch) # Cancel alarm if set (for non-Windows platforms) try: if hasattr(signal, 'SIGALRM'): signal.alarm(0) if old_handler is not None: signal.signal(signal.SIGALRM, old_handler) except (AttributeError, ValueError, UnboundLocalError): pass def _handle_get_strings(self): """Handle get strings request.""" result = self._execute_in_main_thread(self._get_strings_impl) self._send_json_response(result) def _get_strings_impl(self): """Implementation of getting strings - runs in main thread.""" min_length = 4 # Minimum string length to include strings_list = [] # Get all strings from binary for ea in idautils.Strings(): if ea.length >= min_length: string_value = str(ea) string_address = ea.ea string_info = { 'address': f"0x{string_address:X}", 'value': string_value, 'length': ea.length, 'type': 'pascal' if ea.strtype == 1 else 'c' } strings_list.append(string_info) return { 'count': len(strings_list), 'strings': strings_list } def _handle_get_exports(self): """Handle get exports request.""" result = self._execute_in_main_thread(self._get_exports_impl) self._send_json_response(result) def _get_exports_impl(self): """Implementation of getting exports - runs in main thread.""" exports_list = [] # Process exports for ordinal, ea, name in idautils.Entries(): exports_list.append({ 'address': f"0x{ea:X}", 'name': name, 'ordinal': ordinal }) return { 'count': len(exports_list), 'exports': exports_list } def _handle_get_imports(self): """Handle get imports request.""" result = self._execute_in_main_thread(self._get_imports_impl) self._send_json_response(result) def _get_imports_impl(self): """Implementation of getting imports - runs in main thread.""" imports_list = [] # Process imports nimps = ida_nalt.get_import_module_qty() for i in range(0, nimps): name = ida_nalt.get_import_module_name(i) if not name: continue def imp_cb(ea, name, ordinal): if name: imports_list.append({ 'address': f"0x{ea:X}", 'name': name, 'ordinal': ordinal }) return True ida_nalt.enum_import_names(i, imp_cb) return { 'count': len(imports_list), 'imports': imports_list } def _handle_get_functions(self): """Handle get functions request.""" result = self._execute_in_main_thread(self._get_functions_impl) self._send_json_response(result) def _get_functions_impl(self): """Implementation of getting functions - runs in main thread.""" functions_list = [] # Get all functions for ea in idautils.Functions(): func = ida_funcs.get_func(ea) if func: func_name = ida_name.get_ea_name(ea) function_info = { 'address': f"0x{ea:X}", 'name': func_name, 'size': func.size(), 'start': f"0x{func.start_ea:X}", 'end': f"0x{func.end_ea:X}", 'flags': func.flags } functions_list.append(function_info) return { 'count': len(functions_list), 'functions': functions_list } def _handle_search_immediate(self): """Handle search for immediate value request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults value = params.get('value', [''])[0] if not value: self._send_error_response('Missing required parameter: value') return # Optional parameters try: radix = int(params.get('radix', ['16'])[0]) except ValueError: radix = 16 try: start_ea = int(params.get('start', ['0'])[0], 0) except ValueError: start_ea = 0 try: end_ea = int(params.get('end', ['0'])[0], 0) except ValueError: end_ea = idc.BADADDR # Execute search in main thread result = self._execute_in_main_thread( self._search_immediate_impl, value, radix, start_ea, end_ea ) self._send_json_response(result) def _search_immediate_impl(self, value, radix, start_ea, end_ea): """Implementation of searching for immediate values - runs in main thread.""" results = [] try: # Convert value to integer if it's a number if isinstance(value, str) and value.isdigit(): value = int(value, radix) # Search for immediate values for ea in idautils.Functions(): func = ida_funcs.get_func(ea) if not func: continue # Skip if outside specified range if start_ea > 0 and func.start_ea < start_ea: continue if end_ea > 0 and func.start_ea >= end_ea: continue # Iterate through instructions in the function current_ea = func.start_ea while current_ea < func.end_ea: insn = idaapi.insn_t() insn_len = idaapi.decode_insn(insn, current_ea) if insn_len > 0: # Check operands for immediate values for i in range(len(insn.ops)): op = insn.ops[i] if op.type == idaapi.o_imm: # If searching for a specific value if isinstance(value, int) and op.value == value: disasm = idc.generate_disasm_line(current_ea, 0) results.append({ 'address': f"0x{current_ea:X}", 'instruction': disasm, 'value': op.value, 'operand_index': i }) # If searching for a string pattern in the disassembly elif isinstance(value, str) and value in idc.generate_disasm_line(current_ea, 0): disasm = idc.generate_disasm_line(current_ea, 0) results.append({ 'address': f"0x{current_ea:X}", 'instruction': disasm, 'value': op.value, 'operand_index': i }) current_ea += insn_len else: current_ea += 1 except Exception as e: return {'error': f"Error searching for immediate values: {str(e)}"} return { 'count': len(results), 'results': results } def _handle_search_text(self): """Handle search for text request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults text = params.get('text', [''])[0] if not text: self._send_error_response('Missing required parameter: text') return # Optional parameters try: start_ea = int(params.get('start', ['0'])[0], 0) except ValueError: start_ea = 0 try: end_ea = int(params.get('end', ['0'])[0], 0) except ValueError: end_ea = idc.BADADDR case_sensitive = params.get('case_sensitive', ['false'])[0].lower() == 'true' # Execute search in main thread result = self._execute_in_main_thread( self._search_text_impl, text, case_sensitive, start_ea, end_ea ) self._send_json_response(result) def _search_text_impl(self, text, case_sensitive, start_ea, end_ea): """Implementation of searching for text - runs in main thread.""" results = [] try: # Get all strings from binary for string_item in idautils.Strings(): if string_item.ea < start_ea: continue if end_ea > 0 and string_item.ea >= end_ea: continue string_value = str(string_item) # Check if text is in string if (case_sensitive and text in string_value) or \ (not case_sensitive and text.lower() in string_value.lower()): results.append({ 'address': f"0x{string_item.ea:X}", 'value': string_value, 'length': string_item.length, 'type': 'pascal' if string_item.strtype == 1 else 'c' }) except Exception as e: return {'error': f"Error searching for text: {str(e)}"} return { 'count': len(results), 'results': results } def _handle_search_bytes(self): """Handle search for byte sequence request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults byte_str = params.get('bytes', [''])[0] if not byte_str: self._send_error_response('Missing required parameter: bytes') return # Optional parameters try: start_ea = int(params.get('start', ['0'])[0], 0) except ValueError: start_ea = 0 try: end_ea = int(params.get('end', ['0'])[0], 0) except ValueError: end_ea = idc.BADADDR # Execute search in main thread result = self._execute_in_main_thread( self._search_bytes_impl, byte_str, start_ea, end_ea ) self._send_json_response(result) def _handle_search_in_names(self): """Handle search for names/symbols in the binary.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults pattern = params.get('pattern', [''])[0] if not pattern: self._send_error_response('Missing required parameter: pattern') return # Optional parameters case_sensitive = params.get('case_sensitive', ['false'])[0].lower() == 'true' # Get name type if specified name_type = params.get('type', ['all'])[0].lower() # Execute search in main thread result = self._execute_in_main_thread( self._search_in_names_impl, pattern, case_sensitive, name_type ) self._send_json_response(result) def _search_bytes_impl(self, byte_str, start_ea, end_ea): """Implementation of searching for byte sequence - runs in main thread.""" results = [] try: # Ensure byte_str is properly formatted for IDA's find_binary # IDA expects a string like "41 42 43" or "41 ?? 43" where ?? is a wildcard # Clean up the input to ensure it's in the right format byte_str = byte_str.strip() if not byte_str.startswith('"') and not byte_str.startswith("'"): byte_str = f'"{byte_str}"' # Start searching ea = start_ea while ea != idc.BADADDR: ea = idc.find_binary(ea, idc.SEARCH_DOWN | idc.SEARCH_NEXT, byte_str) if ea == idc.BADADDR or (end_ea > 0 and ea >= end_ea): break # Get some context around the found bytes disasm = idc.generate_disasm_line(ea, 0) # Add to results results.append({ 'address': f"0x{ea:X}", 'disassembly': disasm, 'bytes': ' '.join([f"{idc.get_wide_byte(ea + i):02X}" for i in range(8)]) # Show 8 bytes }) # Move to next byte to continue search ea += 1 except Exception as e: return {'error': f"Error searching for byte sequence: {str(e)}"} return { 'count': len(results), 'results': results } def _search_in_names_impl(self, pattern, case_sensitive, name_type): """Implementation of searching in names/symbols - runs in main thread.""" results = [] try: # Prepare name type filters is_func = name_type in ['function', 'func', 'functions', 'all'] is_data = name_type in ['data', 'variable', 'variables', 'all'] is_import = name_type in ['import', 'imports', 'all'] is_export = name_type in ['export', 'exports', 'all'] is_label = name_type in ['label', 'labels', 'all'] # Get all names in the database for ea, name in idautils.Names(): # Skip null names if not name: continue # Apply pattern matching based on case sensitivity if (case_sensitive and pattern in name) or \ (not case_sensitive and pattern.lower() in name.lower()): # Determine the type of the name name_info = { 'address': f"0x{ea:X}", 'name': name, 'type': 'unknown' } # Check if it's a function if is_func and ida_funcs.get_func(ea) is not None: name_info['type'] = 'function' if ida_funcs.get_func(ea).start_ea == ea: # Function start name_info['disassembly'] = idc.generate_disasm_line(ea, 0) name_info['is_start'] = True # Check if it's part of imports (using IDA's import list) elif is_import and ida_nalt.is_imported(ea): name_info['type'] = 'import' # Check if it's an export elif is_export and ida_nalt.is_exported(ea): name_info['type'] = 'export' # Check if it's a data variable elif is_data and ida_bytes.is_data(ida_bytes.get_flags(ea)): name_info['type'] = 'data' name_info['data_type'] = idc.get_type_name(ea) # Check if it's a label (non-function named location) elif is_label and not ida_funcs.get_func(ea): name_info['type'] = 'label' name_info['disassembly'] = idc.generate_disasm_line(ea, 0) # Filter out if it doesn't match the requested type if name_type != 'all' and name_info['type'] != name_type and \ not (name_type in ['function', 'func', 'functions'] and name_info['type'] == 'function') and \ not (name_type in ['import', 'imports'] and name_info['type'] == 'import') and \ not (name_type in ['export', 'exports'] and name_info['type'] == 'export') and \ not (name_type in ['data', 'variable', 'variables'] and name_info['type'] == 'data') and \ not (name_type in ['label', 'labels'] and name_info['type'] == 'label'): continue # Add to results results.append(name_info) # Sort results by address results.sort(key=lambda x: int(x['address'], 16)) except Exception as e: return {'error': f"Error searching in names: {str(e)}\n{traceback.format_exc()}"} return { 'count': len(results), 'results': results } def _handle_get_disassembly(self): """Handle get disassembly request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults try: start_ea = int(params.get('start', ['0'])[0], 0) except ValueError: self._send_error_response('Invalid start address') return # Optional parameters try: end_ea = int(params.get('end', ['0'])[0], 0) except ValueError: end_ea = 0 try: count = int(params.get('count', ['10'])[0]) except ValueError: count = 10 # Execute in main thread result = self._execute_in_main_thread( self._get_disassembly_impl, start_ea, end_ea, count ) self._send_json_response(result) def _get_disassembly_impl(self, start_ea, end_ea, count): """Implementation of getting disassembly - runs in main thread.""" disassembly = [] try: # If end_ea is specified, use it, otherwise use count if end_ea > 0: current_ea = start_ea while current_ea < end_ea: disasm = idc.generate_disasm_line(current_ea, 0) bytes_str = ' '.join([f"{idc.get_wide_byte(current_ea + i):02X}" for i in range(min(16, idc.get_item_size(current_ea)))]) disassembly.append({ 'address': f"0x{current_ea:X}", 'disassembly': disasm, 'bytes': bytes_str, 'size': idc.get_item_size(current_ea) }) current_ea += idc.get_item_size(current_ea) if len(disassembly) >= 1000: # Limit to 1000 instructions for safety break else: # Use count to limit the number of instructions current_ea = start_ea for _ in range(min(count, 1000)): # Limit to 1000 instructions for safety disasm = idc.generate_disasm_line(current_ea, 0) bytes_str = ' '.join([f"{idc.get_wide_byte(current_ea + i):02X}" for i in range(min(16, idc.get_item_size(current_ea)))]) disassembly.append({ 'address': f"0x{current_ea:X}", 'disassembly': disasm, 'bytes': bytes_str, 'size': idc.get_item_size(current_ea) }) current_ea += idc.get_item_size(current_ea) if current_ea == idc.BADADDR: break except Exception as e: return {'error': f"Error getting disassembly: {str(e)}"} return { 'count': len(disassembly), 'disassembly': disassembly, 'start_address': f"0x{start_ea:X}", 'end_address': f"0x{end_ea:X}" if end_ea > 0 else None } def _handle_get_xrefs_to(self): """Handle get xrefs to address request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults try: address = int(params.get('address', ['0'])[0], 0) except ValueError: self._send_error_response('Invalid address') return # Optional parameters xref_type = params.get('type', ['all'])[0].lower() # Execute in main thread result = self._execute_in_main_thread( self._get_xrefs_to_impl, address, xref_type ) self._send_json_response(result) def _get_xrefs_to_impl(self, address, xref_type): """Implementation of getting xrefs to address - runs in main thread.""" xrefs = [] try: # Get all cross-references to the specified address for xref in idautils.XrefsTo(address, 0): # Determine xref type xref_info = { 'from_address': f"0x{xref.frm:X}", 'to_address': f"0x{xref.to:X}", 'type': self._get_xref_type_name(xref.type), 'is_code': xref.iscode } # Filter by type if specified if xref_type != 'all': if xref_type == 'code' and not xref.iscode: continue if xref_type == 'data' and xref.iscode: continue # Get function name if available func = ida_funcs.get_func(xref.frm) if func: xref_info['function_name'] = ida_name.get_ea_name(func.start_ea) xref_info['function_address'] = f"0x{func.start_ea:X}" # Get disassembly for context xref_info['disassembly'] = idc.generate_disasm_line(xref.frm, 0) xrefs.append(xref_info) # Sort by address xrefs.sort(key=lambda x: int(x['from_address'], 16)) except Exception as e: return {'error': f"Error getting xrefs to address: {str(e)}\n{traceback.format_exc()}"} return { 'count': len(xrefs), 'xrefs': xrefs, 'address': f"0x{address:X}", 'name': ida_name.get_ea_name(address) } def _handle_get_xrefs_from(self): """Handle get xrefs from address request.""" # Parse query parameters parsed_url = urlparse(self.path) params = parse_qs(parsed_url.query) # Get parameters with defaults try: address = int(params.get('address', ['0'])[0], 0) except ValueError: self._send_error_response('Invalid address') return # Optional parameters xref_type = params.get('type', ['all'])[0].lower() # Execute in main thread result = self._execute_in_main_thread( self._get_xrefs_from_impl, address, xref_type ) self._send_json_response(result) def _get_xrefs_from_impl(self, address, xref_type): """Implementation of getting xrefs from address - runs in main thread.""" xrefs = [] try: # Get all cross-references from the specified address for xref in idautils.XrefsFrom(address, 0): # Determine xref type xref_info = { 'from_address': f"0x{xref.frm:X}", 'to_address': f"0x{xref.to:X}", 'type': self._get_xref_type_name(xref.type), 'is_code': xref.iscode } # Filter by type if specified if xref_type != 'all': if xref_type == 'code' and not xref.iscode: continue if xref_type == 'data' and xref.iscode: continue # Get target name if available target_name = ida_name.get_ea_name(xref.to) if target_name: xref_info['target_name'] = target_name # Check if target is a function func = ida_funcs.get_func(xref.to) if func and func.start_ea == xref.to: xref_info['target_is_function'] = True xref_info['target_function_name'] = ida_name.get_ea_name(func.start_ea) # Get disassembly for context xref_info['target_disassembly'] = idc.generate_disasm_line(xref.to, 0) xrefs.append(xref_info) # Sort by address xrefs.sort(key=lambda x: int(x['to_address'], 16)) except Exception as e: return {'error': f"Error getting xrefs from address: {str(e)}\n{traceback.format_exc()}"} return { 'count': len(xrefs), 'xrefs': xrefs, 'address': f"0x{address:X}", 'name': ida_name.get_ea_name(address) } def _get_xref_type_name(self, xref_type): """Convert IDA xref type code to human-readable name.""" # Code cross-reference types if xref_type == idaapi.fl_CF: return "call_far" elif xref_type == idaapi.fl_CN: return "call_near" elif xref_type == idaapi.fl_JF: return "jump_far" elif xref_type == idaapi.fl_JN: return "jump_near" # Data cross-reference types elif xref_type == idaapi.dr_O: return "data_offset" elif xref_type == idaapi.dr_W: return "data_write" elif xref_type == idaapi.dr_R: return "data_read" elif xref_type == idaapi.dr_T: return "data_text" elif xref_type == idaapi.dr_I: return "data_informational" else: return f"unknown_{xref_type}" def _execute_in_main_thread(self, func, *args, **kwargs): """Execute a function in the main thread with additional safeguards.""" result_container = {} execution_done = threading.Event() def sync_wrapper(): """Wrapper function to capture the result safely.""" try: result_container['result'] = func(*args, **kwargs) except Exception as e: result_container['error'] = str(e) result_container['traceback'] = traceback.format_exc() finally: # Signal that execution has finished execution_done.set() return 0 # Must return an integer # Schedule execution in the main thread idaapi.execute_sync(sync_wrapper, MFF_READ) # Wait for the result with a timeout max_wait = 30 # Maximum wait time in seconds if not execution_done.wait(max_wait): error_msg = f"Operation timed out after {max_wait} seconds" print(f"[RemoteControl] {error_msg}") return {'error': error_msg} if 'error' in result_container: print(f"[RemoteControl] Error in main thread: {result_container['error']}") print(result_container.get('traceback', '')) return {'error': result_container['error']} return result_container.get('result', {'error': 'Unknown error occurred'}) class RemoteControlServer: """HTTP server for IDA Pro remote control.""" def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT): self.host = host self.port = port self.server = None self.server_thread = None self.running = False def start(self): """Start the HTTP server.""" if self.running: print("[RemoteControl] Server is already running") return False try: self.server = HTTPServer((self.host, self.port), RemoteControlHandler) self.server_thread = threading.Thread(target=self.server.serve_forever) self.server_thread.daemon = True self.server_thread.start() self.running = True print(f"[RemoteControl] Server started on http://{self.host}:{self.port}") return True except Exception as e: print(f"[RemoteControl] Failed to start server: {str(e)}") return False def stop(self): """Stop the HTTP server.""" if not self.running: print("[RemoteControl] Server is not running") return False try: self.server.shutdown() self.server.server_close() self.server_thread.join() self.running = False print("[RemoteControl] Server stopped") return True except Exception as e: print(f"[RemoteControl] Failed to stop server: {str(e)}") return False def is_running(self): """Check if the server is running.""" return self.running class RemoteControlPlugin(idaapi.plugin_t): """IDA Pro plugin for remote control.""" flags = idaapi.PLUGIN_KEEP comment = "Remote control for IDA through HTTP" help = "Provides HTTP endpoints to control IDA Pro remotely" wanted_name = PLUGIN_NAME wanted_hotkey = "Alt-R" def init(self): """Initialize the plugin.""" print(f"[{PLUGIN_NAME}] Initializing...") # Auto-start server if configured if AUTO_START: global g_server g_server = RemoteControlServer(DEFAULT_HOST, DEFAULT_PORT) success = g_server.start() if success: print(f"[{PLUGIN_NAME}] Server auto-started on http://{DEFAULT_HOST}:{DEFAULT_PORT}") print(f"[{PLUGIN_NAME}] Available endpoints:") else: g_server = None print(f"[{PLUGIN_NAME}] Failed to auto-start server") return idaapi.PLUGIN_KEEP def run(self, arg): """Run the plugin when activated manually.""" global g_server # Check if server is already running if g_server and g_server.is_running(): response = idaapi.ask_yn(idaapi.ASKBTN_NO, "Remote control server is already running.\nDo you want to stop it?") if response == idaapi.ASKBTN_YES: g_server.stop() g_server = None return # If AUTO_START is enabled but server isn't running, start with default settings if AUTO_START: g_server = RemoteControlServer(DEFAULT_HOST, DEFAULT_PORT) success = g_server.start() if success: print(f"[{PLUGIN_NAME}] Server started on http://{DEFAULT_HOST}:{DEFAULT_PORT}") else: g_server = None print(f"[{PLUGIN_NAME}] Failed to start server") return # Manual configuration if AUTO_START is disabled # Get host and port from user host = idaapi.ask_str(DEFAULT_HOST, 0, "Enter host address (e.g. 127.0.0.1):") if not host: host = DEFAULT_HOST port_str = idaapi.ask_str(str(DEFAULT_PORT), 0, "Enter port number:") try: port = int(port_str) except (ValueError, TypeError): port = DEFAULT_PORT # Start server g_server = RemoteControlServer(host, port) success = g_server.start() if success: print(f"[{PLUGIN_NAME}] Server started on http://{host}:{port}") print(f"[{PLUGIN_NAME}] Available endpoints:") else: g_server = None print(f"[{PLUGIN_NAME}] Failed to start server") def term(self): """Terminate the plugin.""" global g_server if g_server and g_server.is_running(): g_server.stop() g_server = None print(f"[{PLUGIN_NAME}] Plugin terminated") # Register the plugin def PLUGIN_ENTRY(): """Return the plugin instance.""" return RemoteControlPlugin() # For testing/debugging in the script editor if __name__ == "__main__": # This will only run when executed in the IDA script editor plugin = RemoteControlPlugin() plugin.run(0)