mcp_client.pyā¢18.7 kB
"""
MCP client implementation following official SDK patterns.
Supports configurable connection timeout and retry behavior through YAML
configuration. Connection parameters include:
- max_reconnect_attempts: Maximum number of reconnection attempts
- initial_reconnect_delay: Initial delay between reconnection attempts
- max_reconnect_delay: Maximum delay (with exponential backoff)
- connection_timeout: Timeout for server initialization
- ping_timeout: Timeout for connection health checks
"""
from __future__ import annotations
import asyncio
import logging
import os
import shutil
import sys
from contextlib import AsyncExitStack
from typing import Any
from mcp import ClientSession, McpError, StdioServerParameters, types
from mcp.client.stdio import stdio_client
class MCPClient:
"""
Official MCP Client implementation following SDK patterns.
Supports configurable connection timeout and retry behavior through YAML
configuration. Connection parameters include:
- max_reconnect_attempts: Maximum number of reconnection attempts
- initial_reconnect_delay: Initial delay between reconnection attempts
- max_reconnect_delay: Maximum delay (with exponential backoff)
- connection_timeout: Timeout for server initialization
- ping_timeout: Timeout for connection health checks
"""
def __init__(
self,
name: str,
config: dict[str, Any],
connection_config: dict[str, Any] | None = None,
) -> None:
"""
Initialize MCP client with configurable connection parameters.
Args:
name: Client name for logging and identification
config: Server-specific configuration (command, args, etc.)
connection_config: Connection and retry configuration parameters
"""
self.name: str = name
self.config: dict[str, Any] = config
self.session: ClientSession | None = None
self.exit_stack: AsyncExitStack = AsyncExitStack()
self._cleanup_lock: asyncio.Lock = asyncio.Lock()
self._reconnect_attempts: int = 0
self._is_connected: bool = False
self.client_version = "0.1.0"
# Configure connection parameters from config or use defaults
conn_config = connection_config or {}
self._max_reconnect_attempts: int = conn_config.get("max_reconnect_attempts", 5)
self._initial_reconnect_delay: float = conn_config.get("initial_reconnect_delay", 1.0)
self._max_reconnect_delay: float = conn_config.get("max_reconnect_delay", 30.0)
self._connection_timeout: float = conn_config.get("connection_timeout", 30.0)
self._ping_timeout: float = conn_config.get("ping_timeout", 10.0)
# Initialize reconnect delay to the configured initial value
self._reconnect_delay: float = self._initial_reconnect_delay
# Store logging configuration
self._tool_results_enabled: bool = conn_config.get("tool_results", False)
self._tool_results_truncate: int = conn_config.get("tool_results_truncate", 200)
# Log configuration for debugging
logging.info(
f"MCP client '{name}' configured with: "
f"max_attempts={self._max_reconnect_attempts}, "
f"initial_delay={self._initial_reconnect_delay}s, "
f"max_delay={self._max_reconnect_delay}s, "
f"connection_timeout={self._connection_timeout}s, "
f"ping_timeout={self._ping_timeout}s"
)
# Log MCP client initialization
logging.info(f"š MCP client '{name}' initialized with connection settings")
def _resolve_command(self) -> str | None:
"""
Resolve command configuration to absolute executable path with validation.
This internal method handles the complex task of finding executable commands
across different operating systems and environments. It supports both absolute
paths and commands that need to be resolved through the system PATH.
The method implements several resolution strategies:
1. Direct return for absolute paths (if they exist)
2. PATH resolution using shutil.which for relative commands
3. Windows-specific workaround for npx/node compatibility issues
This is critical for MCP server startup since many servers are distributed
as npm packages or require specific runtime environments.
Returns:
str | None: Absolute path to executable if found, None if resolution fails
Implementation Notes:
- Windows compatibility: Falls back to 'node' if 'npx' is not found
- Only returns paths to files that actually exist on the filesystem
- Does not validate that the file is executable (left to OS)
Example:
config = {"command": "python"} # -> "/usr/bin/python"
config = {"command": "/opt/custom/tool"} # -> "/opt/custom/tool"
config = {"command": "nonexistent"} # -> None
"""
command = self.config.get("command")
if not command:
return None
# Handle absolute paths first - validate they exist
if os.path.isabs(command):
return command if os.path.exists(command) else None
# Resolve through system PATH
resolved = shutil.which(command)
# Windows-specific npx compatibility workaround
if not resolved and command == "npx" and sys.platform == "win32":
node_path = shutil.which("node")
if node_path:
logging.warning("Using node instead of npx on Windows")
return node_path
return resolved
async def connect(self) -> None:
"""
Connect to MCP server using official transport patterns with configurable
retry logic.
Implements exponential backoff retry strategy using configuration parameters:
- Retries up to max_reconnect_attempts times
- Uses initial_reconnect_delay as starting delay, doubling each attempt
- Caps delay at max_reconnect_delay to prevent excessive wait times
- Resets delay and attempt count on successful connection
Raises:
Exception: If all connection attempts fail
"""
while self._reconnect_attempts < self._max_reconnect_attempts:
try:
logging.info(
f"š Attempting to connect to MCP server '{self.name}' "
f"(attempt {self._reconnect_attempts + 1}/{self._max_reconnect_attempts})"
)
await self._attempt_connection()
self._is_connected = True
self._reconnect_attempts = 0
# Reset delay for future reconnection attempts
self._reconnect_delay = self._initial_reconnect_delay
logging.info(f"š Successfully connected to MCP server '{self.name}'")
return
except Exception as e:
self._reconnect_attempts += 1
self._is_connected = False
if self._reconnect_attempts >= self._max_reconnect_attempts:
# Reset delay for future connection attempts
self._reconnect_delay = self._initial_reconnect_delay
logging.error(
f"š Failed to connect to {self.name} after {self._max_reconnect_attempts} attempts: {e}"
)
raise
logging.warning(
f"š Connection attempt {self._reconnect_attempts} failed for "
f"{self.name}: {e}. Retrying in {self._reconnect_delay}s..."
)
await asyncio.sleep(self._reconnect_delay)
# Apply exponential backoff with maximum delay limit
self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay)
async def _attempt_connection(self) -> None:
"""
Attempt a single connection to the MCP server with comprehensive error handling.
This internal method performs the low-level work of establishing an MCP client
connection using the official SDK patterns. It handles command resolution,
server parameter setup, transport initialization, and session establishment.
The method implements the standard MCP connection flow:
1. Resolve command to executable path (may fail if command not found)
2. Configure server parameters with command, args, and environment
3. Initialize stdio transport for communication with the server process
4. Create and initialize the ClientSession with proper client info
5. Wait for server initialization with timeout protection
Failure at any step will raise an exception that is caught by the retry
logic in the connect() method.
Raises:
ValueError: If the configured command cannot be resolved to an executable
asyncio.TimeoutError: If server initialization takes longer than 30 seconds
Exception: Any other error during transport or session initialization
Side Effects:
- Creates server process through stdio transport
- Modifies self.session to store the active ClientSession
- Registers transport and session with exit_stack for cleanup
- Logs successful connection
"""
command = self._resolve_command()
if not command:
raise ValueError(f"Command '{self.config.get('command')}' not found in PATH")
# Configure server parameters with environment inheritance
server_params = StdioServerParameters(
command=command,
args=self.config.get("args", []),
env={**os.environ, **self.config.get("env", {})} if self.config.get("env") else None,
)
# Initialize stdio transport for server communication
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
read_stream, write_stream = stdio_transport
# Create client info for MCP handshake
client_info = types.Implementation(name=self.name, version=self.client_version)
# Initialize client session with transport streams
self.session = await self.exit_stack.enter_async_context(
ClientSession(read_stream, write_stream, client_info=client_info)
)
# Complete MCP initialization handshake with configurable timeout
await asyncio.wait_for(self.session.initialize(), timeout=self._connection_timeout)
logging.info(f"MCP client '{self.name}' connected successfully")
async def ping(self) -> bool:
"""Send ping to verify connection is alive."""
if not self.session or not self._is_connected:
return False
try:
# Use list_tools() as a lightweight ping with configurable timeout
await asyncio.wait_for(self.session.list_tools(), timeout=self._ping_timeout)
return True
except Exception as e:
logging.warning(f"Ping failed for {self.name}: {e}")
self._is_connected = False
return False
async def list_tools(self) -> list[types.Tool]:
"""List available tools using official SDK patterns."""
if not self.session:
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Client {self.name} not connected",
)
)
try:
result = await self.session.list_tools()
return result.tools
except McpError as e:
logging.error(f"MCP error listing tools from {self.name}: {e.error.message}")
raise
except Exception as e:
logging.error(f"Error listing tools from {self.name}: {e}")
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Failed to list tools: {e!s}",
)
) from e
async def list_prompts(self) -> list[types.Prompt]:
"""List available prompts using official SDK patterns."""
if not self.session:
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Client {self.name} not connected",
)
)
try:
result = await self.session.list_prompts()
return result.prompts
except McpError as e:
logging.error(f"MCP error listing prompts from {self.name}: {e.error.message}")
raise
except Exception as e:
logging.error(f"Error listing prompts from {self.name}: {e}")
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Failed to list prompts: {e!s}",
)
) from e
async def get_prompt(self, name: str, arguments: dict[str, Any] | None = None) -> types.GetPromptResult:
"""Get a prompt by name using official SDK patterns."""
if not self.session:
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Client {self.name} not connected",
)
)
try:
return await self.session.get_prompt(name, arguments)
except McpError as e:
logging.error(f"MCP error getting prompt '{name}' from {self.name}: {e.error.message}")
raise
except Exception as e:
logging.error(f"Error getting prompt '{name}' from {self.name}: {e}")
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Failed to get prompt '{name}': {e!s}",
)
) from e
async def list_resources(self) -> list[types.Resource]:
"""List available resources using official SDK patterns."""
if not self.session:
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Client {self.name} not connected",
)
)
try:
result = await self.session.list_resources()
return result.resources
except McpError as e:
logging.error(f"MCP error listing resources from {self.name}: {e.error.message}")
raise
except Exception as e:
logging.error(f"Error listing resources from {self.name}: {e}")
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Failed to list resources: {e!s}",
)
) from e
async def read_resource(self, uri: str) -> types.ReadResourceResult:
"""Read a resource by URI using official SDK patterns."""
if not self.session:
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Client {self.name} not connected",
)
)
try:
return await self.session.read_resource(uri)
except McpError as e:
logging.error(f"MCP error reading resource '{uri}' from {self.name}: {e.error.message}")
raise
except Exception as e:
logging.error(f"Error reading resource '{uri}' from {self.name}: {e}")
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Failed to read resource '{uri}': {e!s}",
)
) from e
async def call_tool(self, name: str, arguments: dict[str, Any] | None = None) -> types.CallToolResult:
"""Call a tool using official SDK patterns."""
if not self.session:
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Client {self.name} not connected",
)
)
try:
logging.info(f"Calling tool '{name}' on client '{self.name}'")
result = await self.session.call_tool(name, arguments)
logging.info(f"Tool '{name}' executed successfully")
# Log tool results if enabled
if self._tool_results_enabled:
results_str = str(result)
if len(results_str) > self._tool_results_truncate:
results_str = results_str[: self._tool_results_truncate] + "..."
logging.info(f"ā MCP[{name}]: results ({self.name}): {results_str}")
return result
except McpError as e:
logging.error(f"MCP error calling tool '{name}': {e.error.message}")
raise
except Exception as e:
logging.error(f"Error calling tool '{name}': {e}")
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Tool call failed: {e!s}",
)
) from e
async def get_tool_schemas(self) -> list[str]:
"""Get tool schemas as JSON strings using SDK's native methods."""
if not self.session:
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Client {self.name} not connected",
)
)
try:
tools = await self.list_tools()
return [tool.model_dump_json() for tool in tools]
except Exception as e:
logging.error(f"Error getting tool schemas from {self.name}: {e}")
raise
async def close(self) -> None:
"""Close the client connection and clean up resources."""
async with self._cleanup_lock:
try:
self._is_connected = False
await self.exit_stack.aclose()
self.session = None
logging.info(f"MCP client '{self.name}' disconnected")
except Exception as e:
logging.error(f"Error during cleanup of client {self.name}: {e}")
@property
def is_connected(self) -> bool:
"""Check if client is currently connected."""
return self._is_connected