Skip to main content
Glama
cnYui

File Manager MCP Server

by cnYui
tools.py10.8 kB
"""文件管理器MCP工具实现 这个模块包含了所有文件系统操作工具的具体实现。 """ import os import shutil import glob from pathlib import Path from typing import List import logging logger = logging.getLogger(__name__) async def list_directory(path: str, show_hidden: bool = False) -> str: """列出目录内容 Args: path: 目录路径 show_hidden: 是否显示隐藏文件 Returns: 目录内容的字符串表示 """ try: # 验证路径 dir_path = Path(path) if not dir_path.exists(): return f"错误: 路径 '{path}' 不存在" if not dir_path.is_dir(): return f"错误: '{path}' 不是一个目录" # 获取目录内容 items = [] for item in dir_path.iterdir(): # 跳过隐藏文件(如果不显示隐藏文件) if not show_hidden and item.name.startswith('.'): continue item_type = "目录" if item.is_dir() else "文件" try: size = item.stat().st_size if item.is_file() else "-" items.append(f"{item_type}: {item.name} ({size} 字节)" if size != "-" else f"{item_type}: {item.name}") except (OSError, PermissionError): items.append(f"{item_type}: {item.name} (无法访问)") if not items: return f"目录 '{path}' 为空,没有内容" result = f"目录 '{path}' 的内容:\n" result += "\n".join(sorted(items)) return result except PermissionError: return f"错误: 没有权限访问目录 '{path}'" except Exception as e: logger.error(f"列出目录失败: {e}") return f"错误: 列出目录时发生异常: {str(e)}" async def read_file(path: str, encoding: str = "utf-8") -> str: """读取文件内容 Args: path: 文件路径 encoding: 文件编码 Returns: 文件内容或错误信息 """ try: file_path = Path(path) if not file_path.exists(): return f"错误: 文件 '{path}' 不存在" if not file_path.is_file(): return f"错误: '{path}' 不是一个文件" # 检查文件大小(限制为10MB) file_size = file_path.stat().st_size if file_size > 10 * 1024 * 1024: # 10MB return f"错误: 文件 '{path}' 太大 ({file_size} 字节),超过10MB限制" with open(file_path, 'r', encoding=encoding) as f: content = f.read() return f"文件 '{path}' 的内容:\n{content}" except UnicodeDecodeError: return f"错误: 无法使用 {encoding} 编码读取文件 '{path}',可能是二进制文件" except PermissionError: return f"错误: 没有权限读取文件 '{path}'" except Exception as e: logger.error(f"读取文件失败: {e}") return f"错误: 读取文件时发生异常: {str(e)}" async def create_file(path: str, content: str = "", encoding: str = "utf-8") -> str: """创建文件 Args: path: 文件路径 content: 文件内容 encoding: 文件编码 Returns: 操作结果信息 """ try: file_path = Path(path) # 创建父目录(如果不存在) file_path.parent.mkdir(parents=True, exist_ok=True) # 检查文件是否已存在 if file_path.exists(): return f"错误: 文件 '{path}' 已存在" # 创建文件 with open(file_path, 'w', encoding=encoding) as f: f.write(content) file_size = file_path.stat().st_size return f"成功创建文件 '{path}' ({file_size} 字节)" except PermissionError: return f"错误: 没有权限在 '{path}' 创建文件" except Exception as e: logger.error(f"创建文件失败: {e}") return f"错误: 创建文件时发生异常: {str(e)}" async def delete_file(path: str, recursive: bool = False) -> str: """删除文件或目录 Args: path: 文件或目录路径 recursive: 是否递归删除目录 Returns: 操作结果信息 """ try: target_path = Path(path) if not target_path.exists(): return f"错误: 路径 '{path}' 不存在" if target_path.is_file(): target_path.unlink() return f"成功删除文件 '{path}'" elif target_path.is_dir(): if recursive: shutil.rmtree(target_path) return f"成功递归删除目录 '{path}' 及其所有内容" else: # 检查目录是否为空 if any(target_path.iterdir()): return f"错误: 目录 '{path}' 不为空,请使用 recursive=true 参数" target_path.rmdir() return f"成功删除空目录 '{path}'" else: return f"错误: '{path}' 既不是文件也不是目录" except PermissionError: return f"错误: 没有权限删除 '{path}'" except Exception as e: logger.error(f"删除失败: {e}") return f"错误: 删除时发生异常: {str(e)}" async def search_files(directory: str, pattern: str, recursive: bool = True) -> str: """搜索文件 Args: directory: 搜索目录 pattern: 搜索模式(支持通配符) recursive: 是否递归搜索 Returns: 搜索结果 """ try: search_path = Path(directory) if not search_path.exists(): return f"错误: 目录 '{directory}' 不存在" if not search_path.is_dir(): return f"错误: '{directory}' 不是一个目录" # 构建搜索模式 if recursive: search_pattern = str(search_path / "**" / pattern) matches = glob.glob(search_pattern, recursive=True) else: search_pattern = str(search_path / pattern) matches = glob.glob(search_pattern) if not matches: return f"在目录 '{directory}' 中没有找到匹配 '{pattern}' 的文件" # 整理结果 results = [] for match in sorted(matches): match_path = Path(match) relative_path = match_path.relative_to(search_path) item_type = "目录" if match_path.is_dir() else "文件" try: size = match_path.stat().st_size if match_path.is_file() else "-" size_info = f" ({size} 字节)" if size != "-" else "" results.append(f"{item_type}: {relative_path}{size_info}") except (OSError, PermissionError): results.append(f"{item_type}: {relative_path} (无法访问)") result = f"在目录 '{directory}' 中搜索 '{pattern}' 的结果 ({len(results)} 个匹配):\n" result += "\n".join(results) return result except PermissionError: return f"错误: 没有权限搜索目录 '{directory}'" except Exception as e: logger.error(f"搜索文件失败: {e}") return f"错误: 搜索文件时发生异常: {str(e)}" async def copy_files(source_paths: List[str], destination_folder: str, overwrite: bool = False) -> str: """复制文件到目标文件夹 Args: source_paths: 源文件路径列表 destination_folder: 目标文件夹路径 overwrite: 是否覆盖已存在的文件 Returns: 复制操作结果信息 """ try: # 验证目标文件夹 dest_path = Path(destination_folder) if not dest_path.exists(): # 创建目标文件夹 dest_path.mkdir(parents=True, exist_ok=True) logger.info(f"创建目标文件夹: {destination_folder}") elif not dest_path.is_dir(): return f"错误: 目标路径 '{destination_folder}' 不是一个目录" # 验证源文件路径 if not source_paths: return "错误: 没有提供源文件路径" copied_files = [] skipped_files = [] failed_files = [] for source_path in source_paths: try: src_path = Path(source_path) # 检查源文件是否存在 if not src_path.exists(): failed_files.append(f"{source_path} (文件不存在)") continue # 只处理文件,不处理目录 if not src_path.is_file(): failed_files.append(f"{source_path} (不是文件)") continue # 构建目标文件路径 dest_file_path = dest_path / src_path.name # 检查目标文件是否已存在 if dest_file_path.exists() and not overwrite: skipped_files.append(f"{src_path.name} (文件已存在)") continue # 复制文件 shutil.copy2(src_path, dest_file_path) file_size = dest_file_path.stat().st_size copied_files.append(f"{src_path.name} ({file_size} 字节)") except PermissionError: failed_files.append(f"{source_path} (权限不足)") except Exception as e: failed_files.append(f"{source_path} ({str(e)})") # 构建结果信息 result_parts = [] if copied_files: result_parts.append(f"成功复制 {len(copied_files)} 个文件到 '{destination_folder}':") result_parts.extend([f" - {file}" for file in copied_files]) if skipped_files: result_parts.append(f"\n跳过 {len(skipped_files)} 个文件:") result_parts.extend([f" - {file}" for file in skipped_files]) if failed_files: result_parts.append(f"\n失败 {len(failed_files)} 个文件:") result_parts.extend([f" - {file}" for file in failed_files]) if not copied_files and not skipped_files and not failed_files: return "没有文件被处理" return "\n".join(result_parts) except PermissionError: return f"错误: 没有权限访问目标文件夹 '{destination_folder}'" except Exception as e: logger.error(f"复制文件失败: {e}") return f"错误: 复制文件时发生异常: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cnYui/mcpDevelop'

If you have feedback or need assistance with the MCP directory API, please join our Discord server