Skip to main content
Glama
check-compliance.py22.4 kB
#!/usr/bin/env python3 """ FastMCP合规性深度检查脚本 逐个检查每个MCP工具的详细规范符合性 """ import ast import re import sys from pathlib import Path # 处理Python版本兼容性问题 try: from typing import TypedDict except ImportError: # 在Python < 3.12环境中,不直接使用TypedDict TypedDict = None class FastMCPComplianceChecker: """FastMCP合规性深度检查器""" def __init__(self, project_root: Path = None): self.project_root = project_root or Path(".") self.src_path = self.project_root / "src" / "genome_mcp" self.issues = [] self.tool_details = {} def run_all_checks(self) -> bool: """运行所有检查""" print("🔍 开始FastMCP深度合规性检查...") # 基础检查 basic_checks = [ self.check_syntax, self.check_imports, self.check_project_structure, ] # 核心工具检查 tool_checks = [ self.extract_mcp_tools, self.check_tool_functions, self.check_tool_signatures, self.check_tool_documentation, self.check_tool_parameters, self.check_tool_return_types, self.check_tool_error_handling, ] # 高级检查 advanced_checks = [ self.check_error_handling_modules, self.check_parameter_validation_modules, self.check_type_definitions, self.check_resource_definitions, self.check_import_conflicts, ] all_checks = basic_checks + tool_checks + advanced_checks for check in all_checks: try: check() except Exception as e: self.issues.append(f"❌ {check.__name__} 检查失败: {str(e)}") return self.report_results() def extract_mcp_tools(self): """提取所有MCP工具函数的详细信息""" print("🛠️ 提取MCP工具函数详情...") tools_file = self.src_path / "core" / "tools.py" if not tools_file.exists(): self.issues.append("❌ 缺少tools.py文件") return with open(tools_file, encoding="utf-8") as f: content = f.read() # 提取工具函数 - 使用经过验证的简单模式 tool_pattern = r"@mcp\.tool\(\)\s*\n\s*async def (\w+)" tool_names = re.findall(tool_pattern, content) if not tool_names: self.issues.append("❌ 未找到任何MCP工具函数") return # 为每个工具函数提取详细信息 for func_name in tool_names: # 通过函数名查找完整的函数定义来获取参数和返回类型 func_pattern = rf"async def {func_name}\s*\((.*?)\)(?:\s*->\s*([^\n:]+))?:" func_match = re.search(func_pattern, content, re.DOTALL) params = "" return_type = "" if func_match: params = func_match.group(1).strip() if func_match.lastindex and func_match.lastindex >= 2: return_type = func_match.group(2).strip() # 提取文档字符串 - 改进模式支持多行和复杂内容 docstring_pattern = rf'async def {func_name}.*?\n\s*"""(.*?)"""' docstring_match = re.search(docstring_pattern, content, re.DOTALL) docstring = docstring_match.group(1).strip() if docstring_match else "" self.tool_details[func_name] = { "name": func_name, "parameters": params, "return_type": return_type or "dict[str, Any]", # 默认返回类型 "docstring": docstring, "full_match": True, } print(f"✅ 提取到 {len(self.tool_details)} 个MCP工具函数") def check_tool_functions(self): """检查工具函数的基础规范""" print("🔤 检查工具函数规范...") required_tools = { "get_data", "smart_search", "advanced_query", "analyze_gene_evolution", "build_phylogenetic_profile", "kegg_pathway_enrichment", } found_tools = set(self.tool_details.keys()) missing_tools = required_tools - found_tools if missing_tools: self.issues.append(f"❌ 缺少核心工具: {missing_tools}") # 检查工具命名 for tool_name in self.tool_details: if tool_name.endswith("_tool"): self.issues.append(f"❌ 工具函数名包含冗余后缀: {tool_name}") print( f"✅ 工具函数检查完成 ({len(found_tools)}/{len(required_tools)} 个核心工具)" ) def check_tool_signatures(self): """检查工具函数签名的详细规范""" print("📝 检查工具函数签名...") for tool_name, details in self.tool_details.items(): # 检查返回类型 - 更加宽松的标准 return_type = details["return_type"] # 只对真正模糊的类型报错,允许合理的TypedDict类型 vague_types = ["dict", "Dict", "Any", "dict[str, Any]", "Dict[str, Any]"] if return_type in vague_types: self.issues.append( f"❌ {tool_name}: 使用过于宽泛的返回类型 {return_type}" ) elif not any(t in return_type for t in ["Result", "Dict", "TypedDict"]): # 如果不是明显的结果类型,给出警告但不阻止 pass # 检查参数 params_str = details["parameters"] if not params_str.strip(): self.issues.append(f"❌ {tool_name}: 函数没有参数") continue # 解析参数 param_pattern = r"(\w+):\s*([^,=\n]+)(?:\s*=\s*([^,\n]+))?" param_matches = re.findall(param_pattern, params_str) if not param_matches: self.issues.append(f"❌ {tool_name}: 无法解析参数") continue for param_name, param_type, _default_value in param_matches: # 检查参数类型注解 if not param_type.strip(): self.issues.append( f"❌ {tool_name}: 参数 {param_name} 缺少类型注解" ) # 检查常见参数 if param_name == "max_results" and "int" not in param_type: self.issues.append(f"❌ {tool_name}: max_results参数应该是int类型") print("✅ 工具函数签名检查完成") def check_tool_documentation(self): """检查工具函数文档字符串的详细内容 - 使用智能和宽松的标准""" print("📖 检查工具函数文档...") for tool_name, details in self.tool_details.items(): docstring = details["docstring"] # 检查文档字符串长度 - 降低要求 if len(docstring) < 20: self.issues.append(f"❌ {tool_name}: 文档字符串过短") continue # 检查是否有基本的文档内容 has_args = "Args:" in docstring or "参数:" in docstring has_returns = "Returns:" in docstring or "返回:" in docstring has_examples = ( "Examples:" in docstring or "示例:" in docstring or "Example:" in docstring ) # 更宽松的文档检查 if not (has_args or has_returns or has_examples): self.issues.append(f"❌ {tool_name}: 文档字符串缺少基本结构") elif len(docstring) > 100: # 如果文档足够长,认为结构是合理的 pass # 长文档通常包含足够的信息 print("✅ 工具函数文档检查完成") def check_tool_parameters(self): """检查工具函数参数的详细规范""" print("✅ 检查工具函数参数规范...") for tool_name, details in self.tool_details.items(): params_str = details["parameters"] # 解析参数 param_pattern = r"(\w+):\s*([^,=\n]+)(?:\s*=\s*([^,\n]+))?" param_matches = re.findall(param_pattern, params_str) for param_name, param_type, _default_value in param_matches: # 检查max_results参数 if param_name == "max_results": if _default_value and int(_default_value.strip()) > 100: self.issues.append( f"❌ {tool_name}: max_results默认值过大 ({_default_value})" ) # 检查query参数 if param_name == "query": if ( "Union" not in param_type and "List" not in param_type and "str" not in param_type ): self.issues.append( f"❌ {tool_name}: query参数类型应该支持str或List[str]" ) # 不再强制要求Optional类型注解,因为Python中这是可选的 # Optional类型注解是最佳实践但不是必需的 print("✅ 工具函数参数检查完成") def check_tool_return_types(self): """检查工具函数返回类型的详细规范""" print("🔄 检查工具函数返回类型...") # 检查types.py中的类型定义 types_file = self.src_path / "core" / "types.py" if not types_file.exists(): self.issues.append("❌ 缺少types.py文件") return with open(types_file, encoding="utf-8") as f: types_content = f.read() required_types = [ "GeneInfo(TypedDict)", "ProteinInfo(TypedDict)", "SearchResult(TypedDict)", "BatchResult(TypedDict)", "AdvancedQueryResult(TypedDict)", "KEGGResult(TypedDict)", "ErrorResult(TypedDict)", "ToolResult", ] for type_def in required_types: if type_def not in types_content: self.issues.append(f"❌ 缺少类型定义: {type_def}") # 检查每个工具的返回类型是否合适 for tool_name, details in self.tool_details.items(): return_type = details["return_type"] if tool_name == "get_data": if return_type not in ["ToolResult", "Dict[str, Any]"]: self.issues.append(f"❌ {tool_name}: 返回类型应该为ToolResult") elif tool_name == "smart_search": if return_type != "SearchResult": self.issues.append(f"❌ {tool_name}: 返回类型应该为SearchResult") elif tool_name == "advanced_query": if return_type != "AdvancedQueryResult": self.issues.append( f"❌ {tool_name}: 返回类型应该为AdvancedQueryResult" ) print("✅ 工具函数返回类型检查完成") def check_tool_error_handling(self): """检查工具函数的错误处理""" print("⚠️ 检查工具函数错误处理...") tools_file = self.src_path / "core" / "tools.py" with open(tools_file, encoding="utf-8") as f: content = f.read() for tool_name in self.tool_details: # 查找工具函数的try-except块 func_pattern = rf"async def {tool_name}\s*\([^)]*\)[^:]*:\s*(.*?)(?=\n @|\n def|\Z)" func_match = re.search(func_pattern, content, re.DOTALL) if func_match: func_body = func_match.group(1) # 检查是否有try-except if "try:" not in func_body: self.issues.append(f"❌ {tool_name}: 缺少异常处理") continue # 检查是否处理ValidationError if "ValidationError" not in func_body: self.issues.append(f"❌ {tool_name}: 未处理ValidationError异常") # 检查是否使用了统一的错误格式化 if "format_simple_error" not in func_body: self.issues.append(f"❌ {tool_name}: 未使用统一错误格式化函数") print("✅ 工具函数错误处理检查完成") def check_error_handling_modules(self): """检查错误处理模块""" print("⚠️ 检查错误处理模块...") errors_file = self.src_path / "core" / "errors.py" if not errors_file.exists(): self.issues.append("❌ 缺少errors.py模块") return with open(errors_file, encoding="utf-8") as f: content = f.read() required_classes = [ "class GenomeMCPError", "class ValidationError", "class APIError", "class DataNotFoundError", "class ErrorCodes", ] for class_def in required_classes: if class_def not in content: self.issues.append(f"❌ 缺少错误类: {class_def}") # 检查format_simple_error函数 if "def format_simple_error" not in content: self.issues.append("❌ 缺少format_simple_error函数") print("✅ 错误处理模块检查完成") def check_parameter_validation_modules(self): """检查参数验证模块""" print("✅ 检查参数验证模块...") validation_file = self.src_path / "core" / "validation.py" if not validation_file.exists(): self.issues.append("❌ 缺少validation.py模块") return with open(validation_file, encoding="utf-8") as f: content = f.read() required_functions = [ "def validate_common_params", "def validate_gene_params", "def validate_kegg_params", "def validate_search_params", ] for func_def in required_functions: if func_def not in content: self.issues.append(f"❌ 缺少验证函数: {func_def}") # 检查参数范围验证 validation_checks = [ "max_results > 100", "min_results < 1", "if max_results < 1", ] found_checks = 0 for check in validation_checks: if check in content: found_checks += 1 if found_checks == 0: self.issues.append("❌ 缺少参数范围验证逻辑") print("✅ 参数验证模块检查完成") def check_type_definitions(self): """检查类型定义模块""" print("📝 检查类型定义模块...") types_file = self.src_path / "core" / "types.py" if not types_file.exists(): self.issues.append("❌ 缺少types.py模块") return with open(types_file, encoding="utf-8") as f: content = f.read() # 检查TypedDict导入(兼容Python 3.11和3.12+) if not any( pattern in content for pattern in [ "from typing import TypedDict", "from typing_extensions import TypedDict", ] ): self.issues.append("❌ types.py未导入TypedDict") # 检查关键类型定义 critical_types = [ "GeneInfo", "ProteinInfo", "SearchResult", "BatchResult", "ErrorResult", "ToolResult", ] for type_name in critical_types: if f"class {type_name}(TypedDict)" not in content: self.issues.append(f"❌ 缺少TypedDict定义: {type_name}") print("✅ 类型定义模块检查完成") def check_resource_definitions(self): """检查资源定义""" print("📚 检查资源定义...") tools_file = self.src_path / "core" / "tools.py" with open(tools_file, encoding="utf-8") as f: content = f.read() # 检查是否有@mcp.resource装饰器 if "@mcp.resource(" not in content: self.issues.append("❌ 未定义MCP资源") return # 检查资源数量 resource_matches = re.findall(r'@mcp\.resource\("([^"]+)"\)', content) if len(resource_matches) < 2: self.issues.append(f"❌ MCP资源数量过少: {len(resource_matches)}") # 检查资源路径格式 for resource_path in resource_matches: if not resource_path.startswith("genome://"): self.issues.append(f"❌ 资源路径格式不正确: {resource_path}") # 检查create_mcp_resources函数 if "def create_mcp_resources(mcp:" not in content: self.issues.append("❌ 缺少create_mcp_resources函数") # 检查main.py中的资源注册 main_file = self.src_path / "main.py" if main_file.exists(): with open(main_file, encoding="utf-8") as f: main_content = f.read() if "create_mcp_resources(mcp)" not in main_content: self.issues.append("❌ main.py未注册MCP资源") print(f"✅ 资源定义检查完成 ({len(resource_matches)}个资源)") def check_import_conflicts(self): """检查导入冲突""" print("🔗 检查导入冲突...") tools_file = self.src_path / "core" / "tools.py" with open(tools_file, encoding="utf-8") as f: content = f.read() # 解析AST查找函数定义 try: tree = ast.parse(content) except SyntaxError as e: self.issues.append(f"❌ tools.py语法错误: {e}") return # 提取所有函数名 function_names = set() for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): function_names.add(node.name) # 检查导入的函数与本地函数重名 import_lines = re.findall(r"from [^ ]+ import (.+)", content) for import_line in import_lines: imported_names = [name.strip() for name in import_line.split(",")] for name in imported_names: if " as " in name: continue if name in function_names: self.issues.append(f"❌ 导入冲突: {name} 与本地函数重名") print("✅ 导入冲突检查完成") def check_syntax(self): """检查Python语法""" print("🔤 检查Python语法...") try: python_files = list(self.src_path.rglob("*.py")) for py_file in python_files: with open(py_file, encoding="utf-8") as f: code = f.read() try: compile(code, str(py_file), "exec") except SyntaxError as e: self.issues.append(f"❌ 语法错误: {py_file} - {e}") return False print(f"✅ Python语法检查通过 ({len(python_files)}个文件)") return True except Exception as e: self.issues.append(f"❌ 语法检查异常: {e}") return False def check_imports(self): """检查模块导入""" print("📦 检查模块导入...") try: import sys sys.path.insert(0, str(self.src_path.parent)) from genome_mcp.core.validation import validate_common_params # 测试基本功能 max_results, species, query_type = validate_common_params(max_results=50) if not (1 <= max_results <= 100): self.issues.append("❌ validate_common_params参数验证异常") print("✅ 所有核心模块导入成功") return True except Exception as e: self.issues.append(f"❌ 模块导入失败: {e}") return False def check_project_structure(self): """检查项目结构""" print("📁 检查项目结构...") required_files = [ self.src_path / "__init__.py", self.src_path / "__main__.py", self.src_path / "main.py", self.src_path / "core" / "__init__.py", self.src_path / "core" / "tools.py", self.src_path / "core" / "validation.py", self.src_path / "core" / "errors.py", self.src_path / "core" / "types.py", ] missing_files = [] for file_path in required_files: if not file_path.exists(): missing_files.append(str(file_path)) if missing_files: self.issues.append(f"❌ 缺少必要文件: {missing_files}") return False print("✅ 项目结构检查通过") return True def report_results(self) -> bool: """报告检查结果""" print("\n" + "=" * 60) if not self.issues: print("✅ 所有FastMCP深度合规性检查通过!") print("🎉 项目完全符合FastMCP规范要求") print(f"📊 检查了 {len(self.tool_details)} 个MCP工具函数") return True else: print(f"❌ 发现 {len(self.issues)} 个合规性问题:") print() for i, issue in enumerate(self.issues, 1): print(f" {i}. {issue}") print("\n💡 请修复上述问题后重新检查") return False def main(): """主函数""" project_root = Path(".") print("🚀 FastMCP深度合规性检查") print(f"📁 项目路径: {project_root}") print("=" * 60) checker = FastMCPComplianceChecker(project_root) success = checker.run_all_checks() if not success: print("\n❌ FastMCP合规性检查失败") print("请修复上述问题后重新运行") sys.exit(1) print("\n✅ 所有检查通过!项目符合FastMCP规范") print("可以安全地提交代码 🎉") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gqy20/genome-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server