MCP JinaAI Reader Server

by spences10
Verified
  • mcp_deepseek_client
import httpx import json import os from typing import Dict, Any, List import asyncio from pathlib import Path import logging from .server_manager import ServerManager import atexit logger = logging.getLogger(__name__) class MCPClient: """Client for interacting with existing MCP servers.""" def __init__(self, config_path: str = None): self.config = self.load_config(config_path) self.servers = {} self.server_manager = ServerManager() self.init_servers() atexit.register(self.cleanup) def load_config(self, config_path: str = None) -> Dict[str, Any]: """Load MCP configuration.""" if not config_path: # Try local test config first test_config = Path("test_config.json") if test_config.exists(): logger.info(f"Loading test config from {test_config}") return json.loads(test_config.read_text()) # Then try Claude config claude_config = Path.home() / "Library/Application Support/Claude/claude_desktop_config.json" if claude_config.exists(): logger.info(f"Loading Claude config from {claude_config}") return json.loads(claude_config.read_text()) else: config_path = Path(config_path) if config_path.exists(): logger.info(f"Loading config from {config_path}") return json.loads(config_path.read_text()) logger.warning("No config found, using empty config") return {"mcpServers": {}} def init_servers(self): """Initialize and start server configurations.""" for name, config in self.config.get("mcpServers", {}).items(): logger.info(f"Found server: {name}") port = config.get("env", {}).get("MCP_PORT", "8080") self.servers[name] = { "url": f"http://localhost:{port}", "config": config, "is_stdio": self.server_manager._is_stdio_server(name, config) } # Start the server self.server_manager.start_server(name, config) async def check_server(self, server: str) -> bool: """Check if server is running.""" server_config = self.servers[server] try: async with httpx.AsyncClient() as client: response = await client.get( f"{server_config['url']}/healthz", timeout=5.0 ) return response.status_code == 200 except: return False async def send_message(self, server: str, message: str, tool: str = None) -> str: """Send message to specified MCP server.""" if server not in self.servers: raise ValueError(f"Unknown server: {server}") server_config = self.servers[server] # Add debug logging for tool permissions if tool: auto_approve = server_config["config"].get("autoApprove", []) logger.debug(f"Tool: {tool}") logger.debug(f"Auto-approve list: {auto_approve}") if tool not in auto_approve: logger.error(f"Tool '{tool}' not in auto-approve list: {auto_approve}") raise ValueError(f"Tool '{tool}' not approved for server '{server}'") # Prepare message object message_obj = { "role": "user", "content": message, "name": tool } # Handle stdio servers differently if server_config["is_stdio"]: logger.info(f"Sending stdio message to {server}") response = await self.server_manager._communicate_stdio(server, message_obj) return response.get("content", "") # HTTP servers logger.info(f"Sending HTTP message to {server} at {server_config['url']}") async with httpx.AsyncClient() as client: try: response = await client.post( f"{server_config['url']}/v1/messages", json=message_obj, headers={"X-MCP-Version": "1.0"}, timeout=30.0 ) response.raise_for_status() result = response.json() return result.get("content", "") except httpx.TimeoutError: logger.error(f"Timeout connecting to {server}") raise ConnectionError(f"Server {server} timed out") except httpx.HTTPError as e: logger.error(f"Error communicating with {server}: {e}") raise def list_servers(self) -> List[str]: """List available MCP servers.""" servers = list(self.servers.keys()) logger.info(f"Available servers: {servers}") return servers def get_server_config(self, server: str) -> Dict[str, Any]: """Get server configuration.""" if server not in self.servers: raise ValueError(f"Unknown server: {server}") return self.servers[server]["config"] def cleanup(self): """Stop all servers on exit.""" self.server_manager.stop_all()