"""文件管理器MCP工具实现
这个模块包含了所有文件系统操作工具的具体实现。
"""
import os
import shutil
import glob
from pathlib import Path
from typing import List
import logging
logger = logging.getLogger(__name__)
async def list_directory(path: str, show_hidden: bool = False) -> str:
"""列出目录内容
Args:
path: 目录路径
show_hidden: 是否显示隐藏文件
Returns:
目录内容的字符串表示
"""
try:
# 验证路径
dir_path = Path(path)
if not dir_path.exists():
return f"错误: 路径 '{path}' 不存在"
if not dir_path.is_dir():
return f"错误: '{path}' 不是一个目录"
# 获取目录内容
items = []
for item in dir_path.iterdir():
# 跳过隐藏文件(如果不显示隐藏文件)
if not show_hidden and item.name.startswith('.'):
continue
item_type = "目录" if item.is_dir() else "文件"
try:
size = item.stat().st_size if item.is_file() else "-"
items.append(f"{item_type}: {item.name} ({size} 字节)" if size != "-" else f"{item_type}: {item.name}")
except (OSError, PermissionError):
items.append(f"{item_type}: {item.name} (无法访问)")
if not items:
return f"目录 '{path}' 为空,没有内容"
result = f"目录 '{path}' 的内容:\n"
result += "\n".join(sorted(items))
return result
except PermissionError:
return f"错误: 没有权限访问目录 '{path}'"
except Exception as e:
logger.error(f"列出目录失败: {e}")
return f"错误: 列出目录时发生异常: {str(e)}"
async def read_file(path: str, encoding: str = "utf-8") -> str:
"""读取文件内容
Args:
path: 文件路径
encoding: 文件编码
Returns:
文件内容或错误信息
"""
try:
file_path = Path(path)
if not file_path.exists():
return f"错误: 文件 '{path}' 不存在"
if not file_path.is_file():
return f"错误: '{path}' 不是一个文件"
# 检查文件大小(限制为10MB)
file_size = file_path.stat().st_size
if file_size > 10 * 1024 * 1024: # 10MB
return f"错误: 文件 '{path}' 太大 ({file_size} 字节),超过10MB限制"
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
return f"文件 '{path}' 的内容:\n{content}"
except UnicodeDecodeError:
return f"错误: 无法使用 {encoding} 编码读取文件 '{path}',可能是二进制文件"
except PermissionError:
return f"错误: 没有权限读取文件 '{path}'"
except Exception as e:
logger.error(f"读取文件失败: {e}")
return f"错误: 读取文件时发生异常: {str(e)}"
async def create_file(path: str, content: str = "", encoding: str = "utf-8") -> str:
"""创建文件
Args:
path: 文件路径
content: 文件内容
encoding: 文件编码
Returns:
操作结果信息
"""
try:
file_path = Path(path)
# 创建父目录(如果不存在)
file_path.parent.mkdir(parents=True, exist_ok=True)
# 检查文件是否已存在
if file_path.exists():
return f"错误: 文件 '{path}' 已存在"
# 创建文件
with open(file_path, 'w', encoding=encoding) as f:
f.write(content)
file_size = file_path.stat().st_size
return f"成功创建文件 '{path}' ({file_size} 字节)"
except PermissionError:
return f"错误: 没有权限在 '{path}' 创建文件"
except Exception as e:
logger.error(f"创建文件失败: {e}")
return f"错误: 创建文件时发生异常: {str(e)}"
async def delete_file(path: str, recursive: bool = False) -> str:
"""删除文件或目录
Args:
path: 文件或目录路径
recursive: 是否递归删除目录
Returns:
操作结果信息
"""
try:
target_path = Path(path)
if not target_path.exists():
return f"错误: 路径 '{path}' 不存在"
if target_path.is_file():
target_path.unlink()
return f"成功删除文件 '{path}'"
elif target_path.is_dir():
if recursive:
shutil.rmtree(target_path)
return f"成功递归删除目录 '{path}' 及其所有内容"
else:
# 检查目录是否为空
if any(target_path.iterdir()):
return f"错误: 目录 '{path}' 不为空,请使用 recursive=true 参数"
target_path.rmdir()
return f"成功删除空目录 '{path}'"
else:
return f"错误: '{path}' 既不是文件也不是目录"
except PermissionError:
return f"错误: 没有权限删除 '{path}'"
except Exception as e:
logger.error(f"删除失败: {e}")
return f"错误: 删除时发生异常: {str(e)}"
async def search_files(directory: str, pattern: str, recursive: bool = True) -> str:
"""搜索文件
Args:
directory: 搜索目录
pattern: 搜索模式(支持通配符)
recursive: 是否递归搜索
Returns:
搜索结果
"""
try:
search_path = Path(directory)
if not search_path.exists():
return f"错误: 目录 '{directory}' 不存在"
if not search_path.is_dir():
return f"错误: '{directory}' 不是一个目录"
# 构建搜索模式
if recursive:
search_pattern = str(search_path / "**" / pattern)
matches = glob.glob(search_pattern, recursive=True)
else:
search_pattern = str(search_path / pattern)
matches = glob.glob(search_pattern)
if not matches:
return f"在目录 '{directory}' 中没有找到匹配 '{pattern}' 的文件"
# 整理结果
results = []
for match in sorted(matches):
match_path = Path(match)
relative_path = match_path.relative_to(search_path)
item_type = "目录" if match_path.is_dir() else "文件"
try:
size = match_path.stat().st_size if match_path.is_file() else "-"
size_info = f" ({size} 字节)" if size != "-" else ""
results.append(f"{item_type}: {relative_path}{size_info}")
except (OSError, PermissionError):
results.append(f"{item_type}: {relative_path} (无法访问)")
result = f"在目录 '{directory}' 中搜索 '{pattern}' 的结果 ({len(results)} 个匹配):\n"
result += "\n".join(results)
return result
except PermissionError:
return f"错误: 没有权限搜索目录 '{directory}'"
except Exception as e:
logger.error(f"搜索文件失败: {e}")
return f"错误: 搜索文件时发生异常: {str(e)}"
async def copy_files(source_paths: List[str], destination_folder: str, overwrite: bool = False) -> str:
"""复制文件到目标文件夹
Args:
source_paths: 源文件路径列表
destination_folder: 目标文件夹路径
overwrite: 是否覆盖已存在的文件
Returns:
复制操作结果信息
"""
try:
# 验证目标文件夹
dest_path = Path(destination_folder)
if not dest_path.exists():
# 创建目标文件夹
dest_path.mkdir(parents=True, exist_ok=True)
logger.info(f"创建目标文件夹: {destination_folder}")
elif not dest_path.is_dir():
return f"错误: 目标路径 '{destination_folder}' 不是一个目录"
# 验证源文件路径
if not source_paths:
return "错误: 没有提供源文件路径"
copied_files = []
skipped_files = []
failed_files = []
for source_path in source_paths:
try:
src_path = Path(source_path)
# 检查源文件是否存在
if not src_path.exists():
failed_files.append(f"{source_path} (文件不存在)")
continue
# 只处理文件,不处理目录
if not src_path.is_file():
failed_files.append(f"{source_path} (不是文件)")
continue
# 构建目标文件路径
dest_file_path = dest_path / src_path.name
# 检查目标文件是否已存在
if dest_file_path.exists() and not overwrite:
skipped_files.append(f"{src_path.name} (文件已存在)")
continue
# 复制文件
shutil.copy2(src_path, dest_file_path)
file_size = dest_file_path.stat().st_size
copied_files.append(f"{src_path.name} ({file_size} 字节)")
except PermissionError:
failed_files.append(f"{source_path} (权限不足)")
except Exception as e:
failed_files.append(f"{source_path} ({str(e)})")
# 构建结果信息
result_parts = []
if copied_files:
result_parts.append(f"成功复制 {len(copied_files)} 个文件到 '{destination_folder}':")
result_parts.extend([f" - {file}" for file in copied_files])
if skipped_files:
result_parts.append(f"\n跳过 {len(skipped_files)} 个文件:")
result_parts.extend([f" - {file}" for file in skipped_files])
if failed_files:
result_parts.append(f"\n失败 {len(failed_files)} 个文件:")
result_parts.extend([f" - {file}" for file in failed_files])
if not copied_files and not skipped_files and not failed_files:
return "没有文件被处理"
return "\n".join(result_parts)
except PermissionError:
return f"错误: 没有权限访问目标文件夹 '{destination_folder}'"
except Exception as e:
logger.error(f"复制文件失败: {e}")
return f"错误: 复制文件时发生异常: {str(e)}"