Skip to main content
Glama
jmap.py8.7 kB
"""Jmap命令实现""" import os import subprocess import re from enum import Enum from typing import Dict, Any, List, Optional from ..base import BaseCommand, CommandResult, OutputFormatter class JmapOperation(Enum): """Jmap操作类型""" HEAP = "heap" # 堆内存概要 HISTO = "histo" # 堆内存直方图 DUMP = "dump" # 堆内存转储 class JmapCommand(BaseCommand): """Jmap命令实现""" def __init__(self, executor, formatter): super().__init__(executor, formatter) self.timeout = 60 # 设置默认超时时间为60秒 self._jdk_version = None # 缓存 JDK 版本 def _get_jdk_version(self) -> int: """获取 JDK 主版本号""" if self._jdk_version is not None: return self._jdk_version try: # 检查是否为远程执行器 from ..base import NativeCommandExecutor if isinstance(self.executor, NativeCommandExecutor) and self.executor.ssh_host: # 远程执行 java -version result = self.executor.run('java -version', timeout=10) version_output = result.error if result.error else result.output else: # 本地执行 result = subprocess.run(['java', '-version'], capture_output=True, text=True, timeout=10) version_output = result.stderr # java -version 输出到 stderr # 解析版本号,支持多种格式 # 格式1: "openjdk version "11.0.12" 2021-07-20" # 格式2: "java version "1.8.0_291"" # 格式3: "openjdk version "17.0.15" 2025-04-15 LTS" version_patterns = [ r'version "1\.(\d+)', # 匹配 "1.8.0_291",优先放前面 r'version "(\d+)', # 匹配 "11.0.12" 或 "17.0.15" ] for pattern in version_patterns: version_match = re.search(pattern, version_output) if version_match: version_str = version_match.group(1) if pattern == r'version "1\.(\d+)': # 对于 "1.8" 格式,返回 8 self._jdk_version = int(version_str) else: # 对于 "11" 或 "17" 格式,直接返回 self._jdk_version = int(version_str) break else: # 如果无法解析,假设是低版本 self._jdk_version = 8 except Exception as e: # 如果无法获取版本,假设是低版本 self._jdk_version = 8 return self._jdk_version def _is_modern_jdk(self) -> bool: """判断是否为现代 JDK (9+)""" return self._get_jdk_version() >= 9 def _test_jhsdb_availability(self) -> bool: """测试 jhsdb 命令是否可用""" try: from ..base import NativeCommandExecutor if isinstance(self.executor, NativeCommandExecutor) and self.executor.ssh_host: # 远程测试 result = self.executor.run('jhsdb --help', timeout=5) return result.success else: # 本地测试 result = subprocess.run(['jhsdb', '--help'], capture_output=True, text=True, timeout=5) return result.returncode == 0 except Exception: return False def get_command(self, pid: str, operation: JmapOperation = JmapOperation.HEAP, dump_file: Optional[str] = None, live_only: bool = False, *args, **kwargs) -> str: """获取jmap命令 Args: pid: 进程ID operation: 操作类型 dump_file: 转储文件路径(仅在dump操作时需要) live_only: 是否只统计存活对象 Returns: str: jmap命令字符串 """ # 验证 pid 参数 if not pid or not pid.strip(): raise ValueError("Process ID is required") try: int(pid) # 验证 pid 是否为有效数字 except ValueError: raise ValueError(f"Invalid process ID: {pid}") if operation == JmapOperation.HEAP: # 对于现代 JDK,优先尝试 jhsdb,如果不可用则回退到传统 jmap if self._is_modern_jdk() and self._test_jhsdb_availability(): return f'jhsdb jmap --heap --pid {pid}' else: return f'jmap -heap {pid}' elif operation == JmapOperation.HISTO: live_flag = " -live" if live_only else "" return f'jmap -histo{live_flag} {pid}' elif operation == JmapOperation.DUMP: if not dump_file: raise ValueError("dump_file is required for dump operation") live_flag = ":live" if live_only else "" return f'jmap -dump:format=b{live_flag},file={dump_file} {pid}' else: raise ValueError(f"Unsupported operation: {operation}") class JmapHeapFormatter(OutputFormatter): """Jmap堆内存概要格式化器(仅文本输出)""" def format(self, result: CommandResult) -> Dict[str, Any]: if not result.success: return { "success": False, "error": result.error, "timestamp": result.timestamp.isoformat() } return { "success": True, "output": result.output, "execution_time": result.execution_time, "timestamp": result.timestamp.isoformat() } class JmapHistoFormatter(OutputFormatter): """Jmap堆内存直方图格式化器""" def format(self, result: CommandResult) -> Dict[str, Any]: """格式化堆内存直方图输出 Args: result: 命令执行结果 Returns: Dict[str, Any]: 格式化后的结果 """ if not result.success: return { "success": False, "error": result.error, "timestamp": result.timestamp.isoformat() } histogram: List[Dict[str, Any]] = [] total = {"instances": 0, "bytes": 0} for line in result.output.splitlines(): line = line.strip() if not line or line.startswith('Total') or line.startswith('Num'): continue # 解析直方图行 # 格式:序号 实例数 字节数 类名 parts = line.split() if len(parts) >= 4: try: instances = int(parts[1]) bytes_used = int(parts[2]) class_name = ' '.join(parts[3:]) histogram.append({ "instances": instances, "bytes": bytes_used, "class_name": class_name }) total["instances"] += instances total["bytes"] += bytes_used except (ValueError, IndexError): continue return { "success": True, "histogram": histogram, "total": total, "execution_time": result.execution_time, "timestamp": result.timestamp.isoformat() } class JmapDumpFormatter(OutputFormatter): """Jmap堆内存转储格式化器""" def format(self, result: CommandResult) -> Dict[str, Any]: """格式化堆内存转储输出 Args: result: 命令执行结果 Returns: Dict[str, Any]: 格式化后的结果 """ if not result.success: return { "success": False, "error": result.error, "timestamp": result.timestamp.isoformat() } # 检查转储文件是否成功创建 dump_file = None for line in result.output.splitlines(): if "Dumping heap to" in line: dump_file = line.split("Dumping heap to")[-1].strip().split()[0] # 获取第一个词作为文件路径 break return { "success": True, "dump_file": dump_file, "file_size": os.path.getsize(dump_file) if dump_file and os.path.exists(dump_file) else None, "execution_time": result.execution_time, "timestamp": result.timestamp.isoformat() }

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xzq-xu/jvm-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server