ida-mcp-server
- ida-mcp-server
- plugin
- ida_mcp_server_plugin
import idaapi
import idautils
import ida_funcs
import ida_hexrays
import ida_bytes
import ida_name
import ida_segment
import ida_lines
import idc
import json
import traceback
import functools
import queue
from typing import Any, Callable, TypeVar, Optional, Dict, List, Union, Tuple, Type
# Type variable for function return type
T = TypeVar('T')
class IDASyncError(Exception):
"""Exception raised for IDA synchronization errors"""
pass
# Global call stack to track synchronization calls
call_stack: queue.LifoQueue[str] = queue.LifoQueue()
def sync_wrapper(func: Callable[..., T], sync_type: int) -> T:
"""
Wrapper function to execute a function in IDA's main thread
Args:
func: The function to execute
sync_type: Synchronization type (MFF_READ or MFF_WRITE)
Returns:
The result of the function execution
"""
if sync_type not in [idaapi.MFF_READ, idaapi.MFF_WRITE]:
error_str = f'Invalid sync type {sync_type} for function {func.__name__}'
print(error_str)
raise IDASyncError(error_str)
# Container for the result
result_container: queue.Queue[Any] = queue.Queue()
def execute_in_main_thread() -> int:
# Check if we're already inside a sync_wrapper call
if not call_stack.empty():
last_func = call_stack.get()
error_str = f'Nested sync call detected: function {func.__name__} called from {last_func}'
print(error_str)
call_stack.put(last_func) # Put it back
raise IDASyncError(error_str)
# Add function to call stack
call_stack.put(func.__name__)
try:
# Execute function and store result
result_container.put(func())
except Exception as e:
print(f"Error in {func.__name__}: {str(e)}")
traceback.print_exc()
result_container.put(None)
finally:
# Always remove function from call stack
call_stack.get()
return 1 # Required by execute_sync
# Execute in IDA's main thread
idaapi.execute_sync(execute_in_main_thread, sync_type)
# Return the result
return result_container.get()
def idaread(func: Callable[..., T]) -> Callable[..., T]:
"""
Decorator for functions that read from the IDA database
Args:
func: The function to decorate
Returns:
Decorated function that executes in IDA's main thread with read access
"""
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> T:
# Create a partial function with the arguments
partial_func = functools.partial(func, *args, **kwargs)
# Preserve the original function name
partial_func.__name__ = func.__name__
# Execute with sync_wrapper
return sync_wrapper(partial_func, idaapi.MFF_READ)
return wrapper
def idawrite(func: Callable[..., T]) -> Callable[..., T]:
"""
Decorator for functions that write to the IDA database
Args:
func: The function to decorate
Returns:
Decorated function that executes in IDA's main thread with write access
"""
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> T:
# Create a partial function with the arguments
partial_func = functools.partial(func, *args, **kwargs)
# Preserve the original function name
partial_func.__name__ = func.__name__
# Execute with sync_wrapper
return sync_wrapper(partial_func, idaapi.MFF_WRITE)
return wrapper
class IDAMCPCore:
"""Core functionality implementation class for IDA MCP"""
@idaread
def get_function_assembly_by_name(self, function_name: str) -> Dict[str, Any]:
"""Get assembly code for a function by its name"""
try:
# Get function address from name
func = idaapi.get_func(idaapi.get_name_ea(0, function_name))
if not func:
return {"error": f"Function '{function_name}' not found"}
# Call address-based implementation
result = self._get_function_assembly_by_address_internal(func.start_ea)
# If successful, add function name to result
if "error" not in result:
result["function_name"] = function_name
return result
except Exception as e:
traceback.print_exc()
return {"error": str(e)}
@idaread
def get_function_assembly_by_address(self, address: int) -> Dict[str, Any]:
"""Get assembly code for a function by its address"""
return self._get_function_assembly_by_address_internal(address)
def _get_function_assembly_by_address_internal(self, address: int) -> Dict[str, Any]:
"""Internal implementation for get_function_assembly_by_address without sync wrapper"""
try:
# Get function object
func = ida_funcs.get_func(address)
if not func:
return {"error": f"Invalid function at {hex(address)}"}
# Collect all assembly instructions
assembly_lines = []
for instr_addr in idautils.FuncItems(address):
disasm = idc.GetDisasm(instr_addr)
assembly_lines.append(f"{hex(instr_addr)}: {disasm}")
if not assembly_lines:
return {"error": "No assembly instructions found"}
return {"assembly": "\n".join(assembly_lines)}
except Exception as e:
print(f"Error getting function assembly: {str(e)}")
traceback.print_exc()
return {"error": str(e)}
@idaread
def get_function_decompiled_by_name(self, function_name: str) -> Dict[str, Any]:
"""Get decompiled code for a function by its name"""
try:
# Get function address from name
func_addr = idaapi.get_name_ea(0, function_name)
if func_addr == idaapi.BADADDR:
return {"error": f"Function '{function_name}' not found"}
# Call internal implementation without decorator
result = self._get_function_decompiled_by_address_internal(func_addr)
# If successful, add function name to result
if "error" not in result:
result["function_name"] = function_name
return result
except Exception as e:
traceback.print_exc()
return {"error": str(e)}
@idaread
def get_function_decompiled_by_address(self, address: int) -> Dict[str, Any]:
"""Get decompiled code for a function by its address"""
return self._get_function_decompiled_by_address_internal(address)
def _get_function_decompiled_by_address_internal(self, address: int) -> Dict[str, Any]:
"""Internal implementation for get_function_decompiled_by_address without sync wrapper"""
try:
# Get function from address
func = idaapi.get_func(address)
if not func:
return {"error": f"No function found at address 0x{address:X}"}
# Get function name
func_name = idaapi.get_func_name(func.start_ea)
# Try to import decompiler module
try:
import ida_hexrays
except ImportError:
return {"error": "Hex-Rays decompiler is not available"}
# Check if decompiler is available
if not ida_hexrays.init_hexrays_plugin():
return {"error": "Unable to initialize Hex-Rays decompiler"}
# Get decompiled function
cfunc = None
try:
cfunc = ida_hexrays.decompile(func.start_ea)
except Exception as e:
return {"error": f"Unable to decompile function: {str(e)}"}
if not cfunc:
return {"error": "Decompilation failed"}
# Get pseudocode as string
decompiled_code = str(cfunc)
return {"decompiled_code": decompiled_code, "function_name": func_name}
except Exception as e:
traceback.print_exc()
return {"error": str(e)}
@idaread
def get_current_function_assembly(self) -> Dict[str, Any]:
"""Get assembly code for the function at the current cursor position"""
try:
# Get current address
curr_addr = idaapi.get_screen_ea()
if curr_addr == idaapi.BADADDR:
return {"error": "No valid cursor position"}
# Use the internal implementation without decorator
return self._get_function_assembly_by_address_internal(curr_addr)
except Exception as e:
traceback.print_exc()
return {"error": str(e)}
@idaread
def get_current_function_decompiled(self) -> Dict[str, Any]:
"""Get decompiled code for the function at the current cursor position"""
try:
# Get current address
curr_addr = idaapi.get_screen_ea()
if curr_addr == idaapi.BADADDR:
return {"error": "No valid cursor position"}
# Use the internal implementation without decorator
return self._get_function_decompiled_by_address_internal(curr_addr)
except Exception as e:
traceback.print_exc()
return {"error": str(e)}
@idaread
def get_global_variable_by_name(self, variable_name: str) -> Dict[str, Any]:
"""Get global variable information by its name"""
try:
# Get variable address
var_addr: int = ida_name.get_name_ea(0, variable_name)
if var_addr == idaapi.BADADDR:
return {"error": f"Global variable '{variable_name}' not found"}
# Call internal implementation
result = self._get_global_variable_by_address_internal(var_addr)
# If successful, add variable name to result
if "error" not in result and "variable_info" in result:
# Parse the JSON string back to dict to modify it
var_info = json.loads(result["variable_info"])
var_info["name"] = variable_name
# Convert back to JSON string
result["variable_info"] = json.dumps(var_info, indent=2)
return result
except Exception as e:
print(f"Error getting global variable by name: {str(e)}")
traceback.print_exc()
return {"error": str(e)}
@idaread
def get_global_variable_by_address(self, address: int) -> Dict[str, Any]:
"""Get global variable information by its address"""
return self._get_global_variable_by_address_internal(address)
def _get_global_variable_by_address_internal(self, address: int) -> Dict[str, Any]:
"""Internal implementation for get_global_variable_by_address without sync wrapper"""
try:
# Verify address is valid
if address == idaapi.BADADDR:
return {"error": f"Invalid address: {hex(address)}"}
# Get variable name if available
variable_name = ida_name.get_name(address)
if not variable_name:
variable_name = f"unnamed_{hex(address)}"
# Get variable segment
segment: Optional[ida_segment.segment_t] = ida_segment.getseg(address)
if not segment:
return {"error": f"No segment found for address {hex(address)}"}
segment_name: str = ida_segment.get_segm_name(segment)
segment_class: str = ida_segment.get_segm_class(segment)
# Get variable type
tinfo = idaapi.tinfo_t()
guess_type: bool = idaapi.guess_tinfo(tinfo, address)
type_str: str = tinfo.get_type_name() if guess_type else "unknown"
# Try to get variable value
size: int = ida_bytes.get_item_size(address)
if size <= 0:
size = 8 # Default to 8 bytes
# Read data based on size
value: Optional[int] = None
if size == 1:
value = ida_bytes.get_byte(address)
elif size == 2:
value = ida_bytes.get_word(address)
elif size == 4:
value = ida_bytes.get_dword(address)
elif size == 8:
value = ida_bytes.get_qword(address)
# Build variable info
var_info: Dict[str, Any] = {
"name": variable_name,
"address": hex(address),
"segment": segment_name,
"segment_class": segment_class,
"type": type_str,
"size": size,
"value": hex(value) if value is not None else "N/A"
}
# If it's a string, try to read string content
if ida_bytes.is_strlit(ida_bytes.get_flags(address)):
str_value = idc.get_strlit_contents(address, -1, 0)
if str_value:
try:
var_info["string_value"] = str_value.decode('utf-8', errors='replace')
except:
var_info["string_value"] = str(str_value)
return {"variable_info": json.dumps(var_info, indent=2)}
except Exception as e:
print(f"Error getting global variable by address: {str(e)}")
traceback.print_exc()
return {"error": str(e)}
@idawrite
def rename_global_variable(self, old_name: str, new_name: str) -> Dict[str, Any]:
"""Rename a global variable"""
return self._rename_global_variable_internal(old_name, new_name)
def _rename_global_variable_internal(self, old_name: str, new_name: str) -> Dict[str, Any]:
"""Internal implementation for rename_global_variable without sync wrapper"""
try:
# Get variable address
var_addr: int = ida_name.get_name_ea(0, old_name)
if var_addr == idaapi.BADADDR:
return {"success": False, "message": f"Variable '{old_name}' not found"}
# Check if new name is already in use
if ida_name.get_name_ea(0, new_name) != idaapi.BADADDR:
return {"success": False, "message": f"Name '{new_name}' is already in use"}
# Try to rename
if not ida_name.set_name(var_addr, new_name):
return {"success": False, "message": f"Failed to rename variable, possibly due to invalid name format or other IDA restrictions"}
# Refresh view
self._refresh_view_internal()
return {"success": True, "message": f"Variable renamed from '{old_name}' to '{new_name}' at address {hex(var_addr)}"}
except Exception as e:
print(f"Error renaming variable: {str(e)}")
traceback.print_exc()
return {"success": False, "message": str(e)}
@idawrite
def rename_function(self, old_name: str, new_name: str) -> Dict[str, Any]:
"""Rename a function"""
return self._rename_function_internal(old_name, new_name)
def _rename_function_internal(self, old_name: str, new_name: str) -> Dict[str, Any]:
"""Internal implementation for rename_function without sync wrapper"""
try:
# Get function address
func_addr: int = ida_name.get_name_ea(0, old_name)
if func_addr == idaapi.BADADDR:
return {"success": False, "message": f"Function '{old_name}' not found"}
# Check if it's a function
func: Optional[ida_funcs.func_t] = ida_funcs.get_func(func_addr)
if not func:
return {"success": False, "message": f"'{old_name}' is not a function"}
# Check if new name is already in use
if ida_name.get_name_ea(0, new_name) != idaapi.BADADDR:
return {"success": False, "message": f"Name '{new_name}' is already in use"}
# Try to rename
if not ida_name.set_name(func_addr, new_name):
return {"success": False, "message": f"Failed to rename function, possibly due to invalid name format or other IDA restrictions"}
# Refresh view
self._refresh_view_internal()
return {"success": True, "message": f"Function renamed from '{old_name}' to '{new_name}' at address {hex(func_addr)}"}
except Exception as e:
print(f"Error renaming function: {str(e)}")
traceback.print_exc()
return {"success": False, "message": str(e)}
@idawrite
def add_assembly_comment(self, address: str, comment: str, is_repeatable: bool) -> Dict[str, Any]:
"""Add an assembly comment"""
return self._add_assembly_comment_internal(address, comment, is_repeatable)
def _add_assembly_comment_internal(self, address: str, comment: str, is_repeatable: bool) -> Dict[str, Any]:
"""Internal implementation for add_assembly_comment without sync wrapper"""
try:
# Convert address string to integer
addr: int
if isinstance(address, str):
if address.startswith("0x"):
addr = int(address, 16)
else:
try:
addr = int(address, 16) # Try parsing as hex
except ValueError:
try:
addr = int(address) # Try parsing as decimal
except ValueError:
return {"success": False, "message": f"Invalid address format: {address}"}
else:
addr = address
# Check if address is valid
if addr == idaapi.BADADDR or not ida_bytes.is_loaded(addr):
return {"success": False, "message": f"Invalid or unloaded address: {hex(addr)}"}
# Add comment
result: bool = idc.set_cmt(addr, comment, is_repeatable)
if result:
# Refresh view
self._refresh_view_internal()
comment_type: str = "repeatable" if is_repeatable else "regular"
return {"success": True, "message": f"Added {comment_type} assembly comment at address {hex(addr)}"}
else:
return {"success": False, "message": f"Failed to add assembly comment at address {hex(addr)}"}
except Exception as e:
print(f"Error adding assembly comment: {str(e)}")
traceback.print_exc()
return {"success": False, "message": str(e)}
@idawrite
def rename_local_variable(self, function_name: str, old_name: str, new_name: str) -> Dict[str, Any]:
"""Rename a local variable within a function"""
return self._rename_local_variable_internal(function_name, old_name, new_name)
def _rename_local_variable_internal(self, function_name: str, old_name: str, new_name: str) -> Dict[str, Any]:
"""Internal implementation for rename_local_variable without sync wrapper"""
try:
# Parameter validation
if not function_name:
return {"success": False, "message": "Function name cannot be empty"}
if not old_name:
return {"success": False, "message": "Old variable name cannot be empty"}
if not new_name:
return {"success": False, "message": "New variable name cannot be empty"}
# Get function address
func_addr: int = ida_name.get_name_ea(0, function_name)
if func_addr == idaapi.BADADDR:
return {"success": False, "message": f"Function '{function_name}' not found"}
# Check if it's a function
func: Optional[ida_funcs.func_t] = ida_funcs.get_func(func_addr)
if not func:
return {"success": False, "message": f"'{function_name}' is not a function"}
# Check if decompiler is available
if not ida_hexrays.init_hexrays_plugin():
return {"success": False, "message": "Hex-Rays decompiler is not available"}
# Get decompilation result
cfunc: Optional[ida_hexrays.cfunc_t] = ida_hexrays.decompile(func_addr)
if not cfunc:
return {"success": False, "message": f"Failed to decompile function '{function_name}'"}
ida_hexrays.open_pseudocode(func_addr, 0)
# Find local variable to rename
found: bool = False
renamed: bool = False
lvar: Optional[ida_hexrays.lvar_t] = None
# Iterate through all local variables
lvars = cfunc.get_lvars()
for i in range(lvars.size()):
v = lvars[i]
if v.name == old_name:
lvar = v
found = True
break
if not found:
return {"success": False, "message": f"Local variable '{old_name}' not found in function '{function_name}'"}
# Rename local variable
if ida_hexrays.rename_lvar(cfunc.entry_ea, lvar.name, new_name):
renamed = True
if renamed:
# Refresh view
self._refresh_view_internal()
return {"success": True, "message": f"Local variable renamed from '{old_name}' to '{new_name}' in function '{function_name}'"}
else:
return {"success": False, "message": f"Failed to rename local variable from '{old_name}' to '{new_name}', possibly due to invalid name format or other IDA restrictions"}
except Exception as e:
print(f"Error renaming local variable: {str(e)}")
traceback.print_exc()
return {"success": False, "message": str(e)}
@idawrite
def rename_multi_local_variables(self, function_name: str, rename_pairs_old2new: List[Dict[str, str]]) -> Dict[str, Any]:
"""Rename multiple local variables within a function at once"""
try:
success_count: int = 0
failed_pairs: List[Dict[str, str]] = []
for pair in rename_pairs_old2new:
old_name = next(iter(pair.keys()))
new_name = pair[old_name]
# Call existing rename_local_variable_internal for each pair
result = self._rename_local_variable_internal(function_name, old_name, new_name)
if result.get("success", False):
success_count += 1
else:
failed_pairs.append({
"old_name": old_name,
"new_name": new_name,
"error": result.get("message", "Unknown error")
})
return {
"success": True,
"message": f"Renamed {success_count} out of {len(rename_pairs_old2new)} local variables",
"success_count": success_count,
"failed_pairs": failed_pairs
}
except Exception as e:
print(f"Error in rename_multi_local_variables: {str(e)}")
traceback.print_exc()
return {
"success": False,
"message": str(e),
"success_count": 0,
"failed_pairs": rename_pairs_old2new
}
@idawrite
def rename_multi_global_variables(self, rename_pairs_old2new: List[Dict[str, str]]) -> Dict[str, Any]:
"""Rename multiple global variables at once"""
try:
success_count: int = 0
failed_pairs: List[Dict[str, str]] = []
for pair in rename_pairs_old2new:
old_name = next(iter(pair.keys()))
new_name = pair[old_name]
# Call existing rename_global_variable_internal for each pair
result = self._rename_global_variable_internal(old_name, new_name)
if result.get("success", False):
success_count += 1
else:
failed_pairs.append({
"old_name": old_name,
"new_name": new_name,
"error": result.get("message", "Unknown error")
})
return {
"success": True,
"message": f"Renamed {success_count} out of {len(rename_pairs_old2new)} global variables",
"success_count": success_count,
"failed_pairs": failed_pairs
}
except Exception as e:
print(f"Error in rename_multi_global_variables: {str(e)}")
traceback.print_exc()
return {
"success": False,
"message": str(e),
"success_count": 0,
"failed_pairs": rename_pairs_old2new
}
@idawrite
def rename_multi_functions(self, rename_pairs_old2new: List[Dict[str, str]]) -> Dict[str, Any]:
"""Rename multiple functions at once"""
try:
success_count: int = 0
failed_pairs: List[Dict[str, str]] = []
for pair in rename_pairs_old2new:
old_name = next(iter(pair.keys()))
new_name = pair[old_name]
# Call existing rename_function_internal for each pair
result = self._rename_function_internal(old_name, new_name)
if result.get("success", False):
success_count += 1
else:
failed_pairs.append({
"old_name": old_name,
"new_name": new_name,
"error": result.get("message", "Unknown error")
})
return {
"success": True,
"message": f"Renamed {success_count} out of {len(rename_pairs_old2new)} functions",
"success_count": success_count,
"failed_pairs": failed_pairs
}
except Exception as e:
print(f"Error in rename_multi_functions: {str(e)}")
traceback.print_exc()
return {
"success": False,
"message": str(e),
"success_count": 0,
"failed_pairs": rename_pairs_old2new
}
@idawrite
def add_function_comment(self, function_name: str, comment: str, is_repeatable: bool) -> Dict[str, Any]:
"""Add a comment to a function"""
return self._add_function_comment_internal(function_name, comment, is_repeatable)
def _add_function_comment_internal(self, function_name: str, comment: str, is_repeatable: bool) -> Dict[str, Any]:
"""Internal implementation for add_function_comment without sync wrapper"""
try:
# Parameter validation
if not function_name:
return {"success": False, "message": "Function name cannot be empty"}
if not comment:
# Allow empty comment to clear the comment
comment = ""
# Get function address
func_addr: int = ida_name.get_name_ea(0, function_name)
if func_addr == idaapi.BADADDR:
return {"success": False, "message": f"Function '{function_name}' not found"}
# Check if it's a function
func: Optional[ida_funcs.func_t] = ida_funcs.get_func(func_addr)
if not func:
return {"success": False, "message": f"'{function_name}' is not a function"}
# Open pseudocode view
ida_hexrays.open_pseudocode(func_addr, 0)
# Add function comment
# is_repeatable=True means show comment at all references to this function
# is_repeatable=False means show comment only at function definition
result: bool = idc.set_func_cmt(func_addr, comment, is_repeatable)
if result:
# Refresh view
self._refresh_view_internal()
comment_type: str = "repeatable" if is_repeatable else "regular"
return {"success": True, "message": f"Added {comment_type} comment to function '{function_name}'"}
else:
return {"success": False, "message": f"Failed to add comment to function '{function_name}'"}
except Exception as e:
print(f"Error adding function comment: {str(e)}")
traceback.print_exc()
return {"success": False, "message": str(e)}
@idawrite
def add_pseudocode_comment(self, function_name: str, address: str, comment: str, is_repeatable: bool) -> Dict[str, Any]:
"""Add a comment to a specific address in the function's decompiled pseudocode"""
return self._add_pseudocode_comment_internal(function_name, address, comment, is_repeatable)
def _add_pseudocode_comment_internal(self, function_name: str, address: str, comment: str, is_repeatable: bool) -> Dict[str, Any]:
"""Internal implementation for add_pseudocode_comment without sync wrapper"""
try:
# Parameter validation
if not function_name:
return {"success": False, "message": "Function name cannot be empty"}
if not address:
return {"success": False, "message": "Address cannot be empty"}
if not comment:
# Allow empty comment to clear the comment
comment = ""
# Get function address
func_addr: int = ida_name.get_name_ea(0, function_name)
if func_addr == idaapi.BADADDR:
return {"success": False, "message": f"Function '{function_name}' not found"}
# Check if it's a function
func: Optional[ida_funcs.func_t] = ida_funcs.get_func(func_addr)
if not func:
return {"success": False, "message": f"'{function_name}' is not a function"}
# Check if decompiler is available
if not ida_hexrays.init_hexrays_plugin():
return {"success": False, "message": "Hex-Rays decompiler is not available"}
# Get decompilation result
cfunc: Optional[ida_hexrays.cfunc_t] = ida_hexrays.decompile(func_addr)
if not cfunc:
return {"success": False, "message": f"Failed to decompile function '{function_name}'"}
# Open pseudocode view
ida_hexrays.open_pseudocode(func_addr, 0)
# Convert address string to integer
addr: int
if isinstance(address, str):
if address.startswith("0x"):
addr = int(address, 16)
else:
try:
addr = int(address, 16) # Try parsing as hex
except ValueError:
try:
addr = int(address) # Try parsing as decimal
except ValueError:
return {"success": False, "message": f"Invalid address format: {address}"}
else:
addr = address
# Check if address is valid
if addr == idaapi.BADADDR or not ida_bytes.is_loaded(addr):
return {"success": False, "message": f"Invalid or unloaded address: {hex(addr)}"}
# Check if address is within function
if not (func.start_ea <= addr < func.end_ea):
return {"success": False, "message": f"Address {hex(addr)} is not within function '{function_name}'"}
# Create treeloc_t object for comment location
loc = ida_hexrays.treeloc_t()
loc.ea = addr
loc.itp = ida_hexrays.ITP_BLOCK1 # Comment location
# Set comment
cfunc.set_user_cmt(loc, comment)
cfunc.save_user_cmts()
# Refresh view
self._refresh_view_internal()
comment_type: str = "repeatable" if is_repeatable else "regular"
return {
"success": True,
"message": f"Added {comment_type} comment at address {hex(addr)} in function '{function_name}'"
}
except Exception as e:
print(f"Error adding pseudocode comment: {str(e)}")
traceback.print_exc()
return {"success": False, "message": str(e)}
@idawrite
def refresh_view(self) -> Dict[str, Any]:
"""Refresh IDA Pro view"""
return self._refresh_view_internal()
def _refresh_view_internal(self) -> Dict[str, Any]:
"""Implementation of refreshing view in IDA main thread"""
try:
# Refresh disassembly view
idaapi.refresh_idaview_anyway()
# Refresh decompilation view
current_widget = idaapi.get_current_widget()
if current_widget:
widget_type: int = idaapi.get_widget_type(current_widget)
if widget_type == idaapi.BWN_PSEUDOCODE:
# If current view is pseudocode, refresh it
vu = idaapi.get_widget_vdui(current_widget)
if vu:
vu.refresh_view(True)
# Try to find and refresh all open pseudocode windows
for i in range(5): # Check multiple possible pseudocode windows
widget_name: str = f"Pseudocode-{chr(65+i)}" # Pseudocode-A, Pseudocode-B, ...
widget = idaapi.find_widget(widget_name)
if widget:
vu = idaapi.get_widget_vdui(widget)
if vu:
vu.refresh_view(True)
return {"success": True, "message": "Views refreshed successfully"}
except Exception as e:
print(f"Error refreshing views: {str(e)}")
traceback.print_exc()
return {"success": False, "message": str(e)}
@idawrite
def execute_script(self, script: str) -> Dict[str, Any]:
"""Execute a Python script in IDA context"""
return self._execute_script_internal(script)
def _execute_script_internal(self, script: str) -> Dict[str, Any]:
"""Internal implementation for execute_script without sync wrapper"""
try:
print(f"Executing script, length: {len(script) if script else 0}")
# Check for empty script
if not script or not script.strip():
print("Error: Empty script provided")
return {
"success": False,
"error": "Empty script provided",
"stdout": "",
"stderr": "",
"traceback": ""
}
# Create a local namespace for script execution
script_globals = {
'__builtins__': __builtins__,
'idaapi': idaapi,
'idautils': idautils,
'idc': idc,
'ida_funcs': ida_funcs,
'ida_bytes': ida_bytes,
'ida_name': ida_name,
'ida_segment': ida_segment,
'ida_lines': ida_lines,
'ida_hexrays': ida_hexrays
}
script_locals = {}
# Save original stdin/stdout/stderr
import sys
import io
original_stdout = sys.stdout
original_stderr = sys.stderr
original_stdin = sys.stdin
# Create string IO objects to capture output
stdout_capture = io.StringIO()
stderr_capture = io.StringIO()
# Redirect stdout/stderr to capture output
sys.stdout = stdout_capture
sys.stderr = stderr_capture
# Prevent script from trying to read from stdin
sys.stdin = io.StringIO()
try:
# Create UI hooks
print("Setting up UI hooks")
hooks = self._create_ui_hooks()
hooks.hook()
# Install auto-continue handlers for common dialogs - but first, redirect stderr
temp_stderr = sys.stderr
auto_handler_stderr = io.StringIO()
sys.stderr = auto_handler_stderr
print("Installing auto handlers")
self._install_auto_handlers()
# Restore stderr and save auto-handler errors separately
sys.stderr = stderr_capture
auto_handler_errors = auto_handler_stderr.getvalue()
# Only log auto-handler errors, don't include in script output
if auto_handler_errors:
print(f"Auto-handler setup errors (not shown to user): {auto_handler_errors}")
# Execute the script
print("Executing script...")
exec(script, script_globals, script_locals)
print("Script execution completed")
# Get captured output
stdout = stdout_capture.getvalue()
stderr = stderr_capture.getvalue()
# Filter out auto-handler messages from stdout
stdout_lines = stdout.splitlines()
filtered_stdout_lines = []
for line in stdout_lines:
skip_line = False
auto_handler_messages = [
"Setting up UI hooks",
"Installing auto handlers",
"Error installing auto handlers",
"Found and saved",
"Could not access user_cancelled",
"Installed auto_",
"Auto handlers installed",
"Note: Could not",
"Restoring IO streams",
"Unhooking UI hooks",
"Restoring original handlers",
"Refreshing view",
"Original handlers restored",
"No original handlers"
]
for msg in auto_handler_messages:
if msg in line:
skip_line = True
break
if not skip_line:
filtered_stdout_lines.append(line)
filtered_stdout = "\n".join(filtered_stdout_lines)
# Compile script results - ensure all fields are present
result = {
"stdout": filtered_stdout.strip() if filtered_stdout else "",
"stderr": stderr.strip() if stderr else "",
"success": True,
"traceback": ""
}
# Check for return value
if "result" in script_locals:
try:
print(f"Script returned value of type: {type(script_locals['result']).__name__}")
result["return_value"] = str(script_locals["result"])
except Exception as rv_err:
print(f"Error converting return value: {str(rv_err)}")
result["stderr"] += f"\nError converting return value: {str(rv_err)}"
result["return_value"] = "Error: Could not convert return value to string"
print(f"Returning script result with keys: {', '.join(result.keys())}")
return result
except Exception as e:
import traceback
error_msg = str(e)
tb = traceback.format_exc()
print(f"Script execution error: {error_msg}")
print(tb)
return {
"success": False,
"stdout": stdout_capture.getvalue().strip() if stdout_capture else "",
"stderr": stderr_capture.getvalue().strip() if stderr_capture else "",
"error": error_msg,
"traceback": tb
}
finally:
# Restore original stdin/stdout/stderr
print("Restoring IO streams")
sys.stdout = original_stdout
sys.stderr = original_stderr
sys.stdin = original_stdin
# Unhook UI hooks
print("Unhooking UI hooks")
hooks.unhook()
# Restore original handlers
print("Restoring original handlers")
self._restore_original_handlers()
# Refresh view to show any changes made by script
print("Refreshing view")
self._refresh_view_internal()
except Exception as e:
print(f"Error in execute_script outer scope: {str(e)}")
traceback.print_exc()
return {
"success": False,
"stdout": "",
"stderr": "",
"error": str(e),
"traceback": traceback.format_exc()
}
@idawrite
def execute_script_from_file(self, file_path: str) -> Dict[str, Any]:
"""Execute a Python script from a file in IDA context"""
return self._execute_script_from_file_internal(file_path)
def _execute_script_from_file_internal(self, file_path: str) -> Dict[str, Any]:
"""Internal implementation for execute_script_from_file without sync wrapper"""
try:
# Check if file path is provided
if not file_path or not file_path.strip():
return {
"success": False,
"error": "No file path provided",
"stdout": "",
"stderr": "",
"traceback": ""
}
# Check if file exists
import os
if not os.path.exists(file_path):
return {
"success": False,
"error": f"Script file not found: {file_path}",
"stdout": "",
"stderr": "",
"traceback": ""
}
try:
# Read script content
with open(file_path, 'r') as f:
script = f.read()
# Execute script using internal method
return self._execute_script_internal(script)
except Exception as file_error:
print(f"Error reading or executing script file: {str(file_error)}")
traceback.print_exc()
return {
"success": False,
"stdout": "",
"stderr": "",
"error": f"Error with script file: {str(file_error)}",
"traceback": traceback.format_exc()
}
except Exception as e:
print(f"Error executing script from file: {str(e)}")
traceback.print_exc()
return {
"success": False,
"stdout": "",
"stderr": "",
"error": str(e),
"traceback": traceback.format_exc()
}
def _create_ui_hooks(self) -> idaapi.UI_Hooks:
"""Create UI hooks to suppress dialogs during script execution"""
try:
class DialogHook(idaapi.UI_Hooks):
def populating_widget_popup(self, widget, popup):
# Just suppress all popups
return 1
def finish_populating_widget_popup(self, widget, popup):
# Also suppress here
return 1
def ready_to_run(self):
# Always continue
return 1
def updating_actions(self, ctx):
# Always continue
return 1
def updated_actions(self):
# Always continue
return 1
def ui_refresh(self, cnd):
# Suppress UI refreshes
return 1
hooks = DialogHook()
return hooks
except Exception as e:
print(f"Error creating UI hooks: {str(e)}")
traceback.print_exc()
# Create minimal dummy hooks that won't cause errors
class DummyHook:
def hook(self):
print("Using dummy hook (hook)")
pass
def unhook(self):
print("Using dummy hook (unhook)")
pass
return DummyHook()
def _install_auto_handlers(self) -> None:
"""Install auto-continue handlers for common dialogs"""
try:
import ida_kernwin
# Save original handlers - with safer access to cvar.user_cancelled
self._original_handlers = {}
# Try to access user_cancelled more safely
try:
if hasattr(ida_kernwin, 'cvar') and hasattr(ida_kernwin.cvar, 'user_cancelled'):
self._original_handlers["yn"] = ida_kernwin.cvar.user_cancelled
print("Found and saved user_cancelled handler")
except Exception as yn_err:
print(f"Note: Could not access user_cancelled: {str(yn_err)}")
# Save other dialog handlers
if hasattr(ida_kernwin, 'ask_buttons'):
self._original_handlers["buttons"] = ida_kernwin.ask_buttons
if hasattr(ida_kernwin, 'ask_text'):
self._original_handlers["text"] = ida_kernwin.ask_text
if hasattr(ida_kernwin, 'ask_file'):
self._original_handlers["file"] = ida_kernwin.ask_file
# Define auto handlers
def auto_yes_no(*args, **kwargs):
return 1 # Return "Yes"
def auto_buttons(*args, **kwargs):
return 1 # Return first button
def auto_text(*args, **kwargs):
return "" # Return empty text
def auto_file(*args, **kwargs):
return "" # Return empty filename
# Install auto handlers only for what we successfully saved
if "yn" in self._original_handlers:
try:
ida_kernwin.cvar.user_cancelled = auto_yes_no
print("Installed auto_yes_no handler")
except Exception as e:
print(f"Could not install auto_yes_no handler: {str(e)}")
if "buttons" in self._original_handlers:
ida_kernwin.ask_buttons = auto_buttons
print("Installed auto_buttons handler")
if "text" in self._original_handlers:
ida_kernwin.ask_text = auto_text
print("Installed auto_text handler")
if "file" in self._original_handlers:
ida_kernwin.ask_file = auto_file
print("Installed auto_file handler")
print(f"Auto handlers installed successfully. Installed handlers: {', '.join(self._original_handlers.keys())}")
except Exception as e:
print(f"Error installing auto handlers: {str(e)}")
traceback.print_exc()
# Ensure _original_handlers exists even on failure
if not hasattr(self, "_original_handlers"):
self._original_handlers = {}
def _restore_original_handlers(self) -> None:
"""Restore original dialog handlers"""
try:
if hasattr(self, "_original_handlers"):
import ida_kernwin
# Restore original handlers (only what was successfully saved)
if "yn" in self._original_handlers:
try:
ida_kernwin.cvar.user_cancelled = self._original_handlers["yn"]
print("Restored user_cancelled handler")
except Exception as e:
print(f"Could not restore user_cancelled handler: {str(e)}")
if "buttons" in self._original_handlers:
ida_kernwin.ask_buttons = self._original_handlers["buttons"]
print("Restored ask_buttons handler")
if "text" in self._original_handlers:
ida_kernwin.ask_text = self._original_handlers["text"]
print("Restored ask_text handler")
if "file" in self._original_handlers:
ida_kernwin.ask_file = self._original_handlers["file"]
print("Restored ask_file handler")
saved_keys = list(self._original_handlers.keys())
if saved_keys:
print(f"Original handlers restored: {', '.join(saved_keys)}")
else:
print("No original handlers were saved, nothing to restore")
else:
print("No original handlers dictionary to restore")
except Exception as e:
print(f"Error restoring original handlers: {str(e)}")
traceback.print_exc()