Skip to main content
Glama

MCP SSH Tools Server

by nwnusun-cool
main.py20.5 kB
""" MCP SSH工具 - 直接配置版本 (修复版) """ from mcp.server.fastmcp import FastMCP import paramiko import os from typing import Dict, Any, Optional import logging from dataclasses import dataclass import stat # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 创建MCP服务器实例 mcp = FastMCP("MCP-SSH-Server",host="0.0.0.0") @dataclass class ServerConfig: """服务器配置""" ssh_ip: str ssh_user: str ssh_password: str ssh_port: int = 22 name: str = "" class MCPManager: """MCP服务器管理器""" def __init__(self): self.server_configs: Dict[str, ServerConfig] = {} self.active_connections: Dict[str, paramiko.SSHClient] = {} self.load_config() def load_config(self): """加载配置 - 从环境变量或代码中直接配置""" try: # 方式1: 从环境变量加载单个服务器配置 ssh_ip = os.getenv("SSH_IP") ssh_user = os.getenv("SSH_USER") ssh_password = os.getenv("SSH_PASSWORD") ssh_port = int(os.getenv("SSH_PORT", "22")) if ssh_ip and ssh_user and ssh_password: self.server_configs["default"] = ServerConfig( name="default", ssh_ip=ssh_ip, ssh_user=ssh_user, ssh_password=ssh_password, ssh_port=ssh_port ) logger.info(f"从环境变量加载服务器配置: {ssh_ip}:{ssh_port}") # 方式2: 直接在代码中配置多个服务器(可根据需要修改) # self.add_server("demo", "192.168.1.100", "username", "password", 22) # self.add_server("prod", "10.0.0.1", "admin", "secret", 22) logger.info(f"已加载 {len(self.server_configs)} 个服务器配置") except Exception as e: logger.error(f"加载配置失败: {str(e)}") def add_server(self, name: str, ip: str, user: str, password: str, port: int = 22): """添加服务器配置""" self.server_configs[name] = ServerConfig( name=name, ssh_ip=ip, ssh_user=user, ssh_password=password, ssh_port=port ) logger.info(f"添加服务器配置: {name} ({ip}:{port})") def get_connection(self, server_name: str) -> Optional[paramiko.SSHClient]: """获取SSH连接""" if server_name not in self.server_configs: logger.error(f"未找到服务器配置: {server_name}") return None if server_name in self.active_connections: # 检查连接是否还活跃 try: transport = self.active_connections[server_name].get_transport() if transport and transport.is_active(): return self.active_connections[server_name] else: # 连接已断开,移除并重新连接 del self.active_connections[server_name] except: del self.active_connections[server_name] config = self.server_configs[server_name] try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect( hostname=config.ssh_ip, port=config.ssh_port, username=config.ssh_user, password=config.ssh_password, timeout=10 ) self.active_connections[server_name] = client logger.info(f"SSH连接成功: {server_name} ({config.ssh_ip}:{config.ssh_port})") return client except Exception as e: logger.error(f"SSH连接失败[{server_name}]: {str(e)}") return None def close_all_connections(self): """关闭所有连接""" for server_name, client in self.active_connections.items(): try: client.close() logger.info(f"关闭连接: {server_name}") except: pass self.active_connections.clear() # 全局管理器 mcp_manager = MCPManager() # =================== 辅助函数 =================== def download_directory_recursive(sftp, remote_path: str, local_path: str) -> Dict[str, Any]: """递归下载目录""" try: # 创建本地目录 os.makedirs(local_path, exist_ok=True) # 获取远程目录内容 files = sftp.listdir_attr(remote_path) downloaded_files = [] failed_files = [] for file_attr in files: remote_file_path = f"{remote_path}/{file_attr.filename}" local_file_path = os.path.join(local_path, file_attr.filename) try: if stat.S_ISDIR(file_attr.st_mode): # 如果是目录,递归下载 result = download_directory_recursive(sftp, remote_file_path, local_file_path) downloaded_files.extend(result.get("downloaded_files", [])) failed_files.extend(result.get("failed_files", [])) else: # 如果是文件,直接下载 sftp.get(remote_file_path, local_file_path) downloaded_files.append({ "remote": remote_file_path, "local": local_file_path, "size": file_attr.st_size }) logger.info(f"已下载文件: {remote_file_path} -> {local_file_path}") except Exception as e: failed_files.append({ "remote": remote_file_path, "local": local_file_path, "error": str(e) }) logger.error(f"下载失败: {remote_file_path} - {str(e)}") return { "success": True, "downloaded_files": downloaded_files, "failed_files": failed_files } except Exception as e: return { "success": False, "error": str(e) } def upload_directory_recursive(sftp, local_path: str, remote_path: str) -> Dict[str, Any]: """递归上传目录""" try: # 创建远程目录 try: sftp.mkdir(remote_path) except IOError: # 目录可能已存在,忽略错误 pass uploaded_files = [] failed_files = [] for root, dirs, files in os.walk(local_path): # 计算相对路径 rel_path = os.path.relpath(root, local_path) if rel_path == '.': remote_root = remote_path else: remote_root = f"{remote_path}/{rel_path.replace(os.sep, '/')}" # 创建远程目录 for dir_name in dirs: remote_dir = f"{remote_root}/{dir_name}" try: sftp.mkdir(remote_dir) except IOError: # 目录可能已存在,忽略错误 pass # 上传文件 for file_name in files: local_file_path = os.path.join(root, file_name) remote_file_path = f"{remote_root}/{file_name}" try: sftp.put(local_file_path, remote_file_path) uploaded_files.append({ "local": local_file_path, "remote": remote_file_path, "size": os.path.getsize(local_file_path) }) logger.info(f"已上传文件: {local_file_path} -> {remote_file_path}") except Exception as e: failed_files.append({ "local": local_file_path, "remote": remote_file_path, "error": str(e) }) logger.error(f"上传失败: {local_file_path} - {str(e)}") return { "success": True, "uploaded_files": uploaded_files, "failed_files": failed_files } except Exception as e: return { "success": False, "error": str(e) } # =================== MCP工具 =================== @mcp.tool() def execute(server_name: str, command: str) -> Dict[str, Any]: """ 执行远程命令 参数: - server_name: 服务器名称 (如果只有一个服务器配置,可以使用 'default') - command: 要执行的命令 """ client = mcp_manager.get_connection(server_name) if not client: return { "success": False, "error": "SSH连接失败", "server": server_name } try: stdin, stdout, stderr = client.exec_command(command) exit_code = stdout.channel.recv_exit_status() stdout_content = stdout.read().decode('utf-8', 'replace') stderr_content = stderr.read().decode('utf-8', 'replace') return { "success": True, "server": server_name, "ip": mcp_manager.server_configs[server_name].ssh_ip, "exit_code": exit_code, "stdout": stdout_content, "stderr": stderr_content } except Exception as e: return { "success": False, "error": str(e), "server": server_name } @mcp.tool() def download_file(server_name: str, remote_path: str, local_src: str) -> Dict[str, Any]: """ 下载远程服务器的文件或者目录,保存到指定目录下,如果是目录,则递归下载 参数: - server_name: 服务器名称 - remote_path: 远程文件/目录路径 - local_src: 本地文件/目录路径 """ client = mcp_manager.get_connection(server_name) if not client: return { "success": False, "error": "SSH连接失败", "server": server_name } try: sftp = client.open_sftp() # 检查远程路径是否存在 try: file_attr = sftp.stat(remote_path) except IOError: sftp.close() return { "success": False, "error": f"远程路径不存在: {remote_path}", "server": server_name } # 判断是文件还是目录 if stat.S_ISDIR(file_attr.st_mode): # 是目录,递归下载 result = download_directory_recursive(sftp, remote_path, local_src) sftp.close() if result["success"]: return { "success": True, "server": server_name, "ip": mcp_manager.server_configs[server_name].ssh_ip, "type": "directory", "message": f"目录下载完成: {remote_path} -> {local_src}", "downloaded_files": result["downloaded_files"], "failed_files": result["failed_files"], "summary": { "total_downloaded": len(result["downloaded_files"]), "total_failed": len(result["failed_files"]) } } else: return { "success": False, "error": result["error"], "server": server_name } else: # 是文件,直接下载 # 如果本地路径是目录,则在该目录下创建同名文件 if os.path.isdir(local_src): local_file_path = os.path.join(local_src, os.path.basename(remote_path)) else: local_file_path = local_src # 创建本地目录 os.makedirs(os.path.dirname(local_file_path), exist_ok=True) sftp.get(remote_path, local_file_path) sftp.close() return { "success": True, "server": server_name, "ip": mcp_manager.server_configs[server_name].ssh_ip, "type": "file", "message": f"文件下载成功: {remote_path} -> {local_file_path}", "remote_path": remote_path, "local_path": local_file_path, "size": file_attr.st_size } except Exception as e: return { "success": False, "error": str(e), "server": server_name } @mcp.tool() def upload_file(server_name: str, remote_path: str, local_src: str) -> Dict[str, Any]: """ 将本地的文件或者目录上传到远程服务器 参数: - server_name: 服务器名称 - remote_path: 远程文件/目录路径 - local_src: 本地文件/目录路径 """ client = mcp_manager.get_connection(server_name) if not client: return { "success": False, "error": "SSH连接失败", "server": server_name } try: # 检查本地路径是否存在 if not os.path.exists(local_src): return { "success": False, "error": f"本地路径不存在: {local_src}", "server": server_name } sftp = client.open_sftp() if os.path.isdir(local_src): # 是目录,递归上传 result = upload_directory_recursive(sftp, local_src, remote_path) sftp.close() if result["success"]: return { "success": True, "server": server_name, "ip": mcp_manager.server_configs[server_name].ssh_ip, "type": "directory", "message": f"目录上传完成: {local_src} -> {remote_path}", "uploaded_files": result["uploaded_files"], "failed_files": result["failed_files"], "summary": { "total_uploaded": len(result["uploaded_files"]), "total_failed": len(result["failed_files"]) } } else: return { "success": False, "error": result["error"], "server": server_name } else: # 是文件,直接上传 # 创建远程目录 remote_dir = os.path.dirname(remote_path) if remote_dir: try: sftp.mkdir(remote_dir) except IOError: # 目录可能已存在,忽略错误 pass sftp.put(local_src, remote_path) sftp.close() return { "success": True, "server": server_name, "ip": mcp_manager.server_configs[server_name].ssh_ip, "type": "file", "message": f"文件上传成功: {local_src} -> {remote_path}", "local_path": local_src, "remote_path": remote_path, "size": os.path.getsize(local_src) } except Exception as e: return { "success": False, "error": str(e), "server": server_name } @mcp.tool() def list_directory(server_name: str, path: str = "/") -> Dict[str, Any]: """ 列出远程目录内容 参数: - server_name: 服务器名称 - path: 目录路径,默认为根目录 """ return execute(server_name, f"ls -la {path}") @mcp.tool() def list_servers() -> Dict[str, Any]: """列出所有配置的服务器""" servers = [] for name, config in mcp_manager.server_configs.items(): # 检查连接状态 is_connected = False if name in mcp_manager.active_connections: try: transport = mcp_manager.active_connections[name].get_transport() is_connected = transport and transport.is_active() except: is_connected = False servers.append({ "name": name, "ip": config.ssh_ip, "user": config.ssh_user, "port": config.ssh_port, "connected": is_connected }) return { "servers": servers, "total_servers": len(servers) } @mcp.tool() def test_connection(server_name: str) -> Dict[str, Any]: """ 测试服务器连接 参数: - server_name: 服务器名称 """ if server_name not in mcp_manager.server_configs: return { "success": False, "error": f"未找到服务器配置: {server_name}" } config = mcp_manager.server_configs[server_name] try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect( hostname=config.ssh_ip, port=config.ssh_port, username=config.ssh_user, password=config.ssh_password, timeout=10 ) client.close() return { "success": True, "server": server_name, "ip": config.ssh_ip, "port": config.ssh_port, "message": "连接测试成功" } except Exception as e: return { "success": False, "server": server_name, "ip": config.ssh_ip, "port": config.ssh_port, "error": str(e) } @mcp.tool() def add_server_config(name: str, ip: str, user: str, password: str, port: int = 22) -> Dict[str, Any]: """ 动态添加服务器配置 参数: - name: 服务器名称 - ip: 服务器IP地址 - user: SSH用户名 - password: SSH密码 - port: SSH端口,默认22 """ try: mcp_manager.add_server(name, ip, user, password, port) return { "success": True, "message": f"服务器配置添加成功: {name}", "server": { "name": name, "ip": ip, "user": user, "port": port } } except Exception as e: return { "success": False, "error": str(e) } # =================== 主程序 =================== if __name__ == "__main__": try: print(f""" MCP SSH服务器已启动 (修复版) 配置方式: 1. 环境变量: SSH_IP, SSH_USER, SSH_PASSWORD, SSH_PORT 2. 代码中直接配置: 修改 load_config() 方法 3. 运行时动态添加: 使用 add_server_config 工具 已加载服务器: {list(mcp_manager.server_configs.keys())} 可用工具: - execute: 执行远程命令 - download_file: 下载远程文件/目录 (支持递归) - upload_file: 上传本地文件/目录 (支持递归) - list_directory: 列出目录内容 - list_servers: 列出所有服务器 - test_connection: 测试连接 - add_server_config: 动态添加服务器配置 修复内容: - 支持目录的递归下载和上传 - 改进错误处理和日志记录 - 添加详细的操作反馈 使用示例: {{ "tool": "download_file", "params": {{ "server_name": "server2", "remote_path": "/etc/yum.repos.d", "local_src": "f:\\项目仓库\\产品\\mcp-server\\yum.repos.d" }} }} """) # 运行MCP服务器 mcp.run(transport="stdio") except KeyboardInterrupt: print("\n正在关闭服务器...") mcp_manager.close_all_connections() print("服务器已关闭") except Exception as e: print(f"服务器启动失败: {str(e)}") logger.error(f"服务器启动失败: {str(e)}") finally: mcp_manager.close_all_connections()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nwnusun-cool/mcp-server-ssh-tools'

If you have feedback or need assistance with the MCP directory API, please join our Discord server