MCP JinaAI Reader Server
by spences10
Verified
- mcp_deepseek_client
import httpx
import json
import os
from typing import Dict, Any, List
import asyncio
from pathlib import Path
import logging
from .server_manager import ServerManager
import atexit
logger = logging.getLogger(__name__)
class MCPClient:
"""Client for interacting with existing MCP servers."""
def __init__(self, config_path: str = None):
self.config = self.load_config(config_path)
self.servers = {}
self.server_manager = ServerManager()
self.init_servers()
atexit.register(self.cleanup)
def load_config(self, config_path: str = None) -> Dict[str, Any]:
"""Load MCP configuration."""
if not config_path:
# Try local test config first
test_config = Path("test_config.json")
if test_config.exists():
logger.info(f"Loading test config from {test_config}")
return json.loads(test_config.read_text())
# Then try Claude config
claude_config = Path.home() / "Library/Application Support/Claude/claude_desktop_config.json"
if claude_config.exists():
logger.info(f"Loading Claude config from {claude_config}")
return json.loads(claude_config.read_text())
else:
config_path = Path(config_path)
if config_path.exists():
logger.info(f"Loading config from {config_path}")
return json.loads(config_path.read_text())
logger.warning("No config found, using empty config")
return {"mcpServers": {}}
def init_servers(self):
"""Initialize and start server configurations."""
for name, config in self.config.get("mcpServers", {}).items():
logger.info(f"Found server: {name}")
port = config.get("env", {}).get("MCP_PORT", "8080")
self.servers[name] = {
"url": f"http://localhost:{port}",
"config": config,
"is_stdio": self.server_manager._is_stdio_server(name, config)
}
# Start the server
self.server_manager.start_server(name, config)
async def check_server(self, server: str) -> bool:
"""Check if server is running."""
server_config = self.servers[server]
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{server_config['url']}/healthz",
timeout=5.0
)
return response.status_code == 200
except:
return False
async def send_message(self, server: str, message: str, tool: str = None) -> str:
"""Send message to specified MCP server."""
if server not in self.servers:
raise ValueError(f"Unknown server: {server}")
server_config = self.servers[server]
# Add debug logging for tool permissions
if tool:
auto_approve = server_config["config"].get("autoApprove", [])
logger.debug(f"Tool: {tool}")
logger.debug(f"Auto-approve list: {auto_approve}")
if tool not in auto_approve:
logger.error(f"Tool '{tool}' not in auto-approve list: {auto_approve}")
raise ValueError(f"Tool '{tool}' not approved for server '{server}'")
# Prepare message object
message_obj = {
"role": "user",
"content": message,
"name": tool
}
# Handle stdio servers differently
if server_config["is_stdio"]:
logger.info(f"Sending stdio message to {server}")
response = await self.server_manager._communicate_stdio(server, message_obj)
return response.get("content", "")
# HTTP servers
logger.info(f"Sending HTTP message to {server} at {server_config['url']}")
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{server_config['url']}/v1/messages",
json=message_obj,
headers={"X-MCP-Version": "1.0"},
timeout=30.0
)
response.raise_for_status()
result = response.json()
return result.get("content", "")
except httpx.TimeoutError:
logger.error(f"Timeout connecting to {server}")
raise ConnectionError(f"Server {server} timed out")
except httpx.HTTPError as e:
logger.error(f"Error communicating with {server}: {e}")
raise
def list_servers(self) -> List[str]:
"""List available MCP servers."""
servers = list(self.servers.keys())
logger.info(f"Available servers: {servers}")
return servers
def get_server_config(self, server: str) -> Dict[str, Any]:
"""Get server configuration."""
if server not in self.servers:
raise ValueError(f"Unknown server: {server}")
return self.servers[server]["config"]
def cleanup(self):
"""Stop all servers on exit."""
self.server_manager.stop_all()