#!/usr/bin/env python3
"""
MaxKB专用SSE MCP服务器
实现完整的MCP协议,支持SSE传输
"""
import asyncio
import json
import logging
from typing import Dict, Any, List
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from mcp.types import Tool, TextContent
from config.settings import settings
from tools.api_proxy import EmployeeManagementTools
# 配置日志
logging.basicConfig(
level=getattr(logging, settings.log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 创建FastAPI应用
app = FastAPI(
title="员工管理MCP服务器 (MaxKB SSE)",
description="专门为MaxKB设计的SSE MCP协议服务器",
version="1.0.0"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化工具
employee_tools = EmployeeManagementTools()
class MCPRequest(BaseModel):
jsonrpc: str = "2.0"
method: str
params: Dict[str, Any] = {}
id: str = "1"
@app.get("/")
async def root():
"""根路径,返回服务器信息"""
return {
"name": "员工管理MCP服务器 (MaxKB SSE)",
"version": "1.0.0",
"description": "专门为MaxKB设计的SSE MCP协议服务器",
"mcp_version": "2024-11-05",
"transport": "sse"
}
@app.get("/mcp")
async def mcp_info():
"""MCP服务器信息"""
return {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": True
}
},
"serverInfo": {
"name": "employee-management-mcp",
"version": "1.0.0"
},
"transport": "sse"
}
@app.get("/events")
async def stream_events():
"""SSE事件流 - MaxKB MCP连接点"""
async def event_generator():
try:
# 发送MCP初始化信息
init_data = {
"jsonrpc": "2.0",
"id": "init",
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": True
}
},
"serverInfo": {
"name": "employee-management-mcp",
"version": "1.0.0"
}
}
}
yield f"data: {json.dumps(init_data)}\n\n"
# 发送工具列表
tools = employee_tools.get_all_tools()
tools_data = {
"jsonrpc": "2.0",
"id": "tools_list",
"result": {
"tools": [tool.model_dump() for tool in tools]
}
}
yield f"data: {json.dumps(tools_data)}\n\n"
# 发送连接确认
yield f"data: {json.dumps({'type': 'connected', 'message': 'MCP服务器已连接'})}\n\n"
# 保持连接活跃
while True:
heartbeat_data = {
"type": "heartbeat",
"timestamp": asyncio.get_event_loop().time()
}
yield f"data: {json.dumps(heartbeat_data)}\n\n"
await asyncio.sleep(30)
except Exception as e:
logger.error(f"SSE流错误: {e}")
error_data = {
"type": "error",
"message": str(e)
}
yield f"data: {json.dumps(error_data)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control"
}
)
@app.post("/mcp")
async def handle_mcp_request(request: MCPRequest):
"""处理MCP请求 - 兼容JSON-RPC格式"""
try:
logger.info(f"收到MCP请求: {request.method}, ID: {request.id}")
if request.method == "initialize":
return {
"jsonrpc": "2.0",
"id": request.id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": True
}
},
"serverInfo": {
"name": "employee-management-mcp",
"version": "1.0.0"
}
}
}
elif request.method == "tools/list":
tools = employee_tools.get_all_tools()
return {
"jsonrpc": "2.0",
"id": request.id,
"result": {
"tools": [tool.model_dump() for tool in tools]
}
}
elif request.method == "tools/call":
tool_name = request.params.get("name")
arguments = request.params.get("arguments", {})
if not tool_name:
return {
"jsonrpc": "2.0",
"id": request.id,
"error": {
"code": -32602,
"message": "工具名称不能为空"
}
}
# 执行工具
result = await employee_tools.execute(tool_name, arguments)
# 格式化响应
if result.get("success"):
content = f"✅ {result.get('message', '操作成功')}\n\n"
if result.get("data"):
content += f"数据: {result['data']}"
else:
content = f"❌ {result.get('error', '操作失败')}"
return {
"jsonrpc": "2.0",
"id": request.id,
"result": {
"content": [{"type": "text", "text": content}],
"isError": not result.get("success", False)
}
}
else:
return {
"jsonrpc": "2.0",
"id": request.id,
"error": {
"code": -32601,
"message": f"不支持的方法: {request.method}"
}
}
except Exception as e:
logger.error(f"处理MCP请求时出错: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request.id,
"error": {
"code": -32603,
"message": f"内部错误: {str(e)}"
}
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"service": "employee-management-mcp-maxkb-sse",
"mcp_version": "2024-11-05",
"transport": "sse"
}
if __name__ == "__main__":
import uvicorn
logger.info(f"启动MaxKB SSE MCP服务器: {settings.mcp_host}:{settings.mcp_port}")
uvicorn.run(
app,
host=settings.mcp_host,
port=settings.mcp_port,
log_level=settings.log_level.lower()
)