Skip to main content
Glama

Mingli MCP Server

by spyfree
MIT License
  • Apple
  • Linux
http_transport.py4.81 kB
""" HTTP传输层实现 基于FastAPI实现HTTP方式的MCP服务 适用于远程调用、云端部署等场景 """ import logging from typing import Dict, Any, Callable, Optional from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware import uvicorn from .base_transport import BaseTransport logger = logging.getLogger(__name__) class HttpTransport(BaseTransport): """HTTP传输实现""" def __init__(self, host: str = "0.0.0.0", port: int = 8080, api_key: Optional[str] = None): """ 初始化HTTP传输 Args: host: 监听地址 port: 监听端口 api_key: API密钥(可选,用于认证) """ self.host = host self.port = port self.api_key = api_key self.app = FastAPI( title="Mingli MCP Server", description="命理MCP服务 - HTTP API", version="1.0.0" ) # 添加CORS支持 self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) self._setup_routes() logger.info(f"HTTP transport initialized on {host}:{port}") def _setup_routes(self): """设置路由""" @self.app.get("/") async def root(): """根路径""" return { "name": "Mingli MCP Server", "version": "1.0.0", "protocol": "MCP", "transport": "HTTP", "endpoints": { "mcp": "/mcp", "health": "/health", "docs": "/docs" } } @self.app.get("/health") async def health(): """健康检查""" return { "status": "healthy", "transport": "http", "systems": ["ziwei", "bazi"] } @self.app.post("/mcp") async def handle_mcp(request: Request): """处理MCP请求""" # API密钥验证(如果配置了) if self.api_key: auth_header = request.headers.get("Authorization") if not auth_header or auth_header != f"Bearer {self.api_key}": raise HTTPException(status_code=401, detail="Invalid API key") try: # 获取请求数据 data = await request.json() logger.debug(f"Received MCP request: {data.get('method')}") # 调用消息处理器 if not self.message_handler: raise HTTPException( status_code=500, detail="Message handler not set" ) response = self.message_handler(data) # 对于通知消息(无需响应),返回空 if response is None: return JSONResponse(content={}, status_code=204) return JSONResponse(content=response) except Exception as e: logger.exception("Error handling MCP request") return JSONResponse( content={ "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Internal error: {str(e)}" }, "id": data.get("id") }, status_code=500 ) def start(self): """启动HTTP服务器""" logger.info(f"Starting HTTP server on {self.host}:{self.port}") uvicorn.run( self.app, host=self.host, port=self.port, log_level="info" ) def stop(self): """停止HTTP服务器""" logger.info("HTTP server stopped") def send_message(self, message: Dict[str, Any]): """ HTTP模式不需要主动发送消息 响应由FastAPI自动处理 """ pass def receive_message(self) -> Optional[Dict[str, Any]]: """ HTTP模式由FastAPI处理接收 此方法不使用 """ pass def set_message_handler(self, handler: Callable[[Dict[str, Any]], Dict[str, Any]]): """设置消息处理器""" self.message_handler = handler logger.debug("Message handler set for HTTP transport")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/spyfree/mingli-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server