#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
智文助手增强版:文件夹中文说明文件生成器
集成安全验证、配置管理、性能监控等优化特性
版本: 2.0.0
作者: 智文团队
"""
import os
import sys
import datetime
import json
import asyncio
import logging
from typing import Optional, List, Dict, Any, Union
from pathlib import Path
from contextlib import asynccontextmanager
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# MCP相关导入
try:
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
except ImportError as e:
print(f"错误: 缺少必要的MCP依赖: {e}")
print("请安装: pip install mcp fastmcp pydantic")
sys.exit(1)
# 配置管理导入
try:
from config import get_config_manager
config_manager = get_config_manager()
logging.info("配置管理模块加载成功")
except ImportError as e:
print(f"警告: 配置管理模块导入失败: {e}")
# 创建最小配置管理器
class MinimalConfigManager:
def __init__(self):
self.config = {
'options': {'exclude_dirs': [], 'force_update': False, 'output_file': 'folder_structure_mindmap.md'},
'performance': {'cache_ttl': 3600},
'server': {'transport': 'stdio', 'port': 8080}
}
def get(self, key: str, default=None):
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def validate_config(self):
return []
def set(self, key: str, value):
keys = key.split('.')
config = self.config
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
config[keys[-1]] = value
config_manager = MinimalConfigManager()
logger.warning("使用最小配置管理器")
# 安全验证导入
try:
from security import get_security_validator
security_validator = get_security_validator(config_manager.get('security', {}))
logger.info("安全验证模块加载成功")
except ImportError as e:
print(f"警告: 安全验证模块导入失败: {e}")
# 创建最小安全验证器
class MinimalSecurityValidator:
def validate_path(self, path: str, base_dir: str = None) -> bool:
if '..' in path:
return False
if any(dangerous in path.lower() for dangerous in ['/etc/', '/sys/', '/proc/', 'windows/system32']):
return False
return True
def detect_sensitive_info(self, text: str) -> List[str]:
return []
def validate_file_type(self, filename: str) -> bool:
return True
def sanitize_path(self, path: str) -> str:
return path.replace('\\', '/')
security_validator = MinimalSecurityValidator()
logger.warning("使用最小安全验证器")
# 全局安全参数验证函数(保证所有工具可用)
def validate_security_params(params: dict) -> bool:
"""验证参数字典的基本安全性"""
return isinstance(params, dict) and all(isinstance(k, str) for k in params.keys())
# 日志已在前面初始化,无需重复设置
# 服务器生命周期管理
@asynccontextmanager
async def lifespan(server: FastMCP):
"""服务器生命周期管理"""
logger.info("启动增强版文件夹文档MCP服务器")
# 加载配置
config_errors = config_manager.validate_config() if hasattr(config_manager, 'validate_config') else []
if config_errors:
logger.warning(f"配置验证警告: {config_errors}")
yield
logger.info("关闭增强版文件夹文档MCP服务器")
# 初始化MCP服务器
mcp = FastMCP(
"enhanced_folder_documentation_mcp",
instructions="增强版文件夹中文说明文件生成器,支持安全验证、配置管理、性能监控等功能",
lifespan=lifespan
)
# 定义工具输入模型
class GenerateReadmeInput(BaseModel):
"""生成README.md文件输入模型"""
root_dir: str = Field(..., description="项目根目录路径")
exclude_dirs: List[str] = Field(default_factory=lambda: config_manager.get("options.exclude_dirs", []), description="排除的目录列表")
force_update: bool = Field(default=config_manager.get("options.force_update", False), description="是否强制更新现有README.md文件")
class GenerateMindmapInput(BaseModel):
"""生成思维导图输入模型"""
root_dir: str = Field(..., description="项目根目录路径")
output_file: str = Field(default=config_manager.get("options.output_file", "folder_structure_mindmap.md"), description="思维导图输出文件路径")
exclude_dirs: List[str] = Field(default_factory=lambda: config_manager.get("options.exclude_dirs", []), description="排除的目录列表")
class UpdateDocumentationInput(BaseModel):
"""更新文档输入模型"""
root_dir: str = Field(..., description="项目根目录路径")
exclude_dirs: List[str] = Field(default_factory=lambda: config_manager.get("options.exclude_dirs", []), description="排除的目录列表")
update_mindmap: bool = Field(default=True, description="是否更新思维导图")
class SecurityCheckInput(BaseModel):
"""安全检查输入模型"""
path: str = Field(..., description="要检查的路径")
base_dir: str = Field(default=".", description="基础目录")
class ConfigInput(BaseModel):
"""配置管理输入模型"""
action: str = Field(..., description="操作类型: get, set, validate")
key: Optional[str] = Field(None, description="配置键")
value: Optional[str] = Field(None, description="配置值")
# 缓存管理
class SimpleCache:
"""简单的内存缓存实现"""
def __init__(self, ttl: int = 3600, max_size: int = 1000):
self.cache = {}
self.ttl = ttl
self.max_size = max_size
self._access_count = {}
def get(self, key: str) -> Optional[Any]:
try:
if key in self.cache:
data, timestamp = self.cache[key]
current_time = datetime.datetime.now().timestamp()
if current_time - timestamp < self.ttl:
self._access_count[key] = current_time
return data
else:
del self.cache[key]
if key in self._access_count:
del self._access_count[key]
return None
except Exception as e:
logger.error(f"缓存获取失败: {e}")
return None
def set(self, key: str, value: Any) -> bool:
try:
if len(self.cache) >= self.max_size and key not in self.cache:
self._evict_lru()
self.cache[key] = (value, datetime.datetime.now().timestamp())
self._access_count[key] = datetime.datetime.now().timestamp()
return True
except Exception as e:
logger.error(f"缓存设置失败: {e}")
return False
def clear(self) -> bool:
try:
self.cache.clear()
self._access_count.clear()
logger.info("缓存已清空")
return True
except Exception as e:
logger.error(f"缓存清空失败: {e}")
return False
def _evict_lru(self):
if not self._access_count:
return
lru_key = min(self._access_count.keys(), key=lambda k: self._access_count[k])
del self.cache[lru_key]
del self._access_count[lru_key]
logger.debug(f"删除LRU缓存条目: {lru_key}")
def get_stats(self) -> Dict[str, Any]:
return {
'size': len(self.cache),
'max_size': self.max_size,
'ttl': self.ttl,
'access_count': len(self._access_count)
}
# 全局缓存实例
cache = SimpleCache(
ttl=config_manager.get("performance.cache_ttl", 3600),
max_size=config_manager.get("performance.cache_memory_size", 1000)
)
# 辅助函数:获取文件夹描述
def get_folder_description(folder_path: str) -> str:
folder_descriptions = {
"skills": "技能文件夹", "skills-translated": "翻译后的技能文件夹", "技能": "中文技能文件夹",
"算法艺术": "算法艺术相关技能", "品牌指南": "品牌指南相关技能", "画布设计": "画布设计相关技能",
"文档协作": "文档协作相关技能", "Word文档": "Word文档处理技能", "PDF文档": "PDF文档处理技能",
"PPT演示文稿": "PPT演示文稿处理技能", "Excel表格": "Excel表格处理技能",
"Web应用测试": "Web应用测试技能", "Web构件构建器": "Web构件构建器技能", "前端设计": "前端设计技能",
"内部通信": "内部通信技能", "主题工厂": "主题工厂技能", "MCP构建器": "MCP构建器技能",
"技能创建器": "技能创建器技能", "参考": "参考文档", "脚本": "脚本文件", "示例": "示例文件",
"模板": "模板文件", "规范": "规范文件", "Office开放XML": "Office开放XML相关文件",
"画布字体": "画布设计使用的字体文件",
}
folder_name = os.path.basename(folder_path)
return folder_descriptions.get(folder_name, folder_name)
# 辅助函数:生成README.md内容
def generate_readme_content(folder_path: str) -> str:
folder_name = os.path.basename(folder_path)
description = get_folder_description(folder_path)
files = []
subfolders = []
try:
for item in os.listdir(folder_path):
item_path = os.path.join(folder_path, item)
if os.path.isfile(item_path):
files.append(item)
elif os.path.isdir(item_path):
subfolders.append(item)
except PermissionError:
logger.warning(f"无权限访问目录: {folder_path}")
return f"# {description}\n\n注意:该目录访问受限。\n"
content = f"# {description}\n\n## 文件夹用途\n\n该文件夹包含{description.lower()}相关的文件和子文件夹。\n\n"
if subfolders:
content += "## 子文件夹\n\n"
for subfolder in subfolders:
subfolder_desc = get_folder_description(os.path.join(folder_path, subfolder))
content += f"- **{subfolder}**: {subfolder_desc}\n"
content += "\n"
if files:
content += "## 文件说明\n\n"
for file in files:
file_ext = os.path.splitext(file)[1].lower()
if file_ext in ['.py']:
file_type = "Python脚本文件"
elif file_ext in ['.md']:
file_type = "Markdown文档"
elif file_ext in ['.txt']:
file_type = "文本文件"
elif file_ext in ['.json']:
file_type = "JSON配置文件"
elif file_ext in ['.yaml', '.yml']:
file_type = "YAML配置文件"
elif file_ext in ['.xsd']:
file_type = "XML Schema文件"
elif file_ext in ['.ttf', '.otf']:
file_type = "字体文件"
elif file_ext in ['.html']:
file_type = "HTML文件"
elif file_ext in ['.js']:
file_type = "JavaScript文件"
elif file_ext in ['.pdf']:
file_type = "PDF文档"
else:
file_type = "其他文件"
content += f"- **{file}**: {file_type}\n"
content += f"\n## 更新时间\n\n该文件生成于:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
return content
# 辅助函数:生成文件夹结构树
def generate_folder_tree(root_dir: str, exclude_dirs: List[str], prefix: str = "", is_last: bool = True) -> str:
tree = ""
try:
items = os.listdir(root_dir)
items.sort()
items = [item for item in items if item not in exclude_dirs and not item.startswith('.')]
for i, item in enumerate(items):
is_last_item = i == len(items) - 1
item_path = os.path.join(root_dir, item)
try:
if os.path.isdir(item_path):
connector = "└── " if is_last_item else "├── "
tree += f"{prefix}{connector}{item}/\n"
new_prefix = prefix + (" " if is_last_item else "│ ")
tree += generate_folder_tree(item_path, exclude_dirs, new_prefix, is_last_item)
else:
connector = "└── " if is_last_item else "├── "
tree += f"{prefix}{connector}{item}\n"
except PermissionError:
connector = "└── " if is_last_item else "├── "
tree += f"{prefix}{connector}[无权限访问: {item}]\n"
except PermissionError:
tree = f"{prefix}[无权限访问: {os.path.basename(root_dir)}]\n"
return tree
# 辅助函数:生成思维导图
def generate_mindmap_content(root_dir: str, exclude_dirs: List[str]) -> str:
mindmap = f"# 项目文件夹结构思维导图\n\n生成时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n```mermaid\nflowchart TD\n subgraph 项目结构\n"
def add_nodes(path: str, parent_node: str, level: int):
if level > 4:
return ""
result = ""
try:
items = os.listdir(path)
items.sort()
items = [item for item in items if item not in exclude_dirs and not item.startswith('.')]
for item in items:
item_path = os.path.join(path, item)
node_id = item_path.replace("\\", "_").replace(":", "_").replace(".", "_").replace(" ", "_")
node_label = item
if os.path.isdir(item_path):
node_label += "/"
result += f" {node_id}[\"{node_label}\"]\n"
if parent_node:
result += f" {parent_node} --> {node_id}\n"
if os.path.isdir(item_path):
result += add_nodes(item_path, node_id, level + 1)
except PermissionError:
pass
return result
mindmap += add_nodes(root_dir, "", 0)
mindmap += " end\n```\n\n## 更新日志\n\n- **" + f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: 生成初始文件夹结构思维导图\n"
return mindmap
# 注册工具:安全检查
@mcp.tool()
async def security_check(params: SecurityCheckInput) -> str:
path = params.path
base_dir = params.base_dir
if not security_validator.validate_path(path, base_dir):
return f"❌ 路径验证失败: '{path}' 包含不安全的路径组件"
full_path = os.path.join(base_dir, path) if not os.path.isabs(path) else path
if os.path.isfile(full_path):
try:
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read(1024)
sensitive_info = security_validator.detect_sensitive_info(content)
if sensitive_info:
return f"⚠️ 检测到敏感信息: {', '.join(sensitive_info)}"
except Exception as e:
logger.warning(f"读取文件失败: {e}")
return f"✅ 安全检查通过: '{path}'"
# 注册工具:配置管理
@mcp.tool()
async def config_manager_tool(params: ConfigInput) -> str:
action = params.action
key = params.key
value = params.value
if action == "get":
if not key:
return json.dumps(config_manager.config, indent=2, ensure_ascii=False)
else:
value = config_manager.get(key)
return f"{key}: {value}"
elif action == "set":
if not key or value is None:
return "❌ 设置配置需要提供key和value"
try:
parsed_value = json.loads(value)
except:
parsed_value = value
config_manager.set(key, parsed_value)
return f"✅ 已设置 {key} = {parsed_value}"
elif action == "validate":
if hasattr(config_manager, 'validate_config'):
errors = config_manager.validate_config()
if errors:
return f"❌ 配置验证失败:\n" + "\n".join(f"- {error}" for error in errors)
else:
return "✅ 配置验证通过"
else:
return "⚠️ 配置验证功能不可用"
else:
return f"❌ 不支持的操作: {action}"
# 注册工具:性能监控
@mcp.tool()
async def performance_stats() -> str:
stats = {
"cache_info": {"size": len(cache.cache), "ttl": cache.ttl},
"server_info": {"name": "enhanced_folder_documentation_mcp", "uptime": "unknown", "version": "1.0.0"},
"system_info": {"python_version": sys.version, "platform": sys.platform}
}
return json.dumps(stats, indent=2, ensure_ascii=False)
# 注册工具:生成README.md文件
@mcp.tool()
async def generate_readme_files(params: GenerateReadmeInput) -> str:
if not validate_security_params(params.__dict__):
return "❌ 输入参数安全验证失败"
root_dir = params.root_dir
exclude_dirs = params.exclude_dirs + [".git", ".trae", ".claude-plugin", ".claude插件"]
force_update = params.force_update
if not security_validator.validate_path(root_dir):
return f"❌ 根目录路径验证失败: {root_dir}"
if not os.path.exists(root_dir):
return f"❌ 根目录不存在: {root_dir}"
generated_count = 0
skipped_count = 0
for root, dirs, files in os.walk(root_dir):
dirs[:] = [d for d in dirs if d not in exclude_dirs and not d.startswith('.')]
readme_path = os.path.join(root, "README.md")
if os.path.exists(readme_path) and not force_update:
skipped_count += 1
continue
readme_content = generate_readme_content(root)
try:
with open(readme_path, "w", encoding="utf-8") as f:
f.write(readme_content)
generated_count += 1
logger.info(f"生成README文件: {readme_path}")
except Exception as e:
logger.error(f"生成README文件失败: {readme_path}, 错误: {e}")
result = f"已生成 {generated_count} 个README.md文件,跳过 {skipped_count} 个已存在的文件。"
logger.info(result)
return result
# 注册工具:生成思维导图
@mcp.tool()
async def generate_mindmap(params: GenerateMindmapInput) -> str:
if not validate_security_params(params.__dict__):
return "❌ 输入参数安全验证失败"
root_dir = params.root_dir
output_file = params.output_file
exclude_dirs = params.exclude_dirs + [".git", ".trae", ".claude-plugin", ".claude插件"]
if not security_validator.validate_path(root_dir):
return f"❌ 根目录路径验证失败: {root_dir}"
if not os.path.exists(root_dir):
return f"❌ 根目录不存在: {root_dir}"
output_dir = os.path.dirname(output_file)
if output_dir and not security_validator.validate_path(output_dir):
return f"❌ 输出目录路径验证失败: {output_dir}"
cache_key = f"mindmap_{root_dir}_{hash(tuple(exclude_dirs))}"
cached_content = cache.get(cache_key)
if cached_content:
return "📋 使用缓存的内容"
mindmap_content = generate_mindmap_content(root_dir, exclude_dirs)
try:
with open(output_file, "w", encoding="utf-8") as f:
f.write(mindmap_content)
cache.set(cache_key, mindmap_content)
result = f"已生成思维导图文件:{output_file}"
logger.info(result)
return result
except Exception as e:
logger.error(f"生成思维导图失败: {e}")
return f"❌ 生成思维导图失败: {e}"
# 注册工具:更新文档
@mcp.tool()
async def update_documentation(params: UpdateDocumentationInput) -> str:
if not validate_security_params(params.__dict__):
return "❌ 输入参数安全验证失败"
root_dir = params.root_dir
exclude_dirs = params.exclude_dirs + [".git", ".trae", ".claude-plugin", ".claude插件"]
update_mindmap_flag = params.update_mindmap
if not security_validator.validate_path(root_dir):
return f"❌ 根目录路径验证失败: {root_dir}"
if not os.path.exists(root_dir):
return f"❌ 根目录不存在: {root_dir}"
generated_count = 0
updated_count = 0
for root, dirs, files in os.walk(root_dir):
dirs[:] = [d for d in dirs if d not in exclude_dirs and not d.startswith('.')]
readme_path = os.path.join(root, "README.md")
readme_content = generate_readme_content(root)
try:
if os.path.exists(readme_path):
with open(readme_path, "w", encoding="utf-8") as f:
f.write(readme_content)
updated_count += 1
else:
with open(readme_path, "w", encoding="utf-8") as f:
f.write(readme_content)
generated_count += 1
except Exception as e:
logger.error(f"更新README文件失败: {readme_path}, 错误: {e}")
result = f"已更新 {updated_count} 个README.md文件,生成 {generated_count} 个新的README.md文件。\n"
if update_mindmap_flag:
mindmap_path = os.path.join(root_dir, "folder_structure_mindmap.md")
if not security_validator.validate_path(mindmap_path):
result += f"❌ 思维导图输出路径验证失败: {mindmap_path}"
else:
mindmap_content = generate_mindmap_content(root_dir, exclude_dirs)
try:
with open(mindmap_path, "w", encoding="utf-8") as f:
f.write(mindmap_content)
result += f"已更新思维导图文件:{mindmap_path}"
except Exception as e:
logger.error(f"更新思维导图失败: {e}")
result += f"❌ 更新思维导图失败: {e}"
logger.info(result)
return result
# 注册工具:获取文件夹结构
@mcp.tool()
async def get_folder_structure(params: GenerateReadmeInput) -> str:
if not validate_security_params(params.__dict__):
return "❌ 输入参数安全验证失败"
root_dir = params.root_dir
exclude_dirs = params.exclude_dirs + [".git", ".trae", ".claude-plugin", ".claude插件"]
if not security_validator.validate_path(root_dir):
return f"❌ 根目录路径验证失败: {root_dir}"
if not os.path.exists(root_dir):
return f"❌ 根目录不存在: {root_dir}"
cache_key = f"structure_{root_dir}_{hash(tuple(exclude_dirs))}"
cached_content = cache.get(cache_key)
if cached_content:
return "📋 使用缓存的内容:\n\n" + cached_content
folder_tree = generate_folder_tree(root_dir, exclude_dirs)
cache.set(cache_key, folder_tree)
result = f"# 项目文件夹结构\n\n```\n{folder_tree}```"
return result
# ===== 兼容 JSON-RPC list_tools / call_tool 协议 =====
async def _list_tools_impl() -> list:
"""返回所有已注册的工具列表(兼容 JSON-RPC 发现协议)"""
tools = []
# 适配不同 FastMCP 版本的工具注册表
registry = getattr(mcp, '_tool_registry', None) or getattr(mcp, 'tools', {})
for name, func in registry.items():
tools.append({
"name": name,
"description": func.__doc__ or ""
})
return tools
async def _call_tool_impl(name: str, arguments: dict) -> list:
"""根据工具名称调用对应函数(兼容 JSON-RPC 调用协议)"""
registry = getattr(mcp, '_tool_registry', None) or getattr(mcp, 'tools', {})
if name in registry:
func = registry[name]
try:
if asyncio.iscoroutinefunction(func):
result = await func(**arguments)
else:
result = func(**arguments)
return [{"type": "text", "text": str(result)}]
except Exception as e:
return [{"type": "text", "text": f"调用工具 {name} 出错: {str(e)}"}]
else:
return [{"type": "text", "text": f"未知工具: {name}"}]
# 挂载到 MCP 实例(兼容不同版本)
mcp.list_tools = _list_tools_impl
mcp.call_tool = _call_tool_impl
# 启动服务器
if __name__ == "__main__":
transport = config_manager.get("server.transport", "stdio")
port = config_manager.get("server.port", 8000)
logger.info(f"启动增强版文件夹文档MCP服务器,传输协议: {transport}")
if transport == "streamable-http":
mcp.run(transport="streamable-http", port=port)
else:
mcp.run(transport="stdio")