Skip to main content
Glama
mcp_client.py5.9 kB
""" MCP Client for OCR-MCP WebApp Communicates with the OCR-MCP server via stdio protocol """ import asyncio import json import sys import subprocess import threading from typing import Dict, Any, Optional import logging from pathlib import Path logger = logging.getLogger(__name__) class MCPClient: """Client for communicating with OCR-MCP server""" def __init__(self): self.process: Optional[subprocess.Popen] = None self.connected = False self._response_futures: Dict[str, asyncio.Future] = {} self._next_id = 1 async def initialize(self): """Initialize connection to MCP server""" try: # Start the MCP server process server_path = Path(__file__).parent.parent / "src" / "ocr_mcp" / "__main__.py" python_path = sys.executable env = { **os.environ, "PYTHONPATH": str(Path(__file__).parent.parent / "src"), "PYTHONUNBUFFERED": "1" } self.process = subprocess.Popen( [python_path, str(server_path)], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env ) # Start reading responses in background threading.Thread(target=self._read_responses, daemon=True).start() # Wait for server to be ready await asyncio.sleep(2) # Test connection await self._initialize_connection() self.connected = True logger.info("MCP client connected successfully") except Exception as e: logger.error(f"Failed to initialize MCP client: {e}") raise async def _initialize_connection(self): """Initialize MCP protocol connection""" # Send initialize request result = await self._send_request("initialize", { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "ocr-mcp-webapp", "version": "0.1.0" } }) # Send initialized notification await self._send_notification("notifications/initialized", {}) async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Call an MCP tool""" if not self.connected: raise Exception("MCP client not connected") return await self._send_request("tools/call", { "name": tool_name, "arguments": arguments }) async def _send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: """Send a JSON-RPC request""" if not self.process or not self.process.stdin: raise Exception("MCP process not available") request_id = self._next_id self._next_id += 1 request = { "jsonrpc": "2.0", "id": request_id, "method": method, "params": params } # Create future for response future = asyncio.Future() self._response_futures[str(request_id)] = future # Send request request_json = json.dumps(request) + "\n" self.process.stdin.write(request_json) self.process.stdin.flush() # Wait for response try: result = await asyncio.wait_for(future, timeout=300.0) # 5 minute timeout return result except asyncio.TimeoutError: raise Exception(f"Request timeout for method {method}") async def _send_notification(self, method: str, params: Dict[str, Any]): """Send a JSON-RPC notification""" if not self.process or not self.process.stdin: return notification = { "jsonrpc": "2.0", "method": method, "params": params } notification_json = json.dumps(notification) + "\n" self.process.stdin.write(notification_json) self.process.stdin.flush() def _read_responses(self): """Read responses from MCP server in background thread""" if not self.process or not self.process.stdout: return try: for line in iter(self.process.stdout.readline, ''): if line.strip(): try: response = json.loads(line.strip()) # Handle response if "id" in response: request_id = str(response["id"]) if request_id in self._response_futures: future = self._response_futures.pop(request_id) if "result" in response: future.set_result(response["result"]) elif "error" in response: future.set_exception(Exception(response["error"].get("message", "Unknown error"))) else: future.set_exception(Exception("Invalid response format")) except json.JSONDecodeError as e: logger.error(f"Failed to parse MCP response: {e}") except Exception as e: logger.error(f"Error processing MCP response: {e}") except Exception as e: logger.error(f"Error reading MCP responses: {e}") async def cleanup(self): """Cleanup resources""" if self.process: try: self.process.terminate() await asyncio.sleep(0.1) if self.process.poll() is None: self.process.kill() except: pass self.connected = False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/ocr-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server