Skip to main content
Glama
advanced_utils.py59.9 kB
"""高级分析工具 - 函数识别、调用图、控制流图""" import os import struct from typing import Optional, List, Dict, Set, Tuple from collections import defaultdict try: import lief LIEF_AVAILABLE = True except ImportError: LIEF_AVAILABLE = False try: from capstone import Cs, CS_ARCH_ARM64, CS_MODE_ARM, CS_MODE_LITTLE_ENDIAN from capstone.arm64 import ARM64_OP_IMM, ARM64_GRP_CALL, ARM64_GRP_JUMP, ARM64_GRP_RET, ARM64_GRP_BRANCH_RELATIVE CAPSTONE_AVAILABLE = True except ImportError: CAPSTONE_AVAILABLE = False # ARM64 函数开头特征 FUNCTION_PROLOGUE_PATTERNS = [ # PACIBSP - PAC指令 (常见于新版本) (0xD503237F, 0xFFFFFFFF, "pacibsp"), # STP X29, X30, [SP, #imm]! - 常见函数开头 (0xA98003E0, 0xFFC003FF, "stp x29, x30"), # SUB SP, SP, #imm (64位) (0xD10003FF, 0xFF0003FF, "sub sp, sp"), # STP with pre-index (通用) (0xA9800000, 0xFFC00000, "stp pre-index"), ] # ARM64 函数结尾特征 FUNCTION_EPILOGUE_PATTERNS = [ # RET (0xD65F03C0, 0xFFFFFFFF, "ret"), # RETAB/RETAA (PAC) (0xD65F0FFF, 0xFFFFFFFF, "retab"), (0xD65F0BFF, 0xFFFFFFFF, "retaa"), ] def list_all_functions(so_path: str, limit: int = 2000, search: str = "") -> dict: """ 识别所有函数(包括未导出的) 方法: 1. 扫描 .text 段 2. 识别函数开头模式 (STP X29,X30 / SUB SP,SP / PACIBSP) 3. 估算函数大小 4. 与导出函数合并 Args: so_path: SO文件路径 limit: 最大返回数量 search: 搜索过滤(用于导出函数名) Returns: dict: {"success": bool, "functions": list, "error": str} """ if not LIEF_AVAILABLE: return {"success": False, "functions": [], "error": "lief not available"} if not os.path.exists(so_path): return {"success": False, "functions": [], "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "functions": [], "error": "Failed to parse SO file"} # 1. 获取 .text 段信息 text_section = None for section in binary.sections: if section.name == ".text": text_section = section break if text_section is None: return {"success": False, "functions": [], "error": ".text section not found"} text_file_start = text_section.file_offset text_file_end = text_section.file_offset + text_section.size text_vaddr_base = text_section.virtual_address # 2. 收集导出函数信息 exported_funcs = {} # vaddr -> name for func in binary.exported_functions: if hasattr(func, 'address') and hasattr(func, 'name'): exported_funcs[func.address] = func.name # 3. 扫描 .text 段识别函数开头 functions = [] found_addrs = set() for file_offset in range(text_file_start, min(text_file_end, len(data) - 4), 4): insn = struct.unpack('<I', data[file_offset:file_offset+4])[0] # 检查是否匹配函数开头模式 for pattern, mask, desc in FUNCTION_PROLOGUE_PATTERNS: if (insn & mask) == pattern: # 计算虚拟地址 vaddr = text_vaddr_base + (file_offset - text_file_start) if vaddr in found_addrs: continue found_addrs.add(vaddr) # 检查是否是导出函数 is_exported = vaddr in exported_funcs name = exported_funcs.get(vaddr, f"sub_{vaddr:x}") # 搜索过滤 if search and search.lower() not in name.lower(): continue # 估算函数大小 func_size = _estimate_function_size(data, file_offset, text_file_end) functions.append({ "address": hex(vaddr), "file_offset": hex(file_offset), "size": func_size, "is_exported": is_exported, "name": name, "prologue_type": desc }) if len(functions) >= limit: break break if len(functions) >= limit: break # 4. 添加导出但未被扫描到的函数 for vaddr, name in exported_funcs.items(): if vaddr not in found_addrs: if search and search.lower() not in name.lower(): continue # 计算文件偏移 if text_vaddr_base <= vaddr < text_vaddr_base + text_section.size: file_offset = text_file_start + (vaddr - text_vaddr_base) func_size = _estimate_function_size(data, file_offset, text_file_end) else: file_offset = 0 func_size = 0 functions.append({ "address": hex(vaddr), "file_offset": hex(file_offset) if file_offset else "unknown", "size": func_size, "is_exported": True, "name": name, "prologue_type": "exported" }) if len(functions) >= limit: break # 按地址排序 functions.sort(key=lambda x: int(x["address"], 16)) return { "success": True, "functions": functions, "total_count": len(functions), "exported_count": sum(1 for f in functions if f["is_exported"]), "internal_count": sum(1 for f in functions if not f["is_exported"]), "text_section": { "vaddr": hex(text_vaddr_base), "size": text_section.size, "file_offset": hex(text_file_start) }, "error": "" } except Exception as e: import traceback return {"success": False, "functions": [], "error": f"{str(e)}\n{traceback.format_exc()}"} def _estimate_function_size(data: bytes, func_start: int, text_end: int) -> int: """估算函数大小""" max_search = min(func_start + 0x10000, text_end, len(data) - 4) # 最多64KB for offset in range(func_start + 4, max_search, 4): insn = struct.unpack('<I', data[offset:offset+4])[0] # RET指令 if insn == 0xD65F03C0: return offset - func_start + 4 # RETAB/RETAA if insn in (0xD65F0FFF, 0xD65F0BFF): return offset - func_start + 4 # 遇到下一个函数开头 for pattern, mask, _ in FUNCTION_PROLOGUE_PATTERNS: if (insn & mask) == pattern: # 检查前一条指令是否是RET或NOP if offset >= 4: prev_insn = struct.unpack('<I', data[offset-4:offset])[0] if prev_insn == 0xD65F03C0 or prev_insn == 0xD503201F: return offset - func_start return min(0x1000, max_search - func_start) # 默认4KB def callgraph(so_path: str, function_addr: int, max_depth: int = 3) -> dict: """ 分析函数调用关系 方法: 1. 反汇编函数 2. 识别 BL/BLR 指令 3. 递归分析被调用的函数 Args: so_path: SO文件路径 function_addr: 函数地址(虚拟地址) max_depth: 最大递归深度 Returns: dict: {"success": bool, "calls": list, "graph": str, "error": str} """ if not LIEF_AVAILABLE or not CAPSTONE_AVAILABLE: return {"success": False, "calls": [], "graph": "", "error": "lief and capstone required"} if not os.path.exists(so_path): return {"success": False, "calls": [], "graph": "", "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "calls": [], "graph": "", "error": "Failed to parse SO file"} # 获取 .text 段信息 text_section = None for section in binary.sections: if section.name == ".text": text_section = section break if text_section is None: return {"success": False, "calls": [], "graph": "", "error": ".text section not found"} text_file_start = text_section.file_offset text_vaddr_base = text_section.virtual_address text_size = text_section.size # 收集导出函数名 exported_funcs = {} for func in binary.exported_functions: if hasattr(func, 'address') and hasattr(func, 'name'): exported_funcs[func.address] = func.name # 初始化反汇编器 md = Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN) md.detail = True # BFS遍历调用图 calls = [] visited = set() queue = [(function_addr, 0)] # (addr, depth) nodes = set() edges = [] def get_func_name(addr): return exported_funcs.get(addr, f"sub_{addr:x}") while queue: current_addr, depth = queue.pop(0) if current_addr in visited or depth > max_depth: continue visited.add(current_addr) nodes.add(current_addr) # 计算文件偏移 if not (text_vaddr_base <= current_addr < text_vaddr_base + text_size): continue file_offset = text_file_start + (current_addr - text_vaddr_base) # 估算函数大小 func_size = _estimate_function_size(data, file_offset, text_file_start + text_size) func_size = min(func_size, 0x4000) # 限制16KB if file_offset + func_size > len(data): continue code = bytes(data[file_offset:file_offset + func_size]) # 反汇编并查找调用 try: for insn in md.disasm(code, current_addr): # BL指令 (直接调用) if insn.mnemonic == "bl": # 解析目标地址 if insn.operands and len(insn.operands) > 0: target = insn.operands[0].imm call_info = { "from": hex(current_addr), "from_name": get_func_name(current_addr), "call_site": hex(insn.address), "to": hex(target), "to_name": get_func_name(target), "type": "direct", "depth": depth } calls.append(call_info) edges.append((current_addr, target)) if target not in visited and depth < max_depth: queue.append((target, depth + 1)) # BLR指令 (间接调用) elif insn.mnemonic == "blr": call_info = { "from": hex(current_addr), "from_name": get_func_name(current_addr), "call_site": hex(insn.address), "to": "indirect", "to_name": f"[{insn.op_str}]", "type": "indirect", "depth": depth } calls.append(call_info) except Exception: pass # 生成DOT格式图 dot_lines = ["digraph callgraph {", " rankdir=TB;", " node [shape=box];"] for addr in nodes: name = get_func_name(addr) is_root = addr == function_addr style = 'style=filled,fillcolor=lightblue' if is_root else '' dot_lines.append(f' "{name}" [{style}];') for from_addr, to_addr in edges: from_name = get_func_name(from_addr) to_name = get_func_name(to_addr) dot_lines.append(f' "{from_name}" -> "{to_name}";') dot_lines.append("}") dot_graph = "\n".join(dot_lines) return { "success": True, "root": hex(function_addr), "root_name": get_func_name(function_addr), "calls": calls, "nodes_count": len(nodes), "edges_count": len(edges), "graph": dot_graph, "error": "" } except Exception as e: import traceback return {"success": False, "calls": [], "graph": "", "error": f"{str(e)}\n{traceback.format_exc()}"} def get_cfg(so_path: str, function_addr: int, max_size: int = 0x2000) -> dict: """ 生成函数的控制流图 (CFG) Args: so_path: SO文件路径 function_addr: 函数地址(虚拟地址) max_size: 最大分析字节数 Returns: dict: {"success": bool, "basic_blocks": list, "edges": list, "graph": str} """ if not LIEF_AVAILABLE or not CAPSTONE_AVAILABLE: return {"success": False, "basic_blocks": [], "edges": [], "graph": "", "error": "lief and capstone required"} if not os.path.exists(so_path): return {"success": False, "basic_blocks": [], "edges": [], "graph": "", "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "basic_blocks": [], "edges": [], "graph": "", "error": "Failed to parse SO file"} # 获取 .text 段 text_section = None for section in binary.sections: if section.name == ".text": text_section = section break if text_section is None: return {"success": False, "basic_blocks": [], "edges": [], "graph": "", "error": ".text section not found"} text_file_start = text_section.file_offset text_vaddr_base = text_section.virtual_address text_size = text_section.size # 计算文件偏移 if not (text_vaddr_base <= function_addr < text_vaddr_base + text_size): return {"success": False, "basic_blocks": [], "edges": [], "graph": "", "error": f"Address {hex(function_addr)} not in .text section"} file_offset = text_file_start + (function_addr - text_vaddr_base) # 估算函数大小 func_size = _estimate_function_size(data, file_offset, text_file_start + text_size) func_size = min(func_size, max_size) code = bytes(data[file_offset:file_offset + func_size]) # 反汇编 md = Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN) md.detail = True # 识别基本块边界 # 基本块结束于: 分支指令、跳转指令、RET # 基本块开始于: 函数入口、分支目标 instructions = list(md.disasm(code, function_addr)) if not instructions: return {"success": False, "basic_blocks": [], "edges": [], "graph": "", "error": "Failed to disassemble function"} # 收集所有基本块起始地址 block_starts = {function_addr} # 函数入口 branch_targets = {} # addr -> list of targets branch_mnemonics = {'b', 'bl', 'br', 'blr', 'ret', 'b.eq', 'b.ne', 'b.lt', 'b.le', 'b.gt', 'b.ge', 'b.hi', 'b.lo', 'b.hs', 'b.ls', 'cbz', 'cbnz', 'tbz', 'tbnz'} for insn in instructions: mnemonic_lower = insn.mnemonic.lower() if mnemonic_lower in branch_mnemonics or mnemonic_lower.startswith('b.'): # 下一条指令是新基本块的开始 next_addr = insn.address + insn.size if next_addr < function_addr + func_size: block_starts.add(next_addr) # 获取跳转目标 targets = [] if insn.operands: for op in insn.operands: if hasattr(op, 'imm') and op.imm: target = op.imm if function_addr <= target < function_addr + func_size: block_starts.add(target) targets.append(target) branch_targets[insn.address] = { "mnemonic": insn.mnemonic, "targets": targets, "is_conditional": mnemonic_lower.startswith('b.') or mnemonic_lower in {'cbz', 'cbnz', 'tbz', 'tbnz'} } # 构建基本块 sorted_starts = sorted(block_starts) basic_blocks = [] edges = [] for i, block_start in enumerate(sorted_starts): # 找到块结束 block_end = sorted_starts[i + 1] if i + 1 < len(sorted_starts) else function_addr + func_size # 收集块内指令 block_insns = [] last_insn = None for insn in instructions: if block_start <= insn.address < block_end: block_insns.append({ "address": hex(insn.address), "mnemonic": insn.mnemonic, "operands": insn.op_str }) last_insn = insn if not block_insns: continue block = { "id": f"bb_{block_start:x}", "start": hex(block_start), "end": hex(block_end), "size": block_end - block_start, "instruction_count": len(block_insns), "instructions": block_insns[:10], # 只显示前10条 "is_entry": block_start == function_addr } basic_blocks.append(block) # 添加边 if last_insn and last_insn.address in branch_targets: bt = branch_targets[last_insn.address] for target in bt["targets"]: edges.append({ "from": f"bb_{block_start:x}", "to": f"bb_{target:x}", "type": "conditional" if bt["is_conditional"] else "unconditional" }) # 条件分支有fall-through if bt["is_conditional"]: next_addr = last_insn.address + last_insn.size if next_addr in block_starts: edges.append({ "from": f"bb_{block_start:x}", "to": f"bb_{next_addr:x}", "type": "fall-through" }) else: # 顺序执行到下一个块 if i + 1 < len(sorted_starts): edges.append({ "from": f"bb_{block_start:x}", "to": f"bb_{sorted_starts[i+1]:x}", "type": "fall-through" }) # 生成DOT图 dot_lines = ["digraph cfg {", " rankdir=TB;", " node [shape=box,fontname=Courier];"] for block in basic_blocks: label = f"{block['id']}\\n{block['instruction_count']} insns" style = 'style=filled,fillcolor=lightgreen' if block['is_entry'] else '' dot_lines.append(f' "{block["id"]}" [label="{label}" {style}];') for edge in edges: style = 'style=dashed' if edge['type'] == 'conditional' else '' color = 'color=red' if edge['type'] == 'conditional' else 'color=blue' if edge['type'] == 'fall-through' else '' dot_lines.append(f' "{edge["from"]}" -> "{edge["to"]}" [{style} {color}];') dot_lines.append("}") dot_graph = "\n".join(dot_lines) return { "success": True, "function_address": hex(function_addr), "function_size": func_size, "basic_blocks": basic_blocks, "block_count": len(basic_blocks), "edges": edges, "edge_count": len(edges), "graph": dot_graph, "error": "" } except Exception as e: import traceback return {"success": False, "basic_blocks": [], "edges": [], "graph": "", "error": f"{str(e)}\n{traceback.format_exc()}"} def analyze_function_advanced(so_path: str, function_address: int, size: int = 512) -> dict: """ 全面分析函数特征 分析内容: 1. 识别函数调用 (BL/BLR) 2. 识别系统调用 (SVC) 3. 识别字符串引用 4. 识别常量使用 5. 估算复杂度 6. 判断函数类型 (SSL/加密/网络等) Args: so_path: SO文件路径 function_address: 函数虚拟地址 size: 分析的字节数 Returns: dict: 详细分析结果 """ if not LIEF_AVAILABLE or not CAPSTONE_AVAILABLE: return {"success": False, "analysis": {}, "error": "lief and capstone required"} if not os.path.exists(so_path): return {"success": False, "analysis": {}, "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "analysis": {}, "error": "Failed to parse SO file"} # 获取段信息 text_section = None for section in binary.sections: if section.name == ".text": text_section = section break if text_section is None: return {"success": False, "analysis": {}, "error": ".text section not found"} text_file_start = text_section.file_offset text_vaddr_base = text_section.virtual_address # 计算文件偏移 file_offset = text_file_start + (function_address - text_vaddr_base) # 获取导出函数映射 exported_funcs = {} for func in binary.exported_functions: if hasattr(func, 'address') and hasattr(func, 'name'): exported_funcs[func.address] = func.name # 读取函数代码 func_size = min(size, len(data) - file_offset) code = bytes(data[file_offset:file_offset + func_size]) # 反汇编 md = Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN) md.detail = True instructions = list(md.disasm(code, function_address)) analysis = { "address": hex(function_address), "file_offset": hex(file_offset), "size_analyzed": func_size, "instruction_count": len(instructions), "calls": [], "indirect_calls": [], "syscalls": [], "string_refs": [], "constants": [], "branches": 0, "loops_detected": False, "complexity": "low", "likely_type": "unknown", "ssl_indicators": [], "crypto_indicators": [] } # 分析每条指令 backward_branches = 0 adrp_targets = {} # 用于跟踪ADRP加载的地址 for insn in instructions: mnemonic = insn.mnemonic.lower() # 1. 函数调用 if mnemonic == "bl": if insn.operands: target = insn.operands[0].imm name = exported_funcs.get(target, f"sub_{target:x}") analysis["calls"].append({ "address": hex(insn.address), "target": hex(target), "name": name }) # 检查SSL相关调用 name_lower = name.lower() if any(kw in name_lower for kw in ['ssl', 'tls', 'cert', 'x509', 'verify']): analysis["ssl_indicators"].append(name) if any(kw in name_lower for kw in ['aes', 'sha', 'md5', 'hmac', 'encrypt', 'decrypt']): analysis["crypto_indicators"].append(name) elif mnemonic == "blr": analysis["indirect_calls"].append({ "address": hex(insn.address), "register": insn.op_str }) # 2. 系统调用 elif mnemonic == "svc": if insn.operands: syscall_num = insn.operands[0].imm analysis["syscalls"].append({ "address": hex(insn.address), "number": syscall_num }) # 3. ADRP指令 (用于字符串引用) elif mnemonic == "adrp": if insn.operands and len(insn.operands) >= 2: reg = insn.op_str.split(',')[0].strip() target = insn.operands[1].imm adrp_targets[reg] = target # 4. ADD指令 (配合ADRP) elif mnemonic == "add" and insn.op_str: parts = insn.op_str.split(',') if len(parts) >= 3: src_reg = parts[1].strip() if src_reg in adrp_targets and insn.operands: base = adrp_targets[src_reg] offset = insn.operands[2].imm if len(insn.operands) > 2 else 0 full_addr = base + offset # 尝试在该地址找字符串 if 0 < full_addr < len(data): # 简单读取字符串 str_bytes = [] for i in range(min(64, len(data) - full_addr)): b = data[full_addr + i] if b == 0: break if 32 <= b < 127: str_bytes.append(chr(b)) else: break if len(str_bytes) >= 4: string = ''.join(str_bytes) analysis["string_refs"].append({ "address": hex(insn.address), "string_addr": hex(full_addr), "value": string[:50] }) # 检查SSL相关字符串 str_lower = string.lower() if any(kw in str_lower for kw in ['ssl', 'cert', 'verify', 'x509', 'tls']): analysis["ssl_indicators"].append(string[:30]) # 5. 分支指令 if mnemonic.startswith('b') and mnemonic not in ['bl', 'blr', 'br']: analysis["branches"] += 1 # 检查是否是向后跳转(可能是循环) if insn.operands: target = insn.operands[0].imm if hasattr(insn.operands[0], 'imm') else 0 if target < insn.address: backward_branches += 1 # 6. 常量加载 if mnemonic in ['mov', 'movz', 'movk']: if insn.operands and len(insn.operands) >= 2: if hasattr(insn.operands[1], 'imm'): imm = insn.operands[1].imm if imm > 0xFF: # 只记录较大的常量 analysis["constants"].append({ "address": hex(insn.address), "value": hex(imm) }) # 计算复杂度 analysis["loops_detected"] = backward_branches > 0 total_complexity = ( len(analysis["calls"]) * 2 + len(analysis["indirect_calls"]) * 3 + analysis["branches"] + backward_branches * 2 ) if total_complexity < 10: analysis["complexity"] = "low" elif total_complexity < 30: analysis["complexity"] = "medium" else: analysis["complexity"] = "high" # 判断函数类型 ssl_score = len(analysis["ssl_indicators"]) crypto_score = len(analysis["crypto_indicators"]) if ssl_score >= 2: analysis["likely_type"] = "ssl_verify" elif crypto_score >= 2: analysis["likely_type"] = "crypto" elif len(analysis["syscalls"]) >= 2: analysis["likely_type"] = "system" elif len(analysis["string_refs"]) >= 3: analysis["likely_type"] = "string_processing" else: analysis["likely_type"] = "general" return { "success": True, "analysis": analysis, "error": "" } except Exception as e: import traceback return {"success": False, "analysis": {}, "error": f"{str(e)}\n{traceback.format_exc()}"} def detect_string_encryption(so_path: str, min_length: int = 8, max_strings: int = 100) -> dict: """ 检测字符串是否被加密/混淆 方法: 1. 分析字符串熵值(高熵 = 可能加密) 2. 检查是否有解密函数特征 3. 识别常见加密算法特征(XOR、Base64等) 4. 检查字符串分布异常 Args: so_path: SO文件路径 min_length: 最小字符串长度 max_strings: 最大分析字符串数 Returns: dict: {"success": bool, "encrypted_strings": list, "decryption_hints": list} """ import math if not LIEF_AVAILABLE: return {"success": False, "error": "lief not available"} if not os.path.exists(so_path): return {"success": False, "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "error": "Failed to parse SO file"} def calculate_entropy(s: bytes) -> float: """计算字节序列的熵""" if len(s) == 0: return 0 freq = {} for b in s: freq[b] = freq.get(b, 0) + 1 entropy = 0 for count in freq.values(): p = count / len(s) entropy -= p * math.log2(p) return entropy def is_printable_ratio(s: bytes) -> float: """计算可打印字符比例""" printable = sum(1 for b in s if 32 <= b < 127) return printable / len(s) if len(s) > 0 else 0 def detect_xor_pattern(s: bytes) -> Optional[int]: """检测简单XOR加密""" # 尝试常见的XOR键 for key in range(1, 256): decoded = bytes(b ^ key for b in s) if is_printable_ratio(decoded) > 0.8: return key return None def is_base64_like(s: bytes) -> bool: """检测是否像Base64编码""" b64_chars = set(b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=') return all(b in b64_chars for b in s) and len(s) % 4 == 0 # 提取字符串 strings = [] current = [] current_start = 0 for i, b in enumerate(data): if 32 <= b < 127: if not current: current_start = i current.append(b) else: if len(current) >= min_length: strings.append({ "offset": current_start, "data": bytes(current), "text": bytes(current).decode('ascii', errors='ignore') }) current = [] if len(strings) >= max_strings * 2: # 收集更多用于分析 break # 分析字符串 normal_strings = [] suspicious_strings = [] encrypted_candidates = [] for s in strings: entropy = calculate_entropy(s["data"]) printable_ratio = is_printable_ratio(s["data"]) s["entropy"] = round(entropy, 2) s["printable_ratio"] = round(printable_ratio, 2) # 高熵字符串(可能是加密或压缩数据) if entropy > 5.5 and len(s["data"]) > 16: s["suspicious_reason"] = "high_entropy" # 检查XOR加密 xor_key = detect_xor_pattern(s["data"]) if xor_key: s["possible_xor_key"] = xor_key s["xor_decoded"] = bytes(b ^ xor_key for b in s["data"]).decode('ascii', errors='ignore')[:50] # 检查Base64 if is_base64_like(s["data"]): s["possible_encoding"] = "base64" try: import base64 decoded = base64.b64decode(s["data"]) s["base64_decoded"] = decoded[:50].hex() except: pass encrypted_candidates.append({ "offset": hex(s["offset"]), "text": s["text"][:50], "entropy": s["entropy"], "length": len(s["data"]), "details": {k: v for k, v in s.items() if k not in ["offset", "data", "text"]} }) # 看起来像编码的字符串 elif is_base64_like(s["data"]) and len(s["data"]) > 20: suspicious_strings.append({ "offset": hex(s["offset"]), "text": s["text"][:50], "reason": "base64_like" }) else: normal_strings.append(s) # 检查解密函数特征 decryption_hints = [] # 搜索常见加密/解密相关符号 crypto_symbols = [ b"decrypt", b"Decrypt", b"DECRYPT", b"encode", b"Encode", b"decode", b"Decode", b"cipher", b"Cipher", b"xor", b"XOR", b"base64", b"Base64", b"AES", b"aes", b"RC4", b"rc4", ] for symbol in crypto_symbols: if symbol in data: pos = data.find(symbol) decryption_hints.append({ "symbol": symbol.decode('ascii'), "offset": hex(pos) }) # 检查导出函数 for func in binary.exported_functions: name = func.name if hasattr(func, 'name') else "" name_lower = name.lower() if any(kw in name_lower for kw in ['decrypt', 'encode', 'decode', 'cipher', 'xor']): decryption_hints.append({ "function": name, "address": hex(func.address) if hasattr(func, 'address') else "unknown" }) # 统计分析 all_entropies = [calculate_entropy(s["data"]) for s in strings[:100]] avg_entropy = sum(all_entropies) / len(all_entropies) if all_entropies else 0 # 判断是否有字符串加密 encryption_detected = len(encrypted_candidates) > 5 or avg_entropy > 5.0 return { "success": True, "encryption_detected": encryption_detected, "encrypted_candidates": encrypted_candidates[:20], "suspicious_strings": suspicious_strings[:10], "decryption_hints": decryption_hints[:10], "statistics": { "total_strings_analyzed": len(strings), "average_entropy": round(avg_entropy, 2), "high_entropy_count": len(encrypted_candidates), "suspicious_count": len(suspicious_strings) }, "note": "High entropy (>5.5) may indicate encryption, compression, or binary data", "error": "" } except Exception as e: import traceback return {"success": False, "error": f"{str(e)}\n{traceback.format_exc()}"} def trace_register_value(so_path: str, function_addr: int, target_register: str = "x0", size: int = 512) -> dict: """ 追踪寄存器值的来源(数据流分析) 分析函数内指定寄存器的值从哪里来,用于理解参数传递和返回值 Args: so_path: SO文件路径 function_addr: 函数虚拟地址 target_register: 目标寄存器(默认x0,即返回值/第一个参数) size: 分析字节数 Returns: dict: {"success": bool, "register": str, "sources": list, "data_flow": list} """ if not LIEF_AVAILABLE or not CAPSTONE_AVAILABLE: return {"success": False, "error": "lief and capstone required"} if not os.path.exists(so_path): return {"success": False, "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "error": "Failed to parse SO file"} # 获取段信息 text_section = None for section in binary.sections: if section.name == ".text": text_section = section break if text_section is None: return {"success": False, "error": ".text section not found"} text_file_start = text_section.file_offset text_vaddr_base = text_section.virtual_address file_offset = text_file_start + (function_addr - text_vaddr_base) code = bytes(data[file_offset:file_offset + size]) # 反汇编 md = Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN) md.detail = True instructions = list(md.disasm(code, function_addr)) # 规范化寄存器名 target_reg = target_register.lower().strip() # 处理 w0/x0 等同 reg_variants = {target_reg} if target_reg.startswith('x'): reg_variants.add('w' + target_reg[1:]) elif target_reg.startswith('w'): reg_variants.add('x' + target_reg[1:]) # 追踪数据流 data_flow = [] sources = [] # 寄存器依赖图 reg_deps = defaultdict(list) # reg -> [(addr, source_type, source)] for insn in instructions: mnemonic = insn.mnemonic.lower() ops = insn.op_str.lower() addr = insn.address # 解析操作数 parts = [p.strip() for p in ops.split(',')] # 检查是否写入目标寄存器 writes_target = False if parts: dst = parts[0] if any(v in dst for v in reg_variants): writes_target = True if not writes_target: continue flow_entry = { "address": hex(addr), "instruction": f"{insn.mnemonic} {ops}", "operation": "", "source": "" } # MOV指令 if mnemonic == "mov": if len(parts) >= 2: flow_entry["operation"] = "assign" flow_entry["source"] = parts[1] sources.append({ "address": hex(addr), "type": "register_copy", "from": parts[1] }) # MOVZ/MOVK - 立即数加载 elif mnemonic in ["movz", "movk"]: if len(parts) >= 2: flow_entry["operation"] = "load_immediate" flow_entry["source"] = ','.join(parts[1:]) sources.append({ "address": hex(addr), "type": "immediate", "value": parts[1] if len(parts) > 1 else "" }) # LDR - 内存加载 elif mnemonic == "ldr": if len(parts) >= 2: flow_entry["operation"] = "load_memory" flow_entry["source"] = parts[1] sources.append({ "address": hex(addr), "type": "memory_load", "from": parts[1] }) # ADD/SUB - 算术运算 elif mnemonic in ["add", "sub"]: if len(parts) >= 3: flow_entry["operation"] = mnemonic flow_entry["source"] = f"{parts[1]} {mnemonic} {parts[2]}" sources.append({ "address": hex(addr), "type": "arithmetic", "operation": mnemonic, "operands": parts[1:] }) # ADRP - 页地址加载 elif mnemonic == "adrp": if len(parts) >= 2: flow_entry["operation"] = "load_page_address" flow_entry["source"] = parts[1] sources.append({ "address": hex(addr), "type": "address_load", "page": parts[1] }) # 其他指令 else: flow_entry["operation"] = mnemonic flow_entry["source"] = ','.join(parts[1:]) if len(parts) > 1 else "" data_flow.append(flow_entry) # 分析返回值 return_value_info = None for i, insn in enumerate(reversed(instructions)): if insn.mnemonic.lower() == "ret": # 查找ret之前对x0/w0的最后一次赋值 for j in range(len(instructions) - 1 - i - 1, -1, -1): prev = instructions[j] if prev.mnemonic.lower() in ["mov", "movz", "ldr", "add"]: parts = prev.op_str.lower().split(',') if parts and any(v in parts[0] for v in ['x0', 'w0']): return_value_info = { "address": hex(prev.address), "instruction": f"{prev.mnemonic} {prev.op_str}", "note": "Last assignment before RET" } break break return { "success": True, "register": target_register, "register_variants": list(reg_variants), "data_flow": data_flow, "sources": sources, "source_count": len(sources), "return_value": return_value_info, "instruction_count": len(instructions), "error": "" } except Exception as e: import traceback return {"success": False, "error": f"{str(e)}\n{traceback.format_exc()}"} # ============ 指令模式搜索 ============ # 预定义的常用搜索模式 COMMON_PATTERNS = { # 系统调用 "syscall": {"mnemonic": "svc", "description": "系统调用"}, "svc": {"mnemonic": "svc", "description": "系统调用"}, # 函数调用 "call": {"mnemonic": "bl", "description": "直接函数调用"}, "bl": {"mnemonic": "bl", "description": "直接函数调用"}, "blr": {"mnemonic": "blr", "description": "间接函数调用(寄存器)"}, # 返回 "ret": {"mnemonic": "ret", "description": "函数返回"}, "return": {"mnemonic": "ret", "description": "函数返回"}, # 内存操作 "load": {"mnemonic_regex": r"^ld[rbp]?", "description": "加载指令"}, "store": {"mnemonic_regex": r"^st[rbp]?", "description": "存储指令"}, "ldr": {"mnemonic": "ldr", "description": "加载寄存器"}, "str": {"mnemonic": "str", "description": "存储寄存器"}, # 比较和分支 "compare": {"mnemonic_regex": r"^(cmp|cmn|tst)", "description": "比较指令"}, "cmp": {"mnemonic": "cmp", "description": "比较"}, "branch": {"mnemonic_regex": r"^b\.", "description": "条件分支"}, "cbz": {"mnemonic": "cbz", "description": "为零跳转"}, "cbnz": {"mnemonic": "cbnz", "description": "非零跳转"}, # 算术运算 "add": {"mnemonic": "add", "description": "加法"}, "sub": {"mnemonic": "sub", "description": "减法"}, "mul": {"mnemonic": "mul", "description": "乘法"}, # 逻辑运算 "and": {"mnemonic": "and", "description": "与操作"}, "or": {"mnemonic_regex": r"^orr?", "description": "或操作"}, "xor": {"mnemonic": "eor", "description": "异或操作"}, "eor": {"mnemonic": "eor", "description": "异或操作"}, # 移位 "shift": {"mnemonic_regex": r"^(lsl|lsr|asr|ror)", "description": "移位操作"}, # 加密相关模式 "aes": {"mnemonic_regex": r"^aes", "description": "AES加密指令"}, "sha": {"mnemonic_regex": r"^sha", "description": "SHA哈希指令"}, # SIMD/NEON "neon": {"mnemonic_regex": r"^(ld[1-4]|st[1-4]|fmla|fmul|fadd)", "description": "NEON/SIMD指令"}, } def find_instruction_pattern(so_path: str, pattern: str, operand_filter: str = "", limit: int = 100, section: str = ".text") -> dict: """ 搜索指令模式 支持的模式格式: 1. 简单指令名: "bl", "svc", "ret" 2. 预定义模式: "syscall", "call", "compare", "xor" 3. 正则表达式: "b\\..*" (条件分支), "ld[rbp].*" (加载指令) 4. 指令序列: "stp;mov;bl" (用分号分隔) 5. 带操作数过滤: pattern="bl", operand_filter="#0x" (调用特定范围) Args: so_path: SO文件路径 pattern: 搜索模式 operand_filter: 操作数过滤(可选,支持正则) limit: 最大返回数量 section: 搜索的段(默认.text) Returns: dict: { "success": bool, "matches": [{address, mnemonic, operands, context}], "count": int, "pattern_info": str } """ import re if not LIEF_AVAILABLE: return {"success": False, "error": "lief not available"} if not CAPSTONE_AVAILABLE: return {"success": False, "error": "capstone not available"} if not os.path.exists(so_path): return {"success": False, "error": f"File not found: {so_path}"} try: binary = lief.parse(so_path) if binary is None: return {"success": False, "error": "Failed to parse binary"} # 查找目标段 target_section = None for sec in binary.sections: if sec.name == section: target_section = sec break if not target_section: # 尝试使用 .text 或可执行段 for sec in binary.sections: if sec.has_characteristic(lief.ELF.Section.FLAGS.EXECINSTR): target_section = sec break if not target_section: return {"success": False, "error": f"Section {section} not found"} # 获取段数据 section_data = bytes(target_section.content) section_vaddr = target_section.virtual_address # 初始化反汇编器 md = Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN) md.detail = True # 解析搜索模式 pattern_info = "" patterns_to_match = [] is_sequence = ";" in pattern if is_sequence: # 指令序列搜索 seq_parts = [p.strip().lower() for p in pattern.split(";") if p.strip()] patterns_to_match = seq_parts pattern_info = f"Instruction sequence: {' -> '.join(seq_parts)}" elif pattern.lower() in COMMON_PATTERNS: # 预定义模式 preset = COMMON_PATTERNS[pattern.lower()] if "mnemonic" in preset: patterns_to_match = [{"type": "exact", "value": preset["mnemonic"]}] elif "mnemonic_regex" in preset: patterns_to_match = [{"type": "regex", "value": preset["mnemonic_regex"]}] pattern_info = f"Preset pattern: {preset['description']}" else: # 自定义模式(可能是正则) if any(c in pattern for c in r".*+?[](){}|^$\\"): patterns_to_match = [{"type": "regex", "value": pattern}] pattern_info = f"Regex pattern: {pattern}" else: patterns_to_match = [{"type": "exact", "value": pattern.lower()}] pattern_info = f"Exact match: {pattern}" # 编译操作数过滤正则 operand_regex = None if operand_filter: try: operand_regex = re.compile(operand_filter, re.IGNORECASE) except: operand_regex = re.compile(re.escape(operand_filter), re.IGNORECASE) # 反汇编整个段 instructions = list(md.disasm(section_data, section_vaddr)) matches = [] if is_sequence: # 序列匹配 seq_len = len(patterns_to_match) i = 0 while i < len(instructions) - seq_len + 1 and len(matches) < limit: match = True matched_insns = [] for j, pattern_item in enumerate(patterns_to_match): insn = instructions[i + j] mnem = insn.mnemonic.lower() if isinstance(pattern_item, str): # 简单字符串匹配 if pattern_item not in mnem and not re.match(pattern_item, mnem): match = False break matched_insns.append(insn) if match: # 检查操作数过滤 if operand_regex: ops_str = " ".join(ins.op_str for ins in matched_insns) if not operand_regex.search(ops_str): i += 1 continue # 获取上下文 context_before = [] context_after = [] for k in range(max(0, i-2), i): ctx_insn = instructions[k] context_before.append({ "address": hex(ctx_insn.address), "instruction": f"{ctx_insn.mnemonic} {ctx_insn.op_str}" }) for k in range(i + seq_len, min(len(instructions), i + seq_len + 2)): ctx_insn = instructions[k] context_after.append({ "address": hex(ctx_insn.address), "instruction": f"{ctx_insn.mnemonic} {ctx_insn.op_str}" }) matches.append({ "address": hex(matched_insns[0].address), "file_offset": hex(matched_insns[0].address - section_vaddr + target_section.offset), "sequence": [ { "address": hex(ins.address), "mnemonic": ins.mnemonic, "operands": ins.op_str } for ins in matched_insns ], "context_before": context_before, "context_after": context_after }) i += seq_len # 跳过匹配的序列 else: i += 1 else: # 单指令匹配 for i, insn in enumerate(instructions): if len(matches) >= limit: break mnem = insn.mnemonic.lower() matched = False for pattern_item in patterns_to_match: if isinstance(pattern_item, dict): if pattern_item["type"] == "exact": matched = mnem == pattern_item["value"] elif pattern_item["type"] == "regex": matched = bool(re.match(pattern_item["value"], mnem, re.IGNORECASE)) else: matched = mnem == pattern_item.lower() if matched: break if not matched: continue # 检查操作数过滤 if operand_regex and not operand_regex.search(insn.op_str): continue # 获取上下文 context_before = [] context_after = [] for k in range(max(0, i-3), i): ctx_insn = instructions[k] context_before.append({ "address": hex(ctx_insn.address), "instruction": f"{ctx_insn.mnemonic} {ctx_insn.op_str}" }) for k in range(i+1, min(len(instructions), i+4)): ctx_insn = instructions[k] context_after.append({ "address": hex(ctx_insn.address), "instruction": f"{ctx_insn.mnemonic} {ctx_insn.op_str}" }) # 解析特殊信息 extra_info = {} # 对于 BL 指令,解析目标地址 if mnem == "bl": try: parts = insn.op_str.split("#") if len(parts) > 1: target_addr = int(parts[1].strip(), 16) extra_info["target_address"] = hex(target_addr) # 尝试找到目标函数名 for sym in binary.exported_symbols: if sym.value == target_addr: extra_info["target_name"] = sym.name break except: pass # 对于 SVC 指令,解析系统调用号 if mnem == "svc": try: parts = insn.op_str.split("#") if len(parts) > 1: syscall_num = int(parts[1].strip(), 16) extra_info["syscall_number"] = syscall_num # 常见 ARM64 Linux 系统调用 SYSCALL_NAMES = { 0: "io_setup", 56: "openat", 57: "close", 63: "read", 64: "write", 78: "readlinkat", 93: "exit", 94: "exit_group", 172: "getpid", 220: "clone", 221: "execve", 226: "mprotect", 278: "getrandom" } if syscall_num in SYSCALL_NAMES: extra_info["syscall_name"] = SYSCALL_NAMES[syscall_num] except: pass matches.append({ "address": hex(insn.address), "file_offset": hex(insn.address - section_vaddr + target_section.offset), "mnemonic": insn.mnemonic, "operands": insn.op_str, "bytes": insn.bytes.hex(), "context_before": context_before, "context_after": context_after, **extra_info }) # 统计信息 stats = { "total_instructions": len(instructions), "section_size": len(section_data), "section_start": hex(section_vaddr) } return { "success": True, "pattern": pattern, "pattern_info": pattern_info, "operand_filter": operand_filter if operand_filter else None, "matches": matches, "count": len(matches), "truncated": len(matches) >= limit, "stats": stats, "available_presets": list(COMMON_PATTERNS.keys()), "error": "" } except Exception as e: import traceback return {"success": False, "error": f"{str(e)}\n{traceback.format_exc()}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/1600822305/so-analyzer-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server