"""File operations manager for safe file system operations."""
import asyncio
import logging
import mimetypes
import os
from pathlib import Path
from typing import Optional, List
import fnmatch
from .config import Config
from .safety import SafetyManager
from .models import (
FileReadResult, FileWriteResult, FileUpdateResult,
DirectoryListResult, FileDeleteResult, FileInfoResult,
FileMetadata, ErrorResponse, DirectoryEntry
)
logger = logging.getLogger(__name__)
class FileOperationsManager:
"""Manages file operations with safety checks and error handling."""
def __init__(self, config: Config, safety_manager: SafetyManager):
"""Initialize FileOperationsManager."""
self.config = config
self.safety = safety_manager
# Initialize magic for MIME type detection
try:
import magic
self.magic = magic.Magic(mime=True)
except Exception as e:
logger.warning(f"Failed to initialize python-magic: {e}")
self.magic = None
async def read_file(self, path: str) -> FileReadResult:
"""Read file contents with metadata."""
try:
# Validate path and safety
if not self.safety.validate_path(path):
return FileReadResult(
content=None,
metadata=None,
success=False,
error=ErrorResponse(
error_type="SecurityError",
message="Access to this path is not allowed",
recovery_suggestions=["Check if path is in protected directories"]
)
)
if not self.safety.is_safe_operation("read", path):
return FileReadResult(
content=None,
metadata=None,
success=False,
error=ErrorResponse(
error_type="SecurityError",
message="Read operation not allowed for this file",
recovery_suggestions=["Check file size limits and permissions"]
)
)
file_path = Path(path).resolve()
# Check if file exists
if not file_path.exists():
return FileReadResult(
content=None,
metadata=None,
success=False,
error=ErrorResponse(
error_type="FileNotFound",
message=f"File does not exist: {path}",
recovery_suggestions=["Check the file path", "Ensure the file exists"]
)
)
# Check if it's a directory
if file_path.is_dir():
return FileReadResult(
content=None,
metadata=None,
success=False,
error=ErrorResponse(
error_type="IsDirectory",
message=f"Path is a directory, not a file: {path}",
recovery_suggestions=["Use list_directory to view directory contents"]
)
)
# Detect MIME type
mime_type = self._detect_mime_type(file_path)
# Check if file is binary
is_binary = self._is_binary_file(file_path, mime_type)
# Get file metadata
metadata = FileMetadata.from_path(str(file_path), mime_type)
if is_binary:
return FileReadResult(
content=None,
metadata=metadata,
success=True,
is_binary=True
)
# Read text file
content, encoding = await self._read_text_file(file_path)
return FileReadResult(
content=content,
metadata=metadata,
success=True,
is_binary=False,
encoding=encoding
)
except Exception as e:
logger.error(f"Failed to read file {path}: {e}")
return FileReadResult(
content=None,
metadata=None,
success=False,
error=ErrorResponse(
error_type="ReadError",
message=f"Failed to read file: {str(e)}",
recovery_suggestions=["Check file permissions", "Ensure file is not locked"]
)
)
async def write_file(self, path: str, content: str, overwrite: bool = False) -> FileWriteResult:
"""Create new file with content."""
try:
# Validate path and safety
if not self.safety.validate_path(path):
return FileWriteResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="SecurityError",
message="Access to this path is not allowed"
)
)
if not self.safety.is_safe_operation("write", path):
return FileWriteResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="SecurityError",
message="Write operation not allowed for this path"
)
)
# Validate file extension
if not self.safety.validate_file_extension(path):
return FileWriteResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="ExtensionError",
message="File extension not allowed",
details={"allowed_extensions": self.config.allowed_extensions}
)
)
file_path = Path(path).resolve()
# Check if file already exists
if file_path.exists() and not overwrite:
return FileWriteResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="FileExists",
message="File already exists and overwrite is not enabled",
recovery_suggestions=["Set overwrite=True to replace existing file"]
)
)
# Create parent directories if needed
file_path.parent.mkdir(parents=True, exist_ok=True)
# Create backup if file exists
backup_path = None
if file_path.exists():
backup_path = self.safety.create_backup(str(file_path))
# Write file
bytes_written = await self._write_text_file(file_path, content)
# Get metadata
mime_type = self._detect_mime_type(file_path)
metadata = FileMetadata.from_path(str(file_path), mime_type)
return FileWriteResult(
path=str(file_path),
metadata=metadata,
success=True,
backup_created=backup_path,
bytes_written=bytes_written
)
except Exception as e:
logger.error(f"Failed to write file {path}: {e}")
return FileWriteResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="WriteError",
message=f"Failed to write file: {str(e)}",
recovery_suggestions=["Check directory permissions", "Ensure sufficient disk space"]
)
)
async def update_file(self, path: str, content: str) -> FileUpdateResult:
"""Update existing file with backup."""
try:
# Validate path and safety
if not self.safety.validate_path(path):
return FileUpdateResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="SecurityError",
message="Access to this path is not allowed"
)
)
if not self.safety.is_safe_operation("update", path):
return FileUpdateResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="SecurityError",
message="Update operation not allowed for this path"
)
)
file_path = Path(path).resolve()
# Check if file exists
if not file_path.exists():
return FileUpdateResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="FileNotFound",
message="File does not exist",
recovery_suggestions=["Use write_file to create new file"]
)
)
# Check if it's a file (not directory)
if not file_path.is_file():
return FileUpdateResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="NotAFile",
message="Path is not a regular file"
)
)
# Create backup
backup_path = self.safety.create_backup(str(file_path))
try:
# Write updated content
bytes_written = await self._write_text_file(file_path, content)
# Get metadata
mime_type = self._detect_mime_type(file_path)
metadata = FileMetadata.from_path(str(file_path), mime_type)
return FileUpdateResult(
path=str(file_path),
metadata=metadata,
success=True,
backup_created=backup_path,
bytes_written=bytes_written
)
except Exception as write_error:
# Restore from backup if write failed
restored = False
if backup_path:
restored = self.safety.restore_from_backup(backup_path, str(file_path))
return FileUpdateResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="UpdateError",
message=f"Failed to update file: {str(write_error)}",
details={"backup_restored": restored}
),
restored_from_backup=restored
)
except Exception as e:
logger.error(f"Failed to update file {path}: {e}")
return FileUpdateResult(
path=path,
metadata=None,
success=False,
error=ErrorResponse(
error_type="UpdateError",
message=f"Failed to update file: {str(e)}"
)
)
def _detect_mime_type(self, file_path: Path) -> str:
"""Detect MIME type of file."""
try:
if self.magic:
return self.magic.from_file(str(file_path))
else:
# Fallback to mimetypes module
mime_type, _ = mimetypes.guess_type(str(file_path))
return mime_type or "application/octet-stream"
except Exception:
return "application/octet-stream"
def _is_binary_file(self, file_path: Path, mime_type: str) -> bool:
"""Check if file is binary."""
# Check MIME type first
if mime_type.startswith("text/"):
return False
# Common text MIME types that don't start with "text/"
text_types = [
"application/json",
"application/xml",
"application/javascript",
"application/x-yaml",
"application/x-sh"
]
if mime_type in text_types:
return False
# Check file content for binary markers
try:
with open(file_path, 'rb') as f:
chunk = f.read(1024)
return b'\x00' in chunk
except Exception:
return True
async def _read_text_file(self, file_path: Path) -> tuple[str, str]:
"""Read text file with encoding detection."""
encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
return content, encoding
except UnicodeDecodeError:
continue
# If all encodings fail, read as binary and decode with errors='replace'
with open(file_path, 'rb') as f:
raw_content = f.read()
content = raw_content.decode('utf-8', errors='replace')
return content, 'utf-8-with-errors'
async def _write_text_file(self, file_path: Path, content: str) -> int:
"""Write text file with UTF-8 encoding."""
encoded_content = content.encode('utf-8')
with open(file_path, 'wb') as f:
f.write(encoded_content)
return len(encoded_content)
async def list_directory(self, path: str, pattern: Optional[str] = None, recursive: bool = False) -> DirectoryListResult:
"""List directory contents with filtering."""
try:
# Validate path and safety
if not self.safety.validate_path(path):
return DirectoryListResult(
path=path,
entries=[],
success=False,
total_entries=0,
filtered_entries=0,
error=ErrorResponse(
error_type="SecurityError",
message="Access to this path is not allowed"
)
)
dir_path = Path(path).resolve()
# Check if directory exists
if not dir_path.exists():
return DirectoryListResult(
path=path,
entries=[],
success=False,
total_entries=0,
filtered_entries=0,
error=ErrorResponse(
error_type="DirectoryNotFound",
message=f"Directory does not exist: {path}",
recovery_suggestions=["Check the directory path"]
)
)
# Check if it's a directory
if not dir_path.is_dir():
return DirectoryListResult(
path=path,
entries=[],
success=False,
total_entries=0,
filtered_entries=0,
error=ErrorResponse(
error_type="NotADirectory",
message=f"Path is not a directory: {path}",
recovery_suggestions=["Use read_file for file operations"]
)
)
entries = []
total_entries = 0
max_depth_reached = False
# List directory contents
if recursive:
entries, total_entries, max_depth_reached = await self._list_recursive(
dir_path, pattern, 0, self.config.max_recursion_depth
)
else:
entries, total_entries = await self._list_single_directory(dir_path, pattern)
return DirectoryListResult(
path=str(dir_path),
entries=entries,
success=True,
total_entries=total_entries,
filtered_entries=len(entries),
pattern_used=pattern,
recursive=recursive,
max_depth_reached=max_depth_reached
)
except Exception as e:
logger.error(f"Failed to list directory {path}: {e}")
return DirectoryListResult(
path=path,
entries=[],
success=False,
total_entries=0,
filtered_entries=0,
error=ErrorResponse(
error_type="ListError",
message=f"Failed to list directory: {str(e)}",
recovery_suggestions=["Check directory permissions"]
)
)
async def _list_single_directory(self, dir_path: Path, pattern: Optional[str]) -> tuple[List[DirectoryEntry], int]:
"""List contents of a single directory."""
entries = []
total_entries = 0
try:
for item in dir_path.iterdir():
total_entries += 1
# Apply pattern filter
if pattern and not fnmatch.fnmatch(item.name, pattern):
continue
try:
mime_type = self._detect_mime_type(item) if item.is_file() else "directory"
metadata = FileMetadata.from_path(str(item), mime_type)
entry = DirectoryEntry(
name=item.name,
path=str(item),
metadata=metadata
)
entries.append(entry)
except Exception as e:
logger.warning(f"Failed to get metadata for {item}: {e}")
continue
except PermissionError:
logger.warning(f"Permission denied accessing directory: {dir_path}")
return entries, total_entries
async def _list_recursive(self, dir_path: Path, pattern: Optional[str], current_depth: int, max_depth: int) -> tuple[List[DirectoryEntry], int, bool]:
"""List directory contents recursively."""
entries = []
total_entries = 0
max_depth_reached = False
if current_depth >= max_depth:
return entries, total_entries, True
try:
for item in dir_path.iterdir():
total_entries += 1
# Apply pattern filter
if pattern and not fnmatch.fnmatch(item.name, pattern):
# Still recurse into directories even if they don't match pattern
if item.is_dir() and current_depth < max_depth - 1:
sub_entries, sub_total, sub_max_reached = await self._list_recursive(
item, pattern, current_depth + 1, max_depth
)
entries.extend(sub_entries)
total_entries += sub_total
if sub_max_reached:
max_depth_reached = True
continue
try:
mime_type = self._detect_mime_type(item) if item.is_file() else "directory"
metadata = FileMetadata.from_path(str(item), mime_type)
entry = DirectoryEntry(
name=item.name,
path=str(item),
metadata=metadata
)
entries.append(entry)
# Recurse into subdirectories
if item.is_dir() and current_depth < max_depth - 1:
sub_entries, sub_total, sub_max_reached = await self._list_recursive(
item, pattern, current_depth + 1, max_depth
)
entries.extend(sub_entries)
total_entries += sub_total
if sub_max_reached:
max_depth_reached = True
except Exception as e:
logger.warning(f"Failed to process {item}: {e}")
continue
except PermissionError:
logger.warning(f"Permission denied accessing directory: {dir_path}")
return entries, total_entries, max_depth_reached
async def delete_file(self, path: str) -> FileDeleteResult:
"""Delete file safely with backup."""
try:
# Validate path and safety
if not self.safety.validate_path(path):
return FileDeleteResult(
path=path,
success=False,
error=ErrorResponse(
error_type="SecurityError",
message="Access to this path is not allowed"
)
)
if not self.safety.is_safe_operation("delete", path):
return FileDeleteResult(
path=path,
success=False,
error=ErrorResponse(
error_type="SecurityError",
message="Delete operation not allowed for this path"
)
)
file_path = Path(path).resolve()
# Check if file exists
if not file_path.exists():
return FileDeleteResult(
path=path,
success=False,
error=ErrorResponse(
error_type="FileNotFound",
message="File or directory does not exist"
)
)
was_directory = file_path.is_dir()
files_deleted = 1
# Count files if directory
if was_directory:
try:
files_deleted = sum(1 for _ in file_path.rglob('*') if _.is_file())
except Exception:
files_deleted = 1 # Fallback
# Create backup (move to backup location instead of permanent deletion)
backup_location = self.safety.create_backup(str(file_path))
if backup_location:
# Remove original after successful backup
if file_path.is_file():
file_path.unlink()
elif file_path.is_dir():
import shutil
shutil.rmtree(file_path)
return FileDeleteResult(
path=str(file_path),
success=True,
backup_location=backup_location,
was_directory=was_directory,
files_deleted=files_deleted
)
else:
return FileDeleteResult(
path=path,
success=False,
error=ErrorResponse(
error_type="BackupError",
message="Failed to create backup before deletion",
recovery_suggestions=["Check backup directory permissions"]
)
)
except Exception as e:
logger.error(f"Failed to delete {path}: {e}")
return FileDeleteResult(
path=path,
success=False,
error=ErrorResponse(
error_type="DeleteError",
message=f"Failed to delete: {str(e)}",
recovery_suggestions=["Check file permissions", "Ensure file is not in use"]
)
)
async def get_file_info(self, path: str) -> FileInfoResult:
"""Get detailed file information."""
try:
# Validate path and safety
if not self.safety.validate_path(path):
return FileInfoResult(
path=path,
metadata=None,
success=False,
exists=False,
error=ErrorResponse(
error_type="SecurityError",
message="Access to this path is not allowed"
)
)
file_path = Path(path).resolve()
# Check if file exists
if not file_path.exists():
return FileInfoResult(
path=str(file_path),
metadata=None,
success=True,
exists=False
)
# Get metadata
mime_type = self._detect_mime_type(file_path) if file_path.is_file() else "directory"
metadata = FileMetadata.from_path(str(file_path), mime_type)
return FileInfoResult(
path=str(file_path),
metadata=metadata,
success=True,
exists=True
)
except Exception as e:
logger.error(f"Failed to get file info for {path}: {e}")
return FileInfoResult(
path=path,
metadata=None,
success=False,
exists=False,
error=ErrorResponse(
error_type="InfoError",
message=f"Failed to get file info: {str(e)}"
)
)