Skip to main content
Glama

ASUS Merlin Router MCP Server

by kcsoukup
asus_merlin_mcp.py25.2 kB
#!/usr/bin/env python3 r""" ___ ____ _____ ____ _____ ____ _ __ _____ \ \/ (__)/ ___)_/ \/ _ ) | | | ____) \ | | |(_ _) () | ( () | | |___ \ \ |__|____ |\____|__|\ \____|____| ) \_/ `--' `--' \____/ P R o G R A M M i N G <========================================[KCS]=> Developer: Ken C. Soukup Project : MCP Server for ASUS Router Purpose : Use AI Agents for managing ASUS routers running Asuswrt-Merlin firmware via SSH/SCP. <=================================[10/08/2025]=> Asuswrt-Merlin Firmware Home -- https://www.asuswrt-merlin.net/ """ __project__ = "MCP Server for ASUS Router" __version__ = "1.0" __author__ = "Ken C. Soukup" __company__ = "Vigorous Programming" __minted__ = "2025" import asyncio import logging from typing import Any, Sequence, Optional import paramiko from mcp.server import Server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource import mcp.server.stdio import os # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("asus-merlin-mcp") # Router connection configuration ROUTER_CONFIG = { "host": os.getenv("ROUTER_HOST", "192.168.1.1"), "port": int(os.getenv("ROUTER_PORT", "22")), "username": os.getenv("ROUTER_USER", "admin"), "password": os.getenv("ROUTER_PASSWORD", ""), "key_file": os.getenv("ROUTER_KEY_FILE", ""), } class RouterSSHClient: """Handles SSH connections to the ASUS router""" def __init__(self, config: dict): self.config = config self.client: Optional[paramiko.SSHClient] = None def connect(self): """Establish SSH connection to router""" try: self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Use key-based auth if key file provided, otherwise password if self.config["key_file"] and os.path.exists(self.config["key_file"]): self.client.connect( hostname=self.config["host"], port=self.config["port"], username=self.config["username"], key_filename=self.config["key_file"], timeout=10, ) else: self.client.connect( hostname=self.config["host"], port=self.config["port"], username=self.config["username"], password=self.config["password"], timeout=10, ) logger.info(f"Connected to router at {self.config['host']}") return True except Exception as e: logger.error(f"Failed to connect to router: {e}") return False def execute_command(self, command: str) -> tuple[str, str, int]: """Execute a command on the router""" if not self.client: if not self.connect(): return "", "Failed to connect to router", 1 assert self.client is not None # Type narrowing for Pylance try: _stdin, stdout, stderr = self.client.exec_command(command, timeout=30) exit_code = stdout.channel.recv_exit_status() output = stdout.read().decode("utf-8", errors="replace") error = stderr.read().decode("utf-8", errors="replace") return output, error, exit_code except Exception as e: logger.error(f"Command execution failed: {e}") return "", str(e), 1 def upload_file(self, local_path: str, remote_path: str) -> tuple[bool, str]: """Upload file to router via SCP""" if not self.client: if not self.connect(): return False, "Failed to connect to router" assert self.client is not None # Type narrowing for Pylance try: sftp = self.client.open_sftp() sftp.put(local_path, remote_path) sftp.close() logger.info(f"Uploaded {local_path} to {remote_path}") return True, "SFTP upload successful" except Exception as e: error_msg = f"SFTP upload failed: {type(e).__name__}: {str(e)}" logger.error(error_msg) return False, error_msg def download_file(self, remote_path: str, local_path: str) -> tuple[bool, str]: """Download file from router via SCP""" if not self.client: if not self.connect(): return False, "Failed to connect to router" assert self.client is not None # Type narrowing for Pylance try: sftp = self.client.open_sftp() sftp.get(remote_path, local_path) sftp.close() logger.info(f"Downloaded {remote_path} to {local_path}") return True, "SFTP download successful" except Exception as e: error_msg = f"SFTP download failed: {type(e).__name__}: {str(e)}" logger.error(error_msg) return False, error_msg def upload_file_shell(self, local_path: str, remote_path: str) -> tuple[bool, str]: """Upload file to router using shell commands (fallback when SFTP unavailable)""" try: import hashlib # Read local file and calculate checksum with open(local_path, "rb") as f: content = f.read() local_md5 = hashlib.md5(content).hexdigest() # Convert to hex string hex_content = content.hex() # Split into chunks to avoid command line length limits (4000 chars per chunk) chunk_size = 4000 chunks = [ hex_content[i : i + chunk_size] for i in range(0, len(hex_content), chunk_size) ] # Clear/create the file first output, error, code = self.execute_command(f"> {remote_path}") if code != 0: error_msg = f"Shell upload failed to create file: {error}" logger.error(error_msg) return False, error_msg # Upload in chunks using printf with hex escape sequences for i, chunk in enumerate(chunks): # Convert hex pairs to \x escape sequences for printf escaped = "".join( f"\\x{chunk[j : j + 2]}" for j in range(0, len(chunk), 2) ) cmd = f"printf '{escaped}' >> {remote_path}" output, error, code = self.execute_command(cmd) if code != 0: error_msg = ( f"Shell upload failed at chunk {i + 1}/{len(chunks)}: {error}" ) logger.error(error_msg) return False, error_msg # Verify upload with size and checksum verify_output, _, verify_code = self.execute_command( f"test -f {remote_path} && wc -c < {remote_path} && md5sum {remote_path}" ) if verify_code == 0: lines = verify_output.strip().split("\n") remote_size = int(lines[0].strip()) remote_md5 = lines[1].split()[0] if len(lines) > 1 else "" if remote_size != len(content): error_msg = f"Shell upload size mismatch: expected {len(content)}, got {remote_size}" logger.error(error_msg) return False, error_msg if remote_md5 and remote_md5 != local_md5: error_msg = f"Shell upload checksum mismatch: expected {local_md5}, got {remote_md5}" logger.error(error_msg) return False, error_msg logger.info( f"Uploaded {local_path} to {remote_path} via shell ({len(content)} bytes, MD5: {local_md5})" ) return ( True, f"Shell-based upload successful ({len(content)} bytes, MD5: {local_md5}, verified)", ) else: error_msg = "Shell upload verification failed: file not found on router" logger.error(error_msg) return False, error_msg except Exception as e: error_msg = f"Shell upload failed: {type(e).__name__}: {str(e)}" logger.error(error_msg) return False, error_msg def download_file_shell( self, remote_path: str, local_path: str ) -> tuple[bool, str]: """Download file from router using shell commands (fallback when SFTP unavailable)""" try: import hashlib # Get remote file checksum first md5_output, _, md5_code = self.execute_command(f"md5sum {remote_path}") remote_md5 = "" if md5_code == 0: remote_md5 = md5_output.split()[0] # Use hexdump to get binary-safe output from router output, error, code = self.execute_command( f"hexdump -v -e '/1 \"%02x\"' {remote_path}" ) if code != 0: error_msg = f"Shell download failed: {error}" logger.error(error_msg) return False, error_msg # Convert hex string back to binary try: binary_data = bytes.fromhex(output.strip()) except ValueError as e: error_msg = f"Shell download failed to decode hex data: {e}" logger.error(error_msg) return False, error_msg # Calculate local checksum local_md5 = hashlib.md5(binary_data).hexdigest() # Verify checksum matches if remote_md5 and local_md5 != remote_md5: error_msg = f"Shell download checksum mismatch: expected {remote_md5}, got {local_md5}" logger.error(error_msg) return False, error_msg # Write to local file in binary mode with open(local_path, "wb") as f: f.write(binary_data) logger.info( f"Downloaded {remote_path} to {local_path} via shell ({len(binary_data)} bytes, MD5: {local_md5})" ) return ( True, f"Shell-based download successful ({len(binary_data)} bytes, MD5: {local_md5}, verified)", ) except Exception as e: error_msg = f"Shell download failed: {type(e).__name__}: {str(e)}" logger.error(error_msg) return False, error_msg def close(self): """Close SSH connection""" if self.client: self.client.close() self.client = None # Initialize MCP server app = Server("asus-merlin-router") router = RouterSSHClient(ROUTER_CONFIG) @app.list_tools() async def list_tools() -> list[Tool]: """List available MCP tools for router management""" return [ Tool( name="get_router_info", description="Get router system information (uptime, memory, CPU, firmware version)", inputSchema={"type": "object", "properties": {}, "required": []}, ), Tool( name="get_connected_devices", description="List all devices connected to the router (via DHCP)", inputSchema={"type": "object", "properties": {}, "required": []}, ), Tool( name="get_wifi_status", description="Get WiFi status for all radios (2.4GHz, 5GHz, etc.)", inputSchema={"type": "object", "properties": {}, "required": []}, ), Tool( name="restart_service", description="Restart a specific router service (e.g., wireless, vpnclient1, httpd)", inputSchema={ "type": "object", "properties": { "service_name": { "type": "string", "description": "Service to restart (wireless, vpnclient1, wan, httpd, etc.)", } }, "required": ["service_name"], }, ), Tool( name="reboot_router", description="Reboot the router. WARNING: This will disconnect all clients.", inputSchema={ "type": "object", "properties": { "confirm": { "type": "boolean", "description": "Must be true to confirm reboot", } }, "required": ["confirm"], }, ), Tool( name="get_nvram_variable", description="Get the value of a specific NVRAM variable", inputSchema={ "type": "object", "properties": { "variable_name": { "type": "string", "description": "NVRAM variable name to retrieve", } }, "required": ["variable_name"], }, ), Tool( name="set_nvram_variable", description="Set a NVRAM variable value. WARNING: Incorrect values can break router configuration.", inputSchema={ "type": "object", "properties": { "variable_name": { "type": "string", "description": "NVRAM variable name", }, "value": {"type": "string", "description": "Value to set"}, "commit": { "type": "boolean", "description": "Commit changes to permanent storage (default: false)", "default": False, }, }, "required": ["variable_name", "value"], }, ), Tool( name="execute_command", description="Execute a custom command on the router via SSH", inputSchema={ "type": "object", "properties": { "command": { "type": "string", "description": "Shell command to execute", } }, "required": ["command"], }, ), Tool( name="read_file", description="Read contents of a file on the router", inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to file on router", }, "max_lines": { "type": "integer", "description": "Maximum number of lines to read (default: 100)", "default": 100, }, }, "required": ["file_path"], }, ), Tool( name="upload_file", description="Upload a file to the router via SCP", inputSchema={ "type": "object", "properties": { "local_path": {"type": "string", "description": "Local file path"}, "remote_path": { "type": "string", "description": "Destination path on router (e.g., /jffs/scripts/)", }, }, "required": ["local_path", "remote_path"], }, ), Tool( name="download_file", description="Download a file from the router via SCP", inputSchema={ "type": "object", "properties": { "remote_path": { "type": "string", "description": "File path on router", }, "local_path": { "type": "string", "description": "Local destination path", }, }, "required": ["remote_path", "local_path"], }, ), Tool( name="get_vpn_status", description="Get status of VPN clients and servers", inputSchema={"type": "object", "properties": {}, "required": []}, ), Tool( name="list_processes", description="List running processes on the router", inputSchema={ "type": "object", "properties": { "filter": { "type": "string", "description": "Optional: filter processes by name", } }, "required": [], }, ), ] @app.call_tool() async def call_tool( name: str, arguments: Any ) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """Handle tool calls""" try: if name == "get_router_info": output, error, code = router.execute_command( "echo '=== Uptime ==='; uptime; " "echo '=== Memory ==='; free; " "echo '=== Firmware ==='; nvram get firmver; nvram get buildno" ) return [ TextContent( type="text", text=output if code == 0 else f"Error: {error}" ) ] elif name == "get_connected_devices": output, error, code = router.execute_command( "cat /var/lib/misc/dnsmasq.leases 2>/dev/null || arp -a" ) return [ TextContent( type="text", text=output if code == 0 else f"Error: {error}" ) ] elif name == "get_wifi_status": output, error, code = router.execute_command( "wl -i eth1 status 2>/dev/null; " "wl -i eth2 status 2>/dev/null; " "nvram get wl0_ssid; nvram get wl1_ssid" ) return [ TextContent( type="text", text=output if code == 0 else f"Error: {error}" ) ] elif name == "restart_service": service = arguments.get("service_name") output, error, code = router.execute_command(f"service restart_{service}") result = f"Service '{service}' restart command executed.\n{output}" if error: result += f"\nErrors: {error}" return [TextContent(type="text", text=result)] elif name == "reboot_router": if not arguments.get("confirm"): return [ TextContent( type="text", text="Reboot not confirmed. Set 'confirm' to true." ) ] output, error, code = router.execute_command("service reboot") return [ TextContent( type="text", text="Router reboot initiated. Connection will be lost.", ) ] elif name == "get_nvram_variable": var = arguments.get("variable_name") output, error, code = router.execute_command(f"nvram get {var}") return [ TextContent( type="text", text=output.strip() if code == 0 else f"Error: {error}" ) ] elif name == "set_nvram_variable": var = arguments.get("variable_name") val = arguments.get("value") commit = arguments.get("commit", False) cmd = f"nvram set {var}='{val}'" if commit: cmd += " && nvram commit" output, error, code = router.execute_command(cmd) result = f"NVRAM variable '{var}' set to '{val}'" if commit: result += " and committed to permanent storage" if error: result += f"\nErrors: {error}" return [TextContent(type="text", text=result)] elif name == "execute_command": cmd = arguments.get("command") output, error, code = router.execute_command(cmd) result = f"Command: {cmd}\n\nOutput:\n{output}" if error: result += f"\n\nErrors:\n{error}" result += f"\n\nExit code: {code}" return [TextContent(type="text", text=result)] elif name == "read_file": path = arguments.get("file_path") max_lines = arguments.get("max_lines", 100) output, error, code = router.execute_command(f"head -n {max_lines} {path}") return [ TextContent( type="text", text=output if code == 0 else f"Error: {error}" ) ] elif name == "upload_file": local = arguments.get("local_path") remote = arguments.get("remote_path") # Try SFTP first success, message = router.upload_file(local, remote) # If SFTP fails, try shell-based fallback if not success and "SFTP" in message: logger.info("SFTP unavailable, falling back to shell-based upload") success, message = router.upload_file_shell(local, remote) if success: result = f"✓ File uploaded successfully: {local} -> {remote}\n" result += ( "Note: Used shell commands (SFTP not available on router)\n" ) result += f"Details: {message}" else: result = f"✗ File upload failed: {local} -> {remote}\n" result += f"Error: {message}" elif success: result = f"✓ File uploaded successfully: {local} -> {remote}\n" result += "Method: SFTP\n" result += f"Details: {message}" else: result = f"✗ File upload failed: {local} -> {remote}\n" result += f"Error: {message}" return [TextContent(type="text", text=result)] elif name == "download_file": remote = arguments.get("remote_path") local = arguments.get("local_path") # Try SFTP first success, message = router.download_file(remote, local) # If SFTP fails, try shell-based fallback if not success and "SFTP" in message: logger.info("SFTP unavailable, falling back to shell-based download") success, message = router.download_file_shell(remote, local) if success: result = f"✓ File downloaded successfully: {remote} -> {local}\n" result += ( "Note: Used shell commands (SFTP not available on router)\n" ) result += f"Details: {message}" else: result = f"✗ File download failed: {remote} -> {local}\n" result += f"Error: {message}" elif success: result = f"✓ File downloaded successfully: {remote} -> {local}\n" result += "Method: SFTP\n" result += f"Details: {message}" else: result = f"✗ File download failed: {remote} -> {local}\n" result += f"Error: {message}" return [TextContent(type="text", text=result)] elif name == "get_vpn_status": output, error, code = router.execute_command( "nvram get vpn_client1_state; " "nvram get vpn_client2_state; " "ps | grep vpn" ) return [ TextContent( type="text", text=output if code == 0 else f"Error: {error}" ) ] elif name == "list_processes": filter_name = arguments.get("filter", "") cmd = "ps" if not filter_name else f"ps | grep {filter_name}" output, error, code = router.execute_command(cmd) return [ TextContent( type="text", text=output if code == 0 else f"Error: {error}" ) ] else: return [TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: logger.error(f"Tool execution error: {e}") return [TextContent(type="text", text=f"Error executing tool: {str(e)}")] async def main(): """Run the MCP server""" async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await app.run(read_stream, write_stream, app.create_initialization_options()) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kcsoukup/asus-merlin-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server