We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/xzq-xu/jvm-mcp-server'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from mcp.server.fastmcp import FastMCP
import subprocess
import json
import time
import os
import telnetlib
import socket
from typing import List, Dict, Optional
class ArthasClient:
"""Arthas客户端封装类"""
def __init__(self, telnet_port: int = 3658):
self.arthas_boot_path = "arthas-boot.jar"
self.telnet_port = telnet_port
self.telnet = None
self.attached_pid = None
self._download_arthas()
def _download_arthas(self):
"""下载Arthas启动器"""
if not os.path.exists(self.arthas_boot_path):
subprocess.run(
["curl", "-o", self.arthas_boot_path, "https://arthas.aliyun.com/arthas-boot.jar"],
check=True
)
def _attach_to_process(self, pid: int):
"""连接到指定的Java进程"""
if self.attached_pid == pid and self.telnet and self._check_connection():
return
# 如果已经连接到其他进程,先断开
self._disconnect()
# 启动Arthas并连接到目标进程
subprocess.Popen(
["java", "-jar", self.arthas_boot_path,
"--target-ip", "127.0.0.1",
"--telnet-port", str(self.telnet_port),
"--arthas-port", str(self.telnet_port + 1),
str(pid)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 等待Arthas启动
time.sleep(5)
# 建立telnet连接
try:
self.telnet = telnetlib.Telnet("127.0.0.1", self.telnet_port, timeout=10)
self.attached_pid = pid
# 等待提示符
self.telnet.read_until(b"$", timeout=10)
except (socket.error, EOFError) as e:
print(f"连接Arthas失败: {e}")
self._disconnect()
raise
def _check_connection(self) -> bool:
"""检查telnet连接是否有效"""
if not self.telnet:
return False
try:
self.telnet.write(b"\n")
self.telnet.read_until(b"$", timeout=1)
return True
except (socket.error, EOFError):
return False
def _disconnect(self):
"""断开与Arthas的连接"""
if self.telnet:
try:
self.telnet.write(b"stop\n")
self.telnet.close()
except (socket.error, EOFError):
pass
finally:
self.telnet = None
self.attached_pid = None
def _execute_command(self, pid: int, command: str) -> str:
"""执行Arthas命令"""
try:
self._attach_to_process(pid)
# 发送命令
self.telnet.write(command.encode() + b"\n")
# 读取响应直到下一个提示符
response = self.telnet.read_until(b"$", timeout=10)
# 处理响应
result = response.decode('utf-8')
# 移除命令回显和提示符
lines = result.split('\n')
return '\n'.join(lines[1:-1]) # 去掉第一行(命令回显)和最后一行(提示符)
except (socket.error, EOFError) as e:
print(f"执行命令失败: {e}")
self._disconnect()
raise
def __del__(self):
"""析构函数,确保断开连接"""
self._disconnect()
def get_thread_info(self, pid: int) -> str:
"""获取线程信息"""
return self._execute_command(pid, "thread")
def get_jvm_info(self, pid: int) -> str:
"""获取JVM信息"""
return self._execute_command(pid, "jvm")
def get_memory_info(self, pid: int) -> str:
"""获取内存信息"""
return self._execute_command(pid, "memory")
def get_stack_trace(self, pid: int, thread_name: str) -> str:
"""获取线程堆栈"""
return self._execute_command(pid, f"thread {thread_name}")
def get_class_info(self, pid: int, class_pattern: str) -> str:
"""获取类信息"""
return self._execute_command(pid, f"sc {class_pattern}")
def list_java_processes(self) -> str:
"""列出Java进程"""
result = subprocess.run(["jps", "-l", "-v"], capture_output=True, text=True)
return result.stdout
# 创建MCP服务器实例
MCP_SERVER_NAME = "arthas-jvm-monitor"
mcp = FastMCP(MCP_SERVER_NAME)
arthas_client = ArthasClient()
@mcp.tool()
def list_java_processes() -> List[Dict[str, str]]:
"""列出所有Java进程"""
result = subprocess.run(["jps", "-l", "-v"], capture_output=True, text=True)
processes = []
for line in result.stdout.splitlines():
if line.strip():
parts = line.split()
if len(parts) >= 2:
processes.append({
"pid": parts[0],
"name": parts[1],
"args": " ".join(parts[2:]) if len(parts) > 2 else ""
})
return processes
@mcp.tool()
def get_thread_info(pid: int) -> Dict:
"""获取指定进程的线程信息"""
output = arthas_client._execute_command(pid, "thread")
# 解析输出并返回结构化数据
return {
"raw_output": output,
"timestamp": time.time()
}
@mcp.tool()
def get_jvm_info(pid: int) -> Dict:
"""获取JVM基础信息"""
output = arthas_client._execute_command(pid, "jvm")
return {
"raw_output": output,
"timestamp": time.time()
}
@mcp.tool()
def get_memory_info(pid: int) -> Dict:
"""获取内存使用情况"""
output = arthas_client._execute_command(pid, "memory")
return {
"raw_output": output,
"timestamp": time.time()
}
@mcp.tool()
def get_stack_trace(pid: int, thread_name: str) -> Dict:
"""获取指定线程的堆栈信息"""
command = f"thread {thread_name}"
output = arthas_client._execute_command(pid, command)
return {
"raw_output": output,
"timestamp": time.time()
}
@mcp.tool()
def get_class_info(pid: int, class_pattern: str) -> Dict:
"""获取类信息"""
command = f"sc {class_pattern}"
output = arthas_client._execute_command(pid, command)
return {
"raw_output": output,
"timestamp": time.time()
}
@mcp.tool()
def get_jvm_status(pid: Optional[int] = None) -> Dict:
"""获取JVM整体状态报告
Args:
pid: 可选的进程ID,如果不指定则自动选择第一个非arthas的Java进程
Returns:
包含JVM状态信息的字典
"""
if pid is None:
# 如果没有指定PID,获取第一个非arthas的Java进程
processes = list_java_processes()
for process in processes:
if "arthas" not in process["name"].lower():
pid = int(process["pid"])
break
if pid is None:
return {"error": "No valid Java process found"}
thread_info = get_thread_info(pid)
jvm_info = get_jvm_info(pid)
memory_info = get_memory_info(pid)
return {
"pid": pid,
"thread_info": thread_info,
"jvm_info": jvm_info,
"memory_info": memory_info,
"timestamp": time.time()
}
# @mcp.prompt()
def jvm_analysis_prompt() -> str:
"""创建JVM分析提示"""
# 获取默认JVM进程的状态
status = get_jvm_status()
status_json = json.dumps(status, indent=2)
return f"""你是一位经验丰富的Java性能调优专家,请基于以下JVM状态数据进行分析:
{status_json}
请考虑以下方面:
1. JVM整体健康状况
2. 内存使用情况和潜在的内存问题
3. 线程状态和可能的死锁
4. 性能优化建议
5. 需要关注的警告信息
请提供详细的分析报告和具体的优化建议。
"""
if __name__ == "__main__":
print(f"Starting {MCP_SERVER_NAME}...")
mcp.run()