"""
MCP Client Service - Proxy for MCP server tool calls
Handles communication with various MCP servers (Unity3D, VRChat, Avatar, OSC, etc.)
"""
import asyncio
import logging
import os
from datetime import datetime
from typing import Any
import aiohttp
logger = logging.getLogger(__name__)
# MCP Server configurations
MCP_SERVERS = {
"unity3d": {"base_url": os.getenv("UNITY3D_MCP_URL", "http://localhost:8001"), "enabled": True},
"vrchat": {"base_url": os.getenv("VRCHAT_MCP_URL", "http://localhost:8002"), "enabled": True},
"avatar": {"base_url": os.getenv("AVATAR_MCP_URL", "http://localhost:8003"), "enabled": True},
"osc": {"base_url": os.getenv("OSC_MCP_URL", "http://localhost:8004"), "enabled": True},
"vroid": {"base_url": os.getenv("VROID_MCP_URL", "http://localhost:8005"), "enabled": True},
"resonite": {
"base_url": os.getenv("RESONITE_MCP_URL", "http://localhost:8006"),
"enabled": True,
},
"local_llm": {
"base_url": os.getenv("LOCAL_LLM_MCP_URL", "http://localhost:8007"),
"enabled": True,
},
"robotics": {
"base_url": os.getenv("ROBOTICS_MCP_URL", "http://localhost:12230"),
"enabled": True,
},
}
class MCPClient:
"""Client for communicating with MCP servers via HTTP"""
def __init__(self):
self.servers = MCP_SERVERS
self.session: aiohttp.ClientSession | None = None
self.health_status: dict[str, bool] = {}
async def initialize(self):
"""Initialize HTTP session"""
if not self.session:
timeout = aiohttp.ClientTimeout(total=30)
self.session = aiohttp.ClientSession(timeout=timeout)
await self.check_health_all()
async def close(self):
"""Close HTTP session"""
if self.session:
await self.session.close()
self.session = None
async def check_health(self, server_name: str) -> bool:
"""Check if an MCP server is healthy"""
if server_name not in self.servers:
return False
server_config = self.servers[server_name]
if not server_config["enabled"]:
return False
try:
if not self.session:
await self.initialize()
async with self.session.get(f"{server_config['base_url']}/health") as resp:
is_healthy = resp.status == 200
self.health_status[server_name] = is_healthy
return is_healthy
except Exception as e:
logger.warning(f"Health check failed for {server_name}: {e}")
self.health_status[server_name] = False
return False
async def check_health_all(self):
"""Check health of all MCP servers"""
tasks = [self.check_health(name) for name in self.servers.keys()]
await asyncio.gather(*tasks, return_exceptions=True)
async def call_tool(
self, server_name: str, tool_name: str, arguments: dict[str, Any] | None = None
) -> dict[str, Any]:
"""
Call an MCP tool on a specific server
Args:
server_name: Name of the MCP server (unity3d, vrchat, avatar, etc.)
tool_name: Name of the tool to call
arguments: Tool arguments
Returns:
Tool response dictionary
"""
if server_name not in self.servers:
raise ValueError(f"Unknown MCP server: {server_name}")
server_config = self.servers[server_name]
if not server_config["enabled"]:
raise ValueError(f"MCP server {server_name} is disabled")
# Check health first
if not await self.check_health(server_name):
raise ConnectionError(f"MCP server {server_name} is not available")
if not self.session:
await self.initialize()
try:
# MCP servers typically expose tools via POST /api/v1/tools/{tool_name}
url = f"{server_config['base_url']}/api/v1/tools/{tool_name}"
payload = {"arguments": arguments or {}}
async with self.session.post(url, json=payload) as resp:
if resp.status == 200:
result = await resp.json()
return {
"success": True,
"data": result,
"server": server_name,
"tool": tool_name,
"timestamp": datetime.now().isoformat(),
}
else:
error_text = await resp.text()
return {
"success": False,
"error": f"HTTP {resp.status}: {error_text}",
"server": server_name,
"tool": tool_name,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Error calling tool {tool_name} on {server_name}: {e}")
return {
"success": False,
"error": str(e),
"server": server_name,
"tool": tool_name,
"timestamp": datetime.now().isoformat(),
}
async def list_tools(self, server_name: str) -> list[dict[str, Any]]:
"""List available tools on an MCP server"""
if server_name not in self.servers:
return []
server_config = self.servers[server_name]
if not server_config["enabled"]:
return []
if not await self.check_health(server_name):
return []
if not self.session:
await self.initialize()
try:
# Try to get tools list from OpenAPI schema or tools endpoint
url = f"{server_config['base_url']}/api/v1/tools"
async with self.session.get(url) as resp:
if resp.status == 200:
return await resp.json()
else:
# Fallback: try OpenAPI schema
schema_url = f"{server_config['base_url']}/api/docs"
async with self.session.get(schema_url) as schema_resp:
if schema_resp.status == 200:
# Parse OpenAPI schema for tools
return []
return []
except Exception as e:
logger.error(f"Error listing tools for {server_name}: {e}")
return []
def get_server_status(self) -> dict[str, dict[str, Any]]:
"""Get status of all MCP servers"""
return {
name: {
"enabled": config["enabled"],
"base_url": config["base_url"],
"healthy": self.health_status.get(name, False),
}
for name, config in self.servers.items()
}
# Global MCP client instance
mcp_client = MCPClient()