Skip to main content
Glama

mcp-server-llmling

MIT License
5
  • Linux
  • Apple
http_client.py7.01 kB
"""HTTP-based MCP protocol client implementation.""" from __future__ import annotations import asyncio from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Self from mcp.client.session import ClientSession from mcp.client.streamable_http import streamablehttp_client from pydantic import AnyUrl from mcp_server_llmling.log import get_logger if TYPE_CHECKING: from collections.abc import Sequence from mcp.types import Prompt, Resource, Tool logger = get_logger(__name__) @dataclass class HTTPClientConfig: """Configuration for HTTP-based MCP client.""" server_url: str protocol_version: str = "0.1" client_name: str = "mcp-client" client_version: str = "1.0" timeout: float = 30.0 headers: dict[str, str] | None = None class MCPClientError(Exception): """Base exception for MCP client errors.""" class McpConnectionError(MCPClientError): """Raised when connection to server fails.""" class ToolError(MCPClientError): """Raised when tool execution fails.""" class HTTPMCPClient: """High-level MCP protocol client for connecting to external servers via HTTP.""" def __init__(self, config: HTTPClientConfig) -> None: """Initialize client with configuration. Args: config: Client configuration including server URL """ self.config = config self._session: ClientSession | None = None self._context_manager: Any = None async def start(self) -> None: """Connect to server and perform handshake.""" try: # Connect via streamablehttp_client and get streams self._context_manager = streamablehttp_client( url=self.config.server_url, headers=self.config.headers, timeout=self.config.timeout, ) streams_and_callback = await self._context_manager.__aenter__() read_stream, write_stream, _get_session_id = streams_and_callback # Create session self._session = ClientSession(read_stream, write_stream) await self._session.__aenter__() # Initialize session result = await self._session.initialize() msg = "Connected to MCP server at %s (protocol version %s)" logger.info(msg, self.config.server_url, result.protocolVersion) except Exception as exc: # Clean up on failure if self._context_manager: try: await self._context_manager.__aexit__( type(exc), exc, exc.__traceback__ ) except Exception: logger.exception("Error during cleanup after connection failure") self._context_manager = None msg = "Failed to connect to server" raise McpConnectionError(msg) from exc async def close(self) -> None: """Close connection to server.""" if self._session: try: await self._session.__aexit__(None, None, None) except Exception: logger.exception("Error closing session") finally: self._session = None if self._context_manager: try: await self._context_manager.__aexit__(None, None, None) except Exception: logger.exception("Error closing context manager") finally: self._context_manager = None @property def session(self) -> ClientSession: """Get active session.""" if not self._session: msg = "Not connected to server" raise RuntimeError(msg) return self._session async def list_tools(self) -> Sequence[Tool]: """List available tools.""" try: result = await self.session.list_tools() except Exception as exc: msg = "Failed to list tools" raise ToolError(msg) from exc else: return result.tools async def call_tool( self, name: str, arguments: dict[str, Any] | None = None, *, timeout: float | None = None, ) -> Any: """Call a tool with given arguments.""" timeout = timeout or self.config.timeout try: async with asyncio.timeout(timeout): result = await self.session.call_tool(name, arguments) return result.content except TimeoutError as exc: msg = f"Tool execution timed out after {timeout}s" raise ToolError(msg) from exc except Exception as exc: msg = f"Tool execution failed: {exc}" raise ToolError(msg) from exc async def list_resources(self) -> Sequence[Resource]: """List available resources.""" try: result = await self.session.list_resources() except Exception as exc: msg = "Failed to list resources" raise MCPClientError(msg) from exc else: return result.resources async def list_prompts(self) -> Sequence[Prompt]: """List available prompts.""" try: result = await self.session.list_prompts() except Exception as exc: msg = "Failed to list prompts" raise MCPClientError(msg) from exc else: return result.prompts async def subscribe_resource(self, uri: str | AnyUrl) -> None: """Subscribe to resource updates.""" try: if isinstance(uri, str): uri = AnyUrl(uri) await self.session.subscribe_resource(uri) except Exception as exc: msg = "Failed to subscribe to resource" raise MCPClientError(msg) from exc async def unsubscribe_resource(self, uri: str | AnyUrl) -> None: """Unsubscribe from resource updates.""" try: if isinstance(uri, str): uri = AnyUrl(uri) await self.session.unsubscribe_resource(uri) except Exception as exc: msg = "Failed to unsubscribe from resource" raise MCPClientError(msg) from exc async def __aenter__(self) -> Self: """Async context manager entry.""" await self.start() return self async def __aexit__(self, *exc: object) -> None: """Async context manager exit.""" await self.close() if __name__ == "__main__": async def main() -> None: """Example usage of HTTPMCPClient.""" config = HTTPClientConfig(server_url="http://localhost:3001") async with HTTPMCPClient(config) as client: # List available tools tools = await client.list_tools() print("Available tools:", tools) # Call a tool result = await client.call_tool("example_tool", {"arg": "value"}) print("Tool result:", result) asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/phil65/mcp-server-llmling'

If you have feedback or need assistance with the MCP directory API, please join our Discord server