MCP Toolbox

"""File operations tools for MCP-Toolbox.""" import re import stat from datetime import datetime from pathlib import Path from typing import Any from mcp_toolbox.app import mcp @mcp.tool( description="Read file content. Args: path (required, Path to the file to read), encoding (optional, File encoding), chunk_size (optional, Size of each chunk in bytes, default: 1MB), chunk_index (optional, Index of the chunk to retrieve, 0-based)" ) async def read_file_content( path: str, encoding: str = "utf-8", chunk_size: int = 1000000, chunk_index: int = 0 ) -> dict[str, Any]: """Read content from a file, with support for chunked reading for large files. Args: path: Path to the file to read encoding: Optional. File encoding (default: utf-8) chunk_size: Optional. Size of each chunk in bytes (default: 1000000, which about 1MB) chunk_index: Optional. Index of the chunk to retrieve, 0-based (default: 0) Returns: Dictionary containing content and metadata, including chunking information """ try: file_path = Path(path).expanduser() if not file_path.exists(): return { "error": f"File not found: {path}", "content": "", "success": False, } if not file_path.is_file(): return { "error": f"Path is not a file: {path}", "content": "", "success": False, } # Get file stats stats = file_path.stat() file_size = stats.st_size # Calculate total chunks total_chunks = (file_size + chunk_size - 1) // chunk_size # Ceiling division # Validate chunk_index if chunk_index < 0 or (file_size > 0 and chunk_index >= total_chunks): return { "error": f"Invalid chunk index: {chunk_index}. Valid range is 0 to {total_chunks - 1}", "content": "", "success": False, "total_chunks": total_chunks, "file_size": file_size, } # Calculate start and end positions for the chunk start_pos = chunk_index * chunk_size end_pos = min(start_pos + chunk_size, file_size) chunk_actual_size = end_pos - start_pos # Read the specified chunk content = "" with open(file_path, "rb") as f: f.seek(start_pos) chunk_bytes = f.read(chunk_actual_size) try: # Try to decode as text content = chunk_bytes.decode(encoding, errors="replace") except UnicodeDecodeError: # If decoding fails, return base64 encoded binary data import base64 content = base64.b64encode(chunk_bytes).decode("ascii") encoding = f"base64 (original: {encoding})" return { "content": content, "size": file_size, "chunk_size": chunk_size, "chunk_index": chunk_index, "chunk_actual_size": chunk_actual_size, "total_chunks": total_chunks, "is_last_chunk": chunk_index == total_chunks - 1, "encoding": encoding, "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(), "success": True, } except UnicodeDecodeError: return { "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.", "content": "", "success": False, } except Exception as e: return { "error": f"Failed to read file: {e!s}", "content": "", "success": False, } @mcp.tool( description="Write content to a file. Args: path (required, Path to the file to write), content (required, Content to write), encoding (optional, File encoding), append (optional, Whether to append to the file)" ) async def write_file_content(path: str, content: str, encoding: str = "utf-8", append: bool = False) -> dict[str, Any]: """Write content to a file. Args: path: Path to the file to write content: Content to write to the file encoding: Optional. File encoding (default: utf-8) append: Optional. Whether to append to the file (default: False) Returns: Dictionary containing success status and metadata """ try: file_path = Path(path).expanduser() # Create parent directories if they don't exist file_path.parent.mkdir(parents=True, exist_ok=True) # Write content to file mode = "a" if append else "w" with open(file_path, mode, encoding=encoding) as f: f.write(content) # Get file stats stats = file_path.stat() return { "path": str(file_path), "size": stats.st_size, "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(), "success": True, } except Exception as e: return { "error": f"Failed to write file: {e!s}", "path": path, "success": False, } @mcp.tool( description="Replace content in a file using regular expressions. Args: path (required, Path to the file), pattern (required, Regular expression pattern), replacement (required, Replacement string), encoding (optional, File encoding), count (optional, Maximum number of replacements)" ) async def replace_in_file( path: str, pattern: str, replacement: str, encoding: str = "utf-8", count: int = 0 ) -> dict[str, Any]: """Replace content in a file using regular expressions. Args: path: Path to the file pattern: Regular expression pattern replacement: Replacement string encoding: Optional. File encoding (default: utf-8) count: Optional. Maximum number of replacements (default: 0, which means all occurrences) Returns: Dictionary containing success status and replacement information """ try: file_path = Path(path).expanduser() if not file_path.exists(): return { "error": f"File not found: {path}", "success": False, "replacements": 0, } if not file_path.is_file(): return { "error": f"Path is not a file: {path}", "success": False, "replacements": 0, } # Read file content with open(file_path, encoding=encoding) as f: content = f.read() # Compile regex pattern try: regex = re.compile(pattern) except re.error as e: return { "error": f"Invalid regular expression: {e!s}", "success": False, "replacements": 0, } # Replace content new_content, replacements = regex.subn(replacement, content, count=count) if replacements > 0: # Write updated content back to file with open(file_path, "w", encoding=encoding) as f: f.write(new_content) return { "path": str(file_path), "replacements": replacements, "success": True, } except UnicodeDecodeError: return { "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.", "success": False, "replacements": 0, } except Exception as e: return { "error": f"Failed to replace content: {e!s}", "success": False, "replacements": 0, } def _format_mode(mode: int) -> str: """Format file mode into a string representation. Args: mode: File mode as an integer Returns: String representation of file permissions """ result = "" # File type if stat.S_ISDIR(mode): result += "d" elif stat.S_ISLNK(mode): result += "l" else: result += "-" # User permissions result += "r" if mode & stat.S_IRUSR else "-" result += "w" if mode & stat.S_IWUSR else "-" result += "x" if mode & stat.S_IXUSR else "-" # Group permissions result += "r" if mode & stat.S_IRGRP else "-" result += "w" if mode & stat.S_IWGRP else "-" result += "x" if mode & stat.S_IXGRP else "-" # Other permissions result += "r" if mode & stat.S_IROTH else "-" result += "w" if mode & stat.S_IWOTH else "-" result += "x" if mode & stat.S_IXOTH else "-" return result def _get_file_info(path: Path) -> dict[str, Any]: """Get detailed information about a file or directory. Args: path: Path to the file or directory Returns: Dictionary containing file information """ stats = path.stat() # Format timestamps mtime = datetime.fromtimestamp(stats.st_mtime).isoformat() ctime = datetime.fromtimestamp(stats.st_ctime).isoformat() atime = datetime.fromtimestamp(stats.st_atime).isoformat() # Get file type if path.is_dir(): file_type = "directory" elif path.is_symlink(): file_type = "symlink" else: file_type = "file" # Format size size = stats.st_size size_str = f"{size} bytes" if size >= 1024: size_str = f"{size / 1024:.2f} KB" if size >= 1024 * 1024: size_str = f"{size / (1024 * 1024):.2f} MB" if size >= 1024 * 1024 * 1024: size_str = f"{size / (1024 * 1024 * 1024):.2f} GB" return { "name": path.name, "path": str(path), "type": file_type, "size": size, "size_formatted": size_str, "permissions": _format_mode(stats.st_mode), "mode": stats.st_mode, "owner": stats.st_uid, "group": stats.st_gid, "created": ctime, "modified": mtime, "accessed": atime, } @mcp.tool( description="List directory contents with detailed information. Args: path (required, Directory path), recursive (optional, Whether to list recursively), max_depth (optional, Maximum recursion depth), include_hidden (optional, Whether to include hidden files)" ) async def list_directory( path: str, recursive: bool = False, max_depth: int = -1, include_hidden: bool = False ) -> dict[str, Any]: """List directory contents with detailed information. Args: path: Directory path recursive: Optional. Whether to list recursively (default: False) max_depth: Optional. Maximum recursion depth (default: -1, which means no limit) include_hidden: Optional. Whether to include hidden files (default: False) Returns: Dictionary containing directory contents and metadata """ try: dir_path = Path(path).expanduser() if not dir_path.exists(): return { "error": f"Directory not found: {path}", "entries": [], "success": False, } if not dir_path.is_dir(): return { "error": f"Path is not a directory: {path}", "entries": [], "success": False, } entries = [] def process_directory(current_path: Path, current_depth: int = 0) -> None: """Process a directory and its contents recursively. Args: current_path: Path to the current directory current_depth: Current recursion depth """ nonlocal entries # Check if we've reached the maximum depth if max_depth >= 0 and current_depth > max_depth: return try: # List directory contents for item in current_path.iterdir(): # Skip hidden files if not included if not include_hidden and item.name.startswith("."): continue # Get file information file_info = _get_file_info(item) file_info["depth"] = current_depth entries.append(file_info) # Recursively process subdirectories if recursive and item.is_dir(): process_directory(item, current_depth + 1) except PermissionError: # Add an entry indicating permission denied entries.append({ "name": current_path.name, "path": str(current_path), "type": "directory", "error": "Permission denied", "depth": current_depth, }) # Start processing from the root directory process_directory(dir_path) return { "path": str(dir_path), "entries": entries, "count": len(entries), "success": True, } except Exception as e: return { "error": f"Failed to list directory: {e!s}", "entries": [], "success": False, }