Skip to main content
Glama
decompile_utils.py21.1 kB
"""反编译工具 - Ghidra Headless 集成 + 简单伪代码生成""" import os import subprocess import tempfile import shutil import struct from typing import Optional, Dict, List from pathlib import Path try: import lief LIEF_AVAILABLE = True except ImportError: LIEF_AVAILABLE = False try: from capstone import Cs, CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN CAPSTONE_AVAILABLE = True except ImportError: CAPSTONE_AVAILABLE = False # Ghidra 安装路径(可通过环境变量配置) GHIDRA_HOME = os.environ.get("GHIDRA_HOME", "") # Radare2 路径配置 RADARE2_PATH = os.environ.get("RADARE2_PATH", r"K:\Cherry\androidmtmangebg\radare2\radare2-5.9.6-w64\bin\radare2.exe") # Ghidra 反编译脚本 GHIDRA_DECOMPILE_SCRIPT = ''' # Ghidra Headless 反编译脚本 # @category: Analysis # @author: SO-Analyzer from ghidra.app.decompiler import DecompInterface from ghidra.util.task import ConsoleTaskMonitor import json def decompile_function(program, address): """反编译指定地址的函数""" decompiler = DecompInterface() decompiler.openProgram(program) func_manager = program.getFunctionManager() addr = program.getAddressFactory().getAddress(hex(address)) func = func_manager.getFunctionContaining(addr) if func is None: # 如果没有函数,尝试创建一个 from ghidra.program.model.symbol import SourceType func = createFunction(addr, None) if func is None: return {"error": "Cannot find or create function at address"} monitor = ConsoleTaskMonitor() result = decompiler.decompileFunction(func, 60, monitor) if result.decompileCompleted(): decomp_func = result.getDecompiledFunction() if decomp_func: return { "success": True, "function_name": func.getName(), "address": hex(func.getEntryPoint().getOffset()), "code": decomp_func.getC(), "signature": func.getSignature().getPrototypeString() } return {"success": False, "error": result.getErrorMessage()} # 主逻辑 if __name__ == "__main__": import sys if len(sys.argv) > 1: addr = int(sys.argv[1], 16) if sys.argv[1].startswith("0x") else int(sys.argv[1]) result = decompile_function(currentProgram, addr) print("===DECOMPILE_RESULT===") print(json.dumps(result, indent=2)) print("===END_RESULT===") ''' def check_ghidra() -> dict: """ 检查 Ghidra 环境是否可用 Returns: dict: {"available": bool, "ghidra_home": str, "message": str} """ global GHIDRA_HOME # 检查环境变量 if not GHIDRA_HOME: # 尝试常见路径 common_paths = [ r"C:\ghidra", r"C:\Program Files\ghidra", r"D:\ghidra", "/opt/ghidra", "/usr/local/ghidra", os.path.expanduser("~/ghidra"), ] for path in common_paths: if os.path.exists(path): # 查找 analyzeHeadless for root, dirs, files in os.walk(path): if "analyzeHeadless.bat" in files or "analyzeHeadless" in files: GHIDRA_HOME = path break if GHIDRA_HOME: break if not GHIDRA_HOME or not os.path.exists(GHIDRA_HOME): return { "available": False, "ghidra_home": "", "message": "Ghidra not found. Set GHIDRA_HOME environment variable.", "install_hint": "Download from https://ghidra-sre.org/" } # 查找 analyzeHeadless analyze_headless = None for root, dirs, files in os.walk(GHIDRA_HOME): if os.name == 'nt': if "analyzeHeadless.bat" in files: analyze_headless = os.path.join(root, "analyzeHeadless.bat") break else: if "analyzeHeadless" in files: analyze_headless = os.path.join(root, "analyzeHeadless") break if not analyze_headless: return { "available": False, "ghidra_home": GHIDRA_HOME, "message": "analyzeHeadless not found in GHIDRA_HOME" } return { "available": True, "ghidra_home": GHIDRA_HOME, "analyze_headless": analyze_headless, "message": "Ghidra is available" } def decompile_with_ghidra(so_path: str, address: int, timeout: int = 120) -> dict: """ 使用 Ghidra Headless 反编译函数 Args: so_path: SO文件路径 address: 函数虚拟地址 timeout: 超时时间(秒) Returns: dict: {"success": bool, "code": str, "error": str} """ ghidra_info = check_ghidra() if not ghidra_info["available"]: return {"success": False, "code": "", "error": ghidra_info["message"]} if not os.path.exists(so_path): return {"success": False, "code": "", "error": f"File not found: {so_path}"} temp_dir = tempfile.mkdtemp(prefix="ghidra_decompile_") try: # 创建 Ghidra 脚本 script_path = os.path.join(temp_dir, "DecompileFunction.py") with open(script_path, 'w') as f: f.write(GHIDRA_DECOMPILE_SCRIPT) # 准备项目目录 project_dir = os.path.join(temp_dir, "project") os.makedirs(project_dir, exist_ok=True) analyze_headless = ghidra_info["analyze_headless"] # 构建命令 cmd = [ analyze_headless, project_dir, "TempProject", "-import", so_path, "-postScript", script_path, hex(address), "-deleteProject", "-noanalysis" # 跳过完整分析,加快速度 ] # 执行 result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, cwd=temp_dir ) # 解析输出 output = result.stdout + result.stderr if "===DECOMPILE_RESULT===" in output: start = output.find("===DECOMPILE_RESULT===") + len("===DECOMPILE_RESULT===") end = output.find("===END_RESULT===") if end > start: import json result_json = output[start:end].strip() try: decompile_result = json.loads(result_json) return decompile_result except: pass return { "success": False, "code": "", "error": f"Ghidra output parsing failed. Return code: {result.returncode}", "stdout": output[:2000] } except subprocess.TimeoutExpired: return {"success": False, "code": "", "error": f"Ghidra timeout after {timeout}s"} except Exception as e: import traceback return {"success": False, "code": "", "error": f"{str(e)}\n{traceback.format_exc()}"} finally: shutil.rmtree(temp_dir, ignore_errors=True) def decompile_simple(so_path: str, address: int, size: int = 256) -> dict: """ 简单伪代码生成(基于模式匹配,无需Ghidra) 这是一个轻量级的反编译器,通过识别常见的ARM64指令模式 生成类C的伪代码。精度不如Ghidra,但速度快。 Args: so_path: SO文件路径 address: 函数虚拟地址 size: 分析字节数 Returns: dict: {"success": bool, "code": str, "instructions": list} """ if not LIEF_AVAILABLE or not CAPSTONE_AVAILABLE: return {"success": False, "code": "", "error": "lief and capstone required"} if not os.path.exists(so_path): return {"success": False, "code": "", "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "code": "", "error": "Failed to parse SO file"} # 获取 .text 段 text_section = None for section in binary.sections: if section.name == ".text": text_section = section break if text_section is None: return {"success": False, "code": "", "error": ".text section not found"} text_file_start = text_section.file_offset text_vaddr_base = text_section.virtual_address # 计算文件偏移 file_offset = text_file_start + (address - text_vaddr_base) # 获取导出函数名 exported_funcs = {} for func in binary.exported_functions: if hasattr(func, 'address') and hasattr(func, 'name'): exported_funcs[func.address] = func.name func_name = exported_funcs.get(address, f"sub_{address:x}") # 读取代码 code_bytes = bytes(data[file_offset:file_offset + size]) # 反汇编 md = Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN) md.detail = True instructions = list(md.disasm(code_bytes, address)) # 生成伪代码 pseudo_lines = [] pseudo_lines.append(f"// Function: {func_name}") pseudo_lines.append(f"// Address: 0x{address:x}") pseudo_lines.append(f"// Size: {size} bytes") pseudo_lines.append("") pseudo_lines.append(f"int {func_name}() {{") # 跟踪寄存器状态 reg_state = {} indent = " " # 分支目标 branch_targets = set() for insn in instructions: if insn.mnemonic.startswith('b') and insn.operands: for op in insn.operands: if hasattr(op, 'imm'): branch_targets.add(op.imm) for insn in instructions: mnemonic = insn.mnemonic.lower() ops = insn.op_str addr = insn.address # 检查是否是分支目标 if addr in branch_targets: pseudo_lines.append(f"\nlabel_{addr:x}:") comment = f"// 0x{addr:x}: {insn.mnemonic} {ops}" # 转换指令到伪代码 if mnemonic == "ret": pseudo_lines.append(f"{indent}return; {comment}") elif mnemonic == "mov": parts = ops.split(',') if len(parts) >= 2: dst = parts[0].strip() src = parts[1].strip() pseudo_lines.append(f"{indent}{dst} = {src}; {comment}") elif mnemonic in ["movz", "movk"]: parts = ops.split(',') if len(parts) >= 2: dst = parts[0].strip() src = ','.join(parts[1:]).strip() pseudo_lines.append(f"{indent}{dst} = {src}; {comment}") elif mnemonic == "add": parts = ops.split(',') if len(parts) >= 3: dst = parts[0].strip() src1 = parts[1].strip() src2 = parts[2].strip() pseudo_lines.append(f"{indent}{dst} = {src1} + {src2}; {comment}") elif mnemonic == "sub": parts = ops.split(',') if len(parts) >= 3: dst = parts[0].strip() src1 = parts[1].strip() src2 = parts[2].strip() pseudo_lines.append(f"{indent}{dst} = {src1} - {src2}; {comment}") elif mnemonic == "mul": parts = ops.split(',') if len(parts) >= 3: dst = parts[0].strip() src1 = parts[1].strip() src2 = parts[2].strip() pseudo_lines.append(f"{indent}{dst} = {src1} * {src2}; {comment}") elif mnemonic == "ldr": parts = ops.split(',', 1) if len(parts) >= 2: dst = parts[0].strip() src = parts[1].strip() pseudo_lines.append(f"{indent}{dst} = *({src}); {comment}") elif mnemonic == "str": parts = ops.split(',', 1) if len(parts) >= 2: src = parts[0].strip() dst = parts[1].strip() pseudo_lines.append(f"{indent}*({dst}) = {src}; {comment}") elif mnemonic == "stp": pseudo_lines.append(f"{indent}// store pair {comment}") elif mnemonic == "ldp": pseudo_lines.append(f"{indent}// load pair {comment}") elif mnemonic == "bl": target = ops.strip() # 查找函数名 if insn.operands: target_addr = insn.operands[0].imm target_name = exported_funcs.get(target_addr, f"sub_{target_addr:x}") pseudo_lines.append(f"{indent}{target_name}(); {comment}") else: pseudo_lines.append(f"{indent}call({target}); {comment}") elif mnemonic == "blr": pseudo_lines.append(f"{indent}(*{ops})(); {comment}") elif mnemonic == "cmp": parts = ops.split(',') if len(parts) >= 2: pseudo_lines.append(f"{indent}// compare {parts[0].strip()} with {parts[1].strip()} {comment}") elif mnemonic.startswith("b.") or mnemonic in ["cbz", "cbnz", "tbz", "tbnz"]: cond = mnemonic[2:] if mnemonic.startswith("b.") else mnemonic target = ops.split(',')[-1].strip() if insn.operands: target_addr = insn.operands[-1].imm if hasattr(insn.operands[-1], 'imm') else 0 pseudo_lines.append(f"{indent}if ({cond}) goto label_{target_addr:x}; {comment}") else: pseudo_lines.append(f"{indent}if ({cond}) goto {target}; {comment}") elif mnemonic == "b": if insn.operands: target_addr = insn.operands[0].imm pseudo_lines.append(f"{indent}goto label_{target_addr:x}; {comment}") else: pseudo_lines.append(f"{indent}goto {ops}; {comment}") elif mnemonic in ["adrp", "adr"]: parts = ops.split(',') if len(parts) >= 2: pseudo_lines.append(f"{indent}{parts[0].strip()} = &page({parts[1].strip()}); {comment}") elif mnemonic == "nop": pass # 忽略 NOP elif mnemonic in ["pacibsp", "autibsp", "paciasp", "autiasp"]: pseudo_lines.append(f"{indent}// PAC: {mnemonic} {comment}") else: pseudo_lines.append(f"{indent}// {insn.mnemonic} {ops} {comment}") pseudo_lines.append("}") return { "success": True, "function_name": func_name, "address": hex(address), "code": "\n".join(pseudo_lines), "instruction_count": len(instructions), "note": "Simple pattern-based decompilation. Use Ghidra for better results.", "error": "" } except Exception as e: import traceback return {"success": False, "code": "", "error": f"{str(e)}\n{traceback.format_exc()}"} def check_radare2() -> dict: """ 检查 Radare2 环境是否可用 Returns: dict: {"available": bool, "path": str, "version": str} """ global RADARE2_PATH # 检查配置的路径 if os.path.exists(RADARE2_PATH): try: result = subprocess.run( [RADARE2_PATH, "-v"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: version = result.stdout.split('\n')[0] if result.stdout else "unknown" return { "available": True, "path": RADARE2_PATH, "version": version } except: pass # 尝试系统PATH try: result = subprocess.run( ["radare2", "-v"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: RADARE2_PATH = "radare2" version = result.stdout.split('\n')[0] if result.stdout else "unknown" return { "available": True, "path": "radare2 (system)", "version": version } except: pass return { "available": False, "path": "", "version": "", "message": "Radare2 not found. Set RADARE2_PATH or install radare2." } def decompile_with_radare2(so_path: str, address: int, timeout: int = 60) -> dict: """ 使用 Radare2 + r2dec/pdc 反编译函数 Args: so_path: SO文件路径 address: 函数虚拟地址 timeout: 超时时间(秒) Returns: dict: {"success": bool, "code": str, "error": str} """ r2_info = check_radare2() if not r2_info["available"]: return {"success": False, "code": "", "error": r2_info.get("message", "Radare2 not available")} if not os.path.exists(so_path): return {"success": False, "code": "", "error": f"File not found: {so_path}"} try: # 构建radare2命令 # 使用pdc(简单反编译)或pdg(如果r2ghidra可用) r2_commands = f"aaa;s {hex(address)};pdc" result = subprocess.run( [RADARE2_PATH, "-q", "-c", r2_commands, so_path], capture_output=True, text=True, timeout=timeout ) if result.returncode == 0 and result.stdout: code = result.stdout.strip() # 检查是否有有效输出 if code and len(code) > 50: return { "success": True, "code": code, "address": hex(address), "error": "" } else: # 尝试使用pdf(反汇编+注释)作为后备 r2_commands_fallback = f"aaa;s {hex(address)};pdf" result2 = subprocess.run( [RADARE2_PATH, "-q", "-c", r2_commands_fallback, so_path], capture_output=True, text=True, timeout=timeout ) if result2.returncode == 0 and result2.stdout: return { "success": True, "code": result2.stdout.strip(), "address": hex(address), "note": "Using disassembly (pdf) as fallback", "error": "" } return { "success": False, "code": "", "error": f"Radare2 failed: {result.stderr or 'No output'}", "stdout": result.stdout[:500] if result.stdout else "" } except subprocess.TimeoutExpired: return {"success": False, "code": "", "error": f"Radare2 timeout after {timeout}s"} except Exception as e: import traceback return {"success": False, "code": "", "error": f"{str(e)}\n{traceback.format_exc()}"} def decompile(so_path: str, address: int, method: str = "radare2", size: int = 256, timeout: int = 120) -> dict: """ 反编译函数(统一接口) Args: so_path: SO文件路径 address: 函数虚拟地址 method: 反编译方法 ("radare2", "ghidra", "simple") size: 简单反编译时的字节数 timeout: 超时时间 Returns: dict: {"success": bool, "code": str, "method": str} """ if method == "ghidra": result = decompile_with_ghidra(so_path, address, timeout) result["method"] = "ghidra" elif method == "radare2": result = decompile_with_radare2(so_path, address, timeout) result["method"] = "radare2" else: result = decompile_simple(so_path, address, size) result["method"] = "simple" return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/1600822305/so-analyzer-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server