#!/usr/bin/env python3
"""
完全符合MCP 1.9.4标准的智能表单收集服务器
基于官方文档和最新SDK实现
"""
import asyncio
import json
import logging
import os
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
# 设置工作目录和环境
os.chdir(Path(__file__).parent)
sys.path.insert(0, str(Path(__file__).parent))
# 从环境变量读取配置,提供默认值
def load_config():
"""加载配置,优先从环境变量读取"""
config = {
'DATABASE_URL': os.getenv('DATABASE_URL', 'mysql+aiomysql://user:password@localhost:3306/database'),
'DEEPSEEK_API_KEY': os.getenv('DEEPSEEK_API_KEY', ''),
'LLM_PROVIDER': os.getenv('LLM_PROVIDER', 'deepseek'),
'LLM_MODEL': os.getenv('LLM_MODEL', 'deepseek-chat'),
'LLM_STREAM': os.getenv('LLM_STREAM', 'true'),
'LOG_LEVEL': os.getenv('LOG_LEVEL', 'INFO')
}
# 验证必需的配置
if not config['DEEPSEEK_API_KEY']:
print("错误: DEEPSEEK_API_KEY环境变量是必需的", file=sys.stderr)
sys.exit(1)
# 设置环境变量(确保其他模块能读取到)
for key, value in config.items():
os.environ[key] = value
return config
# 加载配置
load_config()
# 导入MCP相关模块
try:
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
CallToolResult,
TextContent,
Tool
)
except ImportError as e:
print(f"MCP导入错误: {e}", file=sys.stderr)
sys.exit(1)
# 导入业务模块
try:
from storage import StorageManager
from llm_service import LLMService
from form_collector import FormCollector
from config import FORM_TEMPLATES
except ImportError as e:
print(f"业务模块导入错误: {e}", file=sys.stderr)
sys.exit(1)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
logger = logging.getLogger(__name__)
# 全局变量
storage_manager: Optional[StorageManager] = None
llm_service: Optional[LLMService] = None
form_collectors: Dict[str, FormCollector] = {}
async def init_services():
"""初始化服务"""
global storage_manager, llm_service
try:
logger.info("🔧 初始化服务中...")
# 初始化LLM服务
llm_service = LLMService()
logger.info("✅ LLM服务初始化完成")
# 初始化存储管理器
storage_manager = StorageManager()
await storage_manager.initialize()
logger.info("✅ 存储管理器初始化完成")
logger.info("✅ 所有服务初始化完成")
except Exception as e:
logger.error(f"❌ 服务初始化失败: {e}")
raise
# 创建MCP服务器
server = Server("智能表单收集MCP服务器")
@server.list_tools()
async def list_tools() -> List[Tool]:
"""列出可用工具 - 返回List[Tool]而不是ListToolsResult"""
logger.info("处理list_tools请求")
return [
Tool(
name="start_form_collection",
description="开始新的表单收集会话",
inputSchema={
"type": "object",
"properties": {
"template_name": {
"type": "string",
"description": "表单模板名称 (默认: mediation)"
},
"stream": {
"type": "boolean",
"description": "是否启用流式输出 (默认: true)"
}
},
"required": ["template_name"]
}
),
Tool(
name="collect_field_info",
description="收集用户输入的字段信息",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "会话ID (必填)"
},
"user_input": {
"type": "string",
"description": "用户输入内容 (必填)"
},
"stream": {
"type": "boolean",
"description": "是否启用流式输出 (默认: true)"
}
},
"required": ["session_id", "user_input"]
}
),
Tool(
name="get_collection_status",
description="获取当前收集状态",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "会话ID (必填)"
}
},
"required": ["session_id"]
}
),
Tool(
name="validate_form_data",
description="验证表单数据完整性",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "会话ID (必填)"
}
},
"required": ["session_id"]
}
),
Tool(
name="submit_form",
description="提交完整表单",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "会话ID (必填)"
}
},
"required": ["session_id"]
}
),
Tool(
name="get_session_statistics",
description="获取系统统计信息",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""调用工具"""
try:
# 确保服务已初始化
if storage_manager is None or llm_service is None:
await init_services()
logger.info(f"调用工具: {name}, 参数: {arguments}")
if name == "start_form_collection":
template_name = arguments.get("template_name", "mediation")
stream = arguments.get("stream", True)
if template_name not in FORM_TEMPLATES:
result = {
"success": False,
"message": f"模板 '{template_name}' 不存在",
"available_templates": list(FORM_TEMPLATES.keys())
}
else:
template = FORM_TEMPLATES[template_name]
form_collector = FormCollector(template, llm_service, storage_manager)
result = await form_collector.start_collection(stream=stream)
# 保存表单收集器
session_id = result.get("session_id")
if session_id:
form_collectors[session_id] = form_collector
return CallToolResult(
content=[
TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)
]
)
elif name == "collect_field_info":
session_id = arguments.get("session_id")
user_input = arguments.get("user_input")
stream = arguments.get("stream", True)
if not session_id or not user_input:
result = {"success": False, "message": "session_id和user_input是必填参数"}
else:
session_data = await storage_manager.get_session(session_id)
if not session_data:
result = {"success": False, "message": f"会话 {session_id} 不存在"}
else:
# 获取或创建表单收集器
if session_id not in form_collectors:
template = FORM_TEMPLATES[session_data.template_name]
form_collectors[session_id] = FormCollector(template, llm_service, storage_manager)
form_collector = form_collectors[session_id]
result = await form_collector.process_user_input(session_id, user_input, stream=stream)
return CallToolResult(
content=[
TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)
]
)
elif name == "get_collection_status":
session_id = arguments.get("session_id")
if not session_id:
result = {"success": False, "message": "session_id是必填参数"}
else:
session_data = await storage_manager.get_session(session_id)
if not session_data:
result = {"success": False, "message": f"会话 {session_id} 不存在"}
else:
# 获取或创建表单收集器
if session_id not in form_collectors:
template = FORM_TEMPLATES[session_data.template_name]
form_collectors[session_id] = FormCollector(template, llm_service, storage_manager)
form_collector = form_collectors[session_id]
result = await form_collector.get_collection_status(session_id)
return CallToolResult(
content=[
TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)
]
)
elif name == "validate_form_data":
session_id = arguments.get("session_id")
if not session_id:
result = {"success": False, "message": "session_id是必填参数"}
else:
session_data = await storage_manager.get_session(session_id)
if not session_data:
result = {"success": False, "message": f"会话 {session_id} 不存在"}
else:
# 获取或创建表单收集器
if session_id not in form_collectors:
template = FORM_TEMPLATES[session_data.template_name]
form_collectors[session_id] = FormCollector(template, llm_service, storage_manager)
form_collector = form_collectors[session_id]
result = await form_collector.validate_form_data(session_id)
return CallToolResult(
content=[
TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)
]
)
elif name == "submit_form":
session_id = arguments.get("session_id")
if not session_id:
result = {"success": False, "message": "session_id是必填参数"}
else:
session_data = await storage_manager.get_session(session_id)
if not session_data:
result = {"success": False, "message": f"会话 {session_id} 不存在"}
else:
# 获取或创建表单收集器
if session_id not in form_collectors:
template = FORM_TEMPLATES[session_data.template_name]
form_collectors[session_id] = FormCollector(template, llm_service, storage_manager)
form_collector = form_collectors[session_id]
result = await form_collector.submit_form(session_id)
# 清理表单收集器
if session_id in form_collectors:
del form_collectors[session_id]
return CallToolResult(
content=[
TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)
]
)
elif name == "get_session_statistics":
stats = await storage_manager.get_session_statistics()
result = {
"success": True,
"statistics": stats,
"available_templates": list(FORM_TEMPLATES.keys()),
"active_sessions": len(form_collectors)
}
return CallToolResult(
content=[
TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)
]
)
else:
return CallToolResult(
content=[
TextContent(
type="text",
text=json.dumps({"success": False, "message": f"未知工具: {name}"}, ensure_ascii=False)
)
]
)
except Exception as e:
logger.error(f"工具执行错误 {name}: {e}")
return CallToolResult(
content=[
TextContent(
type="text",
text=json.dumps({"success": False, "message": f"工具执行失败: {str(e)}"}, ensure_ascii=False)
)
]
)
async def main():
"""主函数"""
logger.info("🚀 启动智能表单收集MCP服务器")
logger.info(f"📁 工作目录: {os.getcwd()}")
logger.info(f"🐍 Python版本: {sys.version}")
# 初始化服务
await init_services()
# 运行服务器
async with stdio_server() as (read_stream, write_stream):
logger.info("MCP服务器启动成功,等待连接...")
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())