main.py•20.5 kB
"""
MCP SSH工具 - 直接配置版本 (修复版)
"""
from mcp.server.fastmcp import FastMCP
import paramiko
import os
from typing import Dict, Any, Optional
import logging
from dataclasses import dataclass
import stat
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 创建MCP服务器实例
mcp = FastMCP("MCP-SSH-Server",host="0.0.0.0")
@dataclass
class ServerConfig:
"""服务器配置"""
ssh_ip: str
ssh_user: str
ssh_password: str
ssh_port: int = 22
name: str = ""
class MCPManager:
"""MCP服务器管理器"""
def __init__(self):
self.server_configs: Dict[str, ServerConfig] = {}
self.active_connections: Dict[str, paramiko.SSHClient] = {}
self.load_config()
def load_config(self):
"""加载配置 - 从环境变量或代码中直接配置"""
try:
# 方式1: 从环境变量加载单个服务器配置
ssh_ip = os.getenv("SSH_IP")
ssh_user = os.getenv("SSH_USER")
ssh_password = os.getenv("SSH_PASSWORD")
ssh_port = int(os.getenv("SSH_PORT", "22"))
if ssh_ip and ssh_user and ssh_password:
self.server_configs["default"] = ServerConfig(
name="default",
ssh_ip=ssh_ip,
ssh_user=ssh_user,
ssh_password=ssh_password,
ssh_port=ssh_port
)
logger.info(f"从环境变量加载服务器配置: {ssh_ip}:{ssh_port}")
# 方式2: 直接在代码中配置多个服务器(可根据需要修改)
# self.add_server("demo", "192.168.1.100", "username", "password", 22)
# self.add_server("prod", "10.0.0.1", "admin", "secret", 22)
logger.info(f"已加载 {len(self.server_configs)} 个服务器配置")
except Exception as e:
logger.error(f"加载配置失败: {str(e)}")
def add_server(self, name: str, ip: str, user: str, password: str, port: int = 22):
"""添加服务器配置"""
self.server_configs[name] = ServerConfig(
name=name,
ssh_ip=ip,
ssh_user=user,
ssh_password=password,
ssh_port=port
)
logger.info(f"添加服务器配置: {name} ({ip}:{port})")
def get_connection(self, server_name: str) -> Optional[paramiko.SSHClient]:
"""获取SSH连接"""
if server_name not in self.server_configs:
logger.error(f"未找到服务器配置: {server_name}")
return None
if server_name in self.active_connections:
# 检查连接是否还活跃
try:
transport = self.active_connections[server_name].get_transport()
if transport and transport.is_active():
return self.active_connections[server_name]
else:
# 连接已断开,移除并重新连接
del self.active_connections[server_name]
except:
del self.active_connections[server_name]
config = self.server_configs[server_name]
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=config.ssh_ip,
port=config.ssh_port,
username=config.ssh_user,
password=config.ssh_password,
timeout=10
)
self.active_connections[server_name] = client
logger.info(f"SSH连接成功: {server_name} ({config.ssh_ip}:{config.ssh_port})")
return client
except Exception as e:
logger.error(f"SSH连接失败[{server_name}]: {str(e)}")
return None
def close_all_connections(self):
"""关闭所有连接"""
for server_name, client in self.active_connections.items():
try:
client.close()
logger.info(f"关闭连接: {server_name}")
except:
pass
self.active_connections.clear()
# 全局管理器
mcp_manager = MCPManager()
# =================== 辅助函数 ===================
def download_directory_recursive(sftp, remote_path: str, local_path: str) -> Dict[str, Any]:
"""递归下载目录"""
try:
# 创建本地目录
os.makedirs(local_path, exist_ok=True)
# 获取远程目录内容
files = sftp.listdir_attr(remote_path)
downloaded_files = []
failed_files = []
for file_attr in files:
remote_file_path = f"{remote_path}/{file_attr.filename}"
local_file_path = os.path.join(local_path, file_attr.filename)
try:
if stat.S_ISDIR(file_attr.st_mode):
# 如果是目录,递归下载
result = download_directory_recursive(sftp, remote_file_path, local_file_path)
downloaded_files.extend(result.get("downloaded_files", []))
failed_files.extend(result.get("failed_files", []))
else:
# 如果是文件,直接下载
sftp.get(remote_file_path, local_file_path)
downloaded_files.append({
"remote": remote_file_path,
"local": local_file_path,
"size": file_attr.st_size
})
logger.info(f"已下载文件: {remote_file_path} -> {local_file_path}")
except Exception as e:
failed_files.append({
"remote": remote_file_path,
"local": local_file_path,
"error": str(e)
})
logger.error(f"下载失败: {remote_file_path} - {str(e)}")
return {
"success": True,
"downloaded_files": downloaded_files,
"failed_files": failed_files
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def upload_directory_recursive(sftp, local_path: str, remote_path: str) -> Dict[str, Any]:
"""递归上传目录"""
try:
# 创建远程目录
try:
sftp.mkdir(remote_path)
except IOError:
# 目录可能已存在,忽略错误
pass
uploaded_files = []
failed_files = []
for root, dirs, files in os.walk(local_path):
# 计算相对路径
rel_path = os.path.relpath(root, local_path)
if rel_path == '.':
remote_root = remote_path
else:
remote_root = f"{remote_path}/{rel_path.replace(os.sep, '/')}"
# 创建远程目录
for dir_name in dirs:
remote_dir = f"{remote_root}/{dir_name}"
try:
sftp.mkdir(remote_dir)
except IOError:
# 目录可能已存在,忽略错误
pass
# 上传文件
for file_name in files:
local_file_path = os.path.join(root, file_name)
remote_file_path = f"{remote_root}/{file_name}"
try:
sftp.put(local_file_path, remote_file_path)
uploaded_files.append({
"local": local_file_path,
"remote": remote_file_path,
"size": os.path.getsize(local_file_path)
})
logger.info(f"已上传文件: {local_file_path} -> {remote_file_path}")
except Exception as e:
failed_files.append({
"local": local_file_path,
"remote": remote_file_path,
"error": str(e)
})
logger.error(f"上传失败: {local_file_path} - {str(e)}")
return {
"success": True,
"uploaded_files": uploaded_files,
"failed_files": failed_files
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# =================== MCP工具 ===================
@mcp.tool()
def execute(server_name: str, command: str) -> Dict[str, Any]:
"""
执行远程命令
参数:
- server_name: 服务器名称 (如果只有一个服务器配置,可以使用 'default')
- command: 要执行的命令
"""
client = mcp_manager.get_connection(server_name)
if not client:
return {
"success": False,
"error": "SSH连接失败",
"server": server_name
}
try:
stdin, stdout, stderr = client.exec_command(command)
exit_code = stdout.channel.recv_exit_status()
stdout_content = stdout.read().decode('utf-8', 'replace')
stderr_content = stderr.read().decode('utf-8', 'replace')
return {
"success": True,
"server": server_name,
"ip": mcp_manager.server_configs[server_name].ssh_ip,
"exit_code": exit_code,
"stdout": stdout_content,
"stderr": stderr_content
}
except Exception as e:
return {
"success": False,
"error": str(e),
"server": server_name
}
@mcp.tool()
def download_file(server_name: str, remote_path: str, local_src: str) -> Dict[str, Any]:
"""
下载远程服务器的文件或者目录,保存到指定目录下,如果是目录,则递归下载
参数:
- server_name: 服务器名称
- remote_path: 远程文件/目录路径
- local_src: 本地文件/目录路径
"""
client = mcp_manager.get_connection(server_name)
if not client:
return {
"success": False,
"error": "SSH连接失败",
"server": server_name
}
try:
sftp = client.open_sftp()
# 检查远程路径是否存在
try:
file_attr = sftp.stat(remote_path)
except IOError:
sftp.close()
return {
"success": False,
"error": f"远程路径不存在: {remote_path}",
"server": server_name
}
# 判断是文件还是目录
if stat.S_ISDIR(file_attr.st_mode):
# 是目录,递归下载
result = download_directory_recursive(sftp, remote_path, local_src)
sftp.close()
if result["success"]:
return {
"success": True,
"server": server_name,
"ip": mcp_manager.server_configs[server_name].ssh_ip,
"type": "directory",
"message": f"目录下载完成: {remote_path} -> {local_src}",
"downloaded_files": result["downloaded_files"],
"failed_files": result["failed_files"],
"summary": {
"total_downloaded": len(result["downloaded_files"]),
"total_failed": len(result["failed_files"])
}
}
else:
return {
"success": False,
"error": result["error"],
"server": server_name
}
else:
# 是文件,直接下载
# 如果本地路径是目录,则在该目录下创建同名文件
if os.path.isdir(local_src):
local_file_path = os.path.join(local_src, os.path.basename(remote_path))
else:
local_file_path = local_src
# 创建本地目录
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
sftp.get(remote_path, local_file_path)
sftp.close()
return {
"success": True,
"server": server_name,
"ip": mcp_manager.server_configs[server_name].ssh_ip,
"type": "file",
"message": f"文件下载成功: {remote_path} -> {local_file_path}",
"remote_path": remote_path,
"local_path": local_file_path,
"size": file_attr.st_size
}
except Exception as e:
return {
"success": False,
"error": str(e),
"server": server_name
}
@mcp.tool()
def upload_file(server_name: str, remote_path: str, local_src: str) -> Dict[str, Any]:
"""
将本地的文件或者目录上传到远程服务器
参数:
- server_name: 服务器名称
- remote_path: 远程文件/目录路径
- local_src: 本地文件/目录路径
"""
client = mcp_manager.get_connection(server_name)
if not client:
return {
"success": False,
"error": "SSH连接失败",
"server": server_name
}
try:
# 检查本地路径是否存在
if not os.path.exists(local_src):
return {
"success": False,
"error": f"本地路径不存在: {local_src}",
"server": server_name
}
sftp = client.open_sftp()
if os.path.isdir(local_src):
# 是目录,递归上传
result = upload_directory_recursive(sftp, local_src, remote_path)
sftp.close()
if result["success"]:
return {
"success": True,
"server": server_name,
"ip": mcp_manager.server_configs[server_name].ssh_ip,
"type": "directory",
"message": f"目录上传完成: {local_src} -> {remote_path}",
"uploaded_files": result["uploaded_files"],
"failed_files": result["failed_files"],
"summary": {
"total_uploaded": len(result["uploaded_files"]),
"total_failed": len(result["failed_files"])
}
}
else:
return {
"success": False,
"error": result["error"],
"server": server_name
}
else:
# 是文件,直接上传
# 创建远程目录
remote_dir = os.path.dirname(remote_path)
if remote_dir:
try:
sftp.mkdir(remote_dir)
except IOError:
# 目录可能已存在,忽略错误
pass
sftp.put(local_src, remote_path)
sftp.close()
return {
"success": True,
"server": server_name,
"ip": mcp_manager.server_configs[server_name].ssh_ip,
"type": "file",
"message": f"文件上传成功: {local_src} -> {remote_path}",
"local_path": local_src,
"remote_path": remote_path,
"size": os.path.getsize(local_src)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"server": server_name
}
@mcp.tool()
def list_directory(server_name: str, path: str = "/") -> Dict[str, Any]:
"""
列出远程目录内容
参数:
- server_name: 服务器名称
- path: 目录路径,默认为根目录
"""
return execute(server_name, f"ls -la {path}")
@mcp.tool()
def list_servers() -> Dict[str, Any]:
"""列出所有配置的服务器"""
servers = []
for name, config in mcp_manager.server_configs.items():
# 检查连接状态
is_connected = False
if name in mcp_manager.active_connections:
try:
transport = mcp_manager.active_connections[name].get_transport()
is_connected = transport and transport.is_active()
except:
is_connected = False
servers.append({
"name": name,
"ip": config.ssh_ip,
"user": config.ssh_user,
"port": config.ssh_port,
"connected": is_connected
})
return {
"servers": servers,
"total_servers": len(servers)
}
@mcp.tool()
def test_connection(server_name: str) -> Dict[str, Any]:
"""
测试服务器连接
参数:
- server_name: 服务器名称
"""
if server_name not in mcp_manager.server_configs:
return {
"success": False,
"error": f"未找到服务器配置: {server_name}"
}
config = mcp_manager.server_configs[server_name]
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=config.ssh_ip,
port=config.ssh_port,
username=config.ssh_user,
password=config.ssh_password,
timeout=10
)
client.close()
return {
"success": True,
"server": server_name,
"ip": config.ssh_ip,
"port": config.ssh_port,
"message": "连接测试成功"
}
except Exception as e:
return {
"success": False,
"server": server_name,
"ip": config.ssh_ip,
"port": config.ssh_port,
"error": str(e)
}
@mcp.tool()
def add_server_config(name: str, ip: str, user: str, password: str, port: int = 22) -> Dict[str, Any]:
"""
动态添加服务器配置
参数:
- name: 服务器名称
- ip: 服务器IP地址
- user: SSH用户名
- password: SSH密码
- port: SSH端口,默认22
"""
try:
mcp_manager.add_server(name, ip, user, password, port)
return {
"success": True,
"message": f"服务器配置添加成功: {name}",
"server": {
"name": name,
"ip": ip,
"user": user,
"port": port
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# =================== 主程序 ===================
if __name__ == "__main__":
try:
print(f"""
MCP SSH服务器已启动 (修复版)
配置方式:
1. 环境变量: SSH_IP, SSH_USER, SSH_PASSWORD, SSH_PORT
2. 代码中直接配置: 修改 load_config() 方法
3. 运行时动态添加: 使用 add_server_config 工具
已加载服务器: {list(mcp_manager.server_configs.keys())}
可用工具:
- execute: 执行远程命令
- download_file: 下载远程文件/目录 (支持递归)
- upload_file: 上传本地文件/目录 (支持递归)
- list_directory: 列出目录内容
- list_servers: 列出所有服务器
- test_connection: 测试连接
- add_server_config: 动态添加服务器配置
修复内容:
- 支持目录的递归下载和上传
- 改进错误处理和日志记录
- 添加详细的操作反馈
使用示例:
{{
"tool": "download_file",
"params": {{
"server_name": "server2",
"remote_path": "/etc/yum.repos.d",
"local_src": "f:\\项目仓库\\产品\\mcp-server\\yum.repos.d"
}}
}}
""")
# 运行MCP服务器
mcp.run(transport="stdio")
except KeyboardInterrupt:
print("\n正在关闭服务器...")
mcp_manager.close_all_connections()
print("服务器已关闭")
except Exception as e:
print(f"服务器启动失败: {str(e)}")
logger.error(f"服务器启动失败: {str(e)}")
finally:
mcp_manager.close_all_connections()