test_mcp_server.py•9.64 kB
#!/usr/bin/env python3
"""文件管理器MCP服务器测试脚本
这个脚本测试所有工具的基本功能和错误处理。
"""
import asyncio
import os
import tempfile
import shutil
from pathlib import Path
from file_manager_mcp.tools import (
list_directory,
read_file,
create_file,
delete_file,
search_files,
)
from file_manager_mcp.validators import validate_tool_arguments, ValidationError
class TestResults:
"""测试结果记录"""
def __init__(self):
self.passed = 0
self.failed = 0
self.errors = []
def add_pass(self, test_name):
self.passed += 1
print(f"✅ {test_name}")
def add_fail(self, test_name, error):
self.failed += 1
self.errors.append(f"{test_name}: {error}")
print(f"❌ {test_name}: {error}")
def summary(self):
total = self.passed + self.failed
print(f"\n测试总结: {self.passed}/{total} 通过")
if self.errors:
print("\n失败的测试:")
for error in self.errors:
print(f" - {error}")
return self.failed == 0
async def test_list_directory(test_dir: Path, results: TestResults):
"""测试列出目录功能"""
try:
# 测试正常情况
result = await list_directory(str(test_dir), False)
if "目录" in result and "内容" in result:
results.add_pass("list_directory - 正常情况")
else:
results.add_fail("list_directory - 正常情况", "输出格式不正确")
# 测试显示隐藏文件
result = await list_directory(str(test_dir), True)
if "目录" in result:
results.add_pass("list_directory - 显示隐藏文件")
else:
results.add_fail("list_directory - 显示隐藏文件", "输出格式不正确")
# 测试不存在的目录
result = await list_directory("/nonexistent/path", False)
if "错误" in result and "不存在" in result:
results.add_pass("list_directory - 错误处理")
else:
results.add_fail("list_directory - 错误处理", "错误处理不正确")
except Exception as e:
results.add_fail("list_directory", str(e))
async def test_create_and_read_file(test_dir: Path, results: TestResults):
"""测试创建和读取文件功能"""
test_file = test_dir / "test.txt"
test_content = "Hello, MCP World!\n这是一个测试文件。"
try:
# 测试创建文件
result = await create_file(str(test_file), test_content)
if "成功创建" in result:
results.add_pass("create_file - 正常情况")
else:
results.add_fail("create_file - 正常情况", "创建失败")
# 测试读取文件
result = await read_file(str(test_file))
if test_content in result:
results.add_pass("read_file - 正常情况")
else:
results.add_fail("read_file - 正常情况", "读取内容不匹配")
# 测试重复创建文件(应该失败)
result = await create_file(str(test_file), "new content")
if "错误" in result and "已存在" in result:
results.add_pass("create_file - 文件已存在")
else:
results.add_fail("create_file - 文件已存在", "应该报错但没有")
# 测试读取不存在的文件
result = await read_file(str(test_dir / "nonexistent.txt"))
if "错误" in result and "不存在" in result:
results.add_pass("read_file - 文件不存在")
else:
results.add_fail("read_file - 文件不存在", "错误处理不正确")
except Exception as e:
results.add_fail("create_and_read_file", str(e))
async def test_search_files(test_dir: Path, results: TestResults):
"""测试搜索文件功能"""
try:
# 创建一些测试文件
(test_dir / "file1.txt").write_text("content1")
(test_dir / "file2.py").write_text("print('hello')")
(test_dir / "subdir").mkdir()
(test_dir / "subdir" / "file3.txt").write_text("content3")
# 测试搜索所有txt文件
result = await search_files(str(test_dir), "*.txt", True)
if "file1.txt" in result and "file3.txt" in result:
results.add_pass("search_files - 递归搜索")
else:
results.add_fail("search_files - 递归搜索", "搜索结果不完整")
# 测试非递归搜索
result = await search_files(str(test_dir), "*.txt", False)
if "file1.txt" in result and "file3.txt" not in result:
results.add_pass("search_files - 非递归搜索")
else:
results.add_fail("search_files - 非递归搜索", "搜索结果不正确")
# 测试搜索不存在的模式
result = await search_files(str(test_dir), "*.xyz", True)
if "没有找到" in result:
results.add_pass("search_files - 无匹配结果")
else:
results.add_fail("search_files - 无匹配结果", "应该返回无匹配")
except Exception as e:
results.add_fail("search_files", str(e))
async def test_delete_file(test_dir: Path, results: TestResults):
"""测试删除文件功能"""
try:
# 创建测试文件和目录
test_file = test_dir / "to_delete.txt"
test_file.write_text("delete me")
test_subdir = test_dir / "to_delete_dir"
test_subdir.mkdir()
(test_subdir / "nested_file.txt").write_text("nested")
# 测试删除文件
result = await delete_file(str(test_file), False)
if "成功删除文件" in result and not test_file.exists():
results.add_pass("delete_file - 删除文件")
else:
results.add_fail("delete_file - 删除文件", "删除失败")
# 测试删除非空目录(应该失败)
result = await delete_file(str(test_subdir), False)
if "错误" in result and "不为空" in result:
results.add_pass("delete_file - 非空目录")
else:
results.add_fail("delete_file - 非空目录", "应该报错但没有")
# 测试递归删除目录
result = await delete_file(str(test_subdir), True)
if "成功递归删除" in result and not test_subdir.exists():
results.add_pass("delete_file - 递归删除")
else:
results.add_fail("delete_file - 递归删除", "递归删除失败")
# 测试删除不存在的文件
result = await delete_file(str(test_dir / "nonexistent.txt"), False)
if "错误" in result and "不存在" in result:
results.add_pass("delete_file - 文件不存在")
else:
results.add_fail("delete_file - 文件不存在", "错误处理不正确")
except Exception as e:
results.add_fail("delete_file", str(e))
async def test_parameter_validation(results: TestResults):
"""测试参数验证功能"""
try:
# 测试缺少必需参数
try:
validate_tool_arguments("list_directory", {})
results.add_fail("参数验证 - 缺少必需参数", "应该抛出异常")
except ValidationError:
results.add_pass("参数验证 - 缺少必需参数")
# 测试无效路径
try:
validate_tool_arguments("list_directory", {"path": "../../../etc/passwd"})
results.add_fail("参数验证 - 路径遍历攻击", "应该抛出异常")
except ValidationError:
results.add_pass("参数验证 - 路径遍历攻击")
# 测试无效工具名
try:
validate_tool_arguments("invalid_tool", {"path": "/tmp"})
results.add_fail("参数验证 - 无效工具名", "应该抛出异常")
except ValidationError:
results.add_pass("参数验证 - 无效工具名")
# 测试有效参数
try:
args = validate_tool_arguments("list_directory", {"path": "/tmp", "show_hidden": True})
if args["show_hidden"] is True:
results.add_pass("参数验证 - 有效参数")
else:
results.add_fail("参数验证 - 有效参数", "参数处理不正确")
except Exception as e:
results.add_fail("参数验证 - 有效参数", str(e))
except Exception as e:
results.add_fail("parameter_validation", str(e))
async def main():
"""主测试函数"""
print("🚀 开始测试文件管理器MCP服务器...\n")
results = TestResults()
# 创建临时测试目录
with tempfile.TemporaryDirectory() as temp_dir:
test_dir = Path(temp_dir)
print(f"使用测试目录: {test_dir}\n")
# 运行所有测试
await test_list_directory(test_dir, results)
await test_create_and_read_file(test_dir, results)
await test_search_files(test_dir, results)
await test_delete_file(test_dir, results)
await test_parameter_validation(results)
# 显示测试结果
success = results.summary()
if success:
print("\n🎉 所有测试通过!MCP服务器功能正常。")
else:
print("\n⚠️ 部分测试失败,请检查上述错误信息。")
return success
if __name__ == "__main__":
success = asyncio.run(main())
exit(0 if success else 1)