ida_remote_server.py•55.9 kB
"""
IDA Pro Remote Control Plugin
This plugin creates an HTTP server to remotely control certain IDA functions.
It exposes endpoints for executing scripts, getting strings, imports, exports, and functions.
Author: Florian Drechsler (@fdrechsler) fd@fdrechsler.com
"""
import idaapi
import idautils
import idc
import ida_funcs
import ida_bytes
import ida_nalt
import ida_name
import json
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import socket
import ssl
import base64
import traceback
from urllib.parse import parse_qs, urlparse
import time
# Default settings
DEFAULT_HOST = "127.0.0.1" # Localhost only for security
DEFAULT_PORT = 9045
PLUGIN_NAME = "IDA Pro Remote Control"
PLUGIN_VERSION = "1.0.0"
AUTO_START = True # Automatically start server on plugin load
# Global variables
g_server = None
g_server_thread = None
# Synchronization flags for execute_sync
MFF_FAST = 0x0 # Execute as soon as possible
MFF_READ = 0x1 # Wait for the database to be read-ready
MFF_WRITE = 0x2 # Wait for the database to be write-ready
class RemoteControlHandler(BaseHTTPRequestHandler):
"""HTTP request handler for the IDA Pro remote control plugin."""
# Add timeout for HTTP requests
timeout = 60 # 60-second timeout for HTTP requests
def log_message(self, format, *args):
"""Override logging to use IDA's console."""
print(f"[RemoteControl] {format % args}")
def _send_response(self, status_code, content_type, content):
"""Helper method to send HTTP response."""
try:
self.send_response(status_code)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
except (ConnectionResetError, BrokenPipeError, socket.error) as e:
print(f"[RemoteControl] Connection error when sending response: {e}")
def _send_json_response(self, data, status_code=200):
"""Helper method to send JSON response."""
try:
content = json.dumps(data).encode('utf-8')
self._send_response(status_code, 'application/json', content)
except Exception as e:
print(f"[RemoteControl] Error preparing JSON response: {e}")
# Try to send a simplified error response
try:
simple_error = json.dumps({'error': 'Internal server error'}).encode('utf-8')
self._send_response(500, 'application/json', simple_error)
except:
# Silently fail if we can't even send the error
pass
def _send_error_response(self, message, status_code=400):
"""Helper method to send error response."""
self._send_json_response({'error': message}, status_code)
def _parse_post_data(self):
"""Parse POST data from request."""
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
# Handle different content types
content_type = self.headers.get('Content-Type', '')
if 'application/json' in content_type:
return json.loads(post_data)
elif 'application/x-www-form-urlencoded' in content_type:
parsed_data = parse_qs(post_data)
# Convert lists to single values where appropriate
return {k: v[0] if len(v) == 1 else v for k, v in parsed_data.items()}
else:
return {'raw_data': post_data}
def do_GET(self):
"""Handle GET requests."""
path = self.path.lower()
try:
if path == '/api/info':
self._handle_info()
elif path == '/api/strings':
self._handle_get_strings()
elif path == '/api/exports':
self._handle_get_exports()
elif path == '/api/imports':
self._handle_get_imports()
elif path == '/api/functions':
self._handle_get_functions()
elif path.startswith('/api/search/immediate'):
self._handle_search_immediate()
elif path.startswith('/api/search/text'):
self._handle_search_text()
elif path.startswith('/api/search/bytes'):
self._handle_search_bytes()
elif path.startswith('/api/search/names'):
self._handle_search_in_names()
elif path.startswith('/api/xrefs/to'):
self._handle_get_xrefs_to()
elif path.startswith('/api/xrefs/from'):
self._handle_get_xrefs_from()
elif path.startswith('/api/disassembly'):
self._handle_get_disassembly()
else:
self._send_error_response('Endpoint not found', 404)
except Exception as e:
error_msg = f"Error processing request: {str(e)}\n{traceback.format_exc()}"
print(f"[RemoteControl] {error_msg}")
self._send_error_response(error_msg, 500)
def do_POST(self):
"""Handle POST requests."""
path = self.path.lower()
try:
if path == '/api/execute':
self._handle_execute_script()
elif path == '/api/executebypath':
self._handle_execute_by_path()
elif path == '/api/executebody':
self._handle_execute_body()
else:
self._send_error_response('Endpoint not found', 404)
except Exception as e:
error_msg = f"Error processing request: {str(e)}\n{traceback.format_exc()}"
print(f"[RemoteControl] {error_msg}")
self._send_error_response(error_msg, 500)
def _handle_info(self):
"""Handle info request."""
result = self._execute_in_main_thread(self._get_info_impl)
self._send_json_response(result)
def _get_info_impl(self):
"""Implementation of getting info - runs in main thread."""
info = {
'plugin_name': PLUGIN_NAME,
'plugin_version': PLUGIN_VERSION,
'ida_version': idaapi.get_kernel_version(),
'file_name': idaapi.get_input_file_path(),
'endpoints': [
{'path': '/api/info', 'method': 'GET', 'description': 'Get plugin information'},
{'path': '/api/strings', 'method': 'GET', 'description': 'Get strings from binary'},
{'path': '/api/exports', 'method': 'GET', 'description': 'Get exports from binary'},
{'path': '/api/imports', 'method': 'GET', 'description': 'Get imports from binary'},
{'path': '/api/functions', 'method': 'GET', 'description': 'Get function list'},
{'path': '/api/search/immediate', 'method': 'GET', 'description': 'Search for immediate values'},
{'path': '/api/search/text', 'method': 'GET', 'description': 'Search for text in binary'},
{'path': '/api/search/bytes', 'method': 'GET', 'description': 'Search for byte sequence'},
{'path': '/api/search/names', 'method': 'GET', 'description': 'Search for names/symbols in binary'},
{'path': '/api/xrefs/to', 'method': 'GET', 'description': 'Get cross-references to an address'},
{'path': '/api/xrefs/from', 'method': 'GET', 'description': 'Get cross-references from an address'},
{'path': '/api/disassembly', 'method': 'GET', 'description': 'Get disassembly for an address range'},
{'path': '/api/execute', 'method': 'POST', 'description': 'Execute Python script (JSON/Form)'},
{'path': '/api/executebypath', 'method': 'POST', 'description': 'Execute Python script from file path'},
{'path': '/api/executebody', 'method': 'POST', 'description': 'Execute Python script from raw body'},
]
}
return info
def _handle_execute_script(self):
"""Handle script execution request."""
post_data = self._parse_post_data()
if 'script' not in post_data:
self._send_error_response('No script provided')
return
script = post_data['script']
# Execute script in the main thread
result = self._execute_in_main_thread(self._execute_script_impl, script)
if 'error' in result:
self._send_error_response(result['error'], 500)
else:
self._send_json_response(result)
def _handle_execute_by_path(self):
"""Handle script execution from a file path."""
post_data = self._parse_post_data()
if 'path' not in post_data:
self._send_error_response('No script path provided')
return
script_path = post_data['path']
try:
# Use IDA's main thread to read the file
def read_script_file():
try:
with open(script_path, 'r') as f:
return {'script': f.read()}
except Exception as e:
return {'error': f"Could not read script file: {str(e)}"}
file_result = self._execute_in_main_thread(read_script_file)
if 'error' in file_result:
self._send_error_response(file_result['error'], 400)
return
script = file_result['script']
# Execute the script using our existing method
result = self._execute_in_main_thread(self._execute_script_impl, script)
if 'error' in result:
self._send_error_response(result['error'], 500)
else:
self._send_json_response(result)
except Exception as e:
error_msg = f"Error executing script from path: {str(e)}\n{traceback.format_exc()}"
print(f"[RemoteControl] {error_msg}")
self._send_error_response(error_msg, 500)
def _handle_execute_body(self):
"""Handle script execution from raw body content."""
try:
# Read raw body content
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 1000000: # 1MB limit
self._send_error_response('Script too large (>1MB)', 413)
return
script = self.rfile.read(content_length).decode('utf-8')
# Execute the script using our existing method
result = self._execute_in_main_thread(self._execute_script_impl, script)
if 'error' in result:
self._send_error_response(result['error'], 500)
else:
self._send_json_response(result)
except Exception as e:
error_msg = f"Error executing script from body: {str(e)}\n{traceback.format_exc()}"
print(f"[RemoteControl] {error_msg}")
self._send_error_response(error_msg, 500)
def _execute_script_impl(self, script):
"""Implementation of script execution - runs in main thread with safety measures."""
# Create a safe execution environment with IDA modules
exec_globals = {
'idaapi': idaapi,
'idautils': idautils,
'idc': idc,
'ida_funcs': ida_funcs,
'ida_bytes': ida_bytes,
'ida_nalt': ida_nalt,
'ida_name': ida_name,
}
# Redirect stdout to capture output
import io
import sys
import signal
original_stdout = sys.stdout
captured_output = io.StringIO()
sys.stdout = captured_output
# Create hooks to automatically respond to IDA prompts
original_funcs = {}
# Store original functions we're going to override
original_funcs['ask_yn'] = idaapi.ask_yn
original_funcs['ask_buttons'] = idaapi.ask_buttons
original_funcs['ask_text'] = idaapi.ask_text
original_funcs['ask_str'] = idaapi.ask_str
original_funcs['ask_file'] = idaapi.ask_file
original_funcs['display_copyright_warning'] = idaapi.display_copyright_warning
# Also handle lower-level IDA UI functions
if hasattr(idaapi, "get_kernel_version") and idaapi.get_kernel_version() >= "7.0":
# IDA 7+ has these functions
if hasattr(idaapi, "warning"):
original_funcs['warning'] = idaapi.warning
idaapi.warning = lambda *args, **kwargs: print(f"[AUTO-CONFIRM] Warning suppressed: {args}")
if hasattr(idaapi, "info"):
original_funcs['info'] = idaapi.info
idaapi.info = lambda *args, **kwargs: print(f"[AUTO-CONFIRM] Info suppressed: {args}")
# For specific known dialogs like the "bad digit" dialog
if hasattr(idc, "set_inf_attr"):
# Suppress "bad digit" dialogs with this setting
original_funcs['INFFL_ALLASM'] = idc.get_inf_attr(idc.INF_AF)
idc.set_inf_attr(idc.INF_AF, idc.get_inf_attr(idc.INF_AF) | 0x2000) # Set INFFL_ALLASM flag
# Create a UI hook to capture any other dialogs
class DialogHook(idaapi.UI_Hooks):
def populating_widget_popup(self, widget, popup):
# Just suppress all popups
print("[AUTO-CONFIRM] Suppressing popup")
return 1
def finish_populating_widget_popup(self, widget, popup):
# Also suppress here
print("[AUTO-CONFIRM] Suppressing popup finish")
return 1
def ready_to_run(self):
# Always continue
return 1
def updating_actions(self, ctx):
# Always continue
return 1
def updated_actions(self):
# Always continue
return 1
def ui_refresh(self, cnd):
# Suppress UI refreshes
return 1
# Install UI hook
ui_hook = DialogHook()
ui_hook.hook()
# Functions to automatically respond to various prompts
def auto_yes_no(*args, **kwargs):
print(f"[AUTO-CONFIRM] Prompt intercepted (Yes/No): {args}")
return idaapi.ASKBTN_YES # Always respond YES
def auto_buttons(*args, **kwargs):
print(f"[AUTO-CONFIRM] Prompt intercepted (Buttons): {args}")
return 0 # Return first button (usually OK/Yes/Continue)
def auto_text(*args, **kwargs):
print(f"[AUTO-CONFIRM] Prompt intercepted (Text): {args}")
return "" # Return empty string
def auto_file(*args, **kwargs):
print(f"[AUTO-CONFIRM] Prompt intercepted (File): {args}")
return "" # Return empty string
def auto_ignore(*args, **kwargs):
print(f"[AUTO-CONFIRM] Warning intercepted: {args}")
return 0 # Just return something
# Override IDA's prompt functions with our auto-response versions
idaapi.ask_yn = auto_yes_no
idaapi.ask_buttons = auto_buttons
idaapi.ask_text = auto_text
idaapi.ask_str = auto_text
idaapi.ask_file = auto_file
idaapi.display_copyright_warning = auto_ignore
# IMPORTANT: Also override searching functions with safer versions
# The "Bad digit" dialog is often triggered by these
if hasattr(idc, "find_binary"):
original_funcs['find_binary'] = idc.find_binary
def safe_find_binary(ea, flag, searchstr, radix=16):
# Always treat as a string by adding quotes if not present
if '"' not in searchstr and "'" not in searchstr:
searchstr = f'"{searchstr}"'
print(f"[AUTO-CONFIRM] Making search safe: {searchstr}")
return original_funcs['find_binary'](ea, flag, searchstr, radix)
idc.find_binary = safe_find_binary
# Set batch mode to minimize UI interactions (stronger settings)
orig_batch = idaapi.set_script_timeout(1) # Set script timeout to suppress dialogs
# Additional batch mode settings
orig_user_screen_ea = idaapi.get_screen_ea()
# Save current IDA settings
try:
# Enable batch mode if available
if hasattr(idaapi, "batch_mode_enabled"):
original_funcs['batch_mode'] = idaapi.batch_mode_enabled()
idaapi.enable_batch_mode(True)
# Disable analysis wait box
if hasattr(idaapi, "set_flag"):
idaapi.set_flag(idaapi.SW_SHHID_ITEM, True) # Hide wait dialogs
idaapi.set_flag(idaapi.SW_HIDE_UNDEF, True) # Hide undefined items
idaapi.set_flag(idaapi.SW_HIDE_SEGADDRS, True) # Hide segment addressing
# For newer versions of IDA
if hasattr(idc, "batch"):
original_funcs['batch_mode_idc'] = idc.batch(1) # Enable batch mode
except Exception as e:
print(f"[AUTO-CONFIRM] Error setting batch mode: {e}")
# Script timeout handling
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException("Script execution timed out")
# Set timeout for script execution (10 seconds)
old_handler = None
try:
# Only set alarm on platforms that support it (not Windows)
if hasattr(signal, 'SIGALRM'):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(10) # 10 second timeout
except (AttributeError, ValueError):
# Signal module might not have SIGALRM on Windows
pass
try:
# Execute the script with size limit to prevent memory issues
if len(script) > 1000000: # 1MB limit
return {'error': 'Script too large (>1MB)'}
# Execute the script
exec(script, exec_globals)
output = captured_output.getvalue()
# Get return value if set
return_value = exec_globals.get('return_value', None)
response = {
'success': True,
'output': output[:1000000] # Limit output size to 1MB
}
if return_value is not None:
try:
# Try to serialize return_value to JSON with size limit
json_str = json.dumps(return_value)
if len(json_str) <= 1000000: # 1MB limit
response['return_value'] = return_value
else:
response['return_value'] = str(return_value)[:1000000] + "... (truncated)"
except (TypeError, OverflowError):
# If not JSON serializable, convert to string with limit
response['return_value'] = str(return_value)[:1000000] + (
"... (truncated)" if len(str(return_value)) > 1000000 else "")
return response
except TimeoutException:
error_msg = "Script execution timed out (exceeded 10 seconds)"
print(f"[RemoteControl] {error_msg}")
return {'error': error_msg}
except MemoryError:
error_msg = "Script caused a memory error"
print(f"[RemoteControl] {error_msg}")
return {'error': error_msg}
except Exception as e:
error_msg = f"Script execution error: {str(e)}\n{traceback.format_exc()}"
print(f"[RemoteControl] {error_msg}")
return {'error': error_msg}
finally:
# Restore stdout
sys.stdout = original_stdout
# Restore original IDA functions
for func_name, original_func in original_funcs.items():
# Special case for INFFL_ALLASM flag
if func_name == 'INFFL_ALLASM':
idc.set_inf_attr(idc.INF_AF, original_func)
# Special case for batch mode
elif func_name == 'batch_mode':
if hasattr(idaapi, "enable_batch_mode"):
idaapi.enable_batch_mode(original_func)
elif func_name == 'batch_mode_idc':
if hasattr(idc, "batch"):
idc.batch(original_func)
else:
# For all other functions
try:
if hasattr(idaapi, func_name):
setattr(idaapi, func_name, original_func)
elif hasattr(idc, func_name):
setattr(idc, func_name, original_func)
except:
print(f"[RemoteControl] Failed to restore {func_name}")
# Restore screen position
idaapi.jumpto(orig_user_screen_ea)
# Unhook UI hooks
ui_hook.unhook()
# Restore original batch mode
idaapi.set_script_timeout(orig_batch)
# Cancel alarm if set (for non-Windows platforms)
try:
if hasattr(signal, 'SIGALRM'):
signal.alarm(0)
if old_handler is not None:
signal.signal(signal.SIGALRM, old_handler)
except (AttributeError, ValueError, UnboundLocalError):
pass
def _handle_get_strings(self):
"""Handle get strings request."""
result = self._execute_in_main_thread(self._get_strings_impl)
self._send_json_response(result)
def _get_strings_impl(self):
"""Implementation of getting strings - runs in main thread."""
min_length = 4 # Minimum string length to include
strings_list = []
# Get all strings from binary
for ea in idautils.Strings():
if ea.length >= min_length:
string_value = str(ea)
string_address = ea.ea
string_info = {
'address': f"0x{string_address:X}",
'value': string_value,
'length': ea.length,
'type': 'pascal' if ea.strtype == 1 else 'c'
}
strings_list.append(string_info)
return {
'count': len(strings_list),
'strings': strings_list
}
def _handle_get_exports(self):
"""Handle get exports request."""
result = self._execute_in_main_thread(self._get_exports_impl)
self._send_json_response(result)
def _get_exports_impl(self):
"""Implementation of getting exports - runs in main thread."""
exports_list = []
# Process exports
for ordinal, ea, name in idautils.Entries():
exports_list.append({
'address': f"0x{ea:X}",
'name': name,
'ordinal': ordinal
})
return {
'count': len(exports_list),
'exports': exports_list
}
def _handle_get_imports(self):
"""Handle get imports request."""
result = self._execute_in_main_thread(self._get_imports_impl)
self._send_json_response(result)
def _get_imports_impl(self):
"""Implementation of getting imports - runs in main thread."""
imports_list = []
# Process imports
nimps = ida_nalt.get_import_module_qty()
for i in range(0, nimps):
name = ida_nalt.get_import_module_name(i)
if not name:
continue
def imp_cb(ea, name, ordinal):
if name:
imports_list.append({
'address': f"0x{ea:X}",
'name': name,
'ordinal': ordinal
})
return True
ida_nalt.enum_import_names(i, imp_cb)
return {
'count': len(imports_list),
'imports': imports_list
}
def _handle_get_functions(self):
"""Handle get functions request."""
result = self._execute_in_main_thread(self._get_functions_impl)
self._send_json_response(result)
def _get_functions_impl(self):
"""Implementation of getting functions - runs in main thread."""
functions_list = []
# Get all functions
for ea in idautils.Functions():
func = ida_funcs.get_func(ea)
if func:
func_name = ida_name.get_ea_name(ea)
function_info = {
'address': f"0x{ea:X}",
'name': func_name,
'size': func.size(),
'start': f"0x{func.start_ea:X}",
'end': f"0x{func.end_ea:X}",
'flags': func.flags
}
functions_list.append(function_info)
return {
'count': len(functions_list),
'functions': functions_list
}
def _handle_search_immediate(self):
"""Handle search for immediate value request."""
# Parse query parameters
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
# Get parameters with defaults
value = params.get('value', [''])[0]
if not value:
self._send_error_response('Missing required parameter: value')
return
# Optional parameters
try:
radix = int(params.get('radix', ['16'])[0])
except ValueError:
radix = 16
try:
start_ea = int(params.get('start', ['0'])[0], 0)
except ValueError:
start_ea = 0
try:
end_ea = int(params.get('end', ['0'])[0], 0)
except ValueError:
end_ea = idc.BADADDR
# Execute search in main thread
result = self._execute_in_main_thread(
self._search_immediate_impl,
value,
radix,
start_ea,
end_ea
)
self._send_json_response(result)
def _search_immediate_impl(self, value, radix, start_ea, end_ea):
"""Implementation of searching for immediate values - runs in main thread."""
results = []
try:
# Convert value to integer if it's a number
if isinstance(value, str) and value.isdigit():
value = int(value, radix)
# Search for immediate values
for ea in idautils.Functions():
func = ida_funcs.get_func(ea)
if not func:
continue
# Skip if outside specified range
if start_ea > 0 and func.start_ea < start_ea:
continue
if end_ea > 0 and func.start_ea >= end_ea:
continue
# Iterate through instructions in the function
current_ea = func.start_ea
while current_ea < func.end_ea:
insn = idaapi.insn_t()
insn_len = idaapi.decode_insn(insn, current_ea)
if insn_len > 0:
# Check operands for immediate values
for i in range(len(insn.ops)):
op = insn.ops[i]
if op.type == idaapi.o_imm:
# If searching for a specific value
if isinstance(value, int) and op.value == value:
disasm = idc.generate_disasm_line(current_ea, 0)
results.append({
'address': f"0x{current_ea:X}",
'instruction': disasm,
'value': op.value,
'operand_index': i
})
# If searching for a string pattern in the disassembly
elif isinstance(value, str) and value in idc.generate_disasm_line(current_ea, 0):
disasm = idc.generate_disasm_line(current_ea, 0)
results.append({
'address': f"0x{current_ea:X}",
'instruction': disasm,
'value': op.value,
'operand_index': i
})
current_ea += insn_len
else:
current_ea += 1
except Exception as e:
return {'error': f"Error searching for immediate values: {str(e)}"}
return {
'count': len(results),
'results': results
}
def _handle_search_text(self):
"""Handle search for text request."""
# Parse query parameters
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
# Get parameters with defaults
text = params.get('text', [''])[0]
if not text:
self._send_error_response('Missing required parameter: text')
return
# Optional parameters
try:
start_ea = int(params.get('start', ['0'])[0], 0)
except ValueError:
start_ea = 0
try:
end_ea = int(params.get('end', ['0'])[0], 0)
except ValueError:
end_ea = idc.BADADDR
case_sensitive = params.get('case_sensitive', ['false'])[0].lower() == 'true'
# Execute search in main thread
result = self._execute_in_main_thread(
self._search_text_impl,
text,
case_sensitive,
start_ea,
end_ea
)
self._send_json_response(result)
def _search_text_impl(self, text, case_sensitive, start_ea, end_ea):
"""Implementation of searching for text - runs in main thread."""
results = []
try:
# Get all strings from binary
for string_item in idautils.Strings():
if string_item.ea < start_ea:
continue
if end_ea > 0 and string_item.ea >= end_ea:
continue
string_value = str(string_item)
# Check if text is in string
if (case_sensitive and text in string_value) or \
(not case_sensitive and text.lower() in string_value.lower()):
results.append({
'address': f"0x{string_item.ea:X}",
'value': string_value,
'length': string_item.length,
'type': 'pascal' if string_item.strtype == 1 else 'c'
})
except Exception as e:
return {'error': f"Error searching for text: {str(e)}"}
return {
'count': len(results),
'results': results
}
def _handle_search_bytes(self):
"""Handle search for byte sequence request."""
# Parse query parameters
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
# Get parameters with defaults
byte_str = params.get('bytes', [''])[0]
if not byte_str:
self._send_error_response('Missing required parameter: bytes')
return
# Optional parameters
try:
start_ea = int(params.get('start', ['0'])[0], 0)
except ValueError:
start_ea = 0
try:
end_ea = int(params.get('end', ['0'])[0], 0)
except ValueError:
end_ea = idc.BADADDR
# Execute search in main thread
result = self._execute_in_main_thread(
self._search_bytes_impl,
byte_str,
start_ea,
end_ea
)
self._send_json_response(result)
def _handle_search_in_names(self):
"""Handle search for names/symbols in the binary."""
# Parse query parameters
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
# Get parameters with defaults
pattern = params.get('pattern', [''])[0]
if not pattern:
self._send_error_response('Missing required parameter: pattern')
return
# Optional parameters
case_sensitive = params.get('case_sensitive', ['false'])[0].lower() == 'true'
# Get name type if specified
name_type = params.get('type', ['all'])[0].lower()
# Execute search in main thread
result = self._execute_in_main_thread(
self._search_in_names_impl,
pattern,
case_sensitive,
name_type
)
self._send_json_response(result)
def _search_bytes_impl(self, byte_str, start_ea, end_ea):
"""Implementation of searching for byte sequence - runs in main thread."""
results = []
try:
# Ensure byte_str is properly formatted for IDA's find_binary
# IDA expects a string like "41 42 43" or "41 ?? 43" where ?? is a wildcard
# Clean up the input to ensure it's in the right format
byte_str = byte_str.strip()
if not byte_str.startswith('"') and not byte_str.startswith("'"):
byte_str = f'"{byte_str}"'
# Start searching
ea = start_ea
while ea != idc.BADADDR:
ea = idc.find_binary(ea, idc.SEARCH_DOWN | idc.SEARCH_NEXT, byte_str)
if ea == idc.BADADDR or (end_ea > 0 and ea >= end_ea):
break
# Get some context around the found bytes
disasm = idc.generate_disasm_line(ea, 0)
# Add to results
results.append({
'address': f"0x{ea:X}",
'disassembly': disasm,
'bytes': ' '.join([f"{idc.get_wide_byte(ea + i):02X}" for i in range(8)]) # Show 8 bytes
})
# Move to next byte to continue search
ea += 1
except Exception as e:
return {'error': f"Error searching for byte sequence: {str(e)}"}
return {
'count': len(results),
'results': results
}
def _search_in_names_impl(self, pattern, case_sensitive, name_type):
"""Implementation of searching in names/symbols - runs in main thread."""
results = []
try:
# Prepare name type filters
is_func = name_type in ['function', 'func', 'functions', 'all']
is_data = name_type in ['data', 'variable', 'variables', 'all']
is_import = name_type in ['import', 'imports', 'all']
is_export = name_type in ['export', 'exports', 'all']
is_label = name_type in ['label', 'labels', 'all']
# Get all names in the database
for ea, name in idautils.Names():
# Skip null names
if not name:
continue
# Apply pattern matching based on case sensitivity
if (case_sensitive and pattern in name) or \
(not case_sensitive and pattern.lower() in name.lower()):
# Determine the type of the name
name_info = {
'address': f"0x{ea:X}",
'name': name,
'type': 'unknown'
}
# Check if it's a function
if is_func and ida_funcs.get_func(ea) is not None:
name_info['type'] = 'function'
if ida_funcs.get_func(ea).start_ea == ea: # Function start
name_info['disassembly'] = idc.generate_disasm_line(ea, 0)
name_info['is_start'] = True
# Check if it's part of imports (using IDA's import list)
elif is_import and ida_nalt.is_imported(ea):
name_info['type'] = 'import'
# Check if it's an export
elif is_export and ida_nalt.is_exported(ea):
name_info['type'] = 'export'
# Check if it's a data variable
elif is_data and ida_bytes.is_data(ida_bytes.get_flags(ea)):
name_info['type'] = 'data'
name_info['data_type'] = idc.get_type_name(ea)
# Check if it's a label (non-function named location)
elif is_label and not ida_funcs.get_func(ea):
name_info['type'] = 'label'
name_info['disassembly'] = idc.generate_disasm_line(ea, 0)
# Filter out if it doesn't match the requested type
if name_type != 'all' and name_info['type'] != name_type and \
not (name_type in ['function', 'func', 'functions'] and name_info['type'] == 'function') and \
not (name_type in ['import', 'imports'] and name_info['type'] == 'import') and \
not (name_type in ['export', 'exports'] and name_info['type'] == 'export') and \
not (name_type in ['data', 'variable', 'variables'] and name_info['type'] == 'data') and \
not (name_type in ['label', 'labels'] and name_info['type'] == 'label'):
continue
# Add to results
results.append(name_info)
# Sort results by address
results.sort(key=lambda x: int(x['address'], 16))
except Exception as e:
return {'error': f"Error searching in names: {str(e)}\n{traceback.format_exc()}"}
return {
'count': len(results),
'results': results
}
def _handle_get_disassembly(self):
"""Handle get disassembly request."""
# Parse query parameters
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
# Get parameters with defaults
try:
start_ea = int(params.get('start', ['0'])[0], 0)
except ValueError:
self._send_error_response('Invalid start address')
return
# Optional parameters
try:
end_ea = int(params.get('end', ['0'])[0], 0)
except ValueError:
end_ea = 0
try:
count = int(params.get('count', ['10'])[0])
except ValueError:
count = 10
# Execute in main thread
result = self._execute_in_main_thread(
self._get_disassembly_impl,
start_ea,
end_ea,
count
)
self._send_json_response(result)
def _get_disassembly_impl(self, start_ea, end_ea, count):
"""Implementation of getting disassembly - runs in main thread."""
disassembly = []
try:
# If end_ea is specified, use it, otherwise use count
if end_ea > 0:
current_ea = start_ea
while current_ea < end_ea:
disasm = idc.generate_disasm_line(current_ea, 0)
bytes_str = ' '.join([f"{idc.get_wide_byte(current_ea + i):02X}" for i in range(min(16, idc.get_item_size(current_ea)))])
disassembly.append({
'address': f"0x{current_ea:X}",
'disassembly': disasm,
'bytes': bytes_str,
'size': idc.get_item_size(current_ea)
})
current_ea += idc.get_item_size(current_ea)
if len(disassembly) >= 1000: # Limit to 1000 instructions for safety
break
else:
# Use count to limit the number of instructions
current_ea = start_ea
for _ in range(min(count, 1000)): # Limit to 1000 instructions for safety
disasm = idc.generate_disasm_line(current_ea, 0)
bytes_str = ' '.join([f"{idc.get_wide_byte(current_ea + i):02X}" for i in range(min(16, idc.get_item_size(current_ea)))])
disassembly.append({
'address': f"0x{current_ea:X}",
'disassembly': disasm,
'bytes': bytes_str,
'size': idc.get_item_size(current_ea)
})
current_ea += idc.get_item_size(current_ea)
if current_ea == idc.BADADDR:
break
except Exception as e:
return {'error': f"Error getting disassembly: {str(e)}"}
return {
'count': len(disassembly),
'disassembly': disassembly,
'start_address': f"0x{start_ea:X}",
'end_address': f"0x{end_ea:X}" if end_ea > 0 else None
}
def _handle_get_xrefs_to(self):
"""Handle get xrefs to address request."""
# Parse query parameters
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
# Get parameters with defaults
try:
address = int(params.get('address', ['0'])[0], 0)
except ValueError:
self._send_error_response('Invalid address')
return
# Optional parameters
xref_type = params.get('type', ['all'])[0].lower()
# Execute in main thread
result = self._execute_in_main_thread(
self._get_xrefs_to_impl,
address,
xref_type
)
self._send_json_response(result)
def _get_xrefs_to_impl(self, address, xref_type):
"""Implementation of getting xrefs to address - runs in main thread."""
xrefs = []
try:
# Get all cross-references to the specified address
for xref in idautils.XrefsTo(address, 0):
# Determine xref type
xref_info = {
'from_address': f"0x{xref.frm:X}",
'to_address': f"0x{xref.to:X}",
'type': self._get_xref_type_name(xref.type),
'is_code': xref.iscode
}
# Filter by type if specified
if xref_type != 'all':
if xref_type == 'code' and not xref.iscode:
continue
if xref_type == 'data' and xref.iscode:
continue
# Get function name if available
func = ida_funcs.get_func(xref.frm)
if func:
xref_info['function_name'] = ida_name.get_ea_name(func.start_ea)
xref_info['function_address'] = f"0x{func.start_ea:X}"
# Get disassembly for context
xref_info['disassembly'] = idc.generate_disasm_line(xref.frm, 0)
xrefs.append(xref_info)
# Sort by address
xrefs.sort(key=lambda x: int(x['from_address'], 16))
except Exception as e:
return {'error': f"Error getting xrefs to address: {str(e)}\n{traceback.format_exc()}"}
return {
'count': len(xrefs),
'xrefs': xrefs,
'address': f"0x{address:X}",
'name': ida_name.get_ea_name(address)
}
def _handle_get_xrefs_from(self):
"""Handle get xrefs from address request."""
# Parse query parameters
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
# Get parameters with defaults
try:
address = int(params.get('address', ['0'])[0], 0)
except ValueError:
self._send_error_response('Invalid address')
return
# Optional parameters
xref_type = params.get('type', ['all'])[0].lower()
# Execute in main thread
result = self._execute_in_main_thread(
self._get_xrefs_from_impl,
address,
xref_type
)
self._send_json_response(result)
def _get_xrefs_from_impl(self, address, xref_type):
"""Implementation of getting xrefs from address - runs in main thread."""
xrefs = []
try:
# Get all cross-references from the specified address
for xref in idautils.XrefsFrom(address, 0):
# Determine xref type
xref_info = {
'from_address': f"0x{xref.frm:X}",
'to_address': f"0x{xref.to:X}",
'type': self._get_xref_type_name(xref.type),
'is_code': xref.iscode
}
# Filter by type if specified
if xref_type != 'all':
if xref_type == 'code' and not xref.iscode:
continue
if xref_type == 'data' and xref.iscode:
continue
# Get target name if available
target_name = ida_name.get_ea_name(xref.to)
if target_name:
xref_info['target_name'] = target_name
# Check if target is a function
func = ida_funcs.get_func(xref.to)
if func and func.start_ea == xref.to:
xref_info['target_is_function'] = True
xref_info['target_function_name'] = ida_name.get_ea_name(func.start_ea)
# Get disassembly for context
xref_info['target_disassembly'] = idc.generate_disasm_line(xref.to, 0)
xrefs.append(xref_info)
# Sort by address
xrefs.sort(key=lambda x: int(x['to_address'], 16))
except Exception as e:
return {'error': f"Error getting xrefs from address: {str(e)}\n{traceback.format_exc()}"}
return {
'count': len(xrefs),
'xrefs': xrefs,
'address': f"0x{address:X}",
'name': ida_name.get_ea_name(address)
}
def _get_xref_type_name(self, xref_type):
"""Convert IDA xref type code to human-readable name."""
# Code cross-reference types
if xref_type == idaapi.fl_CF:
return "call_far"
elif xref_type == idaapi.fl_CN:
return "call_near"
elif xref_type == idaapi.fl_JF:
return "jump_far"
elif xref_type == idaapi.fl_JN:
return "jump_near"
# Data cross-reference types
elif xref_type == idaapi.dr_O:
return "data_offset"
elif xref_type == idaapi.dr_W:
return "data_write"
elif xref_type == idaapi.dr_R:
return "data_read"
elif xref_type == idaapi.dr_T:
return "data_text"
elif xref_type == idaapi.dr_I:
return "data_informational"
else:
return f"unknown_{xref_type}"
def _execute_in_main_thread(self, func, *args, **kwargs):
"""Execute a function in the main thread with additional safeguards."""
result_container = {}
execution_done = threading.Event()
def sync_wrapper():
"""Wrapper function to capture the result safely."""
try:
result_container['result'] = func(*args, **kwargs)
except Exception as e:
result_container['error'] = str(e)
result_container['traceback'] = traceback.format_exc()
finally:
# Signal that execution has finished
execution_done.set()
return 0 # Must return an integer
# Schedule execution in the main thread
idaapi.execute_sync(sync_wrapper, MFF_READ)
# Wait for the result with a timeout
max_wait = 30 # Maximum wait time in seconds
if not execution_done.wait(max_wait):
error_msg = f"Operation timed out after {max_wait} seconds"
print(f"[RemoteControl] {error_msg}")
return {'error': error_msg}
if 'error' in result_container:
print(f"[RemoteControl] Error in main thread: {result_container['error']}")
print(result_container.get('traceback', ''))
return {'error': result_container['error']}
return result_container.get('result', {'error': 'Unknown error occurred'})
class RemoteControlServer:
"""HTTP server for IDA Pro remote control."""
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT):
self.host = host
self.port = port
self.server = None
self.server_thread = None
self.running = False
def start(self):
"""Start the HTTP server."""
if self.running:
print("[RemoteControl] Server is already running")
return False
try:
self.server = HTTPServer((self.host, self.port), RemoteControlHandler)
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.daemon = True
self.server_thread.start()
self.running = True
print(f"[RemoteControl] Server started on http://{self.host}:{self.port}")
return True
except Exception as e:
print(f"[RemoteControl] Failed to start server: {str(e)}")
return False
def stop(self):
"""Stop the HTTP server."""
if not self.running:
print("[RemoteControl] Server is not running")
return False
try:
self.server.shutdown()
self.server.server_close()
self.server_thread.join()
self.running = False
print("[RemoteControl] Server stopped")
return True
except Exception as e:
print(f"[RemoteControl] Failed to stop server: {str(e)}")
return False
def is_running(self):
"""Check if the server is running."""
return self.running
class RemoteControlPlugin(idaapi.plugin_t):
"""IDA Pro plugin for remote control."""
flags = idaapi.PLUGIN_KEEP
comment = "Remote control for IDA through HTTP"
help = "Provides HTTP endpoints to control IDA Pro remotely"
wanted_name = PLUGIN_NAME
wanted_hotkey = "Alt-R"
def init(self):
"""Initialize the plugin."""
print(f"[{PLUGIN_NAME}] Initializing...")
# Auto-start server if configured
if AUTO_START:
global g_server
g_server = RemoteControlServer(DEFAULT_HOST, DEFAULT_PORT)
success = g_server.start()
if success:
print(f"[{PLUGIN_NAME}] Server auto-started on http://{DEFAULT_HOST}:{DEFAULT_PORT}")
print(f"[{PLUGIN_NAME}] Available endpoints:")
else:
g_server = None
print(f"[{PLUGIN_NAME}] Failed to auto-start server")
return idaapi.PLUGIN_KEEP
def run(self, arg):
"""Run the plugin when activated manually."""
global g_server
# Check if server is already running
if g_server and g_server.is_running():
response = idaapi.ask_yn(idaapi.ASKBTN_NO,
"Remote control server is already running.\nDo you want to stop it?")
if response == idaapi.ASKBTN_YES:
g_server.stop()
g_server = None
return
# If AUTO_START is enabled but server isn't running, start with default settings
if AUTO_START:
g_server = RemoteControlServer(DEFAULT_HOST, DEFAULT_PORT)
success = g_server.start()
if success:
print(f"[{PLUGIN_NAME}] Server started on http://{DEFAULT_HOST}:{DEFAULT_PORT}")
else:
g_server = None
print(f"[{PLUGIN_NAME}] Failed to start server")
return
# Manual configuration if AUTO_START is disabled
# Get host and port from user
host = idaapi.ask_str(DEFAULT_HOST, 0, "Enter host address (e.g. 127.0.0.1):")
if not host:
host = DEFAULT_HOST
port_str = idaapi.ask_str(str(DEFAULT_PORT), 0, "Enter port number:")
try:
port = int(port_str)
except (ValueError, TypeError):
port = DEFAULT_PORT
# Start server
g_server = RemoteControlServer(host, port)
success = g_server.start()
if success:
print(f"[{PLUGIN_NAME}] Server started on http://{host}:{port}")
print(f"[{PLUGIN_NAME}] Available endpoints:")
else:
g_server = None
print(f"[{PLUGIN_NAME}] Failed to start server")
def term(self):
"""Terminate the plugin."""
global g_server
if g_server and g_server.is_running():
g_server.stop()
g_server = None
print(f"[{PLUGIN_NAME}] Plugin terminated")
# Register the plugin
def PLUGIN_ENTRY():
"""Return the plugin instance."""
return RemoteControlPlugin()
# For testing/debugging in the script editor
if __name__ == "__main__":
# This will only run when executed in the IDA script editor
plugin = RemoteControlPlugin()
plugin.run(0)