#!/usr/bin/env python3
"""
HTTP版本的MCP服务器
用于与MaxKB等支持HTTP传输的MCP客户端集成
"""
import asyncio
import json
import logging
from typing import Dict, Any, List
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from mcp.types import Tool, TextContent
from config.settings import settings
from tools.api_proxy import EmployeeManagementTools
# 配置日志
logging.basicConfig(
level=getattr(logging, settings.log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 创建FastAPI应用
app = FastAPI(
title="员工管理MCP服务器",
description="基于MCP协议的员工管理系统HTTP服务器",
version="1.0.0"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化工具
employee_tools = EmployeeManagementTools()
# Pydantic模型
class MCPRequest(BaseModel):
method: str
params: Dict[str, Any] = {}
class MCPResponse(BaseModel):
result: Any = None
error: Dict[str, Any] = None
class ToolCallRequest(BaseModel):
name: str
arguments: Dict[str, Any] = {}
@app.get("/")
async def root():
"""根路径,返回服务器信息"""
return {
"name": "员工管理MCP服务器",
"version": "1.0.0",
"description": "基于MCP协议的员工管理系统",
"mcp_version": "2024-11-05"
}
@app.get("/mcp")
async def mcp_info():
"""MCP服务器信息"""
return {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": True
}
},
"serverInfo": {
"name": "employee-management-mcp",
"version": "1.0.0"
}
}
@app.post("/mcp")
async def handle_mcp_request(request: dict):
"""处理MCP请求 - 使用标准JSON-RPC格式"""
try:
# 解析JSON-RPC请求
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
logger.info(f"收到MCP请求: {method}, ID: {request_id}")
if method == "tools/list":
tools = employee_tools.get_all_tools()
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": [tool.dict() for tool in tools]
}
}
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if not tool_name:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "工具名称不能为空"
}
}
# 执行工具
result = await employee_tools.execute(tool_name, arguments)
# 格式化响应
if result.get("success"):
content = f"✅ {result.get('message', '操作成功')}\n\n"
if result.get("data"):
content += f"数据: {result['data']}"
else:
content = f"❌ {result.get('error', '操作失败')}"
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [{"type": "text", "text": content}],
"isError": not result.get("success", False)
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"不支持的方法: {method}"
}
}
except Exception as e:
logger.error(f"处理MCP请求时出错: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32603,
"message": f"内部错误: {str(e)}"
}
}
@app.get("/tools")
async def list_tools():
"""直接获取工具列表(非MCP协议)"""
tools = employee_tools.get_all_tools()
return {"tools": [tool.dict() for tool in tools]}
@app.post("/tools/call")
async def call_tool(request: ToolCallRequest):
"""直接调用工具(非MCP协议)"""
try:
logger.info(f"调用工具: {request.name}, 参数: {request.arguments}")
# 执行工具
result = await employee_tools.execute(request.name, request.arguments)
# 格式化响应
if result.get("success"):
content = f"✅ {result.get('message', '操作成功')}\n\n"
if result.get("data"):
content += f"数据: {result['data']}"
else:
content = f"❌ {result.get('error', '操作失败')}"
return {
"success": result.get("success", False),
"content": content,
"data": result.get("data")
}
except Exception as e:
logger.error(f"调用工具时出错: {str(e)}")
return {
"success": False,
"error": str(e)
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "service": "employee-management-mcp"}
if __name__ == "__main__":
import uvicorn
logger.info(f"启动HTTP MCP服务器: {settings.mcp_host}:{settings.mcp_port}")
uvicorn.run(
app,
host=settings.mcp_host,
port=settings.mcp_port,
log_level=settings.log_level.lower()
)