Skip to main content
Glama
xref_utils.py20 kB
"""交叉引用分析工具""" import os import struct from typing import Optional, List, Dict, Tuple try: import lief LIEF_AVAILABLE = True except ImportError: LIEF_AVAILABLE = False try: from capstone import Cs, CS_ARCH_ARM64, CS_MODE_ARM, CS_MODE_LITTLE_ENDIAN CAPSTONE_AVAILABLE = True except ImportError: CAPSTONE_AVAILABLE = False def get_code_sections(so_path: str) -> dict: """ 获取所有代码段信息 Args: so_path: SO文件路径 Returns: dict: {"success": bool, "sections": list, "error": str} """ if not LIEF_AVAILABLE: return {"success": False, "sections": [], "error": "lief not available"} if not os.path.exists(so_path): return {"success": False, "sections": [], "error": f"File not found: {so_path}"} try: binary = lief.parse(so_path) if binary is None: return {"success": False, "sections": [], "error": "Failed to parse SO file"} sections = [] for section in binary.sections: # 检查是否可执行 - 多种方法判断 is_exec = False # 方法1: 通过flags判断 try: # EXECINSTR = 0x4 if hasattr(section, 'flags'): flags_value = int(section.flags) is_exec = bool(flags_value & 0x4) # SHF_EXECINSTR = 0x4 except: pass # 方法2: 通过段名判断(备用) if not is_exec and section.name in [".text", ".plt", ".init", ".fini"]: is_exec = True sections.append({ "name": section.name, "start": hex(section.virtual_address), "end": hex(section.virtual_address + section.size), "file_offset": hex(section.file_offset), "size": section.size, "is_executable": is_exec, "flags": hex(int(section.flags)) if hasattr(section, 'flags') else "0x0" }) # 找出可执行段 exec_sections = [s for s in sections if s["is_executable"]] return { "success": True, "sections": sections, "executable_sections": exec_sections, "error": "" } except Exception as e: import traceback return {"success": False, "sections": [], "error": f"{str(e)}\n{traceback.format_exc()}"} def find_string_offset(so_path: str, search_string: str) -> dict: """ 查找字符串在文件中的偏移 Args: so_path: SO文件路径 search_string: 要搜索的字符串 Returns: dict: {"success": bool, "offsets": list, "error": str} """ if not os.path.exists(so_path): return {"success": False, "offsets": [], "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() search_bytes = search_string.encode('utf-8') offsets = [] start = 0 while True: pos = data.find(search_bytes, start) if pos == -1: break offsets.append({ "offset": pos, "hex_offset": hex(pos), "context": data[max(0, pos-10):pos+len(search_bytes)+10] }) start = pos + 1 if len(offsets) >= 10: # 限制数量 break return { "success": True, "string": search_string, "offsets": offsets, "count": len(offsets), "error": "" } except Exception as e: return {"success": False, "offsets": [], "error": str(e)} def xref_string(so_path: str, search_string: str, max_xrefs: int = 20) -> dict: """ 查找字符串的交叉引用(哪些代码引用了这个字符串) 这是定位SSL验证函数的核心工具! Args: so_path: SO文件路径 search_string: 要搜索的字符串 max_xrefs: 最多返回的交叉引用数量 Returns: dict: {"success": bool, "xrefs": list, "error": str} """ if not LIEF_AVAILABLE: return {"success": False, "xrefs": [], "error": "lief not available"} if not os.path.exists(so_path): return {"success": False, "xrefs": [], "error": f"File not found: {so_path}"} # 使用新的搜索算法 return xref_string_v2(so_path, search_string, max_xrefs) def xref_string_v2(so_path: str, search_string: str, max_xrefs: int = 20) -> dict: """ 改进版交叉引用搜索 - 考虑虚拟地址映射 """ if not LIEF_AVAILABLE: return {"success": False, "xrefs": [], "error": "lief not available"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "xrefs": [], "error": "Failed to parse SO file"} debug_info = { "file_size": len(data), "sections": [], "search_stats": {} } # 1. 找到字符串的文件偏移 search_bytes = search_string.encode('utf-8') string_file_offset = data.find(search_bytes) if string_file_offset == -1: return {"success": False, "xrefs": [], "error": f"String not found: {search_string}"} # 2. 建立文件偏移到虚拟地址的映射 # 找到字符串所在的段,获取其虚拟地址 string_vaddr = string_file_offset # 默认 for section in binary.sections: sec_start = section.file_offset sec_end = section.file_offset + section.size if sec_start <= string_file_offset < sec_end: # 计算虚拟地址 offset_in_section = string_file_offset - sec_start string_vaddr = section.virtual_address + offset_in_section debug_info["string_section"] = { "name": section.name, "file_offset": hex(sec_start), "virtual_address": hex(section.virtual_address) } break debug_info["string_file_offset"] = hex(string_file_offset) debug_info["string_vaddr"] = hex(string_vaddr) # 3. 获取.text段信息 text_section = None for section in binary.sections: debug_info["sections"].append({ "name": section.name, "file_offset": hex(section.file_offset), "vaddr": hex(section.virtual_address), "size": section.size }) if section.name == ".text": text_section = section if text_section is None: return {"success": False, "xrefs": [], "debug": debug_info, "error": ".text section not found"} text_file_start = text_section.file_offset text_file_end = text_section.file_offset + text_section.size text_vaddr_base = text_section.virtual_address debug_info["text_section"] = { "file_start": hex(text_file_start), "file_end": hex(text_file_end), "vaddr_base": hex(text_vaddr_base), "size": text_section.size } # 4. 计算目标页地址 target_page = string_vaddr & ~0xFFF target_page_offset = string_vaddr & 0xFFF debug_info["target_page"] = hex(target_page) debug_info["target_page_offset"] = hex(target_page_offset) xrefs = [] adrp_found = 0 adrp_page_match = 0 adrp_samples = [] # 采样前20个ADRP指令的解析结果 unique_target_pages = set() # 收集所有目标页面 # 5. 在.text段中搜索ADRP指令 for file_offset in range(text_file_start, min(text_file_end, len(data) - 8), 4): if len(xrefs) >= max_xrefs: break insn = struct.unpack('<I', data[file_offset:file_offset+4])[0] # ADRP指令: [1] [immlo:2] [10000] [immhi:19] [Rd:5] # 检查是否是ADRP (op=1, 识别码 10010000) if (insn & 0x9F000000) == 0x90000000: adrp_found += 1 # 提取立即数 immlo = (insn >> 29) & 0x3 immhi = (insn >> 5) & 0x7FFFF imm = (immhi << 2) | immlo # 符号扩展 (21位有符号数) if imm & 0x100000: imm = imm - 0x200000 # 正确的符号扩展 # 计算当前指令的虚拟地址 offset_in_text = file_offset - text_file_start current_vaddr = text_vaddr_base + offset_in_text # 计算ADRP的目标页 (PC & ~0xFFF + imm << 12) pc_page = current_vaddr & ~0xFFF adrp_target_page = (pc_page + (imm << 12)) & 0xFFFFFFFFFFFFFFFF # 收集目标页面用于调试 if len(unique_target_pages) < 1000: unique_target_pages.add(adrp_target_page) # 采样前20个ADRP指令 if len(adrp_samples) < 20: adrp_samples.append({ "file_offset": hex(file_offset), "vaddr": hex(current_vaddr), "instruction": hex(insn), "imm_raw": hex(imm & 0x1FFFFF), "pc_page": hex(pc_page), "target_page": hex(adrp_target_page) }) # 检查是否指向目标页 if adrp_target_page == target_page: adrp_page_match += 1 # 检查后续ADD指令 next_insn = struct.unpack('<I', data[file_offset+4:file_offset+8])[0] # ADD immediate: [sf:1] [00] [100010] [sh:1] [imm12:12] [Rn:5] [Rd:5] # 32位: 0x11000000, 64位: 0x91000000 if (next_insn & 0x7F800000) == 0x11000000 or (next_insn & 0xFF800000) == 0x91000000: add_imm = (next_insn >> 10) & 0xFFF # 检查ADD的目标是否匹配(允许一定误差) if abs(add_imm - target_page_offset) <= 0x10: xrefs.append({ "file_offset": hex(file_offset), "virtual_address": hex(current_vaddr), "type": "adrp+add", "adrp_target_page": hex(adrp_target_page), "add_offset": hex(add_imm), "full_target": hex(adrp_target_page + add_imm), "instruction_bytes": data[file_offset:file_offset+8].hex() }) # 检查目标页面是否在唯一页面集合中 target_in_pages = target_page in unique_target_pages # 找到最接近目标页面的页面 closest_pages = [] if unique_target_pages: sorted_pages = sorted(unique_target_pages, key=lambda p: abs(p - target_page)) closest_pages = [hex(p) for p in sorted_pages[:10]] debug_info["search_stats"] = { "adrp_instructions_found": adrp_found, "adrp_page_matches": adrp_page_match, "xrefs_found": len(xrefs), "unique_target_pages_count": len(unique_target_pages), "target_page_found_in_adrps": target_in_pages } debug_info["adrp_samples"] = adrp_samples debug_info["closest_pages_to_target"] = closest_pages # 6. 如果没找到,也尝试搜索LDR模式 if len(xrefs) == 0: # 搜索包含目标地址的数据引用 # 有些编译器会在.got或.data段放置地址 string_vaddr_bytes = struct.pack('<Q', string_vaddr) pos = 0 while len(xrefs) < max_xrefs: pos = data.find(string_vaddr_bytes[:4], pos) # 只搜索低32位 if pos == -1: break # 检查是否是完整的64位地址 if data[pos:pos+8] == string_vaddr_bytes or data[pos:pos+4] == string_vaddr_bytes[:4]: xrefs.append({ "file_offset": hex(pos), "type": "direct_pointer", "value": data[pos:pos+8].hex() }) pos += 1 return { "success": True, "string": search_string, "string_file_offset": hex(string_file_offset), "string_vaddr": hex(string_vaddr), "xrefs": xrefs, "count": len(xrefs), "debug": debug_info, "error": "" } except Exception as e: import traceback return {"success": False, "xrefs": [], "error": f"{str(e)}\n{traceback.format_exc()}"} def find_function_containing_address(binary, data: bytes, address: int) -> Optional[dict]: """ 根据地址找到所属的函数 通过向前搜索函数开头特征来找函数边界 """ if not LIEF_AVAILABLE: return None try: # 首先检查是否在已知的导出函数中 for func in binary.exported_functions: func_addr = func.address if hasattr(func, 'address') else 0 # 假设函数大小最大4096字节 if func_addr <= address < func_addr + 4096: return { "name": func.name if hasattr(func, 'name') else "unknown", "start": hex(func_addr) } # 向前搜索函数开头特征 # ARM64函数常见开头: stp x29, x30, [sp, #-N]! 或 sub sp, sp, #N search_start = max(0, address - 4096) for offset in range(address, search_start, -4): if offset < 4: break insn = struct.unpack('<I', data[offset:offset+4])[0] # 检查是否是stp x29, x30 (常见的函数开头) # stp x29, x30, [sp, #imm]! 的编码特征 if (insn & 0xFFC003FF) == 0xA98003E0: return { "name": "sub_" + hex(offset)[2:], "start": hex(offset) } # 检查 pacibsp (ARM64 PAC) if insn == 0xD503237F: return { "name": "sub_" + hex(offset)[2:], "start": hex(offset) } return None except: return None def find_function_by_address(so_path: str, address: int) -> dict: """ 根据地址查找函数信息 Args: so_path: SO文件路径 address: 地址 Returns: dict: {"success": bool, "function": dict, "error": str} """ if not LIEF_AVAILABLE: return {"success": False, "function": {}, "error": "lief not available"} if not os.path.exists(so_path): return {"success": False, "function": {}, "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "function": {}, "error": "Failed to parse SO file"} func_info = find_function_containing_address(binary, data, address) if func_info: return { "success": True, "function": func_info, "query_address": hex(address), "error": "" } else: return { "success": False, "function": {}, "query_address": hex(address), "error": "Could not determine function boundaries" } except Exception as e: return {"success": False, "function": {}, "error": str(e)} def analyze_function(so_path: str, function_address: int, size: int = 256) -> dict: """ 分析函数特征,判断是否是SSL验证函数 Args: so_path: SO文件路径 function_address: 函数地址 size: 分析的字节数 Returns: dict: {"success": bool, "analysis": dict, "error": str} """ if not os.path.exists(so_path): return {"success": False, "analysis": {}, "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() if function_address < 0 or function_address + size > len(data): return {"success": False, "analysis": {}, "error": "Invalid address range"} func_data = data[function_address:function_address + size] analysis = { "address": hex(function_address), "size_analyzed": size, "strings_nearby": [], "call_instructions": [], "return_instructions": [], "is_ssl_verify": False, "ssl_confidence": 0.0 } # 搜索附近的字符串引用 ssl_keywords = [ b"ssl", b"SSL", b"cert", b"CERT", b"verify", b"VERIFY", b"x509", b"X509", b"certificate", b"CERTIFICATE" ] ssl_score = 0 for keyword in ssl_keywords: # 在函数附近搜索 search_start = max(0, function_address - 10000) search_end = min(len(data), function_address + 10000) search_range = data[search_start:search_end] if keyword in search_range: ssl_score += 1 analysis["strings_nearby"].append(keyword.decode('utf-8', errors='ignore')) # 检查是否有CERTIFICATE_VERIFY_FAILED字符串引用 if b"CERTIFICATE_VERIFY_FAILED" in data: cert_offset = data.find(b"CERTIFICATE_VERIFY_FAILED") # 检查函数是否引用了这个字符串 cert_page = cert_offset & ~0xFFF func_page = function_address & ~0xFFF page_diff = abs(cert_page - func_page) if page_diff < 0x100000: # 1MB范围内 ssl_score += 5 analysis["references_cert_verify_failed"] = True # 计算置信度 analysis["ssl_confidence"] = min(ssl_score / 10.0, 1.0) analysis["is_ssl_verify"] = analysis["ssl_confidence"] > 0.5 # 反汇编前几条指令 if CAPSTONE_AVAILABLE: try: md = Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN) instructions = [] for insn in md.disasm(func_data[:64], function_address): instructions.append({ "address": hex(insn.address), "mnemonic": insn.mnemonic, "operands": insn.op_str, "bytes": insn.bytes.hex() }) if len(instructions) >= 10: break analysis["first_instructions"] = instructions except: pass return { "success": True, "analysis": analysis, "error": "" } except Exception as e: return {"success": False, "analysis": {}, "error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/1600822305/so-analyzer-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server