#!/usr/bin/env python3
"""
HTTP SSE服务器 - 支持通过HTTP方式启动MCP服务器
用于Dify和毕昇平台集成
"""
import asyncio
import json
import logging
import os
import sys
import uuid
from typing import Dict, Any, Optional
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 从环境变量读取配置,提供默认值
def load_config():
"""加载配置,优先从环境变量读取"""
config = {
'DATABASE_URL': os.getenv('DATABASE_URL', 'mysql+aiomysql://user:password@localhost:3306/database'),
'DEEPSEEK_API_KEY': os.getenv('DEEPSEEK_API_KEY', ''),
'LLM_PROVIDER': os.getenv('LLM_PROVIDER', 'deepseek'),
'LLM_MODEL': os.getenv('LLM_MODEL', 'deepseek-chat'),
'LLM_STREAM': os.getenv('LLM_STREAM', 'true'),
'LOG_LEVEL': os.getenv('LOG_LEVEL', 'INFO')
}
# 验证必需的配置
if not config['DEEPSEEK_API_KEY']:
raise ValueError("DEEPSEEK_API_KEY环境变量是必需的")
# 设置环境变量(确保其他模块能读取到)
for key, value in config.items():
os.environ[key] = value
return config
# 加载配置
HARDCODED_CONFIG = load_config()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
logger.info("🚀 启动HTTP SSE MCP服务器")
logger.info("配置信息:")
for key, value in HARDCODED_CONFIG.items():
if 'KEY' in key or 'PASSWORD' in key:
logger.info(f" {key}: {value[:10]}***")
else:
logger.info(f" {key}: {value}")
# 初始化数据库
try:
from storage import StorageManager
storage = StorageManager()
await storage.initialize()
logger.info("✅ 数据库初始化成功")
except Exception as e:
logger.error(f"❌ 数据库初始化失败: {e}")
# 不抛出异常,允许服务器继续运行
yield
logger.info("🔄 关闭HTTP SSE MCP服务器")
# 创建FastAPI应用
app = FastAPI(
title="智能表单收集MCP服务器",
description="通过HTTP SSE方式提供MCP服务",
version="1.0.0",
lifespan=lifespan
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class MCPSession:
"""MCP会话管理"""
def __init__(self):
self.sessions: Dict[str, Any] = {}
async def create_session(self, session_id: str) -> str:
"""创建新会话"""
if session_id not in self.sessions:
self.sessions[session_id] = {
'id': session_id,
'created_at': asyncio.get_event_loop().time(),
'active': True
}
return session_id
async def process_message(self, session_id: str, message: Dict[str, Any]) -> Dict[str, Any]:
"""处理MCP消息"""
try:
# 确保会话存在
await self.create_session(session_id)
# 处理MCP请求
if message.get('method') == 'tools/call':
tool_name = message.get('params', {}).get('name')
arguments = message.get('params', {}).get('arguments', {})
logger.info(f"调用工具: {tool_name}, 参数: {arguments}")
# 调用对应的MCP工具
result = await self._call_mcp_tool(tool_name, arguments)
return {
'jsonrpc': '2.0',
'id': message.get('id'),
'result': result
}
else:
return {
'jsonrpc': '2.0',
'id': message.get('id'),
'error': {
'code': -32601,
'message': f"Method not found: {message.get('method')}"
}
}
except Exception as e:
logger.error(f"处理消息失败: {e}")
return {
'jsonrpc': '2.0',
'id': message.get('id'),
'error': {
'code': -32603,
'message': f"Internal error: {str(e)}"
}
}
async def _call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""调用MCP工具"""
try:
# 初始化必要的服务
from storage import StorageManager
from llm_service import LLMService
from form_collector import FormCollector, FORM_TEMPLATES
storage_manager = StorageManager()
await storage_manager.initialize()
llm_service = LLMService()
# 根据工具名称调用对应的处理函数
if tool_name == 'start_form_collection':
template_name = arguments.get('template_name', 'mediation')
stream = arguments.get('stream', True)
if template_name not in FORM_TEMPLATES:
return {
"success": False,
"message": f"模板 '{template_name}' 不存在",
"available_templates": list(FORM_TEMPLATES.keys())
}
# 创建新会话
session_id = str(uuid.uuid4())
session_data = await storage_manager.create_session(session_id, template_name)
template = FORM_TEMPLATES[template_name]
return {
"success": True,
"session_id": session_id,
"template_name": template_name,
"message": f"表单收集会话已启动 (会话ID: {session_id})",
"fields_to_collect": [field.name for field in template.fields if field.required],
"template_description": template.description,
"expires_at": session_data.expires_at.isoformat(),
"next_step": "请使用 collect_field_info 工具收集用户信息"
}
elif tool_name == 'collect_field_info':
session_id = arguments.get('session_id')
user_input = arguments.get('user_input')
stream = arguments.get('stream', True)
if not session_id or not user_input:
return {"success": False, "message": "session_id和user_input是必填参数"}
session_data = await storage_manager.get_session(session_id)
if not session_data:
return {"success": False, "message": f"会话 {session_id} 不存在"}
template = FORM_TEMPLATES[session_data.template_name]
collector = FormCollector(template, llm_service, storage_manager)
return await collector.collect_info(session_id, user_input, use_stream=stream)
elif tool_name == 'get_collection_status':
session_id = arguments.get('session_id')
if not session_id:
return {"success": False, "message": "session_id是必填参数"}
session_data = await storage_manager.get_session(session_id)
if not session_data:
return {"success": False, "message": f"会话 {session_id} 不存在"}
template = FORM_TEMPLATES[session_data.template_name]
collector = FormCollector(template, llm_service, storage_manager)
return await collector.get_status(session_id)
elif tool_name == 'validate_form_data':
session_id = arguments.get('session_id')
if not session_id:
return {"success": False, "message": "session_id是必填参数"}
session_data = await storage_manager.get_session(session_id)
if not session_data:
return {"success": False, "message": f"会话 {session_id} 不存在"}
template = FORM_TEMPLATES[session_data.template_name]
collector = FormCollector(template, llm_service, storage_manager)
return await collector.validate_data(session_id)
elif tool_name == 'submit_form':
session_id = arguments.get('session_id')
if not session_id:
return {"success": False, "message": "session_id是必填参数"}
session_data = await storage_manager.get_session(session_id)
if not session_data:
return {"success": False, "message": f"会话 {session_id} 不存在"}
template = FORM_TEMPLATES[session_data.template_name]
collector = FormCollector(template, llm_service, storage_manager)
return await collector.submit_form(session_id)
elif tool_name == 'get_session_statistics':
return await storage_manager.get_statistics()
else:
return {"success": False, "message": f"未知工具: {tool_name}"}
except Exception as e:
logger.error(f"调用工具失败: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
return {"success": False, "message": f"工具执行失败: {str(e)}"}
# 全局会话管理器
session_manager = MCPSession()
@app.get("/")
async def root():
"""根路径 - 服务器状态"""
return {
"service": "智能表单收集MCP服务器",
"version": "1.0.0",
"status": "running",
"endpoints": {
"sse": "/sse",
"health": "/health",
"tools": "/tools"
},
"config": {
"llm_provider": os.getenv('LLM_PROVIDER'),
"llm_model": os.getenv('LLM_MODEL'),
"database": "MySQL (已连接)" if os.getenv('DATABASE_URL') else "未配置"
}
}
@app.get("/health")
async def health_check():
"""健康检查"""
try:
# 检查数据库连接
from storage import StorageManager
storage = StorageManager()
stats = await storage.get_statistics()
return {
"status": "healthy",
"timestamp": asyncio.get_event_loop().time(),
"services": {
"database": "connected",
"llm": "configured",
"mcp": "running"
},
"stats": stats
}
except Exception as e:
logger.error(f"健康检查失败: {e}")
return {
"status": "unhealthy",
"error": str(e),
"timestamp": asyncio.get_event_loop().time(),
"services": {
"database": "error",
"llm": "configured",
"mcp": "running"
}
}
@app.get("/tools")
async def list_tools():
"""列出可用工具"""
return {
"tools": [
{
"name": "start_form_collection",
"description": "开始新的表单收集会话",
"parameters": {
"template_name": "表单模板名称 (默认: mediation)",
"stream": "是否启用流式输出 (默认: true)"
}
},
{
"name": "collect_field_info",
"description": "收集用户输入的字段信息",
"parameters": {
"session_id": "会话ID (必填)",
"user_input": "用户输入内容 (必填)",
"stream": "是否启用流式输出 (默认: true)"
}
},
{
"name": "get_collection_status",
"description": "获取当前收集状态",
"parameters": {
"session_id": "会话ID (必填)"
}
},
{
"name": "validate_form_data",
"description": "验证表单数据完整性",
"parameters": {
"session_id": "会话ID (必填)"
}
},
{
"name": "submit_form",
"description": "提交完整表单",
"parameters": {
"session_id": "会话ID (必填)"
}
},
{
"name": "get_session_statistics",
"description": "获取系统统计信息",
"parameters": {}
}
]
}
@app.get("/sse")
@app.post("/sse")
async def sse_endpoint(request: Request):
"""SSE端点 - 处理MCP消息流"""
async def event_stream():
"""事件流生成器"""
try:
# 设置SSE响应头
yield "event: connected\n"
yield f"data: {json.dumps({'status': 'connected', 'timestamp': asyncio.get_event_loop().time()}, ensure_ascii=False)}\n\n"
# 读取请求体
body = await request.body() if request.method == "POST" else b""
if not body:
# 如果没有请求体,发送服务器信息
server_info = {
"jsonrpc": "2.0",
"method": "server/info",
"params": {
"name": "智能表单收集MCP服务器",
"version": "1.0.0",
"capabilities": {
"tools": True,
"prompts": False,
"resources": False
},
"available_tools": [
"start_form_collection",
"collect_field_info",
"get_collection_status",
"validate_form_data",
"submit_form",
"get_session_statistics"
]
}
}
yield "event: server_info\n"
yield f"data: {json.dumps(server_info, ensure_ascii=False)}\n\n"
return
# 解析请求消息
try:
message = json.loads(body.decode('utf-8'))
logger.info(f"收到MCP消息: {message}")
except json.JSONDecodeError as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": f"Parse error: {str(e)}"
}
}
yield "event: error\n"
yield f"data: {json.dumps(error_response, ensure_ascii=False)}\n\n"
return
# 处理不同类型的MCP消息
if message.get('method') == 'tools/list':
# 列出工具
tools_response = {
"jsonrpc": "2.0",
"id": message.get('id'),
"result": {
"tools": [
{
"name": "start_form_collection",
"description": "开始新的表单收集会话",
"inputSchema": {
"type": "object",
"properties": {
"template_name": {"type": "string", "default": "mediation"},
"stream": {"type": "boolean", "default": True}
}
}
},
{
"name": "collect_field_info",
"description": "收集用户输入的字段信息",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {"type": "string"},
"user_input": {"type": "string"},
"stream": {"type": "boolean", "default": True}
},
"required": ["session_id", "user_input"]
}
},
{
"name": "get_collection_status",
"description": "获取当前收集状态",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {"type": "string"}
},
"required": ["session_id"]
}
},
{
"name": "validate_form_data",
"description": "验证表单数据完整性",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {"type": "string"}
},
"required": ["session_id"]
}
},
{
"name": "submit_form",
"description": "提交完整表单",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {"type": "string"}
},
"required": ["session_id"]
}
},
{
"name": "get_session_statistics",
"description": "获取系统统计信息",
"inputSchema": {
"type": "object",
"properties": {}
}
}
]
}
}
yield "event: tools_list\n"
yield f"data: {json.dumps(tools_response, ensure_ascii=False)}\n\n"
elif message.get('method') == 'tools/call':
# 调用工具
session_id = f"session_{int(asyncio.get_event_loop().time())}"
response = await session_manager.process_message(session_id, message)
yield "event: tool_result\n"
yield f"data: {json.dumps(response, ensure_ascii=False)}\n\n"
else:
# 处理其他消息
session_id = f"session_{int(asyncio.get_event_loop().time())}"
response = await session_manager.process_message(session_id, message)
yield "event: message\n"
yield f"data: {json.dumps(response, ensure_ascii=False)}\n\n"
except Exception as e:
logger.error(f"SSE处理错误: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
yield "event: error\n"
yield f"data: {json.dumps(error_response, ensure_ascii=False)}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
}
)
@app.post("/mcp")
async def mcp_endpoint(request: Request):
"""MCP端点 - 处理标准MCP JSON-RPC消息"""
try:
# 读取请求体
body = await request.body()
if not body:
return JSONResponse(
status_code=400,
content={
"jsonrpc": "2.0",
"error": {
"code": -32600,
"message": "Invalid Request: Empty body"
}
}
)
# 解析JSON消息
try:
message = json.loads(body.decode('utf-8'))
logger.info(f"收到MCP请求: {message}")
except json.JSONDecodeError as e:
return JSONResponse(
status_code=400,
content={
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": f"Parse error: {str(e)}"
}
}
)
# 处理MCP消息
method = message.get('method')
message_id = message.get('id')
params = message.get('params', {})
logger.info(f"处理MCP消息: {method}")
if method == 'tools/list':
# 列出工具
response = {
"jsonrpc": "2.0",
"id": message_id,
"result": {
"tools": [
{
"name": "start_form_collection",
"description": "开始新的表单收集会话",
"inputSchema": {
"type": "object",
"properties": {
"template_name": {
"type": "string",
"description": "表单模板名称 (默认: mediation)"
},
"stream": {
"type": "boolean",
"description": "是否启用流式输出 (默认: true)"
}
},
"required": ["template_name"]
}
},
{
"name": "collect_field_info",
"description": "收集用户输入的字段信息",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "会话ID (必填)"
},
"user_input": {
"type": "string",
"description": "用户输入内容 (必填)"
},
"stream": {
"type": "boolean",
"description": "是否启用流式输出 (默认: true)"
}
},
"required": ["session_id", "user_input"]
}
},
{
"name": "get_collection_status",
"description": "获取当前收集状态",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "会话ID (必填)"
}
},
"required": ["session_id"]
}
},
{
"name": "validate_form_data",
"description": "验证表单数据完整性",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "会话ID (必填)"
}
},
"required": ["session_id"]
}
},
{
"name": "submit_form",
"description": "提交完整表单",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "会话ID (必填)"
}
},
"required": ["session_id"]
}
},
{
"name": "get_session_statistics",
"description": "获取系统统计信息",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
}
]
}
}
elif method == 'tools/call':
# 调用工具
tool_name = params.get('name')
arguments = params.get('arguments', {})
if not tool_name:
response = {
"jsonrpc": "2.0",
"id": message_id,
"error": {
"code": -32602,
"message": "Invalid params: missing tool name"
}
}
else:
try:
# 使用现有的工具调用逻辑
session_id = f"mcp_session_{int(asyncio.get_event_loop().time())}"
tool_result = await session_manager._call_mcp_tool(tool_name, arguments)
response = {
"jsonrpc": "2.0",
"id": message_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(tool_result, ensure_ascii=False, indent=2)
}
]
}
}
except Exception as e:
logger.error(f"工具调用失败: {e}")
response = {
"jsonrpc": "2.0",
"id": message_id,
"error": {
"code": -32603,
"message": f"Tool execution error: {str(e)}"
}
}
else:
# 未知方法
response = {
"jsonrpc": "2.0",
"id": message_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
logger.info(f"MCP响应: {response}")
return JSONResponse(content=response)
except Exception as e:
logger.error(f"MCP端点处理错误: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
)
if __name__ == "__main__":
# 启动HTTP服务器
logger.info("🚀 启动智能表单收集HTTP SSE MCP服务器")
logger.info("📝 服务器将在 http://0.0.0.0:7000 上运行")
logger.info("🔧 SSE端点: http://0.0.0.0:7000/sse")
uvicorn.run(
"http_server:app",
host="0.0.0.0",
port=7000,
reload=False,
log_level="info"
)