mcp_client.py•3.35 kB
"""MCP client for testing."""
import json
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
SSE_EVENT_PREFIX = "event:"
SSE_DATA_PREFIX = "data: "
DATA_PREFIX_LENGTH = 6
HTTP_TIMEOUT_SECONDS = 30
class MCPClientError(Exception):
"""Base exception for MCP client errors."""
class MCPClient:
"""Client for making MCP JSON-RPC requests."""
def __init__(self, url: str):
self.url = url
def request(
self,
method: str,
params: dict[str, Any] | None = None,
request_id: int = 1
) -> dict[str, Any]:
"""Send an MCP JSON-RPC request.
Args:
method: JSON-RPC method name
params: Optional method parameters
request_id: Request identifier
Returns:
JSON-RPC response as dictionary
Raises:
MCPClientError: On HTTP or parsing errors
"""
payload = self._build_payload(method, params, request_id)
http_request = self._create_http_request(payload)
try:
response_data = self._send_request(http_request)
return self._parse_response(response_data)
except HTTPError as e:
error_body = e.read().decode("utf-8")
raise MCPClientError(f"HTTP Error {e.code}: {error_body}")
except URLError as e:
raise MCPClientError(f"URL Error: {e.reason}")
def _build_payload(
self,
method: str,
params: dict[str, Any] | None,
request_id: int
) -> bytes:
"""Build JSON-RPC request payload."""
payload = {
"jsonrpc": "2.0",
"method": method,
"id": request_id,
}
if params:
payload["params"] = params
return json.dumps(payload).encode("utf-8")
def _create_http_request(self, payload: bytes) -> Request:
"""Create HTTP request with proper headers."""
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
}
return Request(
self.url,
data=payload,
headers=headers,
method="POST"
)
def _send_request(self, http_request: Request) -> str:
"""Send HTTP request and return response body."""
with urlopen(http_request, timeout=HTTP_TIMEOUT_SECONDS) as response:
return response.read().decode("utf-8")
def _parse_response(self, response_data: str) -> dict[str, Any]:
"""Parse response, handling both SSE and JSON formats."""
if self._is_sse_response(response_data):
return self._parse_sse_response(response_data)
return json.loads(response_data)
def _is_sse_response(self, response_data: str) -> bool:
"""Check if response is in SSE format."""
return response_data.startswith(SSE_EVENT_PREFIX)
def _parse_sse_response(self, response_data: str) -> dict[str, Any]:
"""Extract JSON data from SSE response."""
for line in response_data.split('\n'):
if line.startswith(SSE_DATA_PREFIX):
json_data = line[DATA_PREFIX_LENGTH:]
return json.loads(json_data)
raise MCPClientError("No data line found in SSE response")