#!/usr/bin/env python3
"""
专门为MaxKB设计的MCP服务器
实现完整的MCP协议握手和初始化流程
"""
import asyncio
import json
import logging
from typing import Dict, Any, List, Optional
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from mcp.types import Tool, TextContent
from config.settings import settings
from tools.api_proxy import EmployeeManagementTools
# 配置日志
logging.basicConfig(
level=getattr(logging, settings.log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 创建FastAPI应用
app = FastAPI(
title="员工管理MCP服务器 (MaxKB专用)",
description="专门为MaxKB设计的MCP协议服务器",
version="1.0.0"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化工具
employee_tools = EmployeeManagementTools()
class MCPRequest(BaseModel):
jsonrpc: str = "2.0"
method: str
params: Optional[Dict[str, Any]] = {}
id: Optional[str] = "1"
@app.get("/")
async def root():
"""根路径,返回服务器信息"""
return {
"name": "员工管理MCP服务器 (MaxKB专用)",
"version": "1.0.0",
"description": "专门为MaxKB设计的MCP协议服务器",
"mcp_version": "2024-11-05"
}
@app.get("/mcp")
async def mcp_info():
"""MCP服务器信息 - MaxKB初始化检查"""
return {
"jsonrpc": "2.0",
"id": "init",
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": True
}
},
"serverInfo": {
"name": "employee-management-mcp",
"version": "1.0.0"
}
}
}
@app.post("/mcp")
async def handle_mcp_request(request: dict):
"""处理MCP请求 - 完整的MCP协议实现"""
try:
method = request.get("method")
request_id = request.get("id", "1")
params = request.get("params", {})
logger.info(f"收到MCP请求: {method}, ID: {request_id}")
# 处理初始化请求
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": True
}
},
"serverInfo": {
"name": "employee-management-mcp",
"version": "1.0.0"
}
}
}
# 处理工具列表请求
elif method == "tools/list":
tools = employee_tools.get_all_tools()
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": [tool.model_dump() for tool in tools]
}
}
# 处理工具调用请求
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if not tool_name:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "工具名称不能为空"
}
}
# 执行工具
result = await employee_tools.execute(tool_name, arguments)
# 格式化响应
if result.get("success"):
content = f"✅ {result.get('message', '操作成功')}\n\n"
if result.get("data"):
data = result['data']
# 限制返回数据量,避免栈溢出
if isinstance(data, list) and len(data) > 10:
content += f"数据: 共{len(data)}条记录,显示前10条:\n"
for i, item in enumerate(data[:10]):
content += f"{i+1}. {item}\n"
content += f"... 还有{len(data)-10}条记录"
else:
content += f"数据: {data}"
else:
content = f"❌ {result.get('error', '操作失败')}"
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [{"type": "text", "text": content}],
"isError": not result.get("success", False)
}
}
# 处理ping请求
elif method == "ping":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"message": "pong"
}
}
# 处理未知方法
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"不支持的方法: {method}"
}
}
except Exception as e:
logger.error(f"处理MCP请求时出错: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"内部错误: {str(e)}"
}
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"service": "employee-management-mcp",
"mcp_version": "2024-11-05"
}
@app.get("/capabilities")
async def get_capabilities():
"""获取服务器能力"""
return {
"tools": {
"listChanged": True
},
"resources": {},
"prompts": {}
}
def main():
import uvicorn
logger.info(f"启动MaxKB专用MCP服务器: {settings.mcp_host}:{settings.mcp_port}")
uvicorn.run(
app,
host="0.0.0.0",
port=settings.mcp_port,
log_level=settings.log_level.lower()
)
if __name__ == "__main__":
main()