check_server.py•5.51 kB
#!/usr/bin/env python3
"""
MCP服务器状态检查工具
用于验证MCP服务器是否正常启动和运行
"""
import subprocess
import sys
import time
import json
import os
from pathlib import Path
def check_mcp_server():
"""检查MCP服务器状态"""
print("🔍 检查MCP反馈收集器服务器状态...")
print("=" * 50)
# 1. 检查依赖项
print("1️⃣ 检查依赖项...")
try:
import tkinter as tk
print(" ✅ tkinter - GUI框架已加载")
from PIL import Image, ImageTk
print(" ✅ Pillow - 图片处理库已加载")
from mcp.server.fastmcp import FastMCP
print(" ✅ FastMCP - MCP框架已加载")
except ImportError as e:
print(f" ❌ 依赖项缺失:{e}")
return False
# 2. 检查模块导入
print("\n2️⃣ 检查模块导入...")
try:
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from mcp_feedforward.server import FeedbackDialog, collect_feedback, pick_image, get_image_info
print(" ✅ 所有工具函数已加载")
print(" - collect_feedback (收集反馈)")
print(" - pick_image (选择图片)")
print(" - get_image_info (图片信息)")
except ImportError as e:
print(f" ❌ 模块导入失败:{e}")
return False
# 3. 检查配置文件
print("\n3️⃣ 检查项目配置...")
try:
import tomllib
with open('pyproject.toml', 'rb') as f:
config = tomllib.load(f)
python_version = config['project']['requires-python']
print(f" ✅ Python版本要求:{python_version}")
dependencies = config['project']['dependencies']
print(f" ✅ 依赖项:{', '.join(dependencies)}")
except Exception as e:
print(f" ⚠️ 配置检查警告:{e}")
# 4. 测试MCP服务器功能
print("\n4️⃣ 测试MCP服务器功能...")
try:
# 创建临时MCP服务器实例
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from mcp_feedforward.server import mcp
print(f" ✅ MCP服务器实例已创建")
print(f" ✅ 服务器名称:{mcp.name}")
# 验证工具函数可以正常导入和调用
from mcp_feedforward.server import collect_feedback, pick_image, get_image_info
print(f" ✅ 已注册3个MCP工具:")
print(f" - collect_feedback (收集反馈)")
print(f" - pick_image (选择图片)")
print(f" - get_image_info (图片信息)")
except Exception as e:
print(f" ❌ MCP服务器测试失败:{e}")
return False
# 5. 启动服务器进程检查
print("\n5️⃣ 启动服务器进程检查...")
print(" 启动MCP服务器(3秒后自动停止)...")
try:
# 启动服务器进程
process = subprocess.Popen(
[sys.executable, "src/mcp_feedforward/server.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 等待3秒
time.sleep(3)
# 检查进程是否在运行
if process.poll() is None:
print(" ✅ MCP服务器进程正在运行")
process.terminate()
process.wait(timeout=5)
else:
stdout, stderr = process.communicate()
print(" ❌ MCP服务器进程意外退出")
if stdout:
print(f" 输出:{stdout}")
if stderr:
print(f" 错误:{stderr}")
return False
except Exception as e:
print(f" ❌ 启动测试失败:{e}")
return False
print("\n" + "=" * 50)
print("🎉 MCP服务器状态检查完成!")
print("✅ 所有检查项目都通过了")
print()
print("📡 启动MCP服务器的方法:")
print(" python src/mcp_feedforward/server.py")
print()
print("🧪 测试GUI功能的方法:")
print(" python test_gui.py")
print()
print("🔗 在Claude Desktop中使用:")
print(" 配置 claude_desktop_config.json 并重启Claude Desktop")
return True
def show_connection_info():
"""显示连接信息"""
print("\n📋 MCP服务器连接信息:")
print("-" * 30)
print("• 协议:stdio (标准输入输出)")
print("• 连接方式:进程通信")
print("• 端口:不适用(MCP使用stdio,不是网络端口)")
print("• 状态检查:通过进程存在性判断")
print()
print("💡 MCP (Model Context Protocol) 特点:")
print("• 不像HTTP服务器那样监听端口")
print("• 通过stdin/stdout与客户端通信")
print("• 客户端直接启动服务器进程")
print("• 一对一的进程间通信")
if __name__ == "__main__":
try:
success = check_mcp_server()
show_connection_info()
if success:
sys.exit(0)
else:
print("\n❌ 发现问题,请检查上述错误信息")
sys.exit(1)
except KeyboardInterrupt:
print("\n\n⏹️ 检查被用户中断")
sys.exit(1)
except Exception as e:
print(f"\n❌ 检查过程中发生错误:{e}")
sys.exit(1)