MCP HTTP with SSE transport Tools
by junjiem
import asyncio
import logging
from contextlib import AsyncExitStack
from typing import Any
import mcp.types as types
from mcp import ClientSession
from mcp.client.sse import sse_client
class McpSseClient:
"""Manages MCP server connections and tool execution."""
def __init__(self, name: str, config: dict[str, Any]) -> None:
self.name: str = name
self.config: dict[str, Any] = config
self.session: ClientSession | None = None
self._cleanup_lock: asyncio.Lock = asyncio.Lock()
self.exit_stack: AsyncExitStack = AsyncExitStack()
async def initialize(self) -> None:
"""Initialize the server connection."""
try:
sse_transport = await self.exit_stack.enter_async_context(
sse_client(url=self.config["url"],
headers=self.config.get("headers", None),
timeout=self.config.get("timeout", 5),
sse_read_timeout=self.config.get("sse_read_timeout", 300),
)
)
read, write = sse_transport
session = await self.exit_stack.enter_async_context(
ClientSession(read, write)
)
await session.initialize()
self.session = session
except Exception as e:
logging.error(f"Error initializing session: {e}")
await self.cleanup()
raise
async def list_tools(self) -> list[types.Tool]:
"""List available tools from the server.
Returns:
A list of available tools.
Raises:
RuntimeError: If the server is not initialized.
"""
if not self.session:
raise RuntimeError(f"Server '{self.name}' session not initialized")
result = await self.session.list_tools()
return result.tools
async def execute_tool(
self,
tool_name: str,
arguments: dict[str, Any],
retries: int = 2,
delay: float = 1.0,
) -> types.CallToolResult:
"""Execute a tool with retry mechanism.
Args:
tool_name: Name of the tool to execute.
arguments: Tool arguments.
retries: Number of retry attempts.
delay: Delay between retries in seconds.
Returns:
Tool execution result.
Raises:
RuntimeError: If server is not initialized.
Exception: If tool execution fails after all retries.
"""
if not self.session:
raise RuntimeError(f"Server '{self.name}' session not initialized")
attempt = 0
while attempt < retries:
try:
logging.info(f"Executing {tool_name}...")
result = await self.session.call_tool(tool_name, arguments)
return result
except Exception as e:
attempt += 1
logging.warning(
f"Error executing tool: {e}. Attempt {attempt} of {retries}."
)
if attempt < retries:
logging.info(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
else:
logging.error("Max retries reached. Failing.")
raise
async def cleanup(self) -> None:
"""Clean up server resources."""
async with self._cleanup_lock:
try:
await self.exit_stack.aclose()
self.session = None
except Exception as e:
logging.error(f"Error during cleanup of server ’{self.name}' session: {e}")