Skip to main content
Glama
h-lu
by h-lu
test_cnki_mcp.py7.65 kB
""" CNKI MCP Server 测试脚本 使用 FastMCP 的 in-memory transport 直接测试 MCP 服务器功能。 无需启动服务器,直接在进程内调用工具。 运行方式: python test_cnki_mcp.py """ import asyncio import json from fastmcp import Client # 导入我们的 MCP 服务器实例 from cnki_mcp_server import mcp async def test_list_tools(): """测试: 列出所有可用工具""" print("\n" + "=" * 60) print("测试 1: 列出所有可用工具") print("=" * 60) async with Client(mcp) as client: tools = await client.list_tools() print(f"发现 {len(tools)} 个工具:") for tool in tools: print(f" - {tool.name}: {tool.description[:50]}...") return True async def test_list_resources(): """测试: 列出所有可用资源""" print("\n" + "=" * 60) print("测试 2: 列出所有可用资源") print("=" * 60) async with Client(mcp) as client: resources = await client.list_resources() print(f"发现 {len(resources)} 个资源:") for resource in resources: print(f" - {resource.uri}") return True async def test_read_resource(): """测试: 读取资源""" print("\n" + "=" * 60) print("测试 3: 读取服务器状态资源") print("=" * 60) async with Client(mcp) as client: # 读取服务器状态 result = await client.read_resource("cnki://status") if result: print(f"DEBUG: Resource result type: {type(result)}") print(f"DEBUG: Resource item type: {type(result[0])}") if hasattr(result[0], 'content'): print(f"DEBUG: Content: {result[0].content!r}") elif hasattr(result[0], 'text'): print(f"DEBUG: Text: {result[0].text!r}") # 兼容性处理 if hasattr(result[0], 'text') and result[0].text: content = result[0].text elif hasattr(result[0], 'content'): content = result[0].content else: content = str(result[0]) if isinstance(content, bytes): content = content.decode('utf-8') try: status = json.loads(content) print(f"服务器名称: {status.get('server_name')}") print(f"版本: {status.get('version')}") print(f"特性: {', '.join(status.get('features', []))}") except Exception as e: print(f"解析 JSON 失败: {e}") # 读取搜索类型 result2 = await client.read_resource("cnki://search-types") if result2: try: if hasattr(result2[0], 'text') and result2[0].text: content2 = result2[0].text elif hasattr(result2[0], 'content'): content2 = result2[0].content else: content2 = str(result2[0]) if isinstance(content2, bytes): content2 = content2.decode('utf-8') types = json.loads(content2) print(f"\n支持的搜索类型: {', '.join(types.get('chinese_types', []))}") except Exception as e: print(f"解析类型 JSON 失败: {e}") return True async def test_search_cnki(): """测试: 搜索 CNKI 论文""" print("\n" + "=" * 60) print("测试 4: 搜索 CNKI 论文 (关键词: '人工智能')") print("=" * 60) async with Client(mcp) as client: async def progress_handler(progress, total, *args): print(f" 进度: {progress}/{total}") result = await client.call_tool( "search_cnki", { "query": "人工智能", "search_type": "主题", "pages": 1 }, progress_handler=progress_handler ) # 解析结果 if hasattr(result, 'data'): data = result.data else: # 尝试从 content 获取 data = result if isinstance(data, dict): if data.get("isError"): print(f"❌ 搜索失败: {data.get('error')}") return False else: print(f"✅ 搜索成功!") print(f" 查询: {data.get('query')}") print(f" 搜索类型: {data.get('search_type')}") print(f" 总页数: {data.get('total_pages')}") print(f" 论文数量: {data.get('total_papers')}") papers = data.get("papers", []) if papers: print(f"\n 前 3 篇论文:") for i, paper in enumerate(papers[:3], 1): print(f" {i}. {paper.get('title', '无标题')[:40]}...") print(f" 作者: {', '.join(paper.get('authors', []))[:30]}") print(f" 来源: {paper.get('source', '未知')}") print(f" 引用: {paper.get('cited_count', '0')}") return True else: print(f"结果: {data}") return True async def test_find_best_match(): """测试: 查找最佳匹配""" print("\n" + "=" * 60) print("测试 5: 查找最佳匹配 (标题: '深度学习在自然语言处理中的应用')") print("=" * 60) async with Client(mcp) as client: result = await client.call_tool( "find_best_match", {"query": "深度学习在自然语言处理中的应用"} ) if hasattr(result, 'data'): data = result.data else: data = result if isinstance(data, dict): if data.get("isError"): print(f"❌ 匹配失败: {data.get('error')}") return False elif data.get("best_match"): match = data["best_match"] print(f"✅ 找到最佳匹配!") print(f" 标题: {match.get('title', '无标题')}") print(f" URL: {match.get('url', '无URL')[:60]}...") print(f" 总结果数: {data.get('total_results', 0)}") return True else: print(f"⚠️ 未找到匹配结果") return True else: print(f"结果: {data}") return True async def main(): """运行所有测试""" print("\n" + "=" * 60) print("CNKI MCP Server 测试脚本") print("=" * 60) tests = [ ("列出工具", test_list_tools), ("列出资源", test_list_resources), ("读取资源", test_read_resource), ("搜索论文", test_search_cnki), ("最佳匹配", test_find_best_match), ] results = {} for name, test_func in tests: try: results[name] = await test_func() except Exception as e: print(f"❌ 测试失败: {e}") results[name] = False # 打印总结 print("\n" + "=" * 60) print("测试总结") print("=" * 60) for name, passed in results.items(): status = "✅ 通过" if passed else "❌ 失败" print(f" {name}: {status}") passed_count = sum(1 for v in results.values() if v) print(f"\n总计: {passed_count}/{len(results)} 测试通过") if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/cnki-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server