Skip to main content
Glama

Sumanshu Arora

mcp_client_probe.pyβ€’12.5 kB
""" MCP Client probe for discovering tools from MCP servers via stdio. """ import asyncio import json import logging import time from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) class MCPClientProbe: """Client for probing MCP servers to discover their tools.""" def __init__(self, timeout: int = 15): """ Initialize MCP client probe. Args: timeout: Timeout for MCP communication """ self.timeout = timeout async def discover_tools_from_command( self, command: List[str], working_dir: Optional[str] = None ) -> Optional[Dict[str, Any]]: """ Discover tools from MCP server via command line. Args: command: Command to run MCP server working_dir: Working directory for command Returns: Dictionary containing discovered tools and metadata, or None if failed """ try: logger.info("Starting MCP server with command: %s", " ".join(command)) # Start the MCP server process process = await asyncio.create_subprocess_exec( *command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=working_dir, ) try: # Initialize MCP connection result = await self._initialize_mcp_session(process) if not result: return None # List tools tools = await self._list_tools(process) if not tools: return None return { "discovery_method": "mcp_client", "timestamp": time.time(), "tools": self._normalize_mcp_tools(tools), "command": command, "server_info": result.get("serverInfo", {}), } finally: # Cleanup process try: if process.returncode is None: process.terminate() await asyncio.wait_for(process.wait(), timeout=5) except (ProcessLookupError, asyncio.TimeoutError): try: process.kill() await process.wait() except ProcessLookupError: pass except Exception as e: logger.error("Failed to discover tools from MCP server: %s", e) return None async def discover_tools_from_docker_mcp( self, image_name: str, args: Optional[List[str]] = None, env_vars: Optional[Dict[str, str]] = None, ) -> Optional[Dict[str, Any]]: """ Discover tools from MCP server running in Docker. Args: image_name: Docker image name args: Additional arguments for the MCP server env_vars: Environment variables to pass to the container Returns: Dictionary containing discovered tools and metadata, or None if failed """ container_name = f"mcp-discovery-{image_name.replace('/', '-').replace(':', '-')}-{int(time.time())}" try: # Build docker command docker_cmd = [ "docker", "run", "--rm", "-i", "--name", container_name, ] # Add environment variables if env_vars: for key, value in env_vars.items(): docker_cmd.extend(["-e", f"{key}={value}"]) docker_cmd.append(image_name) # Add custom arguments if provided if args: docker_cmd.extend(args) return await self.discover_tools_from_command(docker_cmd) except Exception as e: logger.debug("MCP discovery failed for Docker server %s: %s", image_name, e) return None finally: # Cleanup container if it's still running try: process = await asyncio.create_subprocess_exec( "docker", "stop", container_name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) await asyncio.wait_for(process.wait(), timeout=10) except Exception: pass async def _initialize_mcp_session( self, process: asyncio.subprocess.Process ) -> Optional[Dict[str, Any]]: """Initialize MCP session with the server using MCP protocol version 2025-03-26.""" try: # Send initialize request with proper MCP 2025-03-26 protocol init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-03-26", "capabilities": {"roots": {"listChanged": True}, "sampling": {}}, "clientInfo": {"name": "ExampleClient", "version": "1.0.0"}, }, } # Send initialize request request_line = json.dumps(init_request) + "\n" process.stdin.write(request_line.encode()) await process.stdin.drain() # Read initialization response init_response = await self._read_json_response(process, "initialization") if not init_response or "error" in init_response: if init_response and "error" in init_response: logger.error("MCP initialization error: %s", init_response["error"]) return None # Send initialized notification (no params property per MCP spec) initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", } notification_line = json.dumps(initialized_notification) + "\n" process.stdin.write(notification_line.encode()) await process.stdin.drain() # Small delay to ensure notification is processed await asyncio.sleep(0.1) logger.debug("MCP server initialized successfully") return init_response.get("result", {}) except asyncio.TimeoutError: logger.error("Timeout during MCP initialization") return None except Exception as e: logger.error("Error during MCP initialization: %s", e) return None async def _read_json_response( self, process: asyncio.subprocess.Process, operation: str ) -> Optional[Dict[str, Any]]: """Read JSON response from MCP server, skipping non-JSON lines.""" max_attempts = 20 # Increased attempts to handle more startup output for attempt in range(max_attempts): try: response_line = await asyncio.wait_for( process.stdout.readline(), timeout=2, # Shorter timeout per line ) if not response_line: logger.error(f"No response from MCP server during {operation}") return None line = response_line.decode().strip() logger.debug( f"Raw MCP response line {attempt + 1} for {operation}: {line}" ) # Skip non-JSON lines (like "Starting MCP server with stdio transport") if line.startswith("{") and line.endswith("}"): try: return json.loads(line) except json.JSONDecodeError: continue except asyncio.TimeoutError: logger.debug( f"Timeout on line {attempt + 1} during {operation}, continuing..." ) continue logger.error( f"No valid JSON response found from MCP server for {operation} after {max_attempts} attempts" ) return None async def _list_tools( self, process: asyncio.subprocess.Process ) -> Optional[List[Dict[str, Any]]]: """List tools from MCP server using correct protocol.""" try: # Send tools/list request with proper params format list_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}, # Empty object, not string } # Send request request_line = json.dumps(list_request) + "\n" process.stdin.write(request_line.encode()) await process.stdin.drain() # Read response using helper method response = await self._read_json_response(process, "tools/list") if not response: return None if "error" in response: logger.error("MCP tools/list error: %s", response["error"]) return None tools = response.get("result", {}).get("tools", []) # Defensive check: ensure tools is a list if not isinstance(tools, list): logger.error(f"Expected list for tools, got {type(tools)}: {tools}") return [] logger.info("Found %d tools from MCP server", len(tools)) return tools except asyncio.TimeoutError: logger.error("Timeout during MCP tools/list") return None except Exception as e: logger.error("Error during MCP tools/list: %s", e) return None def _normalize_mcp_tools( self, mcp_tools: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """Normalize MCP tools to standard format.""" normalized = [] # Defensive check: ensure mcp_tools is iterable if not isinstance(mcp_tools, (list, tuple)): logger.error( f"Expected list/tuple for mcp_tools, got {type(mcp_tools)}: {mcp_tools}" ) return [] for tool in mcp_tools: try: # Skip None or invalid tools if tool is None or not isinstance(tool, dict): continue normalized_tool = { "name": tool.get("name", "unknown"), "description": tool.get("description", "No description available"), "category": "mcp", "parameters": tool.get("inputSchema", {}), "mcp_info": { "input_schema": tool.get("inputSchema", {}), "output_schema": tool.get("outputSchema", {}), }, } normalized.append(normalized_tool) except Exception as e: logger.warning( "Failed to normalize MCP tool %s: %s", ( tool.get("name", "unknown") if tool and isinstance(tool, dict) else "unknown" ), e, ) continue return normalized def discover_tools_sync( self, command: List[str], working_dir: Optional[str] = None ) -> Optional[Dict[str, Any]]: """Synchronous wrapper for discovering tools.""" try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete( self.discover_tools_from_command(command, working_dir) ) finally: loop.close() def discover_tools_from_docker_sync( self, image_name: str, args: Optional[List[str]] = None, env_vars: Optional[Dict[str, str]] = None, ) -> Optional[Dict[str, Any]]: """Synchronous wrapper for discovering tools from Docker.""" try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete( self.discover_tools_from_docker_mcp(image_name, args, env_vars) ) finally: loop.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Data-Everything/mcp-server-templates'

If you have feedback or need assistance with the MCP directory API, please join our Discord server