JVM MCP Server

  • demo
from mcp.server.fastmcp import FastMCP import subprocess import json import time import os import telnetlib import socket from typing import List, Dict, Optional class ArthasClient: """Arthas客户端封装类""" def __init__(self, telnet_port: int = 3658): self.arthas_boot_path = "arthas-boot.jar" self.telnet_port = telnet_port self.telnet = None self.attached_pid = None self._download_arthas() def _download_arthas(self): """下载Arthas启动器""" if not os.path.exists(self.arthas_boot_path): subprocess.run( ["curl", "-o", self.arthas_boot_path, "https://arthas.aliyun.com/arthas-boot.jar"], check=True ) def _attach_to_process(self, pid: int): """连接到指定的Java进程""" if self.attached_pid == pid and self.telnet and self._check_connection(): return # 如果已经连接到其他进程,先断开 self._disconnect() # 启动Arthas并连接到目标进程 subprocess.Popen( ["java", "-jar", self.arthas_boot_path, "--target-ip", "127.0.0.1", "--telnet-port", str(self.telnet_port), "--arthas-port", str(self.telnet_port + 1), str(pid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # 等待Arthas启动 time.sleep(5) # 建立telnet连接 try: self.telnet = telnetlib.Telnet("127.0.0.1", self.telnet_port, timeout=10) self.attached_pid = pid # 等待提示符 self.telnet.read_until(b"$", timeout=10) except (socket.error, EOFError) as e: print(f"连接Arthas失败: {e}") self._disconnect() raise def _check_connection(self) -> bool: """检查telnet连接是否有效""" if not self.telnet: return False try: self.telnet.write(b"\n") self.telnet.read_until(b"$", timeout=1) return True except (socket.error, EOFError): return False def _disconnect(self): """断开与Arthas的连接""" if self.telnet: try: self.telnet.write(b"stop\n") self.telnet.close() except (socket.error, EOFError): pass finally: self.telnet = None self.attached_pid = None def _execute_command(self, pid: int, command: str) -> str: """执行Arthas命令""" try: self._attach_to_process(pid) # 发送命令 self.telnet.write(command.encode() + b"\n") # 读取响应直到下一个提示符 response = self.telnet.read_until(b"$", timeout=10) # 处理响应 result = response.decode('utf-8') # 移除命令回显和提示符 lines = result.split('\n') return '\n'.join(lines[1:-1]) # 去掉第一行(命令回显)和最后一行(提示符) except (socket.error, EOFError) as e: print(f"执行命令失败: {e}") self._disconnect() raise def __del__(self): """析构函数,确保断开连接""" self._disconnect() def get_thread_info(self, pid: int) -> str: """获取线程信息""" return self._execute_command(pid, "thread") def get_jvm_info(self, pid: int) -> str: """获取JVM信息""" return self._execute_command(pid, "jvm") def get_memory_info(self, pid: int) -> str: """获取内存信息""" return self._execute_command(pid, "memory") def get_stack_trace(self, pid: int, thread_name: str) -> str: """获取线程堆栈""" return self._execute_command(pid, f"thread {thread_name}") def get_class_info(self, pid: int, class_pattern: str) -> str: """获取类信息""" return self._execute_command(pid, f"sc {class_pattern}") def list_java_processes(self) -> str: """列出Java进程""" result = subprocess.run(["jps", "-l", "-v"], capture_output=True, text=True) return result.stdout # 创建MCP服务器实例 MCP_SERVER_NAME = "arthas-jvm-monitor" mcp = FastMCP(MCP_SERVER_NAME) arthas_client = ArthasClient() @mcp.tool() def list_java_processes() -> List[Dict[str, str]]: """列出所有Java进程""" result = subprocess.run(["jps", "-l", "-v"], capture_output=True, text=True) processes = [] for line in result.stdout.splitlines(): if line.strip(): parts = line.split() if len(parts) >= 2: processes.append({ "pid": parts[0], "name": parts[1], "args": " ".join(parts[2:]) if len(parts) > 2 else "" }) return processes @mcp.tool() def get_thread_info(pid: int) -> Dict: """获取指定进程的线程信息""" output = arthas_client._execute_command(pid, "thread") # 解析输出并返回结构化数据 return { "raw_output": output, "timestamp": time.time() } @mcp.tool() def get_jvm_info(pid: int) -> Dict: """获取JVM基础信息""" output = arthas_client._execute_command(pid, "jvm") return { "raw_output": output, "timestamp": time.time() } @mcp.tool() def get_memory_info(pid: int) -> Dict: """获取内存使用情况""" output = arthas_client._execute_command(pid, "memory") return { "raw_output": output, "timestamp": time.time() } @mcp.tool() def get_stack_trace(pid: int, thread_name: str) -> Dict: """获取指定线程的堆栈信息""" command = f"thread {thread_name}" output = arthas_client._execute_command(pid, command) return { "raw_output": output, "timestamp": time.time() } @mcp.tool() def get_class_info(pid: int, class_pattern: str) -> Dict: """获取类信息""" command = f"sc {class_pattern}" output = arthas_client._execute_command(pid, command) return { "raw_output": output, "timestamp": time.time() } @mcp.tool() def get_jvm_status(pid: Optional[int] = None) -> Dict: """获取JVM整体状态报告 Args: pid: 可选的进程ID,如果不指定则自动选择第一个非arthas的Java进程 Returns: 包含JVM状态信息的字典 """ if pid is None: # 如果没有指定PID,获取第一个非arthas的Java进程 processes = list_java_processes() for process in processes: if "arthas" not in process["name"].lower(): pid = int(process["pid"]) break if pid is None: return {"error": "No valid Java process found"} thread_info = get_thread_info(pid) jvm_info = get_jvm_info(pid) memory_info = get_memory_info(pid) return { "pid": pid, "thread_info": thread_info, "jvm_info": jvm_info, "memory_info": memory_info, "timestamp": time.time() } # @mcp.prompt() def jvm_analysis_prompt() -> str: """创建JVM分析提示""" # 获取默认JVM进程的状态 status = get_jvm_status() status_json = json.dumps(status, indent=2) return f"""你是一位经验丰富的Java性能调优专家,请基于以下JVM状态数据进行分析: {status_json} 请考虑以下方面: 1. JVM整体健康状况 2. 内存使用情况和潜在的内存问题 3. 线程状态和可能的死锁 4. 性能优化建议 5. 需要关注的警告信息 请提供详细的分析报告和具体的优化建议。 """ if __name__ == "__main__": print(f"Starting {MCP_SERVER_NAME}...") mcp.run()