Skip to main content
Glama
ghidra_runner.py23.5 kB
""" Ghidra GUI Runner for Testing Manages Ghidra instances with Xvfb for automated testing """ import subprocess import time import os import signal import requests import shutil import socket from pathlib import Path from typing import Optional import logging logger = logging.getLogger(__name__) class GhidraRunner: """Manages a Ghidra GUI instance with plugin for testing""" def __init__( self, ghidra_install_dir: str, test_project_dir: str, test_binary: str, plugin_path: Optional[str] = None, http_port: Optional[int] = None, use_xvfb: bool = True, verbose: bool = False, isolated_user_dir: Optional[str] = None, additional_binaries: Optional[list] = None ): self.ghidra_dir = Path(ghidra_install_dir) self.project_dir = Path(test_project_dir) self.binary_path = Path(test_binary) self.plugin_path = Path(plugin_path) if plugin_path else None self.use_xvfb = use_xvfb self.verbose = verbose self.isolated_user_dir = Path(isolated_user_dir) if isolated_user_dir else None # Collect all binaries to import self.binaries_to_import = [self.binary_path] if additional_binaries: for binary in additional_binaries: binary_path = Path(binary) if binary_path.exists() and binary_path != self.binary_path: self.binaries_to_import.append(binary_path) self.ghidra_process = None self.xvfb_process = None self.vnc_process = None self.display_num = None self.original_user_home = None # Find a free port if not specified if http_port is None: self.http_port = self._find_free_port() logger.info(f"Using auto-selected port: {self.http_port}") else: self.http_port = http_port logger.info(f"Using specified port: {self.http_port}") # Validate paths if not self.ghidra_dir.exists(): raise FileNotFoundError(f"Ghidra directory not found: {self.ghidra_dir}") if not self.binary_path.exists(): raise FileNotFoundError(f"Test binary not found: {self.binary_path}") def _find_free_display(self): """Find a free X display number""" for i in range(99, 999): if not Path(f"/tmp/.X{i}-lock").exists(): return i raise RuntimeError("Could not find free X display") def _find_free_port(self): """Find a free port for HTTP server""" # Use OS to find a free port with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('127.0.0.1', 0)) s.listen(1) port = s.getsockname()[1] return port def _start_xvfb(self): """Start virtual X server""" if not self.use_xvfb: return self.display_num = self._find_free_display() logger.info(f"Starting Xvfb on display :{self.display_num}") self.xvfb_process = subprocess.Popen( [ 'Xvfb', f':{self.display_num}', '-screen', '0', '1920x1080x24', '-ac', '+extension', 'GLX', '+render', '-noreset' ], stdout=subprocess.DEVNULL if not self.verbose else None, stderr=subprocess.DEVNULL if not self.verbose else None ) os.environ['DISPLAY'] = f':{self.display_num}' time.sleep(2) logger.info(f"Xvfb started on display :{self.display_num}") # Start x11vnc if enabled if os.environ.get('ENABLE_VNC', '').lower() == 'true': self._start_vnc() def _start_vnc(self): """Start VNC server for viewing Xvfb display""" logger.info("Starting x11vnc server on port 5900") self.vnc_process = subprocess.Popen( [ 'x11vnc', '-display', f':{self.display_num}', '-forever', '-nopw', '-shared', '-rfbport', '5900' ], stdout=subprocess.DEVNULL if not self.verbose else None, stderr=subprocess.DEVNULL if not self.verbose else None ) time.sleep(1) logger.info("x11vnc server started - connect with VNC viewer to localhost:5900") def _stop_xvfb(self): """Stop virtual X server and VNC""" # Stop VNC first if self.vnc_process: logger.info("Stopping x11vnc") self.vnc_process.terminate() try: self.vnc_process.wait(timeout=5) except subprocess.TimeoutExpired: self.vnc_process.kill() self.vnc_process.wait() self.vnc_process = None # Then stop Xvfb if self.xvfb_process: logger.info("Stopping Xvfb") self.xvfb_process.terminate() try: self.xvfb_process.wait(timeout=5) except subprocess.TimeoutExpired: self.xvfb_process.kill() self.xvfb_process.wait() self.xvfb_process = None def _detect_ghidra_version(self): """Detect Ghidra version from application.properties""" # Try to read version from application.properties app_props = self.ghidra_dir / "Ghidra" / "application.properties" if app_props.exists(): with open(app_props, 'r') as f: for line in f: if line.startswith('application.version='): version = line.split('=')[1].strip() logger.info(f"Detected Ghidra version from application.properties: {version}") return version # Fallback: use directory name dir_name = self.ghidra_dir.name if "_" in dir_name: version = dir_name.replace("ghidra_", "").replace("_PUBLIC", "") logger.info(f"Detected Ghidra version from directory name: {version}") return version # Last resort: use a default logger.warning(f"Could not detect Ghidra version, using default") return "11.4.2_PUBLIC" def _install_plugin(self): """Install plugin to user Extensions directory""" if not self.plugin_path: logger.info("No plugin path provided, skipping plugin installation") return # Detect Ghidra version ghidra_version = self._detect_ghidra_version() # Get current username for Ghidra's directory naming convention # Ghidra uses $USER-ghidra format (e.g., root-ghidra, john-ghidra) import getpass username = getpass.getuser() # Use isolated directory if specified, otherwise use user home if self.isolated_user_dir: # Modern Ghidra uses XDG paths: .config/$USER-ghidra/ghidra_VERSION_PUBLIC/Extensions config_dir = self.isolated_user_dir / ".config" logger.info(f"Using isolated Ghidra user directory: {self.isolated_user_dir}") extensions_dir = config_dir / f"{username}-ghidra" / f"ghidra_{ghidra_version}_PUBLIC" / "Extensions" else: # For non-isolated mode, use standard XDG location xdg_config = os.environ.get('XDG_CONFIG_HOME', str(Path.home() / '.config')) logger.info("Using default user config directory") extensions_dir = Path(xdg_config) / f"{username}-ghidra" / f"ghidra_{ghidra_version}_PUBLIC" / "Extensions" extensions_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Installing plugin to {extensions_dir}") if self.plugin_path.suffix == '.zip': import zipfile with zipfile.ZipFile(self.plugin_path, 'r') as zip_ref: zip_ref.extractall(extensions_dir) logger.info(f"Extracted plugin from {self.plugin_path}") elif self.plugin_path.is_dir(): dest = extensions_dir / self.plugin_path.name if dest.exists(): shutil.rmtree(dest) shutil.copytree(self.plugin_path, dest) logger.info(f"Copied plugin directory to {dest}") else: raise ValueError(f"Unsupported plugin format: {self.plugin_path}") def _accept_user_agreement(self): """Pre-accept Ghidra user agreement to avoid blocking dialog""" # Detect Ghidra version ghidra_version = self._detect_ghidra_version() # Get current username for Ghidra's directory naming convention import getpass username = getpass.getuser() # Use isolated directory if specified if self.isolated_user_dir: config_dir = self.isolated_user_dir / ".config" ghidra_config_dir = config_dir / f"{username}-ghidra" / f"ghidra_{ghidra_version}_PUBLIC" else: xdg_config = os.environ.get('XDG_CONFIG_HOME', str(Path.home() / '.config')) ghidra_config_dir = Path(xdg_config) / f"{username}-ghidra" / f"ghidra_{ghidra_version}_PUBLIC" ghidra_config_dir.mkdir(parents=True, exist_ok=True) # Create or update preferences file to mark agreement as accepted preferences_file = ghidra_config_dir / "preferences" # Read existing preferences or create new preferences = {} if preferences_file.exists(): with open(preferences_file, 'r') as f: for line in f: line = line.strip() if '=' in line and not line.startswith('#'): key, value = line.split('=', 1) preferences[key] = value # Set user agreement accepted preferences['USER_AGREEMENT'] = 'ACCEPT' # Auto-restore workspace with CodeBrowser tool preferences['AUTOMATICALLY_SAVE_TOOLS'] = 'true' preferences['RESTORE_WORKSPACE_ON_STARTUP'] = 'true' preferences['SHOW_TIPS'] = 'false' preferences['GhidraShowWhatsNew'] = 'false' # Write back preferences with open(preferences_file, 'w') as f: for key, value in preferences.items(): f.write(f"{key}={value}\n") logger.info(f"Pre-accepted user agreement and configured auto-restore in {preferences_file}") def _import_binary(self): """Import all binaries into Ghidra project using analyzeHeadless""" analyze_headless = self.ghidra_dir / "support" / "analyzeHeadless" if not analyze_headless.exists(): raise FileNotFoundError(f"analyzeHeadless not found: {analyze_headless}") self.project_dir.mkdir(parents=True, exist_ok=True) project_name = "TestProject" binary_names = [b.name for b in self.binaries_to_import] logger.info(f"Importing {len(self.binaries_to_import)} binary(ies) into Ghidra project: {binary_names}") # Build command with all binaries cmd = [ str(analyze_headless), str(self.project_dir), project_name, ] # Add each binary with -import flag for binary in self.binaries_to_import: cmd.extend(["-import", str(binary)]) cmd.extend([ "-analysisTimeoutPerFile", "120", "-max-cpu", "2" ]) result = subprocess.run( cmd, capture_output=True, text=True, timeout=300 # Increased timeout for multiple binaries ) if result.returncode != 0: logger.error(f"Binary import failed: {result.stderr}") raise RuntimeError(f"Failed to import binary: {result.stderr}") logger.info(f"Binary import completed for {len(self.binaries_to_import)} binary(ies)") # Return both project name and program name (first/primary binary filename) program_name = self.binary_path.name return project_name, program_name def _start_ghidra_gui(self, project_name: str, program_name: str = None): """Start Ghidra GUI in foreground mode using LaunchCodeBrowser Args: project_name: Name of the Ghidra project program_name: Name of program to open automatically (required) """ # Use launch.sh directly in foreground mode with LaunchCodeBrowser launch_script = self.ghidra_dir / "support" / "launch.sh" if not launch_script.exists(): raise FileNotFoundError(f"launch.sh script not found: {launch_script}") if not program_name: raise ValueError("program_name is required for LaunchCodeBrowser") project_path = self.project_dir / f"{project_name}.gpr" logger.info(f"Starting Ghidra CodeBrowser with project: {project_path}, program: {program_name}") # launch.sh arguments: # <mode> <java-type> <name> <max-memory> "<vmarg-list>" <app-classname> <app-args>... # mode: fg (foreground) instead of bg (background) so we keep the process handle # LaunchCodeBrowser will automatically open CodeBrowser with the specified program cmd = [ str(launch_script), "fg", # Run in foreground mode "jdk", # Requires JDK "Ghidra", # Application name "4G", # Max memory (increased for better performance) "", # VM arguments list (empty) "ghidra.LaunchCodeBrowser", # Custom launcher main class str(project_path), # Project file argument program_name # Program name to open ] env = os.environ.copy() # Set isolated HOME directory if specified if self.isolated_user_dir: self.original_user_home = env.get('HOME') env['HOME'] = str(self.isolated_user_dir) # Ghidra on Linux uses XDG Base Directory specification # Force it to use our isolated directory for config, data, cache, and state env['XDG_CONFIG_HOME'] = str(self.isolated_user_dir / '.config') env['XDG_DATA_HOME'] = str(self.isolated_user_dir / '.local' / 'share') env['XDG_CACHE_HOME'] = str(self.isolated_user_dir / '.cache') env['XDG_STATE_HOME'] = str(self.isolated_user_dir / '.local' / 'state') logger.info(f"Setting isolated environment:") logger.info(f" HOME={env['HOME']}") logger.info(f" XDG_CONFIG_HOME={env['XDG_CONFIG_HOME']}") logger.info(f" XDG_DATA_HOME={env['XDG_DATA_HOME']}") logger.info(f" XDG_CACHE_HOME={env['XDG_CACHE_HOME']}") # Create log files for Ghidra output self.ghidra_stdout_log = self.project_dir / "ghidra_stdout.log" self.ghidra_stderr_log = self.project_dir / "ghidra_stderr.log" self.stdout_file = open(self.ghidra_stdout_log, 'w') self.stderr_file = open(self.ghidra_stderr_log, 'w') logger.info(f"Launching Ghidra in foreground mode with command: {' '.join(cmd)}") self.ghidra_process = subprocess.Popen( cmd, stdout=self.stdout_file, stderr=self.stderr_file, env=env ) logger.info(f"Ghidra GUI process started (PID: {self.ghidra_process.pid}, logs: {self.ghidra_stdout_log}, {self.ghidra_stderr_log})") def _read_log_file(self, log_path: Path, max_lines: int = 50): """Read last N lines from log file""" if not log_path.exists(): return "<log file not found>" try: with open(log_path, 'r') as f: lines = f.readlines() return ''.join(lines[-max_lines:]) if lines else "<empty>" except Exception as e: return f"<error reading log: {e}>" def _get_ghidra_application_log(self): """Get path to Ghidra's application.log""" ghidra_version = self._detect_ghidra_version() # Get current username for Ghidra's directory naming convention import getpass username = getpass.getuser() if self.isolated_user_dir: # Modern Ghidra uses XDG paths: .config/$USER-ghidra/ghidra_VERSION_PUBLIC/application.log config_dir = self.isolated_user_dir / ".config" log_file = config_dir / f"{username}-ghidra" / f"ghidra_{ghidra_version}_PUBLIC" / "application.log" else: # Use XDG_CONFIG_HOME or fallback to ~/.config xdg_config = os.environ.get('XDG_CONFIG_HOME', str(Path.home() / '.config')) log_file = Path(xdg_config) / f"{username}-ghidra" / f"ghidra_{ghidra_version}_PUBLIC" / "application.log" return log_file def _get_log_contents(self, max_lines: int = 50): """Get contents of all relevant logs for debugging""" log_info = [] # Process stdout/stderr logs stdout_log = self.project_dir / "ghidra_stdout.log" stderr_log = self.project_dir / "ghidra_stderr.log" log_info.append("=== Ghidra STDOUT (last 50 lines) ===") log_info.append(self._read_log_file(stdout_log, max_lines)) log_info.append("\n=== Ghidra STDERR (last 50 lines) ===") log_info.append(self._read_log_file(stderr_log, max_lines)) # Ghidra application log app_log = self._get_ghidra_application_log() log_info.append("\n=== Ghidra Application Log (last 50 lines) ===") log_info.append(self._read_log_file(app_log, max_lines)) return '\n'.join(log_info) def _wait_for_http_server(self, timeout: int = 60): """Wait for HTTP server to become available and program to be loaded""" logger.info(f"Waiting for HTTP server on port {self.http_port} and program to load") start_time = time.time() last_error = None server_ready = False program_loaded = False while time.time() - start_time < timeout: if self.ghidra_process.poll() is not None: # Close log files to ensure all output is flushed if hasattr(self, 'stdout_file'): self.stdout_file.close() if hasattr(self, 'stderr_file'): self.stderr_file.close() # Read logs stdout_content = self._read_log_file(self.ghidra_stdout_log) stderr_content = self._read_log_file(self.ghidra_stderr_log) app_log_content = self._read_log_file(self._get_ghidra_application_log()) logger.error(f"Ghidra process exited unexpectedly (exit code: {self.ghidra_process.returncode})") logger.error(f"STDOUT (last 50 lines from {self.ghidra_stdout_log}):\n{stdout_content}") logger.error(f"STDERR (last 50 lines from {self.ghidra_stderr_log}):\n{stderr_content}") logger.error(f"Ghidra application.log (last 50 lines):\n{app_log_content}") raise RuntimeError( f"Ghidra process exited before server started (exit code: {self.ghidra_process.returncode}). " f"Check logs at:\n" f" - stdout: {self.ghidra_stdout_log}\n" f" - stderr: {self.ghidra_stderr_log}\n" f" - application.log: {self._get_ghidra_application_log()}" ) try: response = requests.get( f"http://127.0.0.1:{self.http_port}/ping", timeout=2 ) if response.ok: if not server_ready: logger.info(f"HTTP server is ready on port {self.http_port}") server_ready = True # Check if program is loaded try: data = response.json() if data.get('program_loaded', False): program_name = data.get('program_name', 'unknown') logger.info(f"Program '{program_name}' is loaded and ready") return True else: if not program_loaded: logger.info("Server ready, waiting for program to load...") program_loaded = True # Flag to avoid spamming logs except Exception as json_error: # If we can't parse JSON, just check if server responds logger.warning(f"Could not parse ping response as JSON: {json_error}") if server_ready: return True except requests.exceptions.RequestException as e: last_error = e pass time.sleep(1) # Capture logs for debugging log_info = self._get_log_contents() raise TimeoutError( f"HTTP server or program did not load within {timeout} seconds. " f"Server ready: {server_ready}, Program loaded: {program_loaded}. " f"Last error: {last_error}\n\n{log_info}" ) def start(self, timeout: int = 30): """Start Ghidra with plugin""" logger.info("=" * 60) logger.info("Starting Ghidra Runner") logger.info("=" * 60) try: if self.use_xvfb: self._start_xvfb() self._install_plugin() self._accept_user_agreement() project_name, program_name = self._import_binary() self._start_ghidra_gui(project_name, program_name) self._wait_for_http_server(timeout=timeout) logger.info("Ghidra Runner started successfully") logger.info("=" * 60) return True except Exception as e: logger.error(f"Failed to start Ghidra: {e}") self.stop() raise def stop(self): """Stop Ghidra and cleanup""" logger.info("Stopping Ghidra Runner") if self.ghidra_process: logger.info("Terminating Ghidra process") self.ghidra_process.terminate() try: self.ghidra_process.wait(timeout=10) except subprocess.TimeoutExpired: logger.warning("Ghidra did not terminate, killing") self.ghidra_process.kill() self.ghidra_process.wait() self.ghidra_process = None # Close log files if hasattr(self, 'stdout_file') and self.stdout_file: try: self.stdout_file.close() except: pass if hasattr(self, 'stderr_file') and self.stderr_file: try: self.stderr_file.close() except: pass if self.use_xvfb: self._stop_xvfb() logger.info("Ghidra Runner stopped") def cleanup_project(self): """Remove test project files""" if self.project_dir.exists(): logger.info(f"Cleaning up project directory: {self.project_dir}") shutil.rmtree(self.project_dir) def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() self.cleanup_project()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/HK47196/GhidraMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server