import asyncio
import aiohttp
import json
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Remote FastMCP server URL
REMOTE_SERVER_URL = "https://chronic-scarlet-urial.fastmcp.app/mcp"
# Create server instance
server = Server("bhakti-server-proxy")
# Cache for remote server capabilities
_remote_tools_cache = None
async def fetch_remote_tools() -> list[Tool]:
"""Fetch available tools from the remote FastMCP server."""
global _remote_tools_cache
if _remote_tools_cache is not None:
return _remote_tools_cache
try:
async with aiohttp.ClientSession() as session:
async with session.post(
REMOTE_SERVER_URL,
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
},
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}
) as response:
if response.status == 200:
# Handle SSE response
content_type = response.headers.get('Content-Type', '')
if 'text/event-stream' in content_type:
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data: '):
json_str = line_str[6:]
data = json.loads(json_str)
break
else:
data = await response.json()
if "result" in data and "tools" in data["result"]:
tools = []
for tool_data in data["result"]["tools"]:
tools.append(Tool(
name=tool_data["name"],
description=tool_data.get("description", ""),
inputSchema=tool_data.get("inputSchema", {})
))
_remote_tools_cache = tools
return tools
except Exception as e:
print(f"Error fetching remote tools: {e}")
return []
async def call_remote_tool(name: str, arguments: dict) -> list[TextContent]:
"""Forward tool call to the remote FastMCP server."""
try:
async with aiohttp.ClientSession() as session:
async with session.post(
REMOTE_SERVER_URL,
json={
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": name,
"arguments": arguments
}
},
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}
) as response:
if response.status == 200:
# Handle SSE response
content_type = response.headers.get('Content-Type', '')
if 'text/event-stream' in content_type:
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data: '):
json_str = line_str[6:]
data = json.loads(json_str)
break
else:
data = await response.json()
if "result" in data and "content" in data["result"]:
content_list = []
for item in data["result"]["content"]:
if item.get("type") == "text":
content_list.append(TextContent(
type="text",
text=item.get("text", "")
))
return content_list
elif "error" in data:
error_msg = data["error"].get("message", "Unknown error")
return [TextContent(type="text", text=f"Error: {error_msg}")]
else:
error_text = await response.text()
return [TextContent(type="text", text=f"HTTP Error {response.status}: {error_text}")]
except Exception as e:
return [TextContent(type="text", text=f"Error calling remote tool: {str(e)}")]
return [TextContent(type="text", text="No response from remote server")]
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools from the remote server."""
return await fetch_remote_tools()
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Forward tool calls to the remote server."""
return await call_remote_tool(name, arguments)
async def main():
"""Run the MCP proxy server."""
print("Starting Bhakti Server Proxy...")
print(f"Proxying to: {REMOTE_SERVER_URL}")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())