MCP JinaAI Reader Server
by spences10
Verified
- mcp_deepseek_client
import subprocess
import os
from typing import Dict, Any
import signal
import atexit
import logging
import shlex
import shutil
import time
import asyncio
import threading
from queue import Queue, Empty
import json
logger = logging.getLogger(__name__)
def log_stream(stream, prefix: str, queue: Queue):
"""Log output from a stream."""
for line in iter(stream.readline, ''):
line = line.strip()
if line:
logger.info(f"{prefix}: {line}")
queue.put(line)
class ServerManager:
def __init__(self):
self.processes = {}
self.output_queues = {}
self.stdio_servers = set()
def _find_executable(self, command: str) -> str:
"""Find the correct executable path."""
if command == "uvx":
return shutil.which("uvicorn") or "uvicorn"
elif command == "npx":
return "/Users/freebeiro/.asdf/shims/npx"
return shutil.which(command) or command
def _is_stdio_server(self, name: str, config: Dict[str, Any]) -> bool:
"""Check if server uses stdio protocol."""
return any([
"@modelcontextprotocol/server-filesystem" in str(config),
"server-filesystem" in str(config)
])
async def _communicate_stdio(self, name: str, message: Dict[str, Any]) -> Dict[str, Any]:
"""Communicate with stdio-based server using JSON-RPC 2.0."""
process = self.processes[name]
try:
# Map tool alias to filesystem server expected tool name
tool_name = message.get("name", "list")
if tool_name == "list":
tool_name = "list_directory"
# Build the JSON-RPC request using tools/call method
request = {
"jsonrpc": "2.0",
"method": "tools/call", # Use tools/call method (per mcp-go and Neon blog)
"params": {
"name": tool_name, # Use mapped tool name here
"arguments": {
"path": message["content"] # For list_directory, pass the directory path using "path"
}
},
"id": 1
}
message_str = json.dumps(request) + "\n"
logger.debug(f"Sending to {name}: {message_str.strip()}")
# Clear any pending output
while not self.output_queues[name].empty():
self.output_queues[name].get_nowait()
# Send request to the server via its stdin
process.stdin.write(message_str)
process.stdin.flush()
# Wait for response with timeout
start_time = time.time()
response_str = None
while (time.time() - start_time) < 10: # 10-second timeout
try:
line = self.output_queues[name].get(timeout=1)
logger.debug(f"Got line: {line}")
try:
if line.startswith("{"):
response = json.loads(line)
if "result" in response or "error" in response:
response_str = line
break
except json.JSONDecodeError:
continue
except Empty:
await asyncio.sleep(0.1)
continue
if not response_str:
raise TimeoutError("No response from server")
logger.debug(f"Received from {name}: {response_str.strip()}")
response = json.loads(response_str)
if "error" in response:
logger.error(f"Server error: {response['error']}")
raise Exception(f"Server error: {response['error']}")
# Format the result as a message: if content is array, join text parts
result = response.get("result", {})
if isinstance(result, dict) and "content" in result:
content = result["content"]
if isinstance(content, list):
text_parts = [item["text"] for item in content if item.get("type") == "text"]
return {
"content": "\n".join(text_parts),
"role": "assistant"
}
return {
"content": str(result),
"role": "assistant"
}
except Exception as e:
logger.error(f"Error communicating with {name}: {e}")
raise
async def _wait_for_server(self, name: str, port: str = "8080", max_retries: int = 30):
"""Wait for server to become available."""
if name in self.stdio_servers:
process = self.processes[name]
if process.poll() is None:
logger.info(f"Stdio server {name} is ready")
return True
return False
import httpx
url = f"http://localhost:{port}/healthz"
for i in range(max_retries):
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=1.0)
if response.status_code == 200:
logger.info(f"Server {name} is ready")
return True
except:
await asyncio.sleep(1)
logger.info(f"Waiting for {name} to start (attempt {i+1}/{max_retries})")
try:
while True:
line = self.output_queues[name].get_nowait()
if "Error:" in line or "error:" in line.lower():
logger.error(f"Server {name} error: {line}")
return False
except Empty:
pass
return False
def start_server(self, name: str, config: Dict[str, Any]):
"""Start a server using its configuration."""
try:
env = os.environ.copy()
env.update(config.get("env", {}))
port = env.get("MCP_PORT", "8080")
command = self._find_executable(config["command"])
args = config.get("args", [])
if self._is_stdio_server(name, config):
self.stdio_servers.add(name)
command_list = [command] + args
command_str = ' '.join(shlex.quote(str(x)) for x in command_list)
logger.info(f"Starting {name} with command: {command_str}")
self.output_queues[name] = Queue()
process = subprocess.Popen(
command_list,
env=env,
stdin=subprocess.PIPE if name in self.stdio_servers else None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True,
text=True,
bufsize=1,
universal_newlines=True
)
threading.Thread(target=log_stream,
args=(process.stdout, f"{name} stdout", self.output_queues[name]),
daemon=True).start()
threading.Thread(target=log_stream,
args=(process.stderr, f"{name} stderr", self.output_queues[name]),
daemon=True).start()
time.sleep(1)
returncode = process.poll()
if returncode is not None:
logger.error(f"Server {name} failed to start. Exit code: {returncode}")
return False
self.processes[name] = process
if not asyncio.run(self._wait_for_server(name, port)):
logger.error(f"Server {name} failed to start")
self.stop_server(name)
return False
return True
except Exception as e:
logger.error(f"Failed to start {name}: {e}", exc_info=True)
return False
def stop_server(self, name: str):
"""Stop a running server."""
if name in self.processes:
try:
process = self.processes[name]
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
process.wait(timeout=5)
del self.processes[name]
if name in self.stdio_servers:
self.stdio_servers.remove(name)
except:
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except:
pass
def stop_all(self):
"""Stop all running servers."""
for name in list(self.processes.keys()):
self.stop_server(name)
def __del__(self):
self.stop_all()