Skip to main content
Glama

Apktool MCP Server

apktool_mcp_server.py64 kB
# /// script # requires-python = ">=3.10" # dependencies = [ "fastmcp", "logging", "argparse" ] # /// """ Copyright (c) 2025 apktool mcp server developer(s) (https://github.com/zinja-coder/apktool-mcp-server) See the file 'LICENSE' for copying permission """ import logging import subprocess import os import shutil import argparse import json import time import xml.etree.ElementTree as ET from typing import List, Union, Dict, Optional, Callable, Any from fastmcp import FastMCP # Set up logging configuration logger = logging.getLogger() logger.setLevel(logging.INFO) # Console handler for logging to the console console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(console_handler) # Parse arguments parser = argparse.ArgumentParser("APKTool MCP Server") parser.add_argument("--http", help="Serve MCP Server over HTTP stream.", action="store_true", default=False) parser.add_argument("--port", help="Specify the port number for --http to serve on. (default:8652)", default=8652, type=int) parser.add_argument("--workspace", help="Specify workspace directory for APK projects", default="apktool_mcp_server_workspace", type=str) parser.add_argument("--timeout", help="Default timeout for APKTool commands in seconds", default=300, type=int) args = parser.parse_args() # Initialize the MCP server mcp = FastMCP("APKTool-MCP Server with Advanced Features") # Current workspace for decoded APK projects WORKSPACE_DIR = os.environ.get("APKTOOL_WORKSPACE", args.workspace) DEFAULT_TIMEOUT = args.timeout # Ensure workspace directory exists os.makedirs(WORKSPACE_DIR, exist_ok=True) class PaginationUtils: """Utility class for handling pagination across different MCP tools""" # Configuration constants DEFAULT_PAGE_SIZE = 100 MAX_PAGE_SIZE = 10000 MAX_OFFSET = 1000000 @staticmethod def validate_pagination_params(offset: int, count: int) -> tuple[int, int]: """Validate and normalize pagination parameters""" offset = max(0, min(offset, PaginationUtils.MAX_OFFSET)) count = max(0, min(count, PaginationUtils.MAX_PAGE_SIZE)) return offset, count @staticmethod def handle_pagination( items: List[Any], offset: int = 0, count: int = 0, data_type: str = "paginated-list", items_key: str = "items", item_transformer: Optional[Callable[[Any], Any]] = None ) -> Dict[str, Any]: """ Generic pagination handler for list data Args: items: List of items to paginate offset: Starting offset count: Number of items to return (0 means use default) data_type: Type identifier for the response items_key: Key name for items in response item_transformer: Optional function to transform items Returns: Paginated response dictionary """ if items is None: items = [] total_items = len(items) # Validate parameters offset, count = PaginationUtils.validate_pagination_params(offset, count) # Determine effective limit if count == 0: effective_limit = min(PaginationUtils.DEFAULT_PAGE_SIZE, max(0, total_items - offset)) else: effective_limit = min(count, max(0, total_items - offset)) # Calculate bounds start_index = min(offset, total_items) end_index = min(start_index + effective_limit, total_items) has_more = end_index < total_items # Extract and transform paginated subset paginated_items = items[start_index:end_index] if item_transformer: paginated_items = [item_transformer(item) for item in paginated_items] # Build response result = { "type": data_type, items_key: paginated_items, "pagination": { "total": total_items, "offset": offset, "limit": effective_limit, "count": len(paginated_items), "has_more": has_more } } # Add navigation helpers if has_more: result["pagination"]["next_offset"] = end_index if offset > 0: prev_offset = max(0, offset - effective_limit) result["pagination"]["prev_offset"] = prev_offset # Page calculations if effective_limit > 0: current_page = (offset // effective_limit) + 1 total_pages = (total_items + effective_limit - 1) // effective_limit result["pagination"]["current_page"] = current_page result["pagination"]["total_pages"] = total_pages result["pagination"]["page_size"] = effective_limit return result class ValidationUtils: """Utility class for input validation""" @staticmethod def validate_path(path: str, must_exist: bool = True) -> Dict[str, Union[bool, str]]: """Validate file/directory path""" if not path or not isinstance(path, str): return {"valid": False, "error": "Path cannot be empty"} if must_exist and not os.path.exists(path): return {"valid": False, "error": f"Path does not exist: {path}"} return {"valid": True} @staticmethod def validate_class_name(class_name: str) -> Dict[str, Union[bool, str]]: """Validate Java class name format""" if not class_name or not isinstance(class_name, str): return {"valid": False, "error": "Class name cannot be empty"} if not class_name.replace('.', '').replace('_', '').replace('$', '').replace('/', '').isalnum(): return {"valid": False, "error": "Invalid class name format"} return {"valid": True} @staticmethod def validate_search_pattern(pattern: str) -> Dict[str, Union[bool, str]]: """Validate search pattern""" if not pattern or not isinstance(pattern, str): return {"valid": False, "error": "Search pattern cannot be empty"} if len(pattern) > 1000: return {"valid": False, "error": "Search pattern too long (max 1000 characters)"} return {"valid": True} # Enhanced command runner with better error handling def run_command(command: List[str], timeout: int = DEFAULT_TIMEOUT, cwd: Optional[str] = None) -> Dict[str, Union[str, int, bool]]: """Enhanced command runner with comprehensive error handling""" try: logger.info(f"Running command: {' '.join(command)}") # Input validation if not command or not all(isinstance(arg, str) for arg in command): return { "success": False, "error": "Invalid command format" } result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True, timeout=timeout, cwd=cwd ) logger.info(f"Command completed successfully with return code {result.returncode}") return { "success": True, "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode, "command": " ".join(command) } except subprocess.CalledProcessError as e: logger.error(f"Command failed with return code {e.returncode}: {e.stderr}") return { "success": False, "stdout": e.stdout or "", "stderr": e.stderr or "", "returncode": e.returncode, "error": f"Command failed with return code {e.returncode}", "command": " ".join(command) } except subprocess.TimeoutExpired as e: logger.error(f"Command timed out after {timeout} seconds") return { "success": False, "error": f"Command timed out after {timeout} seconds", "command": " ".join(command) } except FileNotFoundError: return { "success": False, "error": "APKTool not found. Please ensure APKTool is installed and in PATH" } except Exception as e: logger.error(f"Unexpected error running command: {str(e)}") return { "success": False, "error": f"Unexpected error: {str(e)}", "command": " ".join(command) } # Health check functionality @mcp.tool() async def health_check() -> Dict: """ Check the health status of the APKTool MCP server and APKTool installation. Returns: Dictionary containing server status and APKTool availability """ try: # Check APKTool installation apktool_result = run_command(["apktool", "--version"], timeout=10) result = { "server_status": "running", "workspace_dir": WORKSPACE_DIR, "workspace_exists": os.path.exists(WORKSPACE_DIR), "apktool_available": apktool_result["success"], "timestamp": time.time() } if apktool_result["success"]: result["apktool_version"] = apktool_result["stdout"].strip() else: result["apktool_error"] = apktool_result["error"] logger.info("APKTool MCP Server: Health check completed") return result except Exception as e: logger.error(f"Health check error: {str(e)}") return { "server_status": "error", "error": str(e), "timestamp": time.time() } # Enhanced MCP Tools with validation and better error handling @mcp.tool() async def decode_apk( apk_path: str, force: bool = True, no_res: bool = False, no_src: bool = False, output_dir: Optional[str] = None, timeout: int = DEFAULT_TIMEOUT ) -> Dict: """ Decode an APK file using APKTool with comprehensive validation and error handling. Args: apk_path: Path to the APK file to decode force: Force delete destination directory if it exists no_res: Do not decode resources no_src: Do not decode sources output_dir: Custom output directory (optional) timeout: Command timeout in seconds Returns: Dictionary with operation results including validation details """ # Input validation path_validation = ValidationUtils.validate_path(apk_path, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} if not apk_path.lower().endswith(('.apk', '.xapk')): return {"success": False, "error": "File must have .apk or .xapk extension"} # Determine output directory if output_dir is None: apk_name = os.path.basename(apk_path).rsplit('.', 1)[0] output_dir = os.path.join(WORKSPACE_DIR, apk_name) # Build command command = ["apktool", "d", apk_path, "-o", output_dir] if force: command.append("-f") if no_res: command.append("-r") if no_src: command.append("-s") result = run_command(command, timeout=timeout) if result["success"]: # Additional validation - check if output directory was created if os.path.exists(output_dir): result["output_dir"] = output_dir result["workspace"] = WORKSPACE_DIR # Get basic project info manifest_path = os.path.join(output_dir, "AndroidManifest.xml") apktool_yml_path = os.path.join(output_dir, "apktool.yml") result["has_manifest"] = os.path.exists(manifest_path) result["has_apktool_yml"] = os.path.exists(apktool_yml_path) else: result["warning"] = "Decode reported success but output directory not found" return result @mcp.tool() async def build_apk( project_dir: str, output_apk: Optional[str] = None, debug: bool = True, force_all: bool = False, timeout: int = DEFAULT_TIMEOUT ) -> Dict: """ Build an APK file from a decoded APKTool project with enhanced validation. Args: project_dir: Path to the APKTool project directory output_apk: Optional output APK path debug: Build with debugging info force_all: Force rebuild all files timeout: Command timeout in seconds Returns: Dictionary with operation results and build information """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} if not os.path.isdir(project_dir): return {"success": False, "error": f"Project path is not a directory: {project_dir}"} # Check for required files apktool_yml = os.path.join(project_dir, "apktool.yml") manifest_xml = os.path.join(project_dir, "AndroidManifest.xml") if not os.path.exists(apktool_yml): return {"success": False, "error": "apktool.yml not found. Is this a valid APKTool project?"} if not os.path.exists(manifest_xml): return {"success": False, "error": "AndroidManifest.xml not found. Is this a valid APKTool project?"} # Build command command = ["apktool", "b", project_dir] if debug: command.append("-d") if force_all: command.append("-f") if output_apk: command.extend(["-o", output_apk]) result = run_command(command, timeout=timeout) if result["success"]: # Determine built APK path if not output_apk: output_apk = os.path.join(project_dir, "dist", os.path.basename(project_dir) + ".apk") if os.path.exists(output_apk): result["apk_path"] = output_apk result["apk_size"] = os.path.getsize(output_apk) else: result["warning"] = f"Build succeeded but APK not found at expected path: {output_apk}" return result @mcp.tool() async def get_manifest(project_dir: str) -> Dict: """ Get the AndroidManifest.xml content from a decoded APK project with validation. Args: project_dir: Path to the APKTool project directory Returns: Dictionary with manifest content, metadata, and validation results """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} manifest_path = os.path.join(project_dir, "AndroidManifest.xml") if not os.path.exists(manifest_path): return { "success": False, "error": f"AndroidManifest.xml not found in {project_dir}", "expected_path": manifest_path } try: with open(manifest_path, 'r', encoding="utf-8") as f: content = f.read() result = { "success": True, "manifest": content, "path": manifest_path, "size": os.path.getsize(manifest_path), "encoding": "utf-8" } return result except Exception as e: logger.error(f"Error reading manifest: {str(e)}") return { "success": False, "error": f"Failed to read AndroidManifest.xml: {str(e)}", "path": manifest_path } @mcp.tool() async def get_apktool_yml(project_dir: str) -> Dict: """ Get apktool.yml information from a decoded APK project with validation. Args: project_dir: Path to APKTool project directory Returns: Dictionary with apktool.yml content, metadata, and validation results """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} yml_path = os.path.join(project_dir, "apktool.yml") if not os.path.exists(yml_path): return { "success": False, "error": f"apktool.yml not found in {project_dir}", "expected_path": yml_path } try: with open(yml_path, 'r', encoding="utf-8") as f: content = f.read() result = { "success": True, "content": content, "path": yml_path, "size": os.path.getsize(yml_path), "encoding": "utf-8" } return result except Exception as e: logger.error(f"Error reading apktool.yml: {str(e)}") return { "success": False, "error": f"Failed to read apktool.yml: {str(e)}", "path": yml_path } @mcp.tool() async def list_smali_directories(project_dir: str) -> Dict: """ List all smali directories in a project with enhanced metadata. Args: project_dir: Path to the APKTool project directory Returns: Dictionary with list of smali directories and metadata """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} try: smali_dirs = [] for d in os.listdir(project_dir): dir_path = os.path.join(project_dir, d) if d.startswith("smali") and os.path.isdir(dir_path): # Count files in smali directory file_count = 0 try: for root, _, files in os.walk(dir_path): file_count += len([f for f in files if f.endswith('.smali')]) except Exception as e: logger.warning(f"Error counting files in {dir_path}: {e}") file_count = 0 smali_dirs.append({ "name": d, "path": dir_path, "smali_file_count": file_count }) return { "success": True, "smali_dirs": smali_dirs, "count": len(smali_dirs) } except Exception as e: logger.error(f"Error listing smali directories: {str(e)}") return { "success": False, "error": f"Failed to list smali directories: {str(e)}" } @mcp.tool() async def list_smali_files( project_dir: str, smali_dir: str = "smali", package_prefix: Optional[str] = None, offset: int = 0, count: int = 0 ) -> Dict: """ List smali files with pagination support and enhanced filtering. Args: project_dir: Path to the APKTool project directory smali_dir: Which smali directory to use (smali, smali_classes2, etc.) package_prefix: Optional package prefix to filter by (e.g., "com.example") offset: Starting offset for pagination count: Number of items to return (0 means use default) Returns: Paginated dictionary with list of smali files and metadata """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} smali_path = os.path.join(project_dir, smali_dir) if not os.path.exists(smali_path): # Get available smali directories try: smali_dirs = [d for d in os.listdir(project_dir) if d.startswith("smali") and os.path.isdir(os.path.join(project_dir, d))] except Exception: smali_dirs = [] return { "success": False, "error": f"Smali directory not found: {smali_path}", "available_dirs": smali_dirs } try: smali_files = [] search_root = smali_path # Handle package filtering if package_prefix: # Validate package name format if not package_prefix.replace('.', '').replace('_', '').replace('$', '').isalnum(): return {"success": False, "error": "Invalid package prefix format"} package_path = os.path.join(smali_path, package_prefix.replace('.', os.path.sep)) if not os.path.exists(package_path): return { "success": False, "error": f"Package not found: {package_prefix}", "expected_path": package_path } search_root = package_path # Recursively find all .smali files for root, _, files in os.walk(search_root): for file in files: if file.endswith(".smali"): file_path = os.path.join(root, file) rel_path = os.path.relpath(file_path, smali_path) class_name = rel_path.replace(os.path.sep, '.').replace('.smali', '') smali_files.append({ "class_name": class_name, "file_path": file_path, "rel_path": rel_path, "size": os.path.getsize(file_path) }) # Sort by class name for consistent results smali_files.sort(key=lambda x: x["class_name"]) # Apply pagination paginated_result = PaginationUtils.handle_pagination( items=smali_files, offset=offset, count=count, data_type="smali-files", items_key="smali_files" ) # Add metadata paginated_result["success"] = True paginated_result["smali_dir"] = smali_dir paginated_result["package_prefix"] = package_prefix paginated_result["search_root"] = search_root return paginated_result except Exception as e: logger.error(f"Error listing smali files: {str(e)}") return { "success": False, "error": f"Failed to list smali files: {str(e)}" } @mcp.tool() async def get_smali_file(project_dir: str, class_name: str) -> Dict: """ Get content of a specific smali file by class name with validation. Args: project_dir: Path to the APKTool project directory class_name: Full class name (e.g., com.example.MyClass) Returns: Dictionary with smali file content and metadata """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} class_validation = ValidationUtils.validate_class_name(class_name) if not class_validation["valid"]: return {"success": False, "error": class_validation["error"]} try: # Look for the class in all smali directories smali_dirs = [d for d in os.listdir(project_dir) if d.startswith("smali") and os.path.isdir(os.path.join(project_dir, d))] for smali_dir in smali_dirs: file_path = os.path.join( project_dir, smali_dir, class_name.replace('.', os.path.sep) + '.smali' ) if os.path.exists(file_path): with open(file_path, 'r', encoding="utf-8") as f: content = f.read() return { "success": True, "content": content, "file_path": file_path, "smali_dir": smali_dir, "size": os.path.getsize(file_path), "class_name": class_name, "encoding": "utf-8" } return { "success": False, "error": f"Smali file not found for class: {class_name}", "searched_dirs": smali_dirs } except Exception as e: logger.error(f"Error getting smali file: {str(e)}") return { "success": False, "error": f"Failed to get smali file: {str(e)}" } @mcp.tool() async def modify_smali_file( project_dir: str, class_name: str, new_content: str, create_backup: bool = True ) -> Dict: """ Modify smali file content with validation and backup support. Args: project_dir: Path to the APKTool project directory class_name: Full class name (e.g., com.example.MyClass) new_content: New content for the smali file create_backup: Whether to create a backup of the original file Returns: Dictionary with operation results and metadata """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} class_validation = ValidationUtils.validate_class_name(class_name) if not class_validation["valid"]: return {"success": False, "error": class_validation["error"]} try: # Find the smali file smali_dirs = [d for d in os.listdir(project_dir) if d.startswith("smali") and os.path.isdir(os.path.join(project_dir, d))] file_path = None for smali_dir in smali_dirs: test_path = os.path.join( project_dir, smali_dir, class_name.replace('.', os.path.sep) + '.smali' ) if os.path.exists(test_path): file_path = test_path break if not file_path: return { "success": False, "error": f"Smali file not found for class: {class_name}", "searched_dirs": smali_dirs } # Get original content size original_size = os.path.getsize(file_path) # Create backup if requested backup_path = None if create_backup: backup_path = f"{file_path}.bak.{int(time.time())}" shutil.copy2(file_path, backup_path) # Write new content with open(file_path, 'w', encoding='utf-8') as f: f.write(new_content) return { "success": True, "message": f"Successfully modified {file_path}", "file_path": file_path, "backup_path": backup_path, "class_name": class_name, "original_size": original_size, "new_size": len(new_content), "backup_created": backup_path is not None } except Exception as e: logger.error(f"Error modifying smali file: {str(e)}") return { "success": False, "error": f"Failed to modify smali file: {str(e)}" } @mcp.tool() async def list_resources( project_dir: str, resource_type: Optional[str] = None, offset: int = 0, count: int = 0 ) -> Dict: """ List resources with pagination support and enhanced metadata. Args: project_dir: Path to the APKTool project directory resource_type: Optional resource type to filter by (e.g., "layout", "drawable") offset: Starting offset for pagination count: Number of items to return (0 means use default) Returns: Paginated dictionary with list of resources and metadata """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} res_path = os.path.join(project_dir, "res") if not os.path.exists(res_path): return { "success": False, "error": f"Resources directory not found: {res_path}" } try: if resource_type: # List resources of specific type type_path = os.path.join(res_path, resource_type) if not os.path.exists(type_path): # Get available resource types resource_types = [ d for d in os.listdir(res_path) if os.path.isdir(os.path.join(res_path, d)) ] return { "success": False, "error": f"Resource type directory not found: {resource_type}", "available_types": resource_types } resources = [] for item in os.listdir(type_path): item_path = os.path.join(type_path, item) if os.path.isfile(item_path): resources.append({ "name": item, "path": item_path, "size": os.path.getsize(item_path), "type": resource_type, "extension": os.path.splitext(item)[1], "modified_time": os.path.getmtime(item_path) }) # Sort by name resources.sort(key=lambda x: x["name"]) # Apply pagination paginated_result = PaginationUtils.handle_pagination( items=resources, offset=offset, count=count, data_type="resources", items_key="resources" ) paginated_result["success"] = True paginated_result["resource_type"] = resource_type paginated_result["resource_path"] = type_path return paginated_result else: # List all resource types with counts resource_types = [] for item in os.listdir(res_path): type_path = os.path.join(res_path, item) if os.path.isdir(type_path): try: files = [f for f in os.listdir(type_path) if os.path.isfile(os.path.join(type_path, f))] resource_count = len(files) # Calculate total size total_size = 0 for f in files: try: total_size += os.path.getsize(os.path.join(type_path, f)) except: pass resource_types.append({ "type": item, "path": type_path, "count": resource_count, "total_size": total_size }) except Exception as e: logger.warning(f"Error processing resource type {item}: {e}") resource_types.append({ "type": item, "path": type_path, "count": 0, "total_size": 0, "error": str(e) }) # Sort by type name resource_types.sort(key=lambda x: x["type"]) # Apply pagination paginated_result = PaginationUtils.handle_pagination( items=resource_types, offset=offset, count=count, data_type="resource-types", items_key="resource_types" ) paginated_result["success"] = True return paginated_result except Exception as e: logger.error(f"Error listing resources: {str(e)}") return { "success": False, "error": f"Failed to list resources: {str(e)}" } @mcp.tool() async def get_resource_file(project_dir: str, resource_type: str, resource_name: str) -> Dict: """ Get content of a specific resource file with validation and metadata. Args: project_dir: Path to the APKTool project directory resource_type: Resource type (e.g., "layout", "drawable") resource_name: Name of the resource file Returns: Dictionary with resource file content and metadata """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} if not resource_type or not resource_name: return {"success": False, "error": "Resource type and name are required"} resource_path = os.path.join(project_dir, "res", resource_type, resource_name) if not os.path.exists(resource_path): return { "success": False, "error": f"Resource file not found: {resource_path}", "expected_path": resource_path } try: file_size = os.path.getsize(resource_path) is_text_file = True # Try to read as text first try: with open(resource_path, 'r', encoding="utf-8") as f: content = f.read() encoding = "utf-8" except UnicodeDecodeError: is_text_file = False content = None encoding = None if is_text_file and content is not None: result = { "success": True, "content": content, "path": resource_path, "size": file_size, "resource_type": resource_type, "resource_name": resource_name, "encoding": encoding } return result else: # Binary file return { "success": False, "error": "This appears to be a binary resource file and cannot be read as text", "path": resource_path, "size": file_size, "resource_type": resource_type, "resource_name": resource_name, "is_binary": True, "is_text": False } except Exception as e: logger.error(f"Error getting resource file: {str(e)}") return { "success": False, "error": f"Failed to get resource file: {str(e)}" } @mcp.tool() async def modify_resource_file( project_dir: str, resource_type: str, resource_name: str, new_content: str, create_backup: bool = True ) -> Dict: """ Modify the content of a specific resource file with validation and backup support. Args: project_dir: Path to the APKTool project directory resource_type: Resource type (e.g., "layout", "values") resource_name: Name of the resource file new_content: New content for the resource file create_backup: Whether to create a backup of the original file Returns: Dictionary with operation results and metadata """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} if not resource_type or not resource_name: return {"success": False, "error": "Resource type and name are required"} resource_path = os.path.join(project_dir, "res", resource_type, resource_name) if not os.path.exists(resource_path): return { "success": False, "error": f"Resource file not found: {resource_path}", "expected_path": resource_path } try: # Get original file info original_size = os.path.getsize(resource_path) # Create backup if requested backup_path = None if create_backup: backup_path = f"{resource_path}.bak.{int(time.time())}" shutil.copy2(resource_path, backup_path) # Write new content with open(resource_path, 'w', encoding="utf-8") as f: f.write(new_content) result = { "success": True, "message": f"Successfully modified {resource_path}", "path": resource_path, "backup_path": backup_path, "resource_type": resource_type, "resource_name": resource_name, "original_size": original_size, "new_size": len(new_content), "backup_created": backup_path is not None } return result except Exception as e: logger.error(f"Error modifying resource file: {str(e)}") return { "success": False, "error": f"Failed to modify resource file: {str(e)}" } @mcp.tool() async def search_in_files( project_dir: str, search_pattern: str, file_extensions: List[str] = [".smali", ".xml"], max_results: int = 100, offset: int = 0, count: int = 0, case_sensitive: bool = False ) -> Dict: """ Search for patterns in files with pagination and enhanced filtering. Args: project_dir: Path to the APKTool project directory search_pattern: Text pattern to search for file_extensions: List of file extensions to search in max_results: Maximum total results to collect before pagination offset: Starting offset for pagination count: Number of items to return (0 means use default) case_sensitive: Whether search should be case sensitive Returns: Paginated dictionary with search results and metadata """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} pattern_validation = ValidationUtils.validate_search_pattern(search_pattern) if not pattern_validation["valid"]: return {"success": False, "error": pattern_validation["error"]} if not file_extensions or not isinstance(file_extensions, list): return {"success": False, "error": "File extensions must be a non-empty list"} try: results = [] search_stats = { "files_searched": 0, "files_matched": 0, "total_matches": 0, "search_truncated": False, "directories_searched": 0, "start_time": time.time() } # Prepare search pattern pattern = search_pattern if case_sensitive else search_pattern.lower() # Walk through project directory for root, dirs, files in os.walk(project_dir): search_stats["directories_searched"] += 1 for file in files: if len(results) >= max_results: search_stats["search_truncated"] = True break if any(file.endswith(ext) for ext in file_extensions): file_path = os.path.join(root, file) search_stats["files_searched"] += 1 try: with open(file_path, 'r', encoding="utf-8") as f: content = f.read() # Perform search search_content = content if case_sensitive else content.lower() if pattern in search_content: search_stats["files_matched"] += 1 # Count matches in this file and find line numbers matches_in_file = search_content.count(pattern) search_stats["total_matches"] += matches_in_file # Find line numbers of matches lines = content.splitlines() line_matches = [] for i, line in enumerate(lines, 1): search_line = line if case_sensitive else line.lower() if pattern in search_line: line_matches.append({ "line_number": i, "line_content": line.strip()[:200], # Truncate long lines "matches_in_line": search_line.count(pattern) }) rel_path = os.path.relpath(file_path, project_dir) results.append({ "file": rel_path, "path": file_path, "size": os.path.getsize(file_path), "matches": matches_in_file, "extension": os.path.splitext(file)[1], "line_matches": line_matches[:10], # Limit to first 10 line matches "total_line_matches": len(line_matches) }) except UnicodeDecodeError: # Skip binary files continue except Exception as e: logger.warning(f"Error reading file {file_path}: {str(e)}") continue if search_stats["search_truncated"]: break search_stats["end_time"] = time.time() search_stats["duration"] = search_stats["end_time"] - search_stats["start_time"] # Sort by number of matches (descending) then by file name results.sort(key=lambda x: (-x["matches"], x["file"])) # Apply pagination paginated_result = PaginationUtils.handle_pagination( items=results, offset=offset, count=count, data_type="search-results", items_key="results" ) # Add search metadata paginated_result["success"] = True paginated_result["search_pattern"] = search_pattern paginated_result["case_sensitive"] = case_sensitive paginated_result["file_extensions"] = file_extensions paginated_result["search_stats"] = search_stats return paginated_result except Exception as e: logger.error(f"Error searching in files: {str(e)}") return { "success": False, "error": f"Failed to search in files: {str(e)}" } @mcp.tool() async def clean_project(project_dir: str, backup: bool = True) -> Dict: """ Clean a project directory to prepare for rebuilding with enhanced backup support. Args: project_dir: Path to the APKTool project directory backup: Whether to create a backup of build directories before cleaning Returns: Dictionary with operation results and cleanup details """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} try: dirs_to_clean = ["build", "dist", "temp"] files_to_clean = ["*.tmp", "*.log"] cleaned_dirs = [] cleaned_files = [] backed_up = [] # Clean directories for dir_name in dirs_to_clean: dir_path = os.path.join(project_dir, dir_name) if os.path.exists(dir_path): if backup: # Create backup backup_path = f"{dir_path}_backup_{int(time.time())}" shutil.copytree(dir_path, backup_path) backed_up.append({ "original": dir_path, "backup": backup_path, "type": "directory" }) # Calculate size before removal dir_size = 0 file_count = 0 for root, dirs, files in os.walk(dir_path): for file in files: file_path = os.path.join(root, file) try: dir_size += os.path.getsize(file_path) file_count += 1 except: pass # Remove directory shutil.rmtree(dir_path) cleaned_dirs.append({ "path": dir_path, "size_freed": dir_size, "files_removed": file_count }) # Clean specific files import glob for pattern in files_to_clean: pattern_path = os.path.join(project_dir, pattern) for file_path in glob.glob(pattern_path): if os.path.isfile(file_path): file_size = os.path.getsize(file_path) if backup: backup_path = f"{file_path}.bak.{int(time.time())}" shutil.copy2(file_path, backup_path) backed_up.append({ "original": file_path, "backup": backup_path, "type": "file" }) os.remove(file_path) cleaned_files.append({ "path": file_path, "size": file_size }) total_size_freed = sum(d["size_freed"] for d in cleaned_dirs) + sum(f["size"] for f in cleaned_files) total_files_removed = sum(d["files_removed"] for d in cleaned_dirs) + len(cleaned_files) return { "success": True, "cleaned_directories": cleaned_dirs, "cleaned_files": cleaned_files, "backed_up_items": backed_up, "total_size_freed": total_size_freed, "total_files_removed": total_files_removed, "backup_created": len(backed_up) > 0 } except Exception as e: logger.error(f"Error cleaning project: {str(e)}") return { "success": False, "error": f"Failed to clean project: {str(e)}" } @mcp.tool() async def analyze_project_structure(project_dir: str) -> Dict: """ Analyze the structure of a decoded APK project and provide comprehensive metadata. Args: project_dir: Path to the APKTool project directory Returns: Dictionary with detailed project analysis """ # Input validation path_validation = ValidationUtils.validate_path(project_dir, must_exist=True) if not path_validation["valid"]: return {"success": False, "error": path_validation["error"]} try: analysis = { "project_path": project_dir, "analysis_time": time.time(), "is_valid_project": False, "project_size": 0, "file_counts": {}, "directory_structure": {}, "smali_analysis": {}, "resource_analysis": {}, "manifest_analysis": {}, "errors": [] } # Check if it's a valid APKTool project required_files = ["AndroidManifest.xml", "apktool.yml"] missing_files = [] for file in required_files: if not os.path.exists(os.path.join(project_dir, file)): missing_files.append(file) analysis["is_valid_project"] = len(missing_files) == 0 if missing_files: analysis["errors"].append(f"Missing required files: {', '.join(missing_files)}") # Calculate total project size and file counts total_size = 0 file_counts = {} for root, dirs, files in os.walk(project_dir): for file in files: file_path = os.path.join(root, file) try: size = os.path.getsize(file_path) total_size += size ext = os.path.splitext(file)[1].lower() if not ext: ext = "(no extension)" if ext not in file_counts: file_counts[ext] = {"count": 0, "size": 0} file_counts[ext]["count"] += 1 file_counts[ext]["size"] += size except Exception as e: analysis["errors"].append(f"Error processing file {file_path}: {str(e)}") analysis["project_size"] = total_size analysis["file_counts"] = file_counts # Directory structure analysis directories = {} for item in os.listdir(project_dir): item_path = os.path.join(project_dir, item) if os.path.isdir(item_path): try: # Count files in directory file_count = 0 dir_size = 0 for root, _, files in os.walk(item_path): file_count += len(files) for file in files: try: dir_size += os.path.getsize(os.path.join(root, file)) except: pass directories[item] = { "path": item_path, "file_count": file_count, "size": dir_size } except Exception as e: analysis["errors"].append(f"Error analyzing directory {item}: {str(e)}") analysis["directory_structure"] = directories # Smali analysis smali_dirs = [d for d in directories.keys() if d.startswith("smali")] smali_analysis = { "smali_directories": smali_dirs, "total_smali_files": 0, "package_distribution": {} } for smali_dir in smali_dirs: smali_path = os.path.join(project_dir, smali_dir) for root, _, files in os.walk(smali_path): smali_files = [f for f in files if f.endswith('.smali')] smali_analysis["total_smali_files"] += len(smali_files) # Analyze package distribution for file in smali_files: rel_path = os.path.relpath(root, smali_path) if rel_path != ".": package = rel_path.replace(os.path.sep, ".") top_level_package = package.split(".")[0] if "." in package else package if top_level_package not in smali_analysis["package_distribution"]: smali_analysis["package_distribution"][top_level_package] = 0 smali_analysis["package_distribution"][top_level_package] += 1 analysis["smali_analysis"] = smali_analysis # Resource analysis res_path = os.path.join(project_dir, "res") resource_analysis = { "has_resources": os.path.exists(res_path), "resource_types": {}, "total_resource_files": 0 } if os.path.exists(res_path): try: for item in os.listdir(res_path): type_path = os.path.join(res_path, item) if os.path.isdir(type_path): files = [f for f in os.listdir(type_path) if os.path.isfile(os.path.join(type_path, f))] total_size = sum(os.path.getsize(os.path.join(type_path, f)) for f in files) resource_analysis["resource_types"][item] = { "file_count": len(files), "total_size": total_size } resource_analysis["total_resource_files"] += len(files) except Exception as e: analysis["errors"].append(f"Error analyzing resources: {str(e)}") analysis["resource_analysis"] = resource_analysis # Manifest analysis manifest_path = os.path.join(project_dir, "AndroidManifest.xml") manifest_analysis = { "exists": os.path.exists(manifest_path), "size": 0, "package_name": None, "activities": [], "permissions": [], "services": [] } if os.path.exists(manifest_path): try: manifest_analysis["size"] = os.path.getsize(manifest_path) with open(manifest_path, 'r', encoding='utf-8') as f: content = f.read() try: root = ET.fromstring(content) # Extract package name manifest_analysis["package_name"] = root.get("package") # Extract activities for activity in root.findall(".//activity"): name = activity.get("{http://schemas.android.com/apk/res/android}name") if name: manifest_analysis["activities"].append(name) # Extract permissions for perm in root.findall(".//uses-permission"): name = perm.get("{http://schemas.android.com/apk/res/android}name") if name: manifest_analysis["permissions"].append(name) # Extract services for service in root.findall(".//service"): name = service.get("{http://schemas.android.com/apk/res/android}name") if name: manifest_analysis["services"].append(name) except ET.ParseError as e: analysis["errors"].append(f"Manifest XML parsing error: {str(e)}") except Exception as e: analysis["errors"].append(f"Error analyzing manifest: {str(e)}") analysis["manifest_analysis"] = manifest_analysis return { "success": True, "analysis": analysis } except Exception as e: logger.error(f"Error analyzing project structure: {str(e)}") return { "success": False, "error": f"Failed to analyze project structure: {str(e)}" } @mcp.tool() async def get_workspace_info() -> Dict: """ Get information about the APKTool MCP server workspace and current projects. Returns: Dictionary with workspace information and project list """ try: workspace_info = { "workspace_path": WORKSPACE_DIR, "workspace_exists": os.path.exists(WORKSPACE_DIR), "projects": [], "total_projects": 0, "total_workspace_size": 0, "free_space": 0 } if not os.path.exists(WORKSPACE_DIR): return { "success": False, "error": f"Workspace directory does not exist: {WORKSPACE_DIR}", "workspace_info": workspace_info } # Get disk usage information try: import shutil total, used, free = shutil.disk_usage(WORKSPACE_DIR) workspace_info["free_space"] = free workspace_info["total_disk_space"] = total workspace_info["used_disk_space"] = used except Exception as e: logger.warning(f"Could not get disk usage info: {e}") # Scan for projects projects = [] total_size = 0 for item in os.listdir(WORKSPACE_DIR): item_path = os.path.join(WORKSPACE_DIR, item) if os.path.isdir(item_path): # Check if it looks like an APKTool project has_manifest = os.path.exists(os.path.join(item_path, "AndroidManifest.xml")) has_apktool_yml = os.path.exists(os.path.join(item_path, "apktool.yml")) is_apktool_project = has_manifest and has_apktool_yml # Calculate project size project_size = 0 file_count = 0 try: for root, _, files in os.walk(item_path): for file in files: file_path = os.path.join(root, file) try: project_size += os.path.getsize(file_path) file_count += 1 except: pass except Exception as e: logger.warning(f"Error calculating size for {item_path}: {e}") total_size += project_size project_info = { "name": item, "path": item_path, "is_apktool_project": is_apktool_project, "has_manifest": has_manifest, "has_apktool_yml": has_apktool_yml, "size": project_size, "file_count": file_count, "modified_time": os.path.getmtime(item_path) } # Get additional info if it's a valid project if is_apktool_project: try: # Read package name from manifest manifest_path = os.path.join(item_path, "AndroidManifest.xml") with open(manifest_path, 'r', encoding='utf-8') as f: content = f.read() root = ET.fromstring(content) project_info["package_name"] = root.get("package") # Count smali directories smali_dirs = [d for d in os.listdir(item_path) if d.startswith("smali") and os.path.isdir(os.path.join(item_path, d))] project_info["smali_directories"] = len(smali_dirs) except Exception as e: logger.warning(f"Error getting additional info for {item}: {e}") projects.append(project_info) # Sort projects by modification time (newest first) projects.sort(key=lambda x: x["modified_time"], reverse=True) workspace_info["projects"] = projects workspace_info["total_projects"] = len(projects) workspace_info["total_workspace_size"] = total_size workspace_info["apktool_projects"] = len([p for p in projects if p["is_apktool_project"]]) return { "success": True, "workspace_info": workspace_info } except Exception as e: logger.error(f"Error getting workspace info: {str(e)}") return { "success": False, "error": f"Failed to get workspace info: {str(e)}" } def main(): print("=" * 80) print("APKTool MCP Server") print("By ZinjaCoder (https://github.com/zinja-coder)") print("To Report Issues: https://github.com/zinja-coder/apktool-mcp-server/issues") print("=" * 80) print() # Display configuration print("Configuration:") print(f" Workspace Directory: {WORKSPACE_DIR}") print(f" Default Timeout: {DEFAULT_TIMEOUT}s") print(f" HTTP Mode: {'Enabled' if args.http else 'Disabled'}") if args.http: print(f" HTTP Port: {args.port}") print() # Perform initial health check print("Performing initial health check...") try: import asyncio health_result = asyncio.run(health_check()) if health_result.get("server_status") == "running": print("Server Status: Running") else: print("Server Status: Error") if health_result.get("apktool_available"): print(f"APKTool Available: {health_result.get('apktool_version', 'Version unknown')}") else: print("APKTool Not Available") print(f"Error: {health_result.get('apktool_error', 'Unknown error')}") if health_result.get("workspace_exists"): print(f"Workspace Directory: {WORKSPACE_DIR}") else: print(f"Workspace Directory: {WORKSPACE_DIR} (will be created)") os.makedirs(WORKSPACE_DIR, exist_ok=True) # Get workspace info workspace_result = asyncio.run(get_workspace_info()) if workspace_result.get("success"): info = workspace_result["workspace_info"] print(f"Workspace Projects: {info.get('total_projects', 0)} total") print(f"APKTool Projects: {info.get('apktool_projects', 0)}") if info.get("free_space"): free_gb = info["free_space"] / (1024**3) print(f" Free Space: {free_gb:.1f} GB") except Exception as e: print(f"Health check failed: {e}") print() print("Available MCP Tools:") tools = [ "health_check", "decode_apk", "build_apk", "get_manifest", "get_apktool_yml", "list_smali_directories", "list_smali_files", "get_smali_file", "modify_smali_file", "list_resources", "get_resource_file", "modify_resource_file", "search_in_files", "clean_project", "analyze_project_structure", "get_workspace_info" ] for i, tool in enumerate(tools, 1): print(f" {i:2d}. {tool}") print() print("Starting MCP server...") if args.http: print(f"Server will be available at: http://127.0.0.1:{args.port}") mcp.run(transport="streamable-http", port=args.port) else: print("Server running in stdio mode") mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zinja-coder/apktool-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server