connect_to_server
Establish a connection to an MCP server for testing by providing its URL or file path, enabling discovery and validation of server tools and resources.
Instructions
Connect to an MCP server for testing.
Establishes a connection to a target MCP server using the appropriate transport protocol (stdio for file paths, streamable-http for URLs). Only one connection can be active at a time.
Returns: Dictionary with connection details including: - success: Always True on successful connection - connection: Full ConnectionState with server info and statistics - message: Human-readable success message - metadata: Request timing information
Raises: Returns error dict on failure with: - success: False - error: Error details (type, message, suggestion) - metadata: Request timing information
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| url | Yes | Server URL (http://..., https://...) or file path for stdio transport |
Implementation Reference
- The core handler function for the 'connect_to_server' MCP tool. It uses ConnectionManager to establish a connection to the target MCP server, handles success and various error cases, provides progress updates via ctx, logs details, and returns structured JSON responses with metadata.@mcp.tool async def connect_to_server( url: Annotated[str, "Server URL (http://..., https://...) or file path for stdio transport"], ctx: Context ) -> dict[str, Any]: """Connect to an MCP server for testing. Establishes a connection to a target MCP server using the appropriate transport protocol (stdio for file paths, streamable-http for URLs). Only one connection can be active at a time. Returns: Dictionary with connection details including: - success: Always True on successful connection - connection: Full ConnectionState with server info and statistics - message: Human-readable success message - metadata: Request timing information Raises: Returns error dict on failure with: - success: False - error: Error details (type, message, suggestion) - metadata: Request timing information """ start_time = time.perf_counter() try: # User-facing progress update await ctx.info(f"Connecting to MCP server at {url}") # Detailed technical log logger.info(f"Connecting to MCP server at: {url}") state: ConnectionState = await ConnectionManager.connect(url) elapsed_ms = (time.perf_counter() - start_time) * 1000 # User-facing success update await ctx.info(f"Successfully connected to {url}") # Detailed technical log logger.info( f"Successfully connected to {url}", extra={ "url": url, "transport": state.transport, "duration_ms": elapsed_ms, }, ) return { "success": True, "connection": state.model_dump(mode="json"), "message": f"Successfully connected to {url}", "metadata": { "request_time_ms": round(elapsed_ms, 2), "transport": state.transport, "server_url": state.server_url, }, } except ConnectionError as e: elapsed_ms = (time.perf_counter() - start_time) * 1000 # User-facing error update await ctx.error(f"Failed to connect to {url}: {str(e)}") # Detailed technical log logger.error( f"Failed to connect to {url}: {str(e)}", extra={"url": url, "error": str(e), "duration_ms": elapsed_ms}, ) # Determine appropriate suggestion based on error suggestion = "Check that the server URL is correct and the server is running" if "timed out" in str(e).lower(): suggestion = "The connection timed out. Check server availability and network connectivity" elif "file" in url.lower() or not url.startswith("http"): suggestion = "For file paths, ensure the path is valid and the server executable has correct permissions" return { "success": False, "error": { "error_type": "connection_failed", "message": str(e), "details": {"url": url}, "suggestion": suggestion, }, "connection": None, "metadata": { "request_time_ms": round(elapsed_ms, 2), "attempted_url": url, }, } except Exception as e: elapsed_ms = (time.perf_counter() - start_time) * 1000 # User-facing error update await ctx.error(f"Unexpected error connecting to {url}: {str(e)}") # Detailed technical log logger.exception( f"Unexpected error connecting to {url}", extra={"url": url, "duration_ms": elapsed_ms}, ) return { "success": False, "error": { "error_type": "connection_failed", "message": f"Unexpected error: {str(e)}", "details": {"url": url, "exception_type": type(e).__name__}, "suggestion": "This is an unexpected error. Check server logs for more details", }, "connection": None, "metadata": { "request_time_ms": round(elapsed_ms, 2), "attempted_url": url, }, }
- The ConnectionManager.connect class method provides the underlying connection logic invoked by the connect_to_server tool handler. It handles client creation, transport inference, server info retrieval, state management, and error raising.async def connect(cls, url: str) -> ConnectionState: """Connect to an MCP server. Creates a FastMCP Client instance, establishes connection, and stores the connection state globally. If a connection already exists, it will be closed before establishing the new connection. Args: url: Server URL or file path - HTTP/HTTPS URLs use streamable-http transport - File paths use stdio transport Returns: ConnectionState with connection details and initial statistics Raises: ConnectionError: If connection fails """ async with _connection.lock: # Close existing connection if any if _connection.client is not None: await cls._disconnect_internal() # Get timeout configuration connect_timeout = cls._get_timeout("MCP_TEST_CONNECT_TIMEOUT", 30.0) try: # Create client - let FastMCP auto-detect transport client = Client(url, timeout=connect_timeout) # Establish connection await asyncio.wait_for(client.__aenter__(), timeout=connect_timeout) # Infer transport type transport = cls._infer_transport(url) # Get server information server_info: dict[str, Any] = {} try: # Try to get server info via initialization result if hasattr(client, "_session") and client._session: session = client._session if hasattr(session, "server_info"): info = session.server_info server_info = { "name": getattr(info, "name", None), "version": getattr(info, "version", None), } # Add capabilities if available if hasattr(session, "server_capabilities"): caps: ServerCapabilities = session.server_capabilities server_info["capabilities"] = { "tools": bool(getattr(caps, "tools", None)), "resources": bool(getattr(caps, "resources", None)), "prompts": bool(getattr(caps, "prompts", None)), } except Exception: # If we can't get server info, continue without it pass # Create connection state state = ConnectionState( server_url=url, transport=transport, # type: ignore connected_at=datetime.now(), server_info=server_info if server_info else None, statistics={ "tools_called": 0, "resources_accessed": 0, "prompts_executed": 0, "errors": 0, }, ) # Store globally _connection.client = client _connection.state = state return state except asyncio.TimeoutError as e: raise ConnectionError( f"Connection to {url} timed out after {connect_timeout}s" ) from e except Exception as e: raise ConnectionError(f"Failed to connect to {url}: {str(e)}") from e @classmethod
- node-wrapper/python-src/src/mcp_test_mcp/server.py:96-182 (registration)Imports the tools.connection module (line 98), which triggers the @mcp.tool decorator on connect_to_server to register it with the FastMCP server instance. Also logs the list of registered tools including connect_to_server.# Import tool modules to trigger decorator registration # This MUST happen after mcp instance is imported but before the server runs from .tools import connection, tools, resources, prompts, llm # Note: Logging during module import can interfere with stdio transport # Only log at DEBUG level to avoid corrupting JSON-RPC protocol on stdout logger.debug("FastMCP server instance created", extra={"server_name": "mcp-test-mcp"}) @mcp.tool() async def health_check(ctx: Context) -> Dict[str, Any]: """ Health check endpoint that verifies the server is running. Returns: Dictionary with status and server information """ await ctx.info("Health check requested") return { "status": "healthy", "server": "mcp-test-mcp", "version": "0.1.6", "transport": "stdio" } @mcp.tool() async def ping(ctx: Context) -> str: """ Simple ping tool that responds with 'pong'. Useful for testing basic connectivity and server responsiveness. Returns: The string 'pong' """ await ctx.debug("Ping received") return "pong" @mcp.tool() def echo(message: str) -> str: """ Echo back a message. Args: message: The message to echo back Returns: The same message that was provided """ logger.debug(f"Echo tool called with message: {message}") return message @mcp.tool() def add(a: int, b: int) -> int: """ Add two numbers together. Args: a: First number b: Second number Returns: The sum of a and b """ logger.debug(f"Add tool called: {a} + {b}") return a + b # Tool registration happens automatically via decorators in imported modules # No need for manual mcp.tool() calls here # Note: Logging during module import can interfere with stdio transport # Only log at DEBUG level to avoid corrupting JSON-RPC protocol on stdout logger.debug("MCP tools registered", extra={ "tools": [ "health_check", "ping", "echo", "add", "connect_to_server", "disconnect", "get_connection_status", "list_tools", "call_tool", "list_resources", "read_resource", "list_prompts", "get_prompt", "execute_prompt_with_llm" ] })
- Creates the shared FastMCP server instance named 'mcp-test-mcp' that is used by @mcp.tool decorators in tool modules to register tools like connect_to_server.# Create the shared FastMCP server instance mcp = FastMCP(name="mcp-test-mcp")