"""
Java Decompiler MCP Server
使用 CFR 反编译器对 Java .class 和 .jar 文件进行反编译
"""
import os
import subprocess
import tempfile
import shutil
import urllib.request
from pathlib import Path
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from mcp.server.fastmcp import FastMCP
# CFR 下载地址
CFR_DOWNLOAD_URL = "https://xget.xi-xu.me/gh/leibnitz27/cfr/releases/download/0.152/cfr-0.152.jar"
CFR_JAR_NAME = "cfr-0.152.jar"
mcp = FastMCP("Java Decompiler")
def get_cfr_path() -> Optional[str]:
"""
获取 CFR jar 路径,优先级:
1. 环境变量 CFR_PATH
2. 当前工作目录下的 cfr-*.jar
3. 返回 None(需要下载)
"""
# 检查环境变量
cfr_env = os.environ.get("CFR_PATH")
if cfr_env and os.path.isfile(cfr_env):
return cfr_env
# 检查当前目录
cwd = Path.cwd()
cfr_files = list(cwd.glob("cfr-*.jar"))
if cfr_files:
# 确保文件真实存在且可读
cfr_path = cfr_files[0]
if cfr_path.is_file() and os.access(cfr_path, os.R_OK):
return str(cfr_path)
return None
def download_cfr(target_dir: Optional[str] = None) -> str:
"""下载 CFR 到指定目录或当前目录"""
if target_dir is None:
target_dir = str(Path.cwd())
target_path = os.path.join(target_dir, CFR_JAR_NAME)
if os.path.exists(target_path):
return target_path
print(f"正在下载 CFR 到 {target_path}...")
urllib.request.urlretrieve(CFR_DOWNLOAD_URL, target_path)
print("CFR 下载完成")
return target_path
def ensure_cfr() -> str:
"""确保 CFR 可用,如果不存在则下载"""
cfr_path = get_cfr_path()
if cfr_path:
return cfr_path
return download_cfr()
def run_cfr(cfr_path: str, input_path: str, output_dir: str) -> tuple[bool, str]:
"""运行 CFR 反编译"""
cmd = [
"java", "-jar", cfr_path,
input_path,
"--outputdir", output_dir
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
return True, result.stdout or "反编译成功"
else:
return False, result.stderr or result.stdout or "反编译失败"
except subprocess.TimeoutExpired:
return False, "反编译超时(超过5分钟)"
except FileNotFoundError:
return False, "未找到 Java 运行时,请确保已安装 Java 并添加到 PATH"
except Exception as e:
return False, f"反编译出错: {str(e)}"
def collect_files(path: str, extensions: tuple = (".class", ".jar")) -> list[str]:
"""收集指定路径下的所有目标文件"""
path_obj = Path(path)
files = []
if path_obj.is_file():
if path_obj.suffix.lower() in extensions:
files.append(str(path_obj))
elif path_obj.is_dir():
for ext in extensions:
files.extend(str(f) for f in path_obj.rglob(f"*{ext}"))
return files
@mcp.tool()
def decompile_file(
file_path: str,
output_dir: Optional[str] = None,
save_to_file: bool = True
) -> str:
"""
反编译单个 .class 或 .jar 文件
Args:
file_path: 要反编译的文件路径(.class 或 .jar)
output_dir: 输出目录,默认为文件所在目录下的 decompiled 文件夹
save_to_file: 是否直接保存到文件系统(推荐),默认为 True。设为 False 时会返回反编译内容
Returns:
反编译结果信息或内容
"""
file_path = os.path.abspath(file_path)
if not os.path.isfile(file_path):
return f"错误:文件不存在 - {file_path}"
ext = os.path.splitext(file_path)[1].lower()
if ext not in (".class", ".jar"):
return f"错误:不支持的文件类型 - {ext},仅支持 .class 和 .jar"
# 确定输出目录
if output_dir is None:
output_dir = os.path.join(os.path.dirname(file_path), "decompiled")
output_dir = os.path.abspath(output_dir)
os.makedirs(output_dir, exist_ok=True)
# 获取 CFR
try:
cfr_path = ensure_cfr()
except Exception as e:
return f"错误:无法获取 CFR 反编译器 - {str(e)}"
# 执行反编译
success, message = run_cfr(cfr_path, file_path, output_dir)
if success:
# 统计生成的文件
java_files = list(Path(output_dir).rglob("*.java"))
result = {
"success": True,
"message": "反编译完成",
"file_count": len(java_files),
"output_dir": output_dir,
"source_file": file_path
}
if save_to_file:
return (
f"✅ 反编译成功\n"
f"源文件: {file_path}\n"
f"输出目录: {output_dir}\n"
f"生成文件数: {len(java_files)}\n"
f"提示: 反编译结果已保存到文件系统"
)
else:
# 返回反编译内容(仅用于小文件)
content_parts = [f"✅ 反编译成功\n源文件: {file_path}\n输出目录: {output_dir}\n"]
for java_file in java_files[:10]: # 最多返回前10个文件
try:
with open(java_file, 'r', encoding='utf-8') as f:
content = f.read()
content_parts.append(f"\n{'='*60}\n文件: {java_file.name}\n{'='*60}\n{content}")
except Exception as e:
content_parts.append(f"\n读取文件失败 {java_file.name}: {str(e)}")
if len(java_files) > 10:
content_parts.append(f"\n\n⚠️ 共 {len(java_files)} 个文件,仅显示前 10 个。建议使用 save_to_file=True 直接保存到文件系统。")
return "\n".join(content_parts)
else:
return f"❌ 反编译失败\n文件: {file_path}\n错误: {message}"
@mcp.tool()
def decompile_files(
file_paths: list[str],
output_dir: Optional[str] = None,
save_to_file: bool = True,
show_progress: bool = True,
max_workers: int = 4
) -> str:
"""
反编译多个 .class 或 .jar 文件(支持多线程)
Args:
file_paths: 要反编译的文件路径列表
output_dir: 输出目录,默认为当前目录下的 decompiled 文件夹
save_to_file: 是否直接保存到文件系统(推荐),默认为 True
show_progress: 是否显示详细进度信息,默认为 True
max_workers: 最大并发线程数,默认为 4(设为 1 则单线程处理)
Returns:
反编译结果信息
"""
if not file_paths:
return "错误:未提供任何文件"
# 确定输出目录
if output_dir is None:
output_dir = os.path.join(os.getcwd(), "decompiled")
output_dir = os.path.abspath(output_dir)
os.makedirs(output_dir, exist_ok=True)
# 获取 CFR
try:
cfr_path = ensure_cfr()
except Exception as e:
return f"错误:无法获取 CFR 反编译器 - {str(e)}"
results = []
success_count = 0
fail_count = 0
skip_count = 0
total = len(file_paths)
# 预处理:过滤有效文件
valid_files = []
for file_path in file_paths:
file_path = os.path.abspath(file_path)
if not os.path.isfile(file_path):
if show_progress:
results.append(f"❌ 跳过(文件不存在): {file_path}")
skip_count += 1
continue
ext = os.path.splitext(file_path)[1].lower()
if ext not in (".class", ".jar"):
if show_progress:
results.append(f"❌ 跳过(不支持的类型): {file_path}")
skip_count += 1
continue
valid_files.append(file_path)
# 单线程处理
if max_workers == 1:
for idx, file_path in enumerate(valid_files, 1):
if show_progress:
progress_msg = f"[{idx}/{len(valid_files)}] 处理中: {os.path.basename(file_path)}"
print(progress_msg, flush=True)
success, message = run_cfr(cfr_path, file_path, output_dir)
if success:
if show_progress:
results.append(f"✅ [{idx}/{len(valid_files)}] 成功: {os.path.basename(file_path)}")
success_count += 1
else:
results.append(f"❌ [{idx}/{len(valid_files)}] 失败: {os.path.basename(file_path)} - {message}")
fail_count += 1
# 多线程处理
else:
def process_file(file_path: str, idx: int) -> tuple[int, bool, str, str]:
"""处理单个文件,返回 (索引, 成功标志, 文件名, 消息)"""
success, message = run_cfr(cfr_path, file_path, output_dir)
return idx, success, os.path.basename(file_path), message
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_file = {
executor.submit(process_file, file_path, idx): (idx, file_path)
for idx, file_path in enumerate(valid_files, 1)
}
# 按完成顺序处理结果
for future in as_completed(future_to_file):
idx, file_path = future_to_file[future]
try:
task_idx, success, filename, message = future.result()
if show_progress:
progress_msg = f"[{task_idx}/{len(valid_files)}] 完成: {filename}"
print(progress_msg, flush=True)
if success:
if show_progress:
results.append(f"✅ [{task_idx}/{len(valid_files)}] 成功: {filename}")
success_count += 1
else:
results.append(f"❌ [{task_idx}/{len(valid_files)}] 失败: {filename} - {message}")
fail_count += 1
except Exception as e:
results.append(f"❌ [{idx}/{len(valid_files)}] 异常: {os.path.basename(file_path)} - {str(e)}")
fail_count += 1
# 统计生成的 Java 文件
java_files = list(Path(output_dir).rglob("*.java"))
summary = [
"\n" + "="*60,
"📊 反编译完成统计",
"="*60,
f"✅ 成功: {success_count}",
f"❌ 失败: {fail_count}",
f"⏭️ 跳过: {skip_count}",
f"📁 总计: {total} 个文件",
f"📄 生成: {len(java_files)} 个 .java 文件",
f"📂 输出目录: {output_dir}",
f"🔧 并发线程: {max_workers}",
"="*60
]
if save_to_file:
summary.append("💾 反编译结果已保存到文件系统")
# 如果显示进度,包含详细结果
if show_progress and results:
return "\n".join(results) + "\n" + "\n".join(summary)
else:
return "\n".join(summary)
@mcp.tool()
def decompile_directory(
directory_path: str,
output_dir: Optional[str] = None,
recursive: bool = True,
save_to_file: bool = True,
show_progress: bool = True,
max_workers: int = 4
) -> str:
"""
反编译指定目录下的所有 .class 和 .jar 文件(支持多线程)
Args:
directory_path: 要扫描的目录路径
output_dir: 输出目录,默认为目标目录下的 decompiled 文件夹
recursive: 是否递归扫描子目录,默认为 True
save_to_file: 是否直接保存到文件系统(推荐),默认为 True
show_progress: 是否显示详细进度信息,默认为 True
max_workers: 最大并发线程数,默认为 4(设为 1 则单线程处理)
Returns:
反编译结果信息
"""
directory_path = os.path.abspath(directory_path)
if not os.path.isdir(directory_path):
return f"错误:目录不存在 - {directory_path}"
# 收集文件
files = collect_files(directory_path)
if not files:
return f"未在目录中找到 .class 或 .jar 文件: {directory_path}"
# 确定输出目录
if output_dir is None:
output_dir = os.path.join(directory_path, "decompiled")
# 显示扫描结果
scan_info = [
f"� 扫描目录: {directory_path}",
f"🔍 找到 {len(files)} 个文件待反编译",
f"�📤 输出目录: {output_dir}",
f"🔧 并发线程: {max_workers}",
""
]
result = decompile_files(files, output_dir, save_to_file, show_progress, max_workers)
return "\n".join(scan_info) + result
@mcp.tool()
def download_cfr_tool(target_dir: Optional[str] = None) -> str:
"""
下载 CFR 反编译器到指定目录
Args:
target_dir: 下载目标目录,默认为当前工作目录
Returns:
下载结果信息
"""
try:
if target_dir:
target_dir = os.path.abspath(target_dir)
os.makedirs(target_dir, exist_ok=True)
cfr_path = download_cfr(target_dir)
return f"✅ CFR 下载成功\n路径: {cfr_path}"
except Exception as e:
return f"❌ CFR 下载失败: {str(e)}"
@mcp.tool()
def check_cfr_status() -> str:
"""
检查 CFR 反编译器状态
Returns:
CFR 状态信息
"""
cfr_path = get_cfr_path()
if cfr_path:
# 检查文件是否真实存在
if os.path.isfile(cfr_path):
file_size = os.path.getsize(cfr_path) / 1024 / 1024 # MB
return (
f"✅ CFR 已就绪\n"
f"路径: {cfr_path}\n"
f"大小: {file_size:.2f} MB"
)
else:
return (
f"❌ CFR 路径配置错误\n"
f"配置的路径: {cfr_path}\n"
f"文件不存在,请检查路径或重新下载"
)
else:
return (
"⚠️ CFR 未找到\n"
"当前状态: 未配置 CFR 反编译器\n\n"
"配置方法:\n"
"在 MCP 配置文件中添加环境变量,指定 CFR jar 文件路径:\n\n"
'{\n'
' "mcpServers": {\n'
' "java-decompiler": {\n'
' "command": "uvx",\n'
' "args": ["java-decompile-mcp"],\n'
' "env": {\n'
' "CFR_PATH": "/你的路径/cfr-0.152.jar"\n'
' },\n'
' "disabled": false\n'
' }\n'
' }\n'
'}\n\n'
"或者:\n"
f"1. 将 cfr-*.jar 放在当前工作目录: {os.getcwd()}\n"
"2. 调用 download_cfr_tool 自动下载到当前目录"
)
@mcp.tool()
def get_java_version() -> str:
"""
获取当前系统的 Java 版本信息
Returns:
Java 版本信息
"""
try:
result = subprocess.run(
["java", "-version"],
capture_output=True,
text=True,
timeout=10
)
# java -version 输出到 stderr
output = result.stderr or result.stdout
return f"✅ Java 环境\n{output}"
except FileNotFoundError:
return "❌ 未找到 Java 运行时,请安装 Java 并添加到 PATH"
except Exception as e:
return f"❌ 检查 Java 版本失败: {str(e)}"
if __name__ == "__main__":
mcp.run()