Skip to main content
Glama

Blockscout MCP Server

Official
web3_pool.py6.79 kB
"""Async Web3 connection pool optimized for Blockscout RPC. The custom provider in this module normalizes JSON-RPC parameters and enforces non-zero request IDs. Blockscout rejects requests with ``id=0`` and parameters that are not JSON arrays. By reusing a shared ``aiohttp.ClientSession`` across calls we avoid repeated TCP handshakes and control concurrency via environment variables: * ``BLOCKSCOUT_RPC_REQUEST_TIMEOUT`` – seconds before an RPC call times out * ``BLOCKSCOUT_RPC_POOL_PER_HOST`` – maximum open HTTP connections Increase this limit for high-throughput deployments or relax it to conserve resources on constrained hosts. Extend the timeout if the remote Blockscout instance is slow or under heavy load. The ``BLOCKSCOUT_MCP_USER_AGENT`` variable customizes the leading part of the ``User-Agent`` header; the server version is appended automatically. """ from __future__ import annotations from itertools import count from typing import Any import aiohttp from web3 import AsyncWeb3 from web3.providers.rpc import AsyncHTTPProvider from blockscout_mcp_server.config import config from blockscout_mcp_server.constants import SERVER_VERSION from blockscout_mcp_server.tools.common import get_blockscout_base_url DEFAULT_HEADERS = { "User-Agent": f"{config.mcp_user_agent}/{SERVER_VERSION} (+pool)", } class AsyncHTTPProviderBlockscout(AsyncHTTPProvider): """Custom provider with Blockscout-specific adaptations. ``web3.py``'s stock provider doesn't cooperate well with Blockscout's JSON-RPC implementation. Blockscout rejects requests with ``id=0`` and expects ``params`` to be JSON arrays. The standard provider also manages its own ``aiohttp`` sessions, which can reset request IDs when reused. This subclass keeps its own ``request_counter`` starting at ``1`` and overrides :meth:`make_request` to normalize parameters and inject the sequential ID. The :meth:`set_pooled_session` method allows an externally managed ``aiohttp.ClientSession`` to be reused for all requests, enabling connection pooling and fine-grained timeout control. """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # Start IDs at 1 because Blockscout rejects JSON-RPC requests with id=0 self.request_counter = count(1) # Will be populated by Web3Pool to enable connection reuse self.pooled_session: aiohttp.ClientSession | None = None def set_pooled_session(self, session: aiohttp.ClientSession) -> None: self.pooled_session = session async def _make_http_request(self, session: aiohttp.ClientSession, rpc_dict: dict[str, Any]) -> dict[str, Any]: """Perform the HTTP request using the given session. A dedicated helper lets us share the implementation between pooled and fallback sessions while keeping tight control over timeouts. """ headers = dict(self._request_kwargs.get("headers", {})) headers.setdefault("Content-Type", "application/json") headers.setdefault("Accept", "application/json") timeout = aiohttp.ClientTimeout(total=self._request_kwargs.get("timeout", config.rpc_request_timeout)) async with session.post( self.endpoint_uri, json=rpc_dict, headers=headers, timeout=timeout, ) as response: response.raise_for_status() return await response.json() async def make_request(self, method: str, params: Any) -> dict[str, Any]: # type: ignore[override] # Blockscout strictly requires ``params`` to be JSON arrays, so normalize # iterables or single values into a list. if not isinstance(params, list): if hasattr(params, "__iter__") and not isinstance( # noqa: UP038 params, (str, bytes, dict) ): params = list(params) else: params = [params] if params is not None else [] rpc_dict = { "jsonrpc": "2.0", "method": method, "params": params, "id": next(self.request_counter), } # Prefer the shared session for connection pooling. Fallback to a new # session only if the pooled one is unavailable. if self.pooled_session and not self.pooled_session.closed: return await self._make_http_request(self.pooled_session, rpc_dict) async with aiohttp.ClientSession() as session: return await self._make_http_request(session, rpc_dict) class Web3Pool: """Manage pooled ``AsyncWeb3`` instances with shared sessions. Each unique combination of chain and headers gets its own ``AsyncWeb3`` instance backed by a shared ``aiohttp.ClientSession``. Reusing these connections avoids the overhead of establishing new TCP handshakes for every contract call. """ def __init__(self) -> None: self._pool: dict[tuple[str, tuple[tuple[str, str], ...]], AsyncWeb3] = {} self._sessions: dict[tuple[str, tuple[tuple[str, str], ...]], aiohttp.ClientSession] = {} async def get(self, chain_id: str, headers: dict[str, str] | None = None) -> AsyncWeb3: combined_headers = dict(DEFAULT_HEADERS) if headers: combined_headers.update(headers) hdr_items = tuple(sorted(combined_headers.items())) key = (chain_id, hdr_items) if key in self._pool: return self._pool[key] base_url = await get_blockscout_base_url(chain_id) endpoint = f"{base_url}/api/eth-rpc" provider = AsyncHTTPProviderBlockscout( endpoint_uri=endpoint, request_kwargs={ "headers": dict(hdr_items), "timeout": config.rpc_request_timeout, }, ) w3 = AsyncWeb3(provider) session = aiohttp.ClientSession( # The connector speaks to a single host so ``limit`` matches ``limit_per_host``. connector=aiohttp.TCPConnector( limit=config.rpc_pool_per_host, limit_per_host=config.rpc_pool_per_host, ) ) provider.set_pooled_session(session) self._pool[key] = w3 self._sessions[key] = session return w3 async def close(self) -> None: for w3 in list(self._pool.values()): try: await w3.provider.disconnect() except Exception: pass for sess in list(self._sessions.values()): if not sess.closed: try: await sess.close() except Exception: pass self._pool.clear() self._sessions.clear() WEB3_POOL = Web3Pool()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/blockscout/mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server