#!/usr/bin/env python3
"""
MCP服务器功能测试脚本
测试基础版本和优化版本的核心功能
"""
import sys
import os
import asyncio
import tempfile
import shutil
from pathlib import Path
from datetime import datetime
import traceback
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
class MCPFunctionalityTester:
def __init__(self):
self.test_results = []
self.temp_dir = None
def setup_test_environment(self):
"""设置测试环境"""
print("🔧 设置测试环境...")
self.temp_dir = tempfile.mkdtemp(prefix="mcp_test_")
print(f"✅ 测试目录创建: {self.temp_dir}")
# 创建测试目录结构
test_structure = {
"项目根目录": {
"src": {
"main.py": "# 主程序文件",
"utils.py": "# 工具函数"
},
"docs": {
"README.md": "# 项目说明"
},
"tests": {
"test_main.py": "# 测试文件"
}
}
}
self._create_directory_structure(test_structure, self.temp_dir)
def _create_directory_structure(self, structure, base_path):
"""递归创建目录结构"""
for name, content in structure.items():
path = os.path.join(base_path, name)
if isinstance(content, dict):
os.makedirs(path, exist_ok=True)
self._create_directory_structure(content, path)
else:
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
async def test_basic_version(self):
"""测试基础版本功能"""
print("\n📦 测试基础版本功能...")
try:
# 导入基础版本模块
from folder_documentation_mcp import (
GenerateReadmeInput,
GenerateMindmapInput,
generate_readme_files,
generate_mindmap
)
# 测试README生成
print(" 📄 测试README文件生成...")
try:
params = GenerateReadmeInput(
root_dir=str(self.temp_dir),
exclude_dirs=[],
force_update=True
)
result = await generate_readme_files(params)
if result:
self._record_test_result("README文件生成", True, f"成功生成README文件: {result}")
else:
self._record_test_result("README文件生成", False, "README文件生成失败")
except Exception as e:
self._record_test_result("README文件生成", False, f"生成异常: {str(e)}")
# 测试思维导图生成
print(" 🧠 测试思维导图生成...")
try:
output_path = os.path.join(self.temp_dir, "mindmap.md")
params = GenerateMindmapInput(
root_dir=str(self.temp_dir),
output_file=output_path,
exclude_dirs=[]
)
result = await generate_mindmap(params)
if result and os.path.exists(output_path):
self._record_test_result("思维导图生成", True, f"成功生成思维导图: {output_path}")
else:
self._record_test_result("思维导图生成", False, "思维导图生成失败")
except Exception as e:
self._record_test_result("思维导图生成", False, f"生成异常: {str(e)}")
except ImportError as e:
self._record_test_result("基础版本导入", False, f"无法导入基础版本模块: {str(e)}")
except Exception as e:
self._record_test_result("基础版本测试", False, f"测试异常: {str(e)}")
def test_optimized_version(self):
"""测试优化版本功能"""
print("\n🚀 测试优化版本功能...")
try:
# 导入优化版本模块
optimized_path = os.path.join(os.path.dirname(__file__), "优化版本", "optimized_folder_docs_mcp")
if os.path.exists(optimized_path):
sys.path.insert(0, optimized_path)
sys.path.insert(0, os.path.join(optimized_path, "src"))
from server import OptimizedFolderDocMCPServer
# 测试服务器初始化
print(" 🔧 测试服务器初始化...")
try:
server = OptimizedFolderDocMCPServer()
self._record_test_result("服务器初始化", True, "优化版本服务器初始化成功")
except Exception as e:
self._record_test_result("服务器初始化", False, f"服务器初始化失败: {str(e)}")
# 测试服务器工具
print(" 🛠️ 测试服务器工具...")
try:
if hasattr(server, 'mcp') and hasattr(server.mcp, 'tools'):
tool_count = len(server.mcp.tools)
self._record_test_result("服务器工具", True, f"服务器包含 {tool_count} 个工具")
else:
self._record_test_result("服务器工具", False, "无法获取服务器工具列表")
except Exception as e:
self._record_test_result("服务器工具", False, f"工具测试异常: {str(e)}")
else:
self._record_test_result("优化版本路径", False, "优化版本目录不存在")
except ImportError as e:
self._record_test_result("优化版本导入", False, f"无法导入优化版本模块: {str(e)}")
except Exception as e:
self._record_test_result("优化版本测试", False, f"测试异常: {str(e)}")
def test_security_features(self):
"""测试安全特性"""
print("\n🔒 测试安全特性...")
# 测试路径遍历攻击防护
print(" 🛡️ 测试路径遍历攻击防护...")
malicious_paths = [
"../../../etc/passwd",
"..\\..\\windows\\system32\\config\\sam",
"/etc/shadow",
"C:\\Windows\\System32\\config\\SAM"
]
for path in malicious_paths:
try:
# 这里应该调用路径验证函数
is_safe = self._validate_path_safety(path)
if not is_safe:
self._record_test_result(f"路径安全验证-{path}", True, "正确识别恶意路径")
else:
self._record_test_result(f"路径安全验证-{path}", False, "未能识别恶意路径")
except Exception as e:
self._record_test_result(f"路径安全验证-{path}", False, f"验证异常: {str(e)}")
def _validate_path_safety(self, path):
"""简单的路径安全验证"""
# 基础安全检查
if ".." in path or path.startswith("/") or ":" in path:
return False
return True
def _record_test_result(self, test_name, success, details):
"""记录测试结果"""
result = {
"测试名称": test_name,
"状态": "✅ 通过" if success else "❌ 失败",
"详情": details,
"时间": datetime.now().strftime("%H:%M:%S")
}
self.test_results.append(result)
status_icon = "✅" if success else "❌"
print(f" {status_icon} {test_name}: {details}")
def generate_test_report(self):
"""生成测试报告"""
print("\n" + "="*60)
print("📊 测试报告总结")
print("="*60)
total_tests = len(self.test_results)
passed_tests = sum(1 for result in self.test_results if "✅" in result["状态"])
failed_tests = total_tests - passed_tests
print(f"总测试数: {total_tests}")
print(f"通过: {passed_tests}")
print(f"失败: {failed_tests}")
print(f"成功率: {(passed_tests/total_tests*100):.1f}%" if total_tests > 0 else "0%")
print("\n详细结果:")
for result in self.test_results:
print(f"{result['状态']} {result['测试名称']}: {result['详情']}")
# 保存报告到文件
self._save_report_to_file()
return passed_tests, failed_tests
def _save_report_to_file(self):
"""保存测试报告到文件"""
report_path = os.path.join(os.path.dirname(__file__), f"功能测试报告_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md")
with open(report_path, 'w', encoding='utf-8') as f:
f.write("# MCP服务器功能测试报告\n\n")
f.write(f"**测试时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write("## 测试结果\n\n")
for result in self.test_results:
f.write(f"### {result['测试名称']}\n")
f.write(f"**状态**: {result['状态']}\n")
f.write(f"**详情**: {result['详情']}\n")
f.write(f"**时间**: {result['时间']}\n\n")
print(f"\n📄 详细报告已保存至: {report_path}")
def cleanup(self):
"""清理测试环境"""
if self.temp_dir and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
print(f"🧹 清理测试目录: {self.temp_dir}")
async def main():
"""主函数"""
print("🧪 MCP服务器功能测试开始")
print("="*60)
tester = MCPFunctionalityTester()
try:
# 设置测试环境
tester.setup_test_environment()
# 执行各项测试
await tester.test_basic_version()
tester.test_optimized_version()
tester.test_security_features()
# 生成测试报告
passed, failed = tester.generate_test_report()
# 返回测试结果
if failed == 0:
print("\n🎉 所有测试通过!")
return 0
else:
print(f"\n⚠️ 有 {failed} 个测试失败")
return 1
except Exception as e:
print(f"\n💥 测试执行异常: {str(e)}")
traceback.print_exc()
return 1
finally:
# 清理测试环境
tester.cleanup()
if __name__ == "__main__":
sys.exit(asyncio.run(main()))