Skip to main content
Glama
file_operations.py26.9 kB
"""File operations manager for safe file system operations.""" import asyncio import logging import mimetypes import os from pathlib import Path from typing import Optional, List import fnmatch from .config import Config from .safety import SafetyManager from .models import ( FileReadResult, FileWriteResult, FileUpdateResult, DirectoryListResult, FileDeleteResult, FileInfoResult, FileMetadata, ErrorResponse, DirectoryEntry ) logger = logging.getLogger(__name__) class FileOperationsManager: """Manages file operations with safety checks and error handling.""" def __init__(self, config: Config, safety_manager: SafetyManager): """Initialize FileOperationsManager.""" self.config = config self.safety = safety_manager # Initialize magic for MIME type detection try: import magic self.magic = magic.Magic(mime=True) except Exception as e: logger.warning(f"Failed to initialize python-magic: {e}") self.magic = None async def read_file(self, path: str) -> FileReadResult: """Read file contents with metadata.""" try: # Validate path and safety if not self.safety.validate_path(path): return FileReadResult( content=None, metadata=None, success=False, error=ErrorResponse( error_type="SecurityError", message="Access to this path is not allowed", recovery_suggestions=["Check if path is in protected directories"] ) ) if not self.safety.is_safe_operation("read", path): return FileReadResult( content=None, metadata=None, success=False, error=ErrorResponse( error_type="SecurityError", message="Read operation not allowed for this file", recovery_suggestions=["Check file size limits and permissions"] ) ) file_path = Path(path).resolve() # Check if file exists if not file_path.exists(): return FileReadResult( content=None, metadata=None, success=False, error=ErrorResponse( error_type="FileNotFound", message=f"File does not exist: {path}", recovery_suggestions=["Check the file path", "Ensure the file exists"] ) ) # Check if it's a directory if file_path.is_dir(): return FileReadResult( content=None, metadata=None, success=False, error=ErrorResponse( error_type="IsDirectory", message=f"Path is a directory, not a file: {path}", recovery_suggestions=["Use list_directory to view directory contents"] ) ) # Detect MIME type mime_type = self._detect_mime_type(file_path) # Check if file is binary is_binary = self._is_binary_file(file_path, mime_type) # Get file metadata metadata = FileMetadata.from_path(str(file_path), mime_type) if is_binary: return FileReadResult( content=None, metadata=metadata, success=True, is_binary=True ) # Read text file content, encoding = await self._read_text_file(file_path) return FileReadResult( content=content, metadata=metadata, success=True, is_binary=False, encoding=encoding ) except Exception as e: logger.error(f"Failed to read file {path}: {e}") return FileReadResult( content=None, metadata=None, success=False, error=ErrorResponse( error_type="ReadError", message=f"Failed to read file: {str(e)}", recovery_suggestions=["Check file permissions", "Ensure file is not locked"] ) ) async def write_file(self, path: str, content: str, overwrite: bool = False) -> FileWriteResult: """Create new file with content.""" try: # Validate path and safety if not self.safety.validate_path(path): return FileWriteResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="SecurityError", message="Access to this path is not allowed" ) ) if not self.safety.is_safe_operation("write", path): return FileWriteResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="SecurityError", message="Write operation not allowed for this path" ) ) # Validate file extension if not self.safety.validate_file_extension(path): return FileWriteResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="ExtensionError", message="File extension not allowed", details={"allowed_extensions": self.config.allowed_extensions} ) ) file_path = Path(path).resolve() # Check if file already exists if file_path.exists() and not overwrite: return FileWriteResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="FileExists", message="File already exists and overwrite is not enabled", recovery_suggestions=["Set overwrite=True to replace existing file"] ) ) # Create parent directories if needed file_path.parent.mkdir(parents=True, exist_ok=True) # Create backup if file exists backup_path = None if file_path.exists(): backup_path = self.safety.create_backup(str(file_path)) # Write file bytes_written = await self._write_text_file(file_path, content) # Get metadata mime_type = self._detect_mime_type(file_path) metadata = FileMetadata.from_path(str(file_path), mime_type) return FileWriteResult( path=str(file_path), metadata=metadata, success=True, backup_created=backup_path, bytes_written=bytes_written ) except Exception as e: logger.error(f"Failed to write file {path}: {e}") return FileWriteResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="WriteError", message=f"Failed to write file: {str(e)}", recovery_suggestions=["Check directory permissions", "Ensure sufficient disk space"] ) ) async def update_file(self, path: str, content: str) -> FileUpdateResult: """Update existing file with backup.""" try: # Validate path and safety if not self.safety.validate_path(path): return FileUpdateResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="SecurityError", message="Access to this path is not allowed" ) ) if not self.safety.is_safe_operation("update", path): return FileUpdateResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="SecurityError", message="Update operation not allowed for this path" ) ) file_path = Path(path).resolve() # Check if file exists if not file_path.exists(): return FileUpdateResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="FileNotFound", message="File does not exist", recovery_suggestions=["Use write_file to create new file"] ) ) # Check if it's a file (not directory) if not file_path.is_file(): return FileUpdateResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="NotAFile", message="Path is not a regular file" ) ) # Create backup backup_path = self.safety.create_backup(str(file_path)) try: # Write updated content bytes_written = await self._write_text_file(file_path, content) # Get metadata mime_type = self._detect_mime_type(file_path) metadata = FileMetadata.from_path(str(file_path), mime_type) return FileUpdateResult( path=str(file_path), metadata=metadata, success=True, backup_created=backup_path, bytes_written=bytes_written ) except Exception as write_error: # Restore from backup if write failed restored = False if backup_path: restored = self.safety.restore_from_backup(backup_path, str(file_path)) return FileUpdateResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="UpdateError", message=f"Failed to update file: {str(write_error)}", details={"backup_restored": restored} ), restored_from_backup=restored ) except Exception as e: logger.error(f"Failed to update file {path}: {e}") return FileUpdateResult( path=path, metadata=None, success=False, error=ErrorResponse( error_type="UpdateError", message=f"Failed to update file: {str(e)}" ) ) def _detect_mime_type(self, file_path: Path) -> str: """Detect MIME type of file.""" try: if self.magic: return self.magic.from_file(str(file_path)) else: # Fallback to mimetypes module mime_type, _ = mimetypes.guess_type(str(file_path)) return mime_type or "application/octet-stream" except Exception: return "application/octet-stream" def _is_binary_file(self, file_path: Path, mime_type: str) -> bool: """Check if file is binary.""" # Check MIME type first if mime_type.startswith("text/"): return False # Common text MIME types that don't start with "text/" text_types = [ "application/json", "application/xml", "application/javascript", "application/x-yaml", "application/x-sh" ] if mime_type in text_types: return False # Check file content for binary markers try: with open(file_path, 'rb') as f: chunk = f.read(1024) return b'\x00' in chunk except Exception: return True async def _read_text_file(self, file_path: Path) -> tuple[str, str]: """Read text file with encoding detection.""" encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() return content, encoding except UnicodeDecodeError: continue # If all encodings fail, read as binary and decode with errors='replace' with open(file_path, 'rb') as f: raw_content = f.read() content = raw_content.decode('utf-8', errors='replace') return content, 'utf-8-with-errors' async def _write_text_file(self, file_path: Path, content: str) -> int: """Write text file with UTF-8 encoding.""" encoded_content = content.encode('utf-8') with open(file_path, 'wb') as f: f.write(encoded_content) return len(encoded_content) async def list_directory(self, path: str, pattern: Optional[str] = None, recursive: bool = False) -> DirectoryListResult: """List directory contents with filtering.""" try: # Validate path and safety if not self.safety.validate_path(path): return DirectoryListResult( path=path, entries=[], success=False, total_entries=0, filtered_entries=0, error=ErrorResponse( error_type="SecurityError", message="Access to this path is not allowed" ) ) dir_path = Path(path).resolve() # Check if directory exists if not dir_path.exists(): return DirectoryListResult( path=path, entries=[], success=False, total_entries=0, filtered_entries=0, error=ErrorResponse( error_type="DirectoryNotFound", message=f"Directory does not exist: {path}", recovery_suggestions=["Check the directory path"] ) ) # Check if it's a directory if not dir_path.is_dir(): return DirectoryListResult( path=path, entries=[], success=False, total_entries=0, filtered_entries=0, error=ErrorResponse( error_type="NotADirectory", message=f"Path is not a directory: {path}", recovery_suggestions=["Use read_file for file operations"] ) ) entries = [] total_entries = 0 max_depth_reached = False # List directory contents if recursive: entries, total_entries, max_depth_reached = await self._list_recursive( dir_path, pattern, 0, self.config.max_recursion_depth ) else: entries, total_entries = await self._list_single_directory(dir_path, pattern) return DirectoryListResult( path=str(dir_path), entries=entries, success=True, total_entries=total_entries, filtered_entries=len(entries), pattern_used=pattern, recursive=recursive, max_depth_reached=max_depth_reached ) except Exception as e: logger.error(f"Failed to list directory {path}: {e}") return DirectoryListResult( path=path, entries=[], success=False, total_entries=0, filtered_entries=0, error=ErrorResponse( error_type="ListError", message=f"Failed to list directory: {str(e)}", recovery_suggestions=["Check directory permissions"] ) ) async def _list_single_directory(self, dir_path: Path, pattern: Optional[str]) -> tuple[List[DirectoryEntry], int]: """List contents of a single directory.""" entries = [] total_entries = 0 try: for item in dir_path.iterdir(): total_entries += 1 # Apply pattern filter if pattern and not fnmatch.fnmatch(item.name, pattern): continue try: mime_type = self._detect_mime_type(item) if item.is_file() else "directory" metadata = FileMetadata.from_path(str(item), mime_type) entry = DirectoryEntry( name=item.name, path=str(item), metadata=metadata ) entries.append(entry) except Exception as e: logger.warning(f"Failed to get metadata for {item}: {e}") continue except PermissionError: logger.warning(f"Permission denied accessing directory: {dir_path}") return entries, total_entries async def _list_recursive(self, dir_path: Path, pattern: Optional[str], current_depth: int, max_depth: int) -> tuple[List[DirectoryEntry], int, bool]: """List directory contents recursively.""" entries = [] total_entries = 0 max_depth_reached = False if current_depth >= max_depth: return entries, total_entries, True try: for item in dir_path.iterdir(): total_entries += 1 # Apply pattern filter if pattern and not fnmatch.fnmatch(item.name, pattern): # Still recurse into directories even if they don't match pattern if item.is_dir() and current_depth < max_depth - 1: sub_entries, sub_total, sub_max_reached = await self._list_recursive( item, pattern, current_depth + 1, max_depth ) entries.extend(sub_entries) total_entries += sub_total if sub_max_reached: max_depth_reached = True continue try: mime_type = self._detect_mime_type(item) if item.is_file() else "directory" metadata = FileMetadata.from_path(str(item), mime_type) entry = DirectoryEntry( name=item.name, path=str(item), metadata=metadata ) entries.append(entry) # Recurse into subdirectories if item.is_dir() and current_depth < max_depth - 1: sub_entries, sub_total, sub_max_reached = await self._list_recursive( item, pattern, current_depth + 1, max_depth ) entries.extend(sub_entries) total_entries += sub_total if sub_max_reached: max_depth_reached = True except Exception as e: logger.warning(f"Failed to process {item}: {e}") continue except PermissionError: logger.warning(f"Permission denied accessing directory: {dir_path}") return entries, total_entries, max_depth_reached async def delete_file(self, path: str) -> FileDeleteResult: """Delete file safely with backup.""" try: # Validate path and safety if not self.safety.validate_path(path): return FileDeleteResult( path=path, success=False, error=ErrorResponse( error_type="SecurityError", message="Access to this path is not allowed" ) ) if not self.safety.is_safe_operation("delete", path): return FileDeleteResult( path=path, success=False, error=ErrorResponse( error_type="SecurityError", message="Delete operation not allowed for this path" ) ) file_path = Path(path).resolve() # Check if file exists if not file_path.exists(): return FileDeleteResult( path=path, success=False, error=ErrorResponse( error_type="FileNotFound", message="File or directory does not exist" ) ) was_directory = file_path.is_dir() files_deleted = 1 # Count files if directory if was_directory: try: files_deleted = sum(1 for _ in file_path.rglob('*') if _.is_file()) except Exception: files_deleted = 1 # Fallback # Create backup (move to backup location instead of permanent deletion) backup_location = self.safety.create_backup(str(file_path)) if backup_location: # Remove original after successful backup if file_path.is_file(): file_path.unlink() elif file_path.is_dir(): import shutil shutil.rmtree(file_path) return FileDeleteResult( path=str(file_path), success=True, backup_location=backup_location, was_directory=was_directory, files_deleted=files_deleted ) else: return FileDeleteResult( path=path, success=False, error=ErrorResponse( error_type="BackupError", message="Failed to create backup before deletion", recovery_suggestions=["Check backup directory permissions"] ) ) except Exception as e: logger.error(f"Failed to delete {path}: {e}") return FileDeleteResult( path=path, success=False, error=ErrorResponse( error_type="DeleteError", message=f"Failed to delete: {str(e)}", recovery_suggestions=["Check file permissions", "Ensure file is not in use"] ) ) async def get_file_info(self, path: str) -> FileInfoResult: """Get detailed file information.""" try: # Validate path and safety if not self.safety.validate_path(path): return FileInfoResult( path=path, metadata=None, success=False, exists=False, error=ErrorResponse( error_type="SecurityError", message="Access to this path is not allowed" ) ) file_path = Path(path).resolve() # Check if file exists if not file_path.exists(): return FileInfoResult( path=str(file_path), metadata=None, success=True, exists=False ) # Get metadata mime_type = self._detect_mime_type(file_path) if file_path.is_file() else "directory" metadata = FileMetadata.from_path(str(file_path), mime_type) return FileInfoResult( path=str(file_path), metadata=metadata, success=True, exists=True ) except Exception as e: logger.error(f"Failed to get file info for {path}: {e}") return FileInfoResult( path=path, metadata=None, success=False, exists=False, error=ErrorResponse( error_type="InfoError", message=f"Failed to get file info: {str(e)}" ) )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chidvilas1234/Project-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server