Skip to main content
Glama
enhanced_folder_docs_mcp.py25.1 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 智文助手增强版:文件夹中文说明文件生成器 集成安全验证、配置管理、性能监控等优化特性 版本: 2.0.0 作者: 智文团队 """ import os import sys import datetime import json import asyncio import logging from typing import Optional, List, Dict, Any, Union from pathlib import Path from contextlib import asynccontextmanager # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # MCP相关导入 try: from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, Field except ImportError as e: print(f"错误: 缺少必要的MCP依赖: {e}") print("请安装: pip install mcp fastmcp pydantic") sys.exit(1) # 配置管理导入 try: from config import get_config_manager config_manager = get_config_manager() logging.info("配置管理模块加载成功") except ImportError as e: print(f"警告: 配置管理模块导入失败: {e}") # 创建最小配置管理器 class MinimalConfigManager: def __init__(self): self.config = { 'options': {'exclude_dirs': [], 'force_update': False, 'output_file': 'folder_structure_mindmap.md'}, 'performance': {'cache_ttl': 3600}, 'server': {'transport': 'stdio', 'port': 8080} } def get(self, key: str, default=None): keys = key.split('.') value = self.config for k in keys: if isinstance(value, dict) and k in value: value = value[k] else: return default return value def validate_config(self): return [] def set(self, key: str, value): keys = key.split('.') config = self.config for k in keys[:-1]: if k not in config: config[k] = {} config = config[k] config[keys[-1]] = value config_manager = MinimalConfigManager() logger.warning("使用最小配置管理器") # 安全验证导入 try: from security import get_security_validator security_validator = get_security_validator(config_manager.get('security', {})) logger.info("安全验证模块加载成功") except ImportError as e: print(f"警告: 安全验证模块导入失败: {e}") # 创建最小安全验证器 class MinimalSecurityValidator: def validate_path(self, path: str, base_dir: str = None) -> bool: if '..' in path: return False if any(dangerous in path.lower() for dangerous in ['/etc/', '/sys/', '/proc/', 'windows/system32']): return False return True def detect_sensitive_info(self, text: str) -> List[str]: return [] def validate_file_type(self, filename: str) -> bool: return True def sanitize_path(self, path: str) -> str: return path.replace('\\', '/') security_validator = MinimalSecurityValidator() logger.warning("使用最小安全验证器") # 全局安全参数验证函数(保证所有工具可用) def validate_security_params(params: dict) -> bool: """验证参数字典的基本安全性""" return isinstance(params, dict) and all(isinstance(k, str) for k in params.keys()) # 日志已在前面初始化,无需重复设置 # 服务器生命周期管理 @asynccontextmanager async def lifespan(server: FastMCP): """服务器生命周期管理""" logger.info("启动增强版文件夹文档MCP服务器") # 加载配置 config_errors = config_manager.validate_config() if hasattr(config_manager, 'validate_config') else [] if config_errors: logger.warning(f"配置验证警告: {config_errors}") yield logger.info("关闭增强版文件夹文档MCP服务器") # 初始化MCP服务器 mcp = FastMCP( "enhanced_folder_documentation_mcp", instructions="增强版文件夹中文说明文件生成器,支持安全验证、配置管理、性能监控等功能", lifespan=lifespan ) # 定义工具输入模型 class GenerateReadmeInput(BaseModel): """生成README.md文件输入模型""" root_dir: str = Field(..., description="项目根目录路径") exclude_dirs: List[str] = Field(default_factory=lambda: config_manager.get("options.exclude_dirs", []), description="排除的目录列表") force_update: bool = Field(default=config_manager.get("options.force_update", False), description="是否强制更新现有README.md文件") class GenerateMindmapInput(BaseModel): """生成思维导图输入模型""" root_dir: str = Field(..., description="项目根目录路径") output_file: str = Field(default=config_manager.get("options.output_file", "folder_structure_mindmap.md"), description="思维导图输出文件路径") exclude_dirs: List[str] = Field(default_factory=lambda: config_manager.get("options.exclude_dirs", []), description="排除的目录列表") class UpdateDocumentationInput(BaseModel): """更新文档输入模型""" root_dir: str = Field(..., description="项目根目录路径") exclude_dirs: List[str] = Field(default_factory=lambda: config_manager.get("options.exclude_dirs", []), description="排除的目录列表") update_mindmap: bool = Field(default=True, description="是否更新思维导图") class SecurityCheckInput(BaseModel): """安全检查输入模型""" path: str = Field(..., description="要检查的路径") base_dir: str = Field(default=".", description="基础目录") class ConfigInput(BaseModel): """配置管理输入模型""" action: str = Field(..., description="操作类型: get, set, validate") key: Optional[str] = Field(None, description="配置键") value: Optional[str] = Field(None, description="配置值") # 缓存管理 class SimpleCache: """简单的内存缓存实现""" def __init__(self, ttl: int = 3600, max_size: int = 1000): self.cache = {} self.ttl = ttl self.max_size = max_size self._access_count = {} def get(self, key: str) -> Optional[Any]: try: if key in self.cache: data, timestamp = self.cache[key] current_time = datetime.datetime.now().timestamp() if current_time - timestamp < self.ttl: self._access_count[key] = current_time return data else: del self.cache[key] if key in self._access_count: del self._access_count[key] return None except Exception as e: logger.error(f"缓存获取失败: {e}") return None def set(self, key: str, value: Any) -> bool: try: if len(self.cache) >= self.max_size and key not in self.cache: self._evict_lru() self.cache[key] = (value, datetime.datetime.now().timestamp()) self._access_count[key] = datetime.datetime.now().timestamp() return True except Exception as e: logger.error(f"缓存设置失败: {e}") return False def clear(self) -> bool: try: self.cache.clear() self._access_count.clear() logger.info("缓存已清空") return True except Exception as e: logger.error(f"缓存清空失败: {e}") return False def _evict_lru(self): if not self._access_count: return lru_key = min(self._access_count.keys(), key=lambda k: self._access_count[k]) del self.cache[lru_key] del self._access_count[lru_key] logger.debug(f"删除LRU缓存条目: {lru_key}") def get_stats(self) -> Dict[str, Any]: return { 'size': len(self.cache), 'max_size': self.max_size, 'ttl': self.ttl, 'access_count': len(self._access_count) } # 全局缓存实例 cache = SimpleCache( ttl=config_manager.get("performance.cache_ttl", 3600), max_size=config_manager.get("performance.cache_memory_size", 1000) ) # 辅助函数:获取文件夹描述 def get_folder_description(folder_path: str) -> str: folder_descriptions = { "skills": "技能文件夹", "skills-translated": "翻译后的技能文件夹", "技能": "中文技能文件夹", "算法艺术": "算法艺术相关技能", "品牌指南": "品牌指南相关技能", "画布设计": "画布设计相关技能", "文档协作": "文档协作相关技能", "Word文档": "Word文档处理技能", "PDF文档": "PDF文档处理技能", "PPT演示文稿": "PPT演示文稿处理技能", "Excel表格": "Excel表格处理技能", "Web应用测试": "Web应用测试技能", "Web构件构建器": "Web构件构建器技能", "前端设计": "前端设计技能", "内部通信": "内部通信技能", "主题工厂": "主题工厂技能", "MCP构建器": "MCP构建器技能", "技能创建器": "技能创建器技能", "参考": "参考文档", "脚本": "脚本文件", "示例": "示例文件", "模板": "模板文件", "规范": "规范文件", "Office开放XML": "Office开放XML相关文件", "画布字体": "画布设计使用的字体文件", } folder_name = os.path.basename(folder_path) return folder_descriptions.get(folder_name, folder_name) # 辅助函数:生成README.md内容 def generate_readme_content(folder_path: str) -> str: folder_name = os.path.basename(folder_path) description = get_folder_description(folder_path) files = [] subfolders = [] try: for item in os.listdir(folder_path): item_path = os.path.join(folder_path, item) if os.path.isfile(item_path): files.append(item) elif os.path.isdir(item_path): subfolders.append(item) except PermissionError: logger.warning(f"无权限访问目录: {folder_path}") return f"# {description}\n\n注意:该目录访问受限。\n" content = f"# {description}\n\n## 文件夹用途\n\n该文件夹包含{description.lower()}相关的文件和子文件夹。\n\n" if subfolders: content += "## 子文件夹\n\n" for subfolder in subfolders: subfolder_desc = get_folder_description(os.path.join(folder_path, subfolder)) content += f"- **{subfolder}**: {subfolder_desc}\n" content += "\n" if files: content += "## 文件说明\n\n" for file in files: file_ext = os.path.splitext(file)[1].lower() if file_ext in ['.py']: file_type = "Python脚本文件" elif file_ext in ['.md']: file_type = "Markdown文档" elif file_ext in ['.txt']: file_type = "文本文件" elif file_ext in ['.json']: file_type = "JSON配置文件" elif file_ext in ['.yaml', '.yml']: file_type = "YAML配置文件" elif file_ext in ['.xsd']: file_type = "XML Schema文件" elif file_ext in ['.ttf', '.otf']: file_type = "字体文件" elif file_ext in ['.html']: file_type = "HTML文件" elif file_ext in ['.js']: file_type = "JavaScript文件" elif file_ext in ['.pdf']: file_type = "PDF文档" else: file_type = "其他文件" content += f"- **{file}**: {file_type}\n" content += f"\n## 更新时间\n\n该文件生成于:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" return content # 辅助函数:生成文件夹结构树 def generate_folder_tree(root_dir: str, exclude_dirs: List[str], prefix: str = "", is_last: bool = True) -> str: tree = "" try: items = os.listdir(root_dir) items.sort() items = [item for item in items if item not in exclude_dirs and not item.startswith('.')] for i, item in enumerate(items): is_last_item = i == len(items) - 1 item_path = os.path.join(root_dir, item) try: if os.path.isdir(item_path): connector = "└── " if is_last_item else "├── " tree += f"{prefix}{connector}{item}/\n" new_prefix = prefix + (" " if is_last_item else "│ ") tree += generate_folder_tree(item_path, exclude_dirs, new_prefix, is_last_item) else: connector = "└── " if is_last_item else "├── " tree += f"{prefix}{connector}{item}\n" except PermissionError: connector = "└── " if is_last_item else "├── " tree += f"{prefix}{connector}[无权限访问: {item}]\n" except PermissionError: tree = f"{prefix}[无权限访问: {os.path.basename(root_dir)}]\n" return tree # 辅助函数:生成思维导图 def generate_mindmap_content(root_dir: str, exclude_dirs: List[str]) -> str: mindmap = f"# 项目文件夹结构思维导图\n\n生成时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n```mermaid\nflowchart TD\n subgraph 项目结构\n" def add_nodes(path: str, parent_node: str, level: int): if level > 4: return "" result = "" try: items = os.listdir(path) items.sort() items = [item for item in items if item not in exclude_dirs and not item.startswith('.')] for item in items: item_path = os.path.join(path, item) node_id = item_path.replace("\\", "_").replace(":", "_").replace(".", "_").replace(" ", "_") node_label = item if os.path.isdir(item_path): node_label += "/" result += f" {node_id}[\"{node_label}\"]\n" if parent_node: result += f" {parent_node} --> {node_id}\n" if os.path.isdir(item_path): result += add_nodes(item_path, node_id, level + 1) except PermissionError: pass return result mindmap += add_nodes(root_dir, "", 0) mindmap += " end\n```\n\n## 更新日志\n\n- **" + f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: 生成初始文件夹结构思维导图\n" return mindmap # 注册工具:安全检查 @mcp.tool() async def security_check(params: SecurityCheckInput) -> str: path = params.path base_dir = params.base_dir if not security_validator.validate_path(path, base_dir): return f"❌ 路径验证失败: '{path}' 包含不安全的路径组件" full_path = os.path.join(base_dir, path) if not os.path.isabs(path) else path if os.path.isfile(full_path): try: with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read(1024) sensitive_info = security_validator.detect_sensitive_info(content) if sensitive_info: return f"⚠️ 检测到敏感信息: {', '.join(sensitive_info)}" except Exception as e: logger.warning(f"读取文件失败: {e}") return f"✅ 安全检查通过: '{path}'" # 注册工具:配置管理 @mcp.tool() async def config_manager_tool(params: ConfigInput) -> str: action = params.action key = params.key value = params.value if action == "get": if not key: return json.dumps(config_manager.config, indent=2, ensure_ascii=False) else: value = config_manager.get(key) return f"{key}: {value}" elif action == "set": if not key or value is None: return "❌ 设置配置需要提供key和value" try: parsed_value = json.loads(value) except: parsed_value = value config_manager.set(key, parsed_value) return f"✅ 已设置 {key} = {parsed_value}" elif action == "validate": if hasattr(config_manager, 'validate_config'): errors = config_manager.validate_config() if errors: return f"❌ 配置验证失败:\n" + "\n".join(f"- {error}" for error in errors) else: return "✅ 配置验证通过" else: return "⚠️ 配置验证功能不可用" else: return f"❌ 不支持的操作: {action}" # 注册工具:性能监控 @mcp.tool() async def performance_stats() -> str: stats = { "cache_info": {"size": len(cache.cache), "ttl": cache.ttl}, "server_info": {"name": "enhanced_folder_documentation_mcp", "uptime": "unknown", "version": "1.0.0"}, "system_info": {"python_version": sys.version, "platform": sys.platform} } return json.dumps(stats, indent=2, ensure_ascii=False) # 注册工具:生成README.md文件 @mcp.tool() async def generate_readme_files(params: GenerateReadmeInput) -> str: if not validate_security_params(params.__dict__): return "❌ 输入参数安全验证失败" root_dir = params.root_dir exclude_dirs = params.exclude_dirs + [".git", ".trae", ".claude-plugin", ".claude插件"] force_update = params.force_update if not security_validator.validate_path(root_dir): return f"❌ 根目录路径验证失败: {root_dir}" if not os.path.exists(root_dir): return f"❌ 根目录不存在: {root_dir}" generated_count = 0 skipped_count = 0 for root, dirs, files in os.walk(root_dir): dirs[:] = [d for d in dirs if d not in exclude_dirs and not d.startswith('.')] readme_path = os.path.join(root, "README.md") if os.path.exists(readme_path) and not force_update: skipped_count += 1 continue readme_content = generate_readme_content(root) try: with open(readme_path, "w", encoding="utf-8") as f: f.write(readme_content) generated_count += 1 logger.info(f"生成README文件: {readme_path}") except Exception as e: logger.error(f"生成README文件失败: {readme_path}, 错误: {e}") result = f"已生成 {generated_count} 个README.md文件,跳过 {skipped_count} 个已存在的文件。" logger.info(result) return result # 注册工具:生成思维导图 @mcp.tool() async def generate_mindmap(params: GenerateMindmapInput) -> str: if not validate_security_params(params.__dict__): return "❌ 输入参数安全验证失败" root_dir = params.root_dir output_file = params.output_file exclude_dirs = params.exclude_dirs + [".git", ".trae", ".claude-plugin", ".claude插件"] if not security_validator.validate_path(root_dir): return f"❌ 根目录路径验证失败: {root_dir}" if not os.path.exists(root_dir): return f"❌ 根目录不存在: {root_dir}" output_dir = os.path.dirname(output_file) if output_dir and not security_validator.validate_path(output_dir): return f"❌ 输出目录路径验证失败: {output_dir}" cache_key = f"mindmap_{root_dir}_{hash(tuple(exclude_dirs))}" cached_content = cache.get(cache_key) if cached_content: return "📋 使用缓存的内容" mindmap_content = generate_mindmap_content(root_dir, exclude_dirs) try: with open(output_file, "w", encoding="utf-8") as f: f.write(mindmap_content) cache.set(cache_key, mindmap_content) result = f"已生成思维导图文件:{output_file}" logger.info(result) return result except Exception as e: logger.error(f"生成思维导图失败: {e}") return f"❌ 生成思维导图失败: {e}" # 注册工具:更新文档 @mcp.tool() async def update_documentation(params: UpdateDocumentationInput) -> str: if not validate_security_params(params.__dict__): return "❌ 输入参数安全验证失败" root_dir = params.root_dir exclude_dirs = params.exclude_dirs + [".git", ".trae", ".claude-plugin", ".claude插件"] update_mindmap_flag = params.update_mindmap if not security_validator.validate_path(root_dir): return f"❌ 根目录路径验证失败: {root_dir}" if not os.path.exists(root_dir): return f"❌ 根目录不存在: {root_dir}" generated_count = 0 updated_count = 0 for root, dirs, files in os.walk(root_dir): dirs[:] = [d for d in dirs if d not in exclude_dirs and not d.startswith('.')] readme_path = os.path.join(root, "README.md") readme_content = generate_readme_content(root) try: if os.path.exists(readme_path): with open(readme_path, "w", encoding="utf-8") as f: f.write(readme_content) updated_count += 1 else: with open(readme_path, "w", encoding="utf-8") as f: f.write(readme_content) generated_count += 1 except Exception as e: logger.error(f"更新README文件失败: {readme_path}, 错误: {e}") result = f"已更新 {updated_count} 个README.md文件,生成 {generated_count} 个新的README.md文件。\n" if update_mindmap_flag: mindmap_path = os.path.join(root_dir, "folder_structure_mindmap.md") if not security_validator.validate_path(mindmap_path): result += f"❌ 思维导图输出路径验证失败: {mindmap_path}" else: mindmap_content = generate_mindmap_content(root_dir, exclude_dirs) try: with open(mindmap_path, "w", encoding="utf-8") as f: f.write(mindmap_content) result += f"已更新思维导图文件:{mindmap_path}" except Exception as e: logger.error(f"更新思维导图失败: {e}") result += f"❌ 更新思维导图失败: {e}" logger.info(result) return result # 注册工具:获取文件夹结构 @mcp.tool() async def get_folder_structure(params: GenerateReadmeInput) -> str: if not validate_security_params(params.__dict__): return "❌ 输入参数安全验证失败" root_dir = params.root_dir exclude_dirs = params.exclude_dirs + [".git", ".trae", ".claude-plugin", ".claude插件"] if not security_validator.validate_path(root_dir): return f"❌ 根目录路径验证失败: {root_dir}" if not os.path.exists(root_dir): return f"❌ 根目录不存在: {root_dir}" cache_key = f"structure_{root_dir}_{hash(tuple(exclude_dirs))}" cached_content = cache.get(cache_key) if cached_content: return "📋 使用缓存的内容:\n\n" + cached_content folder_tree = generate_folder_tree(root_dir, exclude_dirs) cache.set(cache_key, folder_tree) result = f"# 项目文件夹结构\n\n```\n{folder_tree}```" return result # ===== 兼容 JSON-RPC list_tools / call_tool 协议 ===== async def _list_tools_impl() -> list: """返回所有已注册的工具列表(兼容 JSON-RPC 发现协议)""" tools = [] # 适配不同 FastMCP 版本的工具注册表 registry = getattr(mcp, '_tool_registry', None) or getattr(mcp, 'tools', {}) for name, func in registry.items(): tools.append({ "name": name, "description": func.__doc__ or "" }) return tools async def _call_tool_impl(name: str, arguments: dict) -> list: """根据工具名称调用对应函数(兼容 JSON-RPC 调用协议)""" registry = getattr(mcp, '_tool_registry', None) or getattr(mcp, 'tools', {}) if name in registry: func = registry[name] try: if asyncio.iscoroutinefunction(func): result = await func(**arguments) else: result = func(**arguments) return [{"type": "text", "text": str(result)}] except Exception as e: return [{"type": "text", "text": f"调用工具 {name} 出错: {str(e)}"}] else: return [{"type": "text", "text": f"未知工具: {name}"}] # 挂载到 MCP 实例(兼容不同版本) mcp.list_tools = _list_tools_impl mcp.call_tool = _call_tool_impl # 启动服务器 if __name__ == "__main__": transport = config_manager.get("server.transport", "stdio") port = config_manager.get("server.port", 8000) logger.info(f"启动增强版文件夹文档MCP服务器,传输协议: {transport}") if transport == "streamable-http": mcp.run(transport="streamable-http", port=port) else: mcp.run(transport="stdio")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server