MCP HTTP with SSE transport Tools

import asyncio import logging from contextlib import AsyncExitStack from typing import Any import mcp.types as types from mcp import ClientSession from mcp.client.sse import sse_client class McpSseClient: """Manages MCP server connections and tool execution.""" def __init__(self, name: str, config: dict[str, Any]) -> None: self.name: str = name self.config: dict[str, Any] = config self.session: ClientSession | None = None self._cleanup_lock: asyncio.Lock = asyncio.Lock() self.exit_stack: AsyncExitStack = AsyncExitStack() async def initialize(self) -> None: """Initialize the server connection.""" try: sse_transport = await self.exit_stack.enter_async_context( sse_client(url=self.config["url"], headers=self.config.get("headers", None), timeout=self.config.get("timeout", 5), sse_read_timeout=self.config.get("sse_read_timeout", 300), ) ) read, write = sse_transport session = await self.exit_stack.enter_async_context( ClientSession(read, write) ) await session.initialize() self.session = session except Exception as e: logging.error(f"Error initializing session: {e}") await self.cleanup() raise async def list_tools(self) -> list[types.Tool]: """List available tools from the server. Returns: A list of available tools. Raises: RuntimeError: If the server is not initialized. """ if not self.session: raise RuntimeError(f"Server '{self.name}' session not initialized") result = await self.session.list_tools() return result.tools async def execute_tool( self, tool_name: str, arguments: dict[str, Any], retries: int = 2, delay: float = 1.0, ) -> types.CallToolResult: """Execute a tool with retry mechanism. Args: tool_name: Name of the tool to execute. arguments: Tool arguments. retries: Number of retry attempts. delay: Delay between retries in seconds. Returns: Tool execution result. Raises: RuntimeError: If server is not initialized. Exception: If tool execution fails after all retries. """ if not self.session: raise RuntimeError(f"Server '{self.name}' session not initialized") attempt = 0 while attempt < retries: try: logging.info(f"Executing {tool_name}...") result = await self.session.call_tool(tool_name, arguments) return result except Exception as e: attempt += 1 logging.warning( f"Error executing tool: {e}. Attempt {attempt} of {retries}." ) if attempt < retries: logging.info(f"Retrying in {delay} seconds...") await asyncio.sleep(delay) else: logging.error("Max retries reached. Failing.") raise async def cleanup(self) -> None: """Clean up server resources.""" async with self._cleanup_lock: try: await self.exit_stack.aclose() self.session = None except Exception as e: logging.error(f"Error during cleanup of server ’{self.name}' session: {e}")