Skip to main content
Glama
test_edge_cases.py13 kB
#!/usr/bin/env python3 """ MCPlanManager 边界情况测试 测试错误处理和边界条件 使用方法: python test/test_edge_cases.py [--mode uvx|sse] """ import asyncio import argparse import json import sys from typing import Dict, Any from fastmcp import Client from fastmcp.client.transports import UvxStdioTransport class EdgeCaseTestSuite: def __init__(self, mode: str = "sse"): self.mode = mode self.client = None self.test_results = [] self.failed_tests = [] async def setup_client(self): """设置客户端连接""" print(f"🔧 设置 {self.mode.upper()} 模式客户端...") if self.mode == "uvx": transport = UvxStdioTransport(tool_name="mcplanmanager") self.client = Client(transport) elif self.mode == "sse": sse_url = "http://localhost:8080/sse" self.client = Client(sse_url) else: raise ValueError(f"不支持的模式: {self.mode}") async def run_test(self, test_name: str, test_func, *args, **kwargs): """运行单个测试并记录结果""" print(f"\n🧪 测试: {test_name}") try: result = await test_func(*args, **kwargs) self.test_results.append({ "name": test_name, "status": "PASS", "result": result }) print(f"✅ {test_name} - 通过") return result except Exception as e: self.test_results.append({ "name": test_name, "status": "FAIL", "error": str(e) }) self.failed_tests.append(test_name) print(f"❌ {test_name} - 失败: {e}") return None def extract_data(self, response): """从响应中提取数据""" if isinstance(response, list) and len(response) > 0: content = response[0] if hasattr(content, 'text'): try: return json.loads(content.text) except: return content.text return response async def test_invalid_task_id(self): """测试无效的任务ID""" invalid_ids = [999, -1, "invalid", None] for invalid_id in invalid_ids: try: response = await self.client.call_tool("completeTask", { "task_id": invalid_id, "result": "测试无效ID" }) data = self.extract_data(response) # 应该返回错误或失败状态 if isinstance(data, dict): assert not data.get("success", True), f"无效ID {invalid_id} 应该失败" print(f" ✅ 无效ID {invalid_id} 正确处理") except Exception as e: print(f" ✅ 无效ID {invalid_id} 抛出异常: {e}") async def test_empty_plan_initialization(self): """测试空计划初始化""" # 测试完全空的计划 response = await self.client.call_tool("initializePlan", {"goal": "空任务计划", "tasks": []}) data = self.extract_data(response) assert not data.get("success", True), "包含空任务列表的计划应该失败" print(f" ✅ 空任务列表的计划被正确拒绝: {data.get('message')}") # 测试 goal 为 None try: await self.client.call_tool("initializePlan", {"goal": None, "tasks": [{"name": "a", "dependencies": [], "reasoning": "a"}]}) except Exception as e: assert "Input should be a valid string" in str(e), "goal 为 None 应该引发 Pydantic 验证错误" print(" ✅ goal 为 None 时引发了正确的验证错误") async def test_circular_dependencies(self): """测试循环依赖""" circular_plan = { "goal": "循环依赖测试", "tasks": [ { "name": "任务A", "dependencies": [1], # 依赖任务B "reasoning": "测试循环依赖" }, { "name": "任务B", "dependencies": [0], # 依赖任务A "reasoning": "测试循环依赖" } ] } response = await self.client.call_tool("initializePlan", circular_plan) data = self.extract_data(response) if isinstance(data, dict): if data.get("success", False): print(" ⚠️ 循环依赖计划被接受 - 需要检查依赖验证") else: print(f" ✅ 循环依赖正确拒绝: {data.get('message', 'Unknown')}") async def test_invalid_dependencies(self): """测试无效依赖""" invalid_plan = { "goal": "无效依赖测试", "tasks": [ { "name": "任务1", "dependencies": [999], # 不存在的依赖 "reasoning": "测试无效依赖" }, { "name": "任务2", "dependencies": [-1], # 负数依赖 "reasoning": "测试无效依赖" } ] } response = await self.client.call_tool("initializePlan", invalid_plan) data = self.extract_data(response) if isinstance(data, dict): if data.get("success", False): print(" ⚠️ 无效依赖计划被接受 - 需要检查依赖验证") else: print(f" ✅ 无效依赖正确拒绝: {data.get('message', 'Unknown')}") async def test_large_task_name(self): """测试超长任务名称""" long_name = "超长任务名称" * 100 # 创建很长的名称 plan = { "goal": "超长名称测试", "tasks": [ { "name": long_name, "dependencies": [], "reasoning": "测试超长名称处理" } ] } response = await self.client.call_tool("initializePlan", plan) data = self.extract_data(response) if isinstance(data, dict): if data.get("success", False): print(f" ✅ 超长名称任务创建成功 (长度: {len(long_name)})") else: print(f" ℹ️ 超长名称被拒绝: {data.get('message', 'Unknown')}") async def test_special_characters(self): """测试特殊字符处理""" special_chars_plan = { "goal": "特殊字符测试 🚀 #@$%^&*()[]{}", "tasks": [ { "name": "任务 with émojis 🎯 and symbols @#$%", "dependencies": [], "reasoning": "测试特殊字符和emoji处理 ✨" } ] } response = await self.client.call_tool("initializePlan", special_chars_plan) data = self.extract_data(response) if isinstance(data, dict): if data.get("success", False): print(" ✅ 特殊字符和emoji处理成功") else: print(f" ℹ️ 特殊字符被拒绝: {data.get('message', 'Unknown')}") async def test_nonexistent_operations(self): """测试不存在的操作""" # 测试在没有当前任务时完成任务 response = await self.client.call_tool("getCurrentTask") data = self.extract_data(response) if isinstance(data, dict) and not data.get("success", False): # 尝试完成不存在的当前任务 try: complete_response = await self.client.call_tool("completeTask", { "task_id": 999, "result": "尝试完成不存在的任务" }) complete_data = self.extract_data(complete_response) if isinstance(complete_data, dict): assert not complete_data.get("success", True), "完成不存在的任务应该失败" print(" ✅ 完成不存在任务正确失败") except Exception as e: print(f" ✅ 完成不存在任务抛出异常: {e}") async def test_state_consistency(self): """测试状态一致性""" # 创建一个简单计划 plan = { "goal": "状态一致性测试", "tasks": [ { "name": "测试任务", "dependencies": [], "reasoning": "测试状态一致性" } ] } response = await self.client.call_tool("initializePlan", plan) data = self.extract_data(response) if isinstance(data, dict) and data.get("success", False): # 启动任务 start_response = await self.client.call_tool("startNextTask") start_data = self.extract_data(start_response) if isinstance(start_data, dict) and start_data.get("success", False): task_id = start_data["data"]["id"] # 尝试重复启动同一个任务 repeat_response = await self.client.call_tool("startNextTask") repeat_data = self.extract_data(repeat_response) if isinstance(repeat_data, dict): if repeat_data.get("success", False): # 检查是否是不同的任务 if repeat_data["data"]["id"] != task_id: print(" ✅ 状态一致性正确 - 启动了不同任务") else: print(" ⚠️ 状态不一致 - 重复启动了同一任务") else: print(" ✅ 状态一致性正确 - 没有重复启动任务") async def run_all_edge_case_tests(self): """运行所有边界情况测试""" print("🚀 开始 MCPlanManager 边界情况测试") print(f"📋 测试模式: {self.mode.upper()}") print("=" * 60) try: async with self.client: print("✅ 客户端连接成功") # 边界情况测试 await self.run_test("无效任务ID处理", self.test_invalid_task_id) await self.run_test("空计划初始化", self.test_empty_plan_initialization) await self.run_test("循环依赖检测", self.test_circular_dependencies) await self.run_test("无效依赖处理", self.test_invalid_dependencies) await self.run_test("超长任务名称", self.test_large_task_name) await self.run_test("特殊字符处理", self.test_special_characters) await self.run_test("不存在操作处理", self.test_nonexistent_operations) await self.run_test("状态一致性", self.test_state_consistency) except Exception as e: print(f"❌ 客户端连接失败: {e}") return False return True def print_summary(self): """打印测试总结""" print("\n" + "=" * 60) print("📊 边界情况测试结果总结") print("=" * 60) total_tests = len(self.test_results) passed_tests = len([t for t in self.test_results if t["status"] == "PASS"]) failed_tests = len(self.failed_tests) print(f"总测试数: {total_tests}") print(f"通过测试: {passed_tests} ✅") print(f"失败测试: {failed_tests} ❌") print(f"成功率: {(passed_tests/total_tests*100):.1f}%") if self.failed_tests: print(f"\n❌ 失败的测试:") for test_name in self.failed_tests: print(f" - {test_name}") print("\n🎯 边界情况测试完成!") return failed_tests == 0 async def main(): parser = argparse.ArgumentParser(description="MCPlanManager 边界情况测试") parser.add_argument("--mode", choices=["uvx", "sse"], default="sse", help="测试模式: uvx (本地) 或 sse (Docker)") args = parser.parse_args() test_suite = EdgeCaseTestSuite(mode=args.mode) await test_suite.setup_client() success = await test_suite.run_all_edge_case_tests() success = test_suite.print_summary() sys.exit(0 if success else 1) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/donway19/MCPlanManager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server