Skip to main content
Glama

APK Security Guard MCP Suite

by il-il1
apktool_mcp_server.py30.4 kB
# /// script # requires-python = ">=3.10" # dependencies = [ "fastmcp", "logging" ] # /// import logging import subprocess import os import shutil from typing import List, Union, Dict, Optional from mcp.server.fastmcp import FastMCP # set up logging configuration logger = logging.getLogger() logger.setLevel(logging.INFO) # Console handler for logging to the console console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(console_handler) # Initialize the MCP Object mcp = FastMCP("APKTool-MCP Server") # Current workspace for decoded APK projects WORKSPACE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "apktool_mcp_server_workspace")) # Ensure workspace directory exists os.makedirs(WORKSPACE_DIR, exist_ok=True) # Helper function to run APKTool commands def run_command(command: List[str], timeout: int = 300) -> Dict[str, Union[str, int, bool]]: try: logger.info(f"Running command: {' '.join(command)}") result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True, timeout=timeout ) logger.info(f"Command completed with return code {result.returncode}") return { "success": True, "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode } except subprocess.CalledProcessError as e: logger.error(f"Command failed with return code {e.returncode}: {e.stderr}") return { "success": False, "stdout": e.stdout, "stderr": e.stderr, "returncode": e.returncode, "error": f"Command failed with return code {e.returncode}" } except subprocess.TimeoutExpired as e: logger.error(f"Command timed out after {timeout} seconds") return { "success": False, "error": f"Command timed out after {timeout} seconds" } except Exception as e: logger.error(f"Error running command: {str(e)}") return { "success": False, "error": str(e) } # MCP Tools @mcp.tool(name="decode_apk", description="Decode an APK file using APKTool") async def decode_apk(apk_path: str, force: bool = True, no_res: bool = False, no_src: bool = False) -> Dict: """ Decode an APK file using APKTool, extracting resources and smali code. Args: apk_path: Path to the APK file to decode force: Force delete destination directory if it exists no_res: Do not decode resources no_src: Do not decode sources Returns: Dictionary with operation results """ logger.info(f"Received apk_path: {apk_path}") # 打印传入的路径 if not os.path.exists(apk_path): return {"success": False, "error": f"APK file not found: {apk_path}"} # If output directory not specified, use the APK filename in workspace apk_name = os.path.basename(apk_path).rsplit('.',1)[0] output_dir = os.path.join(WORKSPACE_DIR, apk_name) command = ["apktool", "d", apk_path, "-o", output_dir,"--force"] if force: command.append("-f") if no_res: command.append("-r") if no_src: command.append("-s") result = run_command(command) if result["success"]: return { "success": True, "output_dir": output_dir } else: return result @mcp.tool(name="build_apk", description="Build an APK file from a decoded APKTool project.") async def build_apk(project_dir: str, output_apk: Optional[str] = None, debug: bool = True, force_all: bool = False) -> Dict: """ Build an APK file from a decoded APKTool project. Args: project_dir: Path to the APKTool project directory output_dir: Optional output APK path debug: Build with debugging info force_all: Force rebuild all files Returns: Dictionary with operation results """ if not os.path.exists(project_dir): return { "success": False, "error": f"Project directory not found: {project_dir}" } command = ["apktool", "b", project_dir] if debug: command.append("-d") if force_all: command.append("-f") if output_apk: command.extend(["-o", output_apk]) result = run_command(command) if result["success"]: # Determine built APK path if not specified if not output_apk: output_apk = os.path.join(project_dir, "dist", os.path.basename(project_dir) + ".apk") if os.path.exists(output_apk): result["apk_path"] = output_apk else: result["warning"] = f"Build succeeded but APK not found at expected path: {output_apk}" return result @mcp.tool(name="get_manifest", description="Get the AndroidManifest.xml content from a decoded APK project.") async def get_manifest(project_dir: str) -> Dict: """ Get the AndroidManifest.xml content from a decoded APK project. Args: project_dir: Path to the APKTool project directory Returns: Dictionary with manifest content or error """ manifest_path = os.path.join(project_dir, "AndroidManifest.xml") if not os.path.exists(manifest_path): return { "success": False, "error": f"AndroidManifest.xml not found in {project_dir}" } try: with open(manifest_path, 'r', encoding="utf-8") as f: content = f.read() return { "success": True, "manifest": content, "path": manifest_path } except Exception as e: logger.error(f"Error reading manifest: {str(e)}") return { "success": False, "error": f"Failed to read AndroidManifest.xml: {str(e)}" } import os import xml.etree.ElementTree as ET from typing import Dict @mcp.tool(name="find_leak_manifest", description="Find exported components without permission restrictions in AndroidManifest.xml") async def find_leak_manifest(project_dir: str) -> Dict: """ Find components with exported=true and no permission restrictions in AndroidManifest.xml. Args: project_dir: Path to the decoded APK project directory Returns: Dictionary with success flag, component list with manifest content, or error message """ manifest_path = os.path.join(project_dir, "AndroidManifest.xml") if not os.path.exists(manifest_path): return { "success": False, "error": f"AndroidManifest.xml not found in {project_dir}" } try: # 解析 AndroidManifest.xml tree = ET.parse(manifest_path) root = tree.getroot() # 提取命名空间(如果有) ns = {'android': 'http://schemas.android.com/apk/res/android'} # 查找所有组件 components = [] for tag in ["activity", "service", "receiver"]: for comp in root.findall(f".//{tag}", namespaces=ns): exported = comp.get(f"{{{ns['android']}}}exported") permission = comp.get(f"{{{ns['android']}}}permission") # 检查 exported 和 permission if exported == "true" and permission is None: components.append({ "type": tag, "name": comp.get(f"{{{ns['android']}}}name"), "manifest_code": ET.tostring(comp, encoding="unicode") }) if components: return { "success": True, "components": components, "path": manifest_path } else: return { "success": False, "error": "No components found with exported=true and no permission", "path": manifest_path } except Exception as e: return { "success": False, "error": f"Error processing AndroidManifest.xml: {str(e)}" } @mcp.tool(name="find_leak_components_source", description="Find exported components without permissions and get their source code path.") async def find_leak_components_source(project_dir: str, source_dirs: Optional[List[str]] = None) -> Dict: """ Find exported components (activities, services, receivers) without permissions and locate their source code. Args: project_dir: Path to the APKTool project directory source_dirs: List of directories where the source code resides (e.g., smali, java, kotlin) Returns: Dictionary with list of vulnerable components and their source code paths. """ # 默认搜索源代码目录 if source_dirs is None: source_dirs = [os.path.join(project_dir, "smali")] manifest_path = os.path.join(project_dir, "AndroidManifest.xml") if not os.path.exists(manifest_path): return {"success": False, "error": f"AndroidManifest.xml not found in {project_dir}"} from xml.etree import ElementTree as ET try: tree = ET.parse(manifest_path) root = tree.getroot() ns = {'android': 'http://schemas.android.com/apk/res/android'} vulnerable_components = [] # 检查 Activity、Service、Receiver for comp_type in ["activity", "service", "receiver"]: for comp in root.iter(comp_type): exported = comp.get(f"{{{ns['android']}}}exported") permission = comp.get(f"{{{ns['android']}}}permission") name = comp.get(f"{{{ns['android']}}}name") if exported == "true" and not permission: # 转换短类名为全类名(考虑 manifest 中可能用的相对路径) pkg_name = root.get("package") full_name = name if name.startswith(".") else f"{pkg_name}{name if name.startswith('.') else '.' + name}" if not '.' in name else name # 搜索 smali 或源代码目录 found_paths = [] for src_dir in source_dirs: for root_dir, _, files in os.walk(src_dir): for file in files: if file.endswith((".smali", ".java", ".kt")) and full_name.replace('.', os.sep) in os.path.join(root_dir, file): found_paths.append(os.path.join(root_dir, file)) vulnerable_components.append({ "type": comp_type, "name": full_name, "source_paths": found_paths }) return {"success": True, "components": vulnerable_components} except Exception as e: logger.error(f"Error parsing manifest or finding components: {str(e)}") return {"success": False, "error": f"Error: {str(e)}"} @mcp.tool(name="get_apktool_yml", description="Get apktool.yml information from a decoded APK project.") async def get_apktool_yml(project_dir: str) -> Dict: """ Get apktool.yml information from a decoded APK project. Args: project_dir: Path to APKTool project directory Returns: Dictionary with apktool.yml content or error """ yml_path = os.path.join(project_dir, "apktool.yml") if not os.path.exists(yml_path): return { "success": False, "error": f"apktool.yml not found in {project_dir}" } try: with open(yml_path, 'r', encoding="utf-8") as f: content = f.read() return { "success": True, "content": content, "path": yml_path } except Exception as e: logger.error(f"Error reading apktool.yml: {str(e)}") return { "success": False, "error": f"Failed to read apktool.yml: {str(e)}" } @mcp.tool(name="list_smali_directories", description="List all smali directories in a project") async def list_smali_directories(project_dir: str) -> Dict: """ List all smali directories in a project. Args: project_dir: Path to the APKTool project directory Returns: Dictionary with list of smali directories """ if not os.path.exists(project_dir): return { "success": False, "error": f"Project directory not found: {project_dir}" } try: smali_dirs = [d for d in os.listdir(project_dir) if d.startswith("smali") and os.path.isdir(os.path.join(project_dir, d))] return { "success": True, "smali_dirs": smali_dirs, "count": len(smali_dirs) } except Exception as e: logger.error(f"Error listing smali directories: {str(e)}") return { "success": False, "error": f"Failed to list smali directories: {str(e)}" } @mcp.tool(name="list_smali_files", description="List smali files in a specific smali directory, optionally filtered by package prefix.") async def list_smali_files(project_dir: str, smali_dir: str = "smali", package_prefix: Optional[str] = None) -> Dict: """ List smali files in a specific smali directory, optionally filtered by package prefix. Args: project_dir: Path to the APKTool project directory smali_dir: Which smali directory to use (smali, smali_classes2, etc.) package_prefix: Optional package prefix to filter by (e.g., "com.example") Returns: Dictionary with list of smali files """ smali_path = os.path.join(project_dir, smali_dir) if not os.path.exists(smali_path): smali_dirs = [d for d in os.listdir(project_dir) if d.startswith("smali") and os.path.isdir(os.path.join(project_dir, d))] return { "success": False, "error": f"Smali directory not found: {smali_path}", "available_dirs": smali_dirs } try: smali_files = [] package_path = None if package_prefix: # If package prefix is given, convert it to directory path package_path = os.path.join(smali_path, package_prefix.replace('.', os.path.sep)) if not os.path.exists(package_path): return { "success": False, "error": f"Package not found: {package_prefix}", "expected_path": package_path } root_dir = package_path else: root_dir = smali_path # Recursively find all .smali files for root, _, files in os.walk(root_dir): for file in files: if file.endswith(".smali"): file_path = os.path.join(root, file) rel_path = os.path.relpath(file_path, smali_path) class_name = rel_path.replace(os.path.sep, '.').replace('.smali', '') smali_files.append({ "class_name": class_name, "file_path": file_path, "rel_path": rel_path }) # Sort by class name smali_files.sort(key=lambda x: x["class_name"]) return { "success": True, "smali_files": smali_files, "count": len(smali_files), "smali_dir": smali_dir, "package_prefix": package_prefix } except Exception as e: logger.error(F"Error listing smali files: {str(e)}") return { "success": False, "error": f"Failed to list smali files: {str(e)}" } @mcp.tool(name="get_smali_file", description="Get content of a specific smali file by class name.") async def get_smali_file(project_dir: str, class_name: str) -> Dict: """ Get content of a specific smali file by class name. Args: project_dir: Path to the APKTool project directory class_name: Full class name (e.g., com.example.MyClass) Returns: Dictionary with smali file content. """ if not os.path.exists(project_dir): return { "success": False, "error": f"Project directory not found: {project_dir}" } try: # Look for the class in all smali directories smali_dirs = [d for d in os.listdir(project_dir) if d.startswith("smali") and os.path.isdir(os.path.join(project_dir, d))] for smali_dir in smali_dirs: file_path = os.path.join( project_dir, smali_dir, class_name.replace('.', os.path.sep) + '.smali' ) if os.path.exists(file_path): with open(file_path, 'r', encoding="utf-8") as f: content = f.read() return { "success": True, "content": content, "file_path": file_path, "smali_dir": smali_dir } return { "success": False, "error": f"Smali file not found for class: {class_name}", "searched_dirs": smali_dirs } except Exception as e: logger.error(f"Error getting smali file: {str(e)}") return { "success": False, "error": f"Failed to get smali file: {str(e)}" } @mcp.tool(name="modify_smali_file", description="Modify the content of a specific smali file.") async def modify_smali_file(project_dir: str, class_name: str, new_content: str, create_backup: bool = True) -> Dict: """ Modify the content of a specific smali file. Args: project_dir: Path to the APKTool project directory class_name: Full class name (e.g., com.example.MyClass) new_content: New content for the smali file create_backup: Whether to create a backup of the original file Returns: Dictionary with operation results. """ if not os.path.exists(project_dir): return { "success": False, "error": f"Project directory not found: {project_dir}" } try: # Look for the class in all smali directories smali_dirs = [d for d in os.listdir(project_dir) if d.startswith("smali") and os.path.isdir(os.path.join(project_dir, d))] file_path = None for smali_dir in smali_dirs: test_path = os.path.join( project_dir, smali_dir, class_name.replace('.', os.path.sep) + '.smali' ) if os.path.exists(test_path): file_path = test_path break if not file_path: return { "success": False, "error": f"Smali file not found for class: {class_name}", "searched_dirs": smali_dirs } # Create backup if requested backup_path = None if create_backup: backup_path = file_path + ".bak" shutil.copy2(file_path, backup_path) # Write new content with open(file_path, 'w', encoding='utf-8') as f: f.write(new_content) return { "success": True, "message": f"Successfully modified {file_path}", "file_path": file_path, "backup_path": backup_path } except Exception as e: logger.error(f"Error modifying smali file: {str(e)}") return { "success": False, "error": f"Failed to modify smali file: {str(e)}" } @mcp.tool(name="list_resources", description="List resources in a project, optionally filtered by resource type.") async def list_resources(project_dir: str, resource_type: Optional[str] = None) -> Dict: """ List resources in a project, optionally filtered by resource type. Args: project_dir: Path to the APKTool project directory resource_type: Optional resource type to the filter by (e.g., "layout", "drawable") Returns: Dictionary with list of resources """ res_path = os.path.join(project_dir, "res") if not os.path.exists(res_path): return { "success": False, "error": f"Resources directory not found: {res_path}" } try: if resource_type: # List resources of specific type type_path = os.path.join(res_path, resource_type) if not os.path.exists(type_path): resource_types = [ d for d in os.listdir(res_path) if os.path.isdir(os.path.join(res_path, d)) ] return { "success": False, "error": f"Resource type directory not found: {resource_type}", "available_types": resource_types } resources = [] for item in os.listdir(type_path): item_path = os.path.join(type_path, item) if os.path.isfile(item_path): resources.append({ "name": item, "path": item_path, "size": os.path.getsize(item_path) }) return { "success": True, "resource_type": resource_type, "resources": resources, "count": len(resources) } else: # List all resource types resource_types = [] for item in os.listdir(res_path): type_path = os.path.join(res_path, item) if os.path.isdir(type_path): resource_count = len([ f for f in os.listdir(type_path) if os.path.isfile(os.path.join(type_path, f)) ]) resource_types.append({ "type": item, "path": type_path, "count": resource_count }) return { "success": True, "resource_types": resource_types, "count": len(resource_types) } except Exception as e: logger.error(f"Error listing resources: {str(e)}") return { "success": False, "error": f"Failed to list resources: {str(e)}" } @mcp.tool(name="get_resource_file", description="Get content of a specific resource file.") async def get_resource_file(project_dir: str, resource_type: str, resource_name: str) -> Dict: """ Get content of a specific resource file. Args: project_dir: Path to the APKTool project directory resource_type: Resource type (e.g., "layout", "drawable") resource_name: Name of the resource file Returns: Dictionary with resource file content """ resource_path = os.path.join(project_dir, "res", resource_type, resource_name) if not os.path.exists(resource_path): return { "success": False, "error": f"Resource file not found: {resource_path}" } try: with open(resource_path, 'r', encoding="utf-8") as f: content = f.read() return { "success": True, "content": content, "path": resource_path, "size": os.path.getsize(resource_path) } except UnicodeDecodeError: # This might be a binary resource return { "success": False, "error": "This appears to be a binary resource file and cannot be read as text", "path": resource_path, "size": os.path.getsize(resource_path), "is_binary": True } except Exception as e: logger.error(f"Error getting resource file: {str(e)}") return { "success": False, "error": f"Failed to get resource file: {str(e)}" } @mcp.tool(name="modify_resource_file", description="Modify the content of a specific resource file.") async def modify_resource_file(project_dir: str, resource_type: str, resource_name: str, new_content: str, create_backup: bool = True) -> Dict: """ Modify the content of a specific resource file. Args: project_dir: Path to the APKTool project directory resource_type: Resource type (e.g., "layout", "values") resource_name: Name of the resource file new_content: New content for the resource file create_backup: Whether to create a backup of the original file Returns: Dictionary with operation results. """ resource_path = os.path.join(project_dir, "res", resource_type, resource_name) if not os.path.exists(resource_path): return { "success": False, "error": f"Resource file not found: {resource_path}" } try: # create backup if requested backup_path = None if create_backup: backup_path = resource_path + ".bak" shutil.copy2(resource_path, backup_path) # write new content with open(resource_path, 'w', encoding="utf-8") as f: f.write(new_content) return { "success": True, "message": f"Successfully modified {resource_path}", "path": resource_path, "backup_path": backup_path } except Exception as e: logger.error(f"Error modifying resource file: {str(e)}") return { "success": False, "error": f"Failed to modify resource file: {str(e)}" } @mcp.tool(name="search_in_files", description="Search for a pattern in files specified extensions.") async def search_in_files(project_dir: str, search_pattern: str, file_extensions: List[str] = [".smali", ".xml"], max_results: int = 100) -> Dict: """ Search for a pattern in files with specified extensions. Args: project_dir: Path to the APKTool project directory search_pattern: Text pattern to search for file_extensions: List of file extensions to search in max_results: Maximum number of results to return Returns: Dictionary with search results """ if not os.path.exists(project_dir): return { "success": False, "error": f"Project directory not found: {project_dir}" } try: results = [] for root, _, files in os.walk(project_dir): for file in files: if len(results) >= max_results: break if any(file.endswith(ext) for ext in file_extensions): file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding="utf-8") as f: content = f.read() if search_pattern in content: rel_path = os.path.relpath(file_path, project_dir) results.append({ "file": rel_path, "path": file_path }) except UnicodeDecodeError: # Skip binary files pass except Exception as e: logger.error(f"Error reading file {file_path}: {str(e)}") return { "success": True, "results": results, "count": len(results), "max_reached": len(results) >= max_results, "search_pattern": search_pattern, "file_extensions": file_extensions } except Exception as e: logger.error(f"Error searching in files: {str(e)}") return { "success": False, "error": f"Failed to search in files: {str(e)}" } @mcp.tool(name="clean_project", description="Clean a project directory to prepare for rebuilding.") async def clean_project(project_dir: str, backup: bool = True) -> Dict: """ Clean a project directory to prepare for rebuilding. Args: project_dir: Path to the APKTool project directory backup: Whether to create a backup of build directories before cleaning Returns: Dictionary with operation results """ import time if not os.path.exists(project_dir): return { "success": False, "error": f"Project directory not found: {project_dir}" } try: dirs_to_clean = ["build", "dist"] cleaned = [] backed_up = [] for dir_name in dirs_to_clean: dir_path = os.path.join(project_dir, dir_name) if os.path.exists(dir_path): if backup: # Create backup backup_path = f"{dir_path}_backup_{int(time.time())}" shutil.copytree(dir_path, backup_path) backed_up.append({ "original": dir_path, "backup": backup_path }) # Remove directory shutil.rmtree(dir_path) cleaned.append(dir_path) return { "success": True, "cleaned_directories": cleaned, "backed_up_directories": backed_up } except Exception as e: logger.error(f"Error cleaning project: {str(e)}") return { "success": False, "error": f"Failed to clean project: {str(e)}" } if __name__ == "__main__": mcp.run(transport="stdio")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/il-il1/APK-Security-Guard-MCP-Suite'

If you have feedback or need assistance with the MCP directory API, please join our Discord server