MCP JinaAI Reader Server

by spences10
Verified
  • mcp_deepseek_client
import subprocess import os from typing import Dict, Any import signal import atexit import logging import shlex import shutil import time import asyncio import threading from queue import Queue, Empty import json logger = logging.getLogger(__name__) def log_stream(stream, prefix: str, queue: Queue): """Log output from a stream.""" for line in iter(stream.readline, ''): line = line.strip() if line: logger.info(f"{prefix}: {line}") queue.put(line) class ServerManager: def __init__(self): self.processes = {} self.output_queues = {} self.stdio_servers = set() def _find_executable(self, command: str) -> str: """Find the correct executable path.""" if command == "uvx": return shutil.which("uvicorn") or "uvicorn" elif command == "npx": return "/Users/freebeiro/.asdf/shims/npx" return shutil.which(command) or command def _is_stdio_server(self, name: str, config: Dict[str, Any]) -> bool: """Check if server uses stdio protocol.""" return any([ "@modelcontextprotocol/server-filesystem" in str(config), "server-filesystem" in str(config) ]) async def _communicate_stdio(self, name: str, message: Dict[str, Any]) -> Dict[str, Any]: """Communicate with stdio-based server using JSON-RPC 2.0.""" process = self.processes[name] try: # Map tool alias to filesystem server expected tool name tool_name = message.get("name", "list") if tool_name == "list": tool_name = "list_directory" # Build the JSON-RPC request using tools/call method request = { "jsonrpc": "2.0", "method": "tools/call", # Use tools/call method (per mcp-go and Neon blog) "params": { "name": tool_name, # Use mapped tool name here "arguments": { "path": message["content"] # For list_directory, pass the directory path using "path" } }, "id": 1 } message_str = json.dumps(request) + "\n" logger.debug(f"Sending to {name}: {message_str.strip()}") # Clear any pending output while not self.output_queues[name].empty(): self.output_queues[name].get_nowait() # Send request to the server via its stdin process.stdin.write(message_str) process.stdin.flush() # Wait for response with timeout start_time = time.time() response_str = None while (time.time() - start_time) < 10: # 10-second timeout try: line = self.output_queues[name].get(timeout=1) logger.debug(f"Got line: {line}") try: if line.startswith("{"): response = json.loads(line) if "result" in response or "error" in response: response_str = line break except json.JSONDecodeError: continue except Empty: await asyncio.sleep(0.1) continue if not response_str: raise TimeoutError("No response from server") logger.debug(f"Received from {name}: {response_str.strip()}") response = json.loads(response_str) if "error" in response: logger.error(f"Server error: {response['error']}") raise Exception(f"Server error: {response['error']}") # Format the result as a message: if content is array, join text parts result = response.get("result", {}) if isinstance(result, dict) and "content" in result: content = result["content"] if isinstance(content, list): text_parts = [item["text"] for item in content if item.get("type") == "text"] return { "content": "\n".join(text_parts), "role": "assistant" } return { "content": str(result), "role": "assistant" } except Exception as e: logger.error(f"Error communicating with {name}: {e}") raise async def _wait_for_server(self, name: str, port: str = "8080", max_retries: int = 30): """Wait for server to become available.""" if name in self.stdio_servers: process = self.processes[name] if process.poll() is None: logger.info(f"Stdio server {name} is ready") return True return False import httpx url = f"http://localhost:{port}/healthz" for i in range(max_retries): try: async with httpx.AsyncClient() as client: response = await client.get(url, timeout=1.0) if response.status_code == 200: logger.info(f"Server {name} is ready") return True except: await asyncio.sleep(1) logger.info(f"Waiting for {name} to start (attempt {i+1}/{max_retries})") try: while True: line = self.output_queues[name].get_nowait() if "Error:" in line or "error:" in line.lower(): logger.error(f"Server {name} error: {line}") return False except Empty: pass return False def start_server(self, name: str, config: Dict[str, Any]): """Start a server using its configuration.""" try: env = os.environ.copy() env.update(config.get("env", {})) port = env.get("MCP_PORT", "8080") command = self._find_executable(config["command"]) args = config.get("args", []) if self._is_stdio_server(name, config): self.stdio_servers.add(name) command_list = [command] + args command_str = ' '.join(shlex.quote(str(x)) for x in command_list) logger.info(f"Starting {name} with command: {command_str}") self.output_queues[name] = Queue() process = subprocess.Popen( command_list, env=env, stdin=subprocess.PIPE if name in self.stdio_servers else None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True, text=True, bufsize=1, universal_newlines=True ) threading.Thread(target=log_stream, args=(process.stdout, f"{name} stdout", self.output_queues[name]), daemon=True).start() threading.Thread(target=log_stream, args=(process.stderr, f"{name} stderr", self.output_queues[name]), daemon=True).start() time.sleep(1) returncode = process.poll() if returncode is not None: logger.error(f"Server {name} failed to start. Exit code: {returncode}") return False self.processes[name] = process if not asyncio.run(self._wait_for_server(name, port)): logger.error(f"Server {name} failed to start") self.stop_server(name) return False return True except Exception as e: logger.error(f"Failed to start {name}: {e}", exc_info=True) return False def stop_server(self, name: str): """Stop a running server.""" if name in self.processes: try: process = self.processes[name] os.killpg(os.getpgid(process.pid), signal.SIGTERM) process.wait(timeout=5) del self.processes[name] if name in self.stdio_servers: self.stdio_servers.remove(name) except: try: os.killpg(os.getpgid(process.pid), signal.SIGKILL) except: pass def stop_all(self): """Stop all running servers.""" for name in list(self.processes.keys()): self.stop_server(name) def __del__(self): self.stop_all()