Skip to main content
Glama
flutter_utils.py29 kB
"""Flutter专用工具""" import os import re import zipfile import tempfile import shutil import struct from pathlib import Path from typing import Optional, List, Dict, Tuple from .flutter_utils_v2 import find_ssl_verify_function_v2, vaddr_to_file_offset try: import lief LIEF_AVAILABLE = True except ImportError: LIEF_AVAILABLE = False def find_ssl_verify_offset(so_path: str) -> dict: """ 使用正确的方法定位Flutter SSL验证函数 策略: 1. 找到JNI_OnLoad导出函数地址 2. 搜索"ssl_client"和"ssl_server"字符串 3. 通过字符串交叉引用找到ssl_crypto_x509_session_verify_cert_chain函数 4. 计算与JNI_OnLoad的偏移 Args: so_path: libflutter.so文件路径 Returns: dict: {"success": bool, "offset": int, "jni_onload": int, "ssl_verify": int, "error": str} """ if not LIEF_AVAILABLE: return {"success": False, "error": "lief not available"} if not os.path.exists(so_path): return {"success": False, "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() binary = lief.parse(so_path) if binary is None: return {"success": False, "error": "Failed to parse SO file"} result = { "success": False, "jni_onload_address": None, "ssl_verify_address": None, "offset_from_jni_onload": None, "ssl_client_offset": None, "ssl_server_offset": None, "frida_script_hint": None, "error": "" } # 1. 找JNI_OnLoad地址 jni_onload_addr = None for func in binary.exported_functions: name = func.name if hasattr(func, 'name') else "" if "JNI_OnLoad" in name: jni_onload_addr = func.address result["jni_onload_address"] = hex(jni_onload_addr) break if jni_onload_addr is None: result["error"] = "JNI_OnLoad not found in exports" return result # 2. 搜索关键字符串 ssl_client_offset = data.find(b"ssl_client\x00") ssl_server_offset = data.find(b"ssl_server\x00") result["ssl_client_offset"] = hex(ssl_client_offset) if ssl_client_offset != -1 else None result["ssl_server_offset"] = hex(ssl_server_offset) if ssl_server_offset != -1 else None if ssl_client_offset == -1 or ssl_server_offset == -1: result["error"] = "ssl_client or ssl_server string not found" return result # 3. 获取段信息用于地址计算 text_section = None rodata_section = None for section in binary.sections: if section.name == ".text": text_section = section elif section.name == ".rodata": rodata_section = section if text_section is None: result["error"] = ".text section not found" return result # 4. 计算字符串的虚拟地址 # 需要找到字符串所在段并计算虚拟地址 ssl_client_vaddr = ssl_client_offset ssl_server_vaddr = ssl_server_offset if rodata_section: # 检查字符串是否在.rodata段 if rodata_section.file_offset <= ssl_client_offset < rodata_section.file_offset + rodata_section.size: ssl_client_vaddr = rodata_section.virtual_address + (ssl_client_offset - rodata_section.file_offset) if rodata_section.file_offset <= ssl_server_offset < rodata_section.file_offset + rodata_section.size: ssl_server_vaddr = rodata_section.virtual_address + (ssl_server_offset - rodata_section.file_offset) result["ssl_client_vaddr"] = hex(ssl_client_vaddr) result["ssl_server_vaddr"] = hex(ssl_server_vaddr) # 5. 在.text段搜索同时引用这两个字符串的函数 text_start = text_section.file_offset text_end = text_section.file_offset + text_section.size text_vaddr = text_section.virtual_address # 搜索引用ssl_client的代码位置 ssl_client_refs = find_string_references(data, text_start, text_end, text_vaddr, ssl_client_vaddr) ssl_server_refs = find_string_references(data, text_start, text_end, text_vaddr, ssl_server_vaddr) result["ssl_client_refs_count"] = len(ssl_client_refs) result["ssl_server_refs_count"] = len(ssl_server_refs) # 6. 收集所有候选函数 candidates = [] seen_funcs = set() # 收集所有ssl_client引用的函数 for ref in ssl_client_refs: ref_file_offset = ref - text_vaddr + text_start func_start = find_function_start(data, text_start, ref_file_offset) if func_start and func_start not in seen_funcs: seen_funcs.add(func_start) func_vaddr = text_vaddr + (func_start - text_start) # 检查这个函数是否也引用了ssl_server refs_ssl_server = any(abs(ref - srv) < 0x1000 for srv in ssl_server_refs) candidates.append({ "address": hex(func_vaddr), "file_offset": hex(func_start), "refs_both": refs_ssl_server, "client_ref_at": hex(ref), "offset_from_jni": hex(func_vaddr - jni_onload_addr) }) # 收集所有ssl_server引用的函数 for ref in ssl_server_refs: ref_file_offset = ref - text_vaddr + text_start func_start = find_function_start(data, text_start, ref_file_offset) if func_start and func_start not in seen_funcs: seen_funcs.add(func_start) func_vaddr = text_vaddr + (func_start - text_start) refs_ssl_client = any(abs(ref - cli) < 0x1000 for cli in ssl_client_refs) candidates.append({ "address": hex(func_vaddr), "file_offset": hex(func_start), "refs_both": refs_ssl_client, "server_ref_at": hex(ref), "offset_from_jni": hex(func_vaddr - jni_onload_addr) }) # 按地址排序 candidates.sort(key=lambda x: int(x["address"], 16)) # 分析每个候选函数的特征 for cand in candidates: func_file_offset = int(cand["file_offset"], 16) func_size = estimate_function_size(data, func_file_offset, text_end) cand["estimated_size"] = func_size # 计算置信度 confidence = 0.5 if cand["refs_both"]: confidence += 0.3 # 同时引用两个字符串 # 函数大小评分 - SSL验证函数通常在300-800字节 if 300 <= func_size <= 800: confidence += 0.25 # 最佳大小范围 elif 200 < func_size < 1200: confidence += 0.15 # 可接受大小 elif func_size > 1200: confidence -= 0.1 # 函数较大,可能是封装函数 elif func_size < 200: confidence -= 0.1 # 函数太小 cand["confidence"] = round(confidence, 2) # 按置信度排序,置信度相同时优先选择大小更接近500的 ideal_size = 500 candidates.sort(key=lambda x: (-x["confidence"], abs(x["estimated_size"] - ideal_size))) result["candidates"] = candidates result["candidates_count"] = len(candidates) # 选择最佳候选 if candidates: best = candidates[0] result["recommended"] = best["address"] result["recommended_offset"] = best["offset_from_jni"] result["recommended_confidence"] = best["confidence"] # 生成Frida脚本 offset_hex = best["offset_from_jni"] result["frida_script"] = f"""// Frida SSL Bypass 脚本 // 推荐地址: {best["address"]} (置信度: {best["confidence"]}) function hook_ssl_verify() {{ var m = Process.findModuleByName("libflutter.so"); var jni_onload = m.enumerateExports().find(e => e.name === "JNI_OnLoad").address; // 推荐的偏移 var ssl_verify = ptr(jni_onload).add({offset_hex}); console.log("Hooking SSL verify at: " + ssl_verify); Interceptor.attach(ssl_verify, {{ onEnter: function(args) {{ console.log("SSL verify called"); }}, onLeave: function(retval) {{ console.log("Original return: " + retval); retval.replace(0x1); console.log("Bypassed! Return: 1"); }} }}); }} // 如果推荐地址不对,尝试其他候选: // {chr(10).join([f"// - {c['address']} (置信度: {c['confidence']}, 偏移: {c['offset_from_jni']})" for c in candidates[:5]])} setTimeout(hook_ssl_verify, 1000); """ else: result["error"] = "No candidate functions found" return result result["success"] = True return result except Exception as e: import traceback return {"success": False, "error": f"{str(e)}\n{traceback.format_exc()}"} def find_string_references(data: bytes, text_start: int, text_end: int, text_vaddr: int, string_vaddr: int) -> list: """ 在.text段中搜索引用指定虚拟地址的代码 搜索ADRP + ADD/LDR模式 """ refs = [] target_page = string_vaddr & ~0xFFF target_offset = string_vaddr & 0xFFF for file_offset in range(text_start, min(text_end, len(data) - 8), 4): insn = struct.unpack('<I', data[file_offset:file_offset+4])[0] # 检查ADRP指令 if (insn & 0x9F000000) == 0x90000000: # 提取立即数 immlo = (insn >> 29) & 0x3 immhi = (insn >> 5) & 0x7FFFF imm = (immhi << 2) | immlo # 符号扩展 if imm & 0x100000: imm = imm - 0x200000 # 计算目标页 current_vaddr = text_vaddr + (file_offset - text_start) pc_page = current_vaddr & ~0xFFF adrp_target = (pc_page + (imm << 12)) & 0xFFFFFFFFFFFFFFFF # 检查是否指向目标页 if adrp_target == target_page: # 检查下一条ADD指令 if file_offset + 4 < len(data): next_insn = struct.unpack('<I', data[file_offset+4:file_offset+8])[0] # ADD immediate (32位或64位) if (next_insn & 0x7F800000) == 0x11000000 or (next_insn & 0xFF800000) == 0x91000000: add_imm = (next_insn >> 10) & 0xFFF if add_imm == target_offset: refs.append(current_vaddr) return refs def find_function_start(data: bytes, text_start: int, ref_offset: int) -> Optional[int]: """ 从引用位置向前搜索函数开头 ARM64函数开头特征: - STP X29, X30, [SP, #-N]! (常见) - PACIBSP (PAC指令) - SUB SP, SP, #N """ # 向前搜索最多4KB search_start = max(text_start, ref_offset - 4096) # 从ref_offset向前搜索 for offset in range(ref_offset, search_start, -4): if offset < 4: break insn = struct.unpack('<I', data[offset:offset+4])[0] # PACIBSP: 0xD503237F if insn == 0xD503237F: return offset # STP X29, X30, [SP, #imm]! - 函数入口常见模式 # 编码: 101_0100_11_0_xxxxxxx_11110_11101_11111 # 简化检查: (insn & 0xFFE003FF) == 0xA98003E0 表示 STP X29, X30 if (insn & 0xFFC003FF) == 0xA98003E0: return offset # SUB SP, SP, #imm (64位) # 编码: 1101_0001_00_xxxxxxxxxxxx_11111_11111 if (insn & 0xFF0003FF) == 0xD10003FF: return offset return None def estimate_function_size(data: bytes, func_start: int, text_end: int) -> int: """ 估算函数大小 通过查找RET指令或下一个函数开头来估算 """ max_search = min(func_start + 0x10000, text_end) # 最多搜索64KB for offset in range(func_start + 4, max_search, 4): if offset + 4 > len(data): break insn = struct.unpack('<I', data[offset:offset+4])[0] # RET指令: 0xD65F03C0 if insn == 0xD65F03C0: return offset - func_start + 4 # 遇到下一个函数开头 # PACIBSP if insn == 0xD503237F: return offset - func_start # STP X29, X30 if (insn & 0xFFC003FF) == 0xA98003E0: # 检查前一条是否是RET或NOP if offset >= 4: prev_insn = struct.unpack('<I', data[offset-4:offset])[0] if prev_insn == 0xD65F03C0 or prev_insn == 0xD503201F: return offset - func_start return max_search - func_start # Flutter SSL验证函数的特征(不同版本可能不同) SSL_VERIFY_PATTERNS = [ # Flutter 3.x b"ssl_crypto_x509_session_verify_cert_chain", b"ssl_server_handshake", b"x509_verify_cert", # 通用SSL相关 b"SSL_CTX_set_verify", b"SSL_set_verify", ] # Flutter版本特征 FLUTTER_VERSION_PATTERNS = [ rb"Flutter\s+(\d+\.\d+\.\d+)", rb"flutter/(\d+\.\d+\.\d+)", rb"engine/(\d+\.\d+\.\d+)", ] def get_flutter_version(so_path: str) -> dict: """ 获取Flutter版本和信息 Args: so_path: libflutter.so路径 Returns: dict: {"success": bool, "info": dict, "error": str} """ if not os.path.exists(so_path): return {"success": False, "info": {}, "error": f"File not found: {so_path}"} try: file_size = os.path.getsize(so_path) with open(so_path, 'rb') as f: data = f.read() info = { "file_size": file_size, "file_size_mb": round(file_size / 1024 / 1024, 2), } # 通过文件大小估计版本范围 if file_size > 20 * 1024 * 1024: info["estimated_version"] = "3.x (large)" elif file_size > 10 * 1024 * 1024: info["estimated_version"] = "2.x-3.x" else: info["estimated_version"] = "2.x or earlier" # 检测架构 if LIEF_AVAILABLE: binary = lief.parse(so_path) if binary and hasattr(binary, 'header'): arch = binary.header.machine_type.name if hasattr(binary.header.machine_type, 'name') else "" info["architecture"] = arch info["is_64bit"] = "AARCH64" in arch or "64" in arch # 搜索Flutter相关字符串 flutter_strings = [] dart_strings = [] # 搜索关键标识 if b"flutter" in data.lower(): info["has_flutter_marker"] = True if b"dart" in data.lower(): info["has_dart_marker"] = True # 搜索SSL相关函数(重要!) ssl_indicators = [ b"ssl_crypto_x509", b"SSL_CTX", b"X509_verify", b"ssl_verify", b"certificate" ] ssl_found = [] for indicator in ssl_indicators: if indicator in data: ssl_found.append(indicator.decode('utf-8', errors='ignore')) info["ssl_indicators_found"] = ssl_found info["ssl_patchable"] = len(ssl_found) > 0 # 尝试提取版本字符串 version_patterns = [ rb'Flutter\s*(\d+\.\d+)', rb'flutter_(\d+\.\d+)', rb'engine.version[^\d]*(\d+\.\d+)', ] for pattern in version_patterns: match = re.search(pattern, data, re.IGNORECASE) if match: info["version_hint"] = match.group(1).decode('utf-8', errors='ignore') break return { "success": True, "info": info, "error": "" } except Exception as e: return {"success": False, "info": {}, "error": str(e)} def find_ssl_verify_function(so_path: str) -> dict: """ 查找SSL验证相关内容(函数、字符串、字节模式) Args: so_path: SO文件路径 Returns: dict: {"success": bool, "findings": dict, "error": str} """ if not os.path.exists(so_path): return {"success": False, "findings": {}, "error": f"File not found: {so_path}"} try: with open(so_path, 'rb') as f: data = f.read() findings = { "exported_functions": [], "imported_functions": [], "string_references": [], "byte_patterns": [], "patchable": False } # 1. 搜索导出/导入函数(如果lief可用) if LIEF_AVAILABLE: binary = lief.parse(so_path) if binary: ssl_keywords = ['ssl', 'verify', 'cert', 'x509', 'tls', 'handshake'] for func in binary.exported_functions: name = func.name if hasattr(func, 'name') else str(func) if any(kw in name.lower() for kw in ssl_keywords): findings["exported_functions"].append({ "name": name, "address": hex(func.address) if hasattr(func, 'address') else "0x0" }) for func in binary.imported_functions: name = func.name if hasattr(func, 'name') else str(func) if any(kw in name.lower() for kw in ssl_keywords): findings["imported_functions"].append({ "name": name, "library": func.library.name if hasattr(func, 'library') and func.library else "unknown" }) # 2. 搜索SSL相关字符串 ssl_strings = [ b"ssl_crypto_x509_session_verify_cert_chain", b"ssl_verify_cert_chain", b"x509_session_verify", b"CERTIFICATE_VERIFY_FAILED", b"certificate verify failed", b"SSL_CTX_set_verify", b"X509_verify_cert", b"handshake_failure", b"bad_certificate", b"certificate_unknown", b"ssl_x509", b"BoringSSL", ] for s in ssl_strings: pos = data.find(s) if pos != -1: findings["string_references"].append({ "string": s.decode('utf-8', errors='ignore'), "offset": hex(pos), "context": data[max(0, pos-20):pos+len(s)+20].hex() }) # 3. 搜索特定字节模式(Flutter SSL验证相关) # 这些是已知的Flutter SSL验证函数的特征模式 byte_patterns = [ # ARM64: 常见的函数开头模式 (b"\x7f\x23\x03\xd5", "ARM64 PACIBSP (function prologue)"), # 返回0的模式(patch目标) (b"\x00\x00\x80\xd2\xc0\x03\x5f\xd6", "ARM64 return 0"), # 返回1的模式 (b"\x20\x00\x80\xd2\xc0\x03\x5f\xd6", "ARM64 return 1"), ] for pattern, desc in byte_patterns: count = data.count(pattern) if count > 0: # 找到前几个位置 positions = [] start = 0 for _ in range(min(count, 5)): pos = data.find(pattern, start) if pos == -1: break positions.append(hex(pos)) start = pos + 1 findings["byte_patterns"].append({ "pattern": pattern.hex(), "description": desc, "count": count, "first_positions": positions }) # 4. 判断是否可以patch # 如果找到SSL相关字符串,通常是可以patch的 findings["patchable"] = len(findings["string_references"]) > 0 # 5. 提供patch建议 if findings["patchable"]: findings["patch_suggestion"] = { "method": "reFlutter style", "description": "使用flutter_patch_apk工具自动patch,或手动搜索ssl_verify相关函数并修改返回值", "note": "需要找到验证函数并让其返回成功(0)" } return { "success": True, "findings": findings, "summary": { "exported_ssl_funcs": len(findings["exported_functions"]), "imported_ssl_funcs": len(findings["imported_functions"]), "ssl_strings_found": len(findings["string_references"]), "patchable": findings["patchable"] }, "error": "" } except Exception as e: return {"success": False, "findings": {}, "error": str(e)} def patch_ssl_verify( so_path: str, output_path: Optional[str] = None ) -> dict: """ Patch SSL验证函数(使其始终返回成功) 使用 v2 算法定位函数,然后修改指令直接返回 1 Args: so_path: SO文件路径 output_path: 输出路径(可选) Returns: dict: {"success": bool, "output_path": str, "patches": list, "error": str} """ if not LIEF_AVAILABLE: return {"success": False, "output_path": "", "patches": [], "error": "lief not available"} if not os.path.exists(so_path): return {"success": False, "output_path": "", "patches": [], "error": f"File not found: {so_path}"} try: # 1. 使用 v2 算法查找函数 # print(f"Finding SSL verify function in {so_path}...") find_result = find_ssl_verify_function_v2(so_path) if not find_result["success"]: return { "success": False, "output_path": "", "patches": [], "error": f"Failed to find target function: {find_result.get('error')}" } func_vaddr = int(find_result["function_address"], 16) func_file_offset = int(find_result["function_file_offset"], 16) # print(f"Target function found at vaddr: 0x{func_vaddr:x} (offset: 0x{func_file_offset:x})") # 2. 准备 Patch 数据 # ARM64: MOV W0, #1; RET # 20 00 80 52 MOV W0, #1 # C0 03 5F D6 RET patch_code = bytes.fromhex("20008052C0035FD6") # 3. 读取并修改文件 with open(so_path, 'rb') as f: data = bytearray(f.read()) # 检查偏移是否有效 if func_file_offset + len(patch_code) > len(data): return {"success": False, "output_path": "", "patches": [], "error": "Offset out of bounds"} # 应用 Patch original_bytes = data[func_file_offset:func_file_offset+len(patch_code)] data[func_file_offset:func_file_offset+len(patch_code)] = patch_code patches = [{ "vaddr": hex(func_vaddr), "offset": hex(func_file_offset), "original": original_bytes.hex(), "patched": patch_code.hex(), "description": "MOV W0, #1; RET" }] # 4. 保存文件 if not output_path: base, ext = os.path.splitext(so_path) output_path = f"{base}_patched{ext}" with open(output_path, 'wb') as f: f.write(data) return { "success": True, "output_path": output_path, "patches": patches, "error": "" } except Exception as e: import traceback return {"success": False, "output_path": "", "patches": [], "error": f"{str(e)}\n{traceback.format_exc()}"} def flutter_patch_apk( apk_path: str, output_path: Optional[str] = None, arch: str = "arm64-v8a" ) -> dict: """ 自动patch Flutter APK的SSL验证 流程: 1. 提取libflutter.so 2. Patch SSL验证 3. 替换回APK 4. 重新签名 Args: apk_path: APK文件路径 output_path: 输出路径(可选) arch: 架构(默认arm64-v8a) Returns: dict: {"success": bool, "output_path": str, "error": str} """ if not os.path.exists(apk_path): return {"success": False, "output_path": "", "error": f"APK not found: {apk_path}"} temp_dir = tempfile.mkdtemp(prefix="flutter_patch_") try: # 1. 检测是否是Flutter应用 from .so_utils import detect_flutter flutter_info = detect_flutter(apk_path) if not flutter_info.get("is_flutter"): shutil.rmtree(temp_dir, ignore_errors=True) return { "success": False, "output_path": "", "error": "Not a Flutter application" } # 2. 提取libflutter.so lib_path_in_apk = f"lib/{arch}/libflutter.so" extracted_so = os.path.join(temp_dir, "libflutter.so") with zipfile.ZipFile(apk_path, 'r') as zf: if lib_path_in_apk not in zf.namelist(): # 尝试其他架构 available_archs = flutter_info.get("architectures", []) if available_archs: arch = available_archs[0] lib_path_in_apk = f"lib/{arch}/libflutter.so" if lib_path_in_apk not in zf.namelist(): shutil.rmtree(temp_dir, ignore_errors=True) return { "success": False, "output_path": "", "error": f"libflutter.so not found for arch: {arch}" } with zf.open(lib_path_in_apk) as src, open(extracted_so, 'wb') as dst: dst.write(src.read()) # 3. Patch SSL验证 patched_so = os.path.join(temp_dir, "libflutter_patched.so") patch_result = patch_ssl_verify(extracted_so, patched_so) if not patch_result["success"]: shutil.rmtree(temp_dir, ignore_errors=True) return { "success": False, "output_path": "", "error": f"Patch failed: {patch_result['error']}" } # 4. 创建新APK if output_path is None: base, ext = os.path.splitext(apk_path) output_path = f"{base}_ssl_patched{ext}" # 复制原APK并替换libflutter.so shutil.copy2(apk_path, output_path) with zipfile.ZipFile(output_path, 'a') as zf: # 删除原来的 # 注意:zipfile不支持直接删除,需要重新创建 pass # 使用更可靠的方式:解压-替换-重新打包 extract_dir = os.path.join(temp_dir, "apk_extracted") with zipfile.ZipFile(apk_path, 'r') as zf: zf.extractall(extract_dir) # 替换libflutter.so target_so = os.path.join(extract_dir, "lib", arch, "libflutter.so") if os.path.exists(target_so): shutil.copy2(patched_so, target_so) # 重新打包 with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf: for root, dirs, files in os.walk(extract_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, extract_dir) zf.write(file_path, arcname) # 清理 shutil.rmtree(temp_dir, ignore_errors=True) return { "success": True, "output_path": output_path, "architecture": arch, "patches_applied": len(patch_result.get("patches", [])), "note": "APK needs to be re-signed before installation", "error": "" } except Exception as e: shutil.rmtree(temp_dir, ignore_errors=True) return {"success": False, "output_path": "", "error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/1600822305/so-analyzer-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server