Skip to main content
Glama

Feifei Proxy MCP

by chenm1xuexi
10
  • Apple
  • Linux
server.py9.27 kB
import json import os import traceback import anyio from mcp import types from mcp.server import Server from .enums import McpTransportType from .logger import McpLogger from .mcp_client_manager import McpClientManager from .mcp_exception import McpException logger = McpLogger.get_logger() # 代理的mcp传输类型 默认为 streamable_http transport_type: str = McpTransportType.STREAMABLE_HTTP.value # 需要进行代理的 mcp server 配置 proxy_mcp_server_config: dict = {} # 代理的mcp名称 proxy_mcp_name: str = "" # 代理的mcp server 实例 proxy_mcp_server: Server # 定义代理版本 mcp_proxy_version: str = "0.0.1" # mcp 客户端管理服务 McpClientManager 对应着连接/会话/工具调用等服务 mcp_client_manager: McpClientManager = None def startup(): """ web应用启动入口 """ global transport_type, proxy_mcp_name, proxy_mcp_server_config, proxy_mcp_server logger.info("====== Start mcp proxy server application begin =======") # 1. 获取环境变量相关信息 proxy_mcp_name = os.getenv("PROXY_MCP_NAME", "") transport_type = os.getenv("TRANSPORT_TYPE", McpTransportType.STREAMABLE_HTTP.value) proxy_mcp_server_config_str = os.getenv("PROXY_MCP_SERVER_CONFIG", "") if proxy_mcp_server_config_str: proxy_mcp_server_config = json.loads(proxy_mcp_server_config_str) # TODO mcp配置 参数 合法性校验,确保mcp配置参数正确 后续完善 else: raise ValueError("PROXY_MCP_SERVER_CONFIG is empty, please check!") # 初始化代理mcp server实例 proxy_mcp_server = Server("feifei-proxy-mcp") logger.info(f"init proxy server," f"transport_type: {transport_type}, " f"proxy_mcp_name: {proxy_mcp_name}, " f"proxy_mcp_server_config: {proxy_mcp_server_config}, " f"version: {mcp_proxy_version}") # 构建代理mcp server create_proxy_mcp_server() # 启动代理mcp server start_proxy_mcp_server() logger.info("====== Start mcp proxy server application end =======") def create_proxy_mcp_server(): """ 创建 代理mcp server 服务, 目前仅提供 tool能力,prompt, resource暂不进行支持 """ @proxy_mcp_server.call_tool() async def call_tool(name: str, arguments: dict ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ 工具调用 """ logger.info(f"calling tool: {name}, arguments: {arguments}") if mcp_client_manager: if not await init_proxy_mcp(): raise NameError(f"failed to init proxy mcp: {proxy_mcp_name}") result = await mcp_client_manager.execute_tool(tool_name=name, arguments=arguments) return result.content @proxy_mcp_server.list_tools() async def list_tools() -> list[types.Tool]: """ 返回所有可用工具 """ return await proxy_mcp_tools() return proxy_mcp_server def start_proxy_mcp_server(): """ 启动代理mcp服务器 """ match transport_type: case McpTransportType.STDIO.value: # 本地进程mcp server from mcp.server.stdio import stdio_server async def arun(): async with stdio_server() as streams: await proxy_mcp_server.run( streams[0], streams[1], proxy_mcp_server.create_initialization_options() ) # 异步调度 anyio.run(arun) logger.info("======= STDIO Proxy mcp Application Started ======") case McpTransportType.SSE.value: # sse mcp server from mcp.server.sse import SseServerTransport from contextlib import asynccontextmanager from collections.abc import AsyncIterator from starlette.routing import Route, Mount from fastapi import FastAPI sse_transport = SseServerTransport("/messages/") async def handle_sse(request): async with sse_transport.connect_sse(request.scope, request.receive, request._send) as streams: await proxy_mcp_server.run(streams[0], streams[1], proxy_mcp_server.create_initialization_options()) @asynccontextmanager async def sse_lifespan(app: FastAPI) -> AsyncIterator[None]: """ 上下文会话管理 """ try: if not await init_proxy_mcp(): raise McpException("failed to init mcp server") yield await mcp_client_manager.cleanup() finally: logger.info("Application shutting down...") # 构建fastapi web程序 sse_app = FastAPI( debug=True, routes=[ Route("/sse", endpoint=handle_sse, methods=["GET"]), Mount("/messages/", app=sse_transport.handle_post_message), ], lifespan=sse_lifespan, ) import uvicorn uvicorn.run(sse_app, host="0.0.0.0", port=int(os.getenv("PROXY_MCP_PORT", "8000"))) logger.info("======= SSE Proxy mcp Application Started ======") return case McpTransportType.STREAMABLE_HTTP.value: from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from starlette.types import Scope from starlette.types import Receive from starlette.types import Send from mcp.server.sse import SseServerTransport from fastapi import FastAPI from starlette.routing import Mount from contextlib import asynccontextmanager from collections.abc import AsyncIterator session_manager = StreamableHTTPSessionManager( app=proxy_mcp_server, event_store=None, json_response=False, stateless=True, # 设置为无状态 ) sse_transport = SseServerTransport("/messages/") async def handle_streamable_http( scope: Scope, receive: Receive, send: Send ) -> None: await session_manager.handle_request(scope, receive, send) @asynccontextmanager async def streamable_lifespan(app: FastAPI) -> AsyncIterator[None]: """ 上下文会话管理 """ logger.info("Starting session manager...") async with session_manager.run(): logger.info("Session manager started successfully") try: if not await init_proxy_mcp(): raise McpException("failed to init_proxy_mcp") yield except Exception as e: logger.error(f"Error during MCP initialization: {e}") logger.error(f"Traceback: {traceback.format_exc()}") raise McpException(f"failed to init_proxy_mcp: {e}") finally: logger.info("Application shutting down...") await mcp_client_manager.cleanup() # 启动web应用 streamable_app = FastAPI( debug=True, routes=[ Mount("/mcp", app=handle_streamable_http), Mount("/messages/", app=sse_transport.handle_post_message), # 兼容sse模式 ], lifespan=streamable_lifespan, ) import uvicorn uvicorn.run(streamable_app, host="0.0.0.0", port=int(os.getenv("PROXY_MCP_PORT", "8000"))) case _: raise ValueError("Invalid MCP_TRANSPORT_TYPE") async def init_proxy_mcp() -> bool: global proxy_mcp_server_config, mcp_client_manager # 如果代理mcp已经在运行中,则不需要再次构建连接 if mcp_client_manager: return True # 创建mcp客户端连接 tmp_mcp_client_manager = McpClientManager(mcp_server_config=proxy_mcp_server_config) # 等待 mcp客户端连接 + 初始化完成 await tmp_mcp_client_manager.wait_for_initialization() # 健康检查 if await tmp_mcp_client_manager.healthy(): mcp_client_manager = tmp_mcp_client_manager # 获取mcp server相关服务版本信息 init_result = mcp_client_manager.get_initialized_response() version = getattr(getattr(init_result, 'serverInfo', None), 'version', "1.0.0") proxy_mcp_server.version = version return True else: return False async def proxy_mcp_tools() -> list[types.Tool]: if await init_proxy_mcp(): try: return await mcp_client_manager.list_tools() except (KeyError, Exception) as e: logger.warning("failed to list tools for proxy mcp server: " + proxy_mcp_name, exc_info=e) return [] else: raise McpException(msg=f"failed to initialize proxy MCP server {proxy_mcp_name}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chenm1xuexi/feifei-proxy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server