Skip to main content
Glama

OPS MCP Server

by Heht571
ssh_manager.py3.15 kB
import paramiko import logging # 配置日志 logger = logging.getLogger('network_tools.ssh_manager') class SSHManager: """SSH连接管理器(上下文管理器)""" _connection_cache = {} # 类级别的连接缓存 def __init__( self, hostname: str, username: str, password: str = "", port: int = 22, timeout: int = 30, use_cache: bool = True ): self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.connect_params = { "hostname": hostname, "username": username, "password": password, "port": port, "timeout": timeout } self.connection_key = f"{username}@{hostname}:{port}" self.use_cache = use_cache self.is_new_connection = False def __enter__(self) -> paramiko.SSHClient: try: # 尝试从缓存获取连接 if self.use_cache and self.connection_key in self._connection_cache: cached_client = self._connection_cache[self.connection_key] # 检查连接是否仍然有效 try: cached_client.exec_command("echo 1", timeout=5) logger.debug(f"Using cached SSH connection for {self.connection_key}") self.client = cached_client return self.client except Exception: # 连接已失效,从缓存中移除 logger.debug(f"Cached connection invalid for {self.connection_key}, creating new one") self._connection_cache.pop(self.connection_key, None) # 创建新连接 logger.debug(f"Creating new SSH connection to {self.connection_key}") self.client.connect(**self.connect_params) self.is_new_connection = True # 添加到缓存 if self.use_cache: self._connection_cache[self.connection_key] = self.client return self.client except paramiko.AuthenticationException as e: logger.error(f"SSH authentication failed for {self.connection_key}: {str(e)}") raise except paramiko.SSHException as e: logger.error(f"SSH connection error for {self.connection_key}: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error connecting to {self.connection_key}: {str(e)}") raise def __exit__(self, exc_type, exc_val, exc_tb): # 只有新创建的连接且不使用缓存时才关闭 if not self.use_cache and self.is_new_connection: logger.debug(f"Closing SSH connection to {self.connection_key}") self.client.close() @classmethod def clear_cache(cls): """清除连接缓存""" for client in cls._connection_cache.values(): try: client.close() except: pass cls._connection_cache.clear() logger.info("SSH connection cache cleared")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Heht571/ops-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server