meetting_room.py•5.54 kB
import random
from mcp.server.sse import SseServerTransport
from mcp import types
from mcp.server.lowlevel import Server
import logging
from starlette.applications import Starlette
from starlette.routing import Mount, Route
import uvicorn
app = Server("meeting_room")
async def list_idle_meeting_rooms(start_time: str, end_time: str) -> list[types.TextContent]:
return ["会议室1", "会议室2", "会议室3"]
async def book_meeting_room(room_id: str, start_time: str, end_time: str) -> list[types.TextContent]:
# 随机返回预定成功或失败
print(f"预定会议室{room_id},开始时间{start_time},结束时间{end_time}")
if room_id in ["会议室1", "会议室2", "会议室3"]:
return f"{room_id}预定成功" if random.random() < 0.5 else f"{room_id}预定失败,请重新选择会议室"
else:
return f"预定失败,{room_id}不存在,请重新选择会议室"
@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="list_idle_meeting_rooms",
description="list idle meeting rooms",
inputSchema={
"type": "object",
"properties": {
"start_time": {
"type": "string",
"description": "开始时间"
},
"end_time": {
"type": "string",
"description": "结束时间"
}
},
"required": ["start_time", "end_time"]
}
),
types.Tool(
name="book_meeting_room",
description="book meeting room",
inputSchema={
"type": "object",
"properties": {
"room_id": {
"type": "string",
"description": "会议室ID"
},
"start_time": {
"type": "string",
"description": "开始时间"
},
"end_time": {
"type": "string",
"description": "结束时间"
}
},
"required": ["room_id", "start_time", "end_time"]
}
)
]
@app.call_tool()
async def call_tool(
name: str,
arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
if name == "list_idle_meeting_rooms":
start_time = arguments["start_time"]
end_time = arguments["end_time"]
result = await list_idle_meeting_rooms(start_time, end_time)
return [types.TextContent(type="text", text=str(result))]
elif name == "book_meeting_room":
print(arguments)
room_id = arguments["room_id"]
start_time = arguments["start_time"]
end_time = arguments["end_time"]
result = await book_meeting_room(room_id, start_time, end_time)
return [types.TextContent(type="text", text=str(result))]
raise ValueError(f"Tool not found: {name}")
if __name__ == "__main__":
# Run the FastAPI app with uvicorn
import sys
# 默认使用标准输入输出传输
transport = "stdio"
port = 8000
# 检查命令行参数
if len(sys.argv) > 1:
if sys.argv[1] == "sse":
transport = "sse"
if len(sys.argv) > 2:
try:
port = int(sys.argv[2])
except ValueError:
pass
if transport == "sse":
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("weather-mcp")
# 配置 SSE 传输,增加超时和重试参数
sse = SseServerTransport(
"/message/",
)
async def handle_sse(request):
try:
logger.info(f"建立新的SSE连接: {request.client}")
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
try:
await app.run(streams[0], streams[1], app.create_initialization_options())
except Exception as e:
logger.error(f"处理SSE连接时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
except Exception as e:
logger.error(f"建立SSE连接时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/message/", app=sse.handle_post_message),
]
)
print(f"在端口{port}上启动MCP服务器,使用SSE传输")
print(f"连接URL: http://127.0.0.1:{port}/sse")
uvicorn.run(starlette_app, host="0.0.0.0", port=port, log_level="info")
else:
from mcp.server.stdio import stdio_server
import anyio
async def run_stdio():
async with stdio_server() as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
print("使用标准输入输出传输启动MCP服务器")
anyio.run(run_stdio)