"""
批量处理工具
提供批量生成文档、批量更新等操作。
"""
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
from data_access import FileSystemInterface
from services import DocumentService, AnalysisService
def register_batch_tools(mcp_server, document_service: DocumentService,
analysis_service: AnalysisService, file_system: FileSystemInterface):
"""
注册批量处理相关工具
Args:
mcp_server: FastMCP服务器实例
document_service: 文档服务实例
analysis_service: 分析服务实例
file_system: 文件系统接口实例
"""
@mcp_server.tool()
async def folder_docs_batch_generate_readme(
root_path: str,
recursive: bool = True,
max_depth: int = 5,
template_name: str = "readme_simple.md.j2",
overwrite: bool = False,
dry_run: bool = False,
exclude_patterns: List[str] = None
) -> Dict[str, Any]:
"""
批量生成文件夹README文件
批量为指定目录下的所有文件夹生成README文档。
Args:
root_path: 根目录路径
recursive: 是否递归处理子目录
max_depth: 最大递归深度
template_name: 使用的模板名称
overwrite: 是否覆盖已存在的README文件
dry_run: 预演模式,不实际写入文件
exclude_patterns: 排除的文件夹模式列表
Returns:
批量生成结果,包含成功、失败统计和详细信息
"""
try:
if exclude_patterns is None:
exclude_patterns = []
# 安全验证
root_path = file_system.security_validator.validate_path(root_path)
# 收集所有要处理的文件夹
folders_to_process = await collect_folders(
root_path, recursive, max_depth, exclude_patterns, file_system
)
if not folders_to_process:
return {
"success": True,
"message": "没有找到符合条件的文件夹",
"folders_processed": 0,
"results": []
}
# 批量处理
results = await batch_process_folders(
folders_to_process, document_service, template_name,
overwrite, dry_run, file_system
)
# 统计结果
successful = len([r for r in results if r["success"]])
failed = len([r for r in results if not r["success"]])
skipped = len([r for r in results if r.get("skipped", False)])
return {
"success": True,
"root_path": root_path,
"total_folders": len(folders_to_process),
"successful": successful,
"failed": failed,
"skipped": skipped,
"dry_run": dry_run,
"results": results,
"processing_time": datetime.now().isoformat()
}
except (RuntimeError, ValueError) as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"root_path": root_path
}
@mcp_server.tool()
async def folder_docs_batch_update_documentation(
root_path: str,
update_readme: bool = True,
update_mindmap: bool = False,
mindmap_format: str = "mermaid",
recursive: bool = True,
max_depth: int = 3,
force_update: bool = False,
dry_run: bool = False
) -> Dict[str, Any]:
"""
批量更新项目文档
批量更新指定目录下的README和思维导图文档。
Args:
root_path: 根目录路径
update_readme: 是否更新README文件
update_mindmap: 是否更新思维导图文件
mindmap_format: 思维导图格式
recursive: 是否递归处理
max_depth: 最大递归深度
force_update: 是否强制更新(忽略修改时间)
dry_run: 预演模式
Returns:
批量更新结果
"""
try:
# 安全验证
root_path = file_system.security_validator.validate_path(root_path)
# 收集所有要处理的文件夹
folders_to_process = await collect_folders(
root_path, recursive, max_depth, [], file_system
)
if not folders_to_process:
return {
"success": True,
"message": "没有找到符合条件的文件夹",
"folders_processed": 0,
"results": []
}
# 批量更新文档
results = []
for folder_path in folders_to_process:
folder_result = {
"folder_path": folder_path,
"readme_updated": False,
"mindmap_updated": False,
"success": True,
"errors": []
}
try:
# 更新README
if update_readme:
readme_result = await document_service.generate_readme(
folder_path, force_update=force_update, dry_run=dry_run
)
folder_result["readme_updated"] = readme_result.get("success", False)
if not readme_result.get("success", False):
folder_result["errors"].append(f"README更新失败: {readme_result.get('error')}")
# 更新思维导图
if update_mindmap:
from .mindmap_tools import folder_docs_generate_mindmap
mindmap_result = await folder_docs_generate_mindmap(
folder_path, mindmap_format, 5, True, ""
)
folder_result["mindmap_updated"] = mindmap_result.get("success", False)
if not mindmap_result.get("success", False):
folder_result["errors"].append(f"思维导图更新失败: {mindmap_result.get('error')}")
except (RuntimeError, ValueError) as e:
folder_result["success"] = False
folder_result["errors"].append(str(e))
results.append(folder_result)
# 统计结果
successful = len([r for r in results if r["success"]])
readme_updated = len([r for r in results if r["readme_updated"]])
mindmap_updated = len([r for r in results if r["mindmap_updated"]])
return {
"success": True,
"root_path": root_path,
"total_folders": len(folders_to_process),
"successful": successful,
"readme_updated": readme_updated,
"mindmap_updated": mindmap_updated,
"dry_run": dry_run,
"results": results,
"processing_time": datetime.now().isoformat()
}
except (RuntimeError, ValueError) as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"root_path": root_path
}
async def collect_folders(root_path: str, recursive: bool, max_depth: int,
exclude_patterns: List[str], file_system: FileSystemInterface) -> List[str]:
"""收集所有要处理的文件夹"""
folders = []
try:
from pathlib import Path
import fnmatch
root_dir = Path(root_path)
if not root_dir.exists() or not root_dir.is_dir():
return folders
def should_exclude(folder_path: str) -> bool:
"""检查是否应该排除该文件夹"""
folder_name = Path(folder_path).name
for pattern in exclude_patterns:
if fnmatch.fnmatch(folder_name, pattern):
return True
return False
async def scan_folder(folder_path: Path, current_depth: int = 0):
"""递归扫描文件夹"""
if current_depth > max_depth:
return
# 检查是否应该排除
if should_exclude(str(folder_path)):
return
# 添加到处理列表(排除根目录)
if current_depth > 0:
folders.append(str(folder_path))
# 如果需要递归,继续扫描子目录
if recursive and current_depth < max_depth:
try:
subfolders = [item for item in folder_path.iterdir() if item.is_dir()]
# 并发扫描子文件夹
tasks = [scan_folder(subfolder, current_depth + 1) for subfolder in subfolders]
await asyncio.gather(*tasks, return_exceptions=True)
except (PermissionError, OSError):
# 忽略权限错误
pass
# 开始扫描
await scan_folder(root_dir)
# 按路径排序
folders.sort()
except (RuntimeError, TypeError, ValueError) as e:
# 如果扫描失败,至少返回根目录
folders = [root_path] if recursive else []
return folders
async def batch_process_folders(folders: List[str], document_service: DocumentService,
template_name: str, overwrite: bool, dry_run: bool,
file_system: FileSystemInterface) -> List[Dict[str, Any]]:
"""批量处理文件夹"""
# 创建信号量控制并发数
semaphore = asyncio.Semaphore(5) # 最多同时处理5个文件夹
async def process_single_folder(folder_path: str) -> Dict[str, Any]:
async with semaphore:
try:
# 生成README
result = await document_service.generate_readme(
folder_path, template_name, overwrite, dry_run
)
return {
"folder_path": folder_path,
"success": result.get("success", False),
"message": result.get("message", ""),
"error": result.get("error"),
"skipped": result.get("skipped", False),
"dry_run": dry_run
}
except (RuntimeError, ValueError) as e:
return {
"folder_path": folder_path,
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"dry_run": dry_run
}
# 并发处理所有文件夹
tasks = [process_single_folder(folder) for folder in folders]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"folder_path": folders[i],
"success": False,
"error": str(result),
"error_type": type(result).__name__,
"dry_run": dry_run
})
else:
processed_results.append(result)
return processed_results