Skip to main content
Glama

ChatPPT-MCP

by pingcy
mcp_interactive_test.py8.58 kB
#!/usr/bin/env python3 """ 简化的交互式PPT RAG MCP服务器测试客户端 专门用于手动交互式测试 """ import asyncio import json from pathlib import Path from mcp.client.session import ClientSession from mcp.client.sse import sse_client async def interactive_test(): """交互式测试函数""" print("=" * 80) print("🚀 PPT RAG MCP服务器 - 交互式测试客户端") print("=" * 80) print("📋 功能: 手动选择和测试MCP工具") print("🔧 操作: 根据提示输入选择和参数") print("=" * 80) # 连接到服务器 port = 5053 print(f"📡 连接到服务器 (localhost:{port})") try: async with sse_client( url=f"http://localhost:{port}/sse", headers={"Content-Type": "application/json"}, ) as (read, write): async with ClientSession(read, write) as session: await session.initialize() print("✅ 连接成功!") # 获取可用工具 tools_result = await session.list_tools() available_tools = {tool.name: tool for tool in tools_result.tools} print(f"\n🔧 发现 {len(available_tools)} 个可用工具:") tools_list = list(available_tools.keys()) for i, tool_name in enumerate(tools_list, 1): tool = available_tools[tool_name] description = tool.description.split('\n')[0] if tool.description else "无描述" print(f" {i}. {tool_name} - {description}") # 主循环 while True: print("\n" + "="*60) print("📋 主菜单:") print(" 1. 获取索引状态 (index_status)") print(" 2. 添加PPT文档 (add_ppt)") print(" 3. 查询文档内容 (chat_with_ppt)") print(" 4. 删除PPT文档 (delete_ppt)") print(" 0. 退出程序") print("="*60) try: choice = input("📝 请选择操作 (0-4): ").strip() if choice == "0": print("👋 再见!") break elif choice == "1": # 获取索引状态 print("\n🔍 正在获取索引状态...") result = await session.call_tool("index_status", {}) print_result(result, "索引状态") elif choice == "2": # 添加PPT文档 print("\n📄 添加PPT文档") file_path = input("📝 请输入PPT文件路径: ").strip() force_str = input("📝 是否强制重新处理? (y/n): ").strip().lower() force_reprocess = force_str in ['y', 'yes', '1', 'true'] print(f"🔄 正在添加文档: {file_path}") result = await session.call_tool("add_ppt", { "file_path": file_path, "force_reprocess": force_reprocess }) print_result(result, "添加PPT结果") elif choice == "3": # 查询文档内容 print("\n💬 查询文档内容") query = input("📝 请输入查询问题: ").strip() print("\n📋 查询选项:") print(" 1. 查询所有文档") print(" 2. 查询特定文件的文档") query_choice = input("📝 请选择查询方式 (1-2): ").strip() params = {"query": query} if query_choice == "2": file_path = input("📝 请输入文件路径: ").strip() if file_path: params["file_path"] = file_path print(f"🔍 正在查询: {query}") result = await session.call_tool("chat_with_ppt", params) print_result(result, "查询结果") elif choice == "4": # 删除PPT文档 print("\n🗑️ 删除PPT文档") # 先显示当前索引 print("📋 当前索引状态:") status_result = await session.call_tool("index_status", {}) print_result(status_result, "当前索引") file_path = input("📝 请输入要删除的PPT文件路径: ").strip() confirm = input(f"📝 确认删除文档 '{file_path}'? (y/n): ").strip().lower() if confirm in ['y', 'yes', '1', 'true']: print(f"🗑️ 正在删除文档: {file_path}") result = await session.call_tool("delete_ppt", { "file_path": file_path }) print_result(result, "删除PPT结果") else: print("🚫 取消删除操作") else: print("❌ 无效的选择,请重新输入") continue except KeyboardInterrupt: print("\n\n🛑 用户中断,正在退出...") break except Exception as e: print(f"❌ 错误: {e}") continue except Exception as e: print(f"❌ 连接失败: {e}") print("💡 请确保服务器正在运行:") print(f" python src/mcp_ppt_server.py --transport sse --port {port}") def print_result(result, title: str = "结果"): """格式化并打印结果""" print(f"\n{'-'*60}") print(f"📋 {title}") print('-'*60) if hasattr(result, 'content') and result.content: for content_item in result.content: if hasattr(content_item, 'text'): try: data = json.loads(content_item.text) # 简化输出,只显示关键信息 if "status" in data: print(f"🔹 状态: {data['status']}") if "total_documents" in data: print(f"🔹 文档总数: {data['total_documents']}") if "documents" in data: print(f"🔹 文档列表:") for doc in data["documents"]: print(f" • {Path(doc).name}") if "answer" in data: print(f"🔹 回答: {data['answer'][:2000]}..." if len(data['answer']) > 2000 else f"🔹 回答: {data['answer']}") if "message" in data: print(f"🔹 消息: {data['message']}") # 显示完整JSON结果 print("\n� 完整JSON结果:") print(json.dumps(data, indent=2, ensure_ascii=False)) except json.JSONDecodeError: print(content_item.text) else: print(result) print('-'*60) if __name__ == "__main__": print("🚀 启动交互式测试客户端...") print("💡 使用方法:") print(" 1. 确保服务器正在运行: python src/mcp_ppt_server.py --transport sse --port 5053") print(" 2. 根据菜单提示选择操作") print(" 3. 输入相应的参数") print(" 4. 查看结果") print() asyncio.run(interactive_test())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pingcy/app_chatppt'

If you have feedback or need assistance with the MCP directory API, please join our Discord server