"""File operations business logic."""
import base64
import logging
import mimetypes
import stat as stat_module
from pathlib import Path
from ..schemas.file import FileEntry, FileListResult, FileContentResult, FileMetadata
from ..schemas.common import OperationResult
from ..core.safety import validate_path, check_file_size, is_binary_file, get_relative_path
logger = logging.getLogger(__name__)
class FileService:
"""Handles all file operations."""
def __init__(self, root: Path, max_file_size_mb: int = 100):
"""
Initialize file service.
Args:
root: Root directory for all operations
max_file_size_mb: Maximum file size in MB for operations
"""
self.root = root
self.max_file_size = max_file_size_mb * 1024 * 1024
def list_files(self, path: str, pattern: str = "*") -> FileListResult:
"""
List all files in a given directory.
Args:
path: Relative path from root directory
pattern: Glob pattern to filter files
Returns:
FileListResult with list of files
Raises:
FileNotFoundError: If directory not found
NotADirectoryError: If path is not a directory
"""
# Validate and resolve path
target_path = validate_path(path, self.root)
if not target_path.exists():
raise FileNotFoundError(f"Directory not found: {path}")
if not target_path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
# List files matching pattern
files = []
for item in target_path.glob(pattern):
if item.is_file():
stat = item.stat()
# Detect if binary by reading first chunk
try:
with open(item, "rb") as f:
sample = f.read(512)
binary = is_binary_file(sample)
except Exception:
binary = True
files.append(
FileEntry(
name=item.name,
path=get_relative_path(item, self.root),
size_bytes=stat.st_size,
created=stat.st_ctime,
modified=stat.st_mtime,
is_binary=binary,
)
)
# Sort by name
files.sort(key=lambda f: f.name.lower())
return FileListResult(
files=files,
total_count=len(files),
)
def read_file(self, path: str, offset: int = 0, limit: int | None = None) -> FileContentResult:
"""
Read file content with optional chunking support.
Args:
path: Relative path to the file
offset: Byte offset to start reading from
limit: Maximum number of bytes to read
Returns:
FileContentResult with file content and metadata
Raises:
FileNotFoundError: If file not found
ValueError: If path is not a file
"""
logger.debug(f"read_file called: path={path}, offset={offset}, limit={limit}")
# Validate and resolve path
target_path = validate_path(path, self.root)
logger.debug(f"Resolved path: {target_path}")
if not target_path.exists():
raise FileNotFoundError(f"File not found: {path}")
if not target_path.is_file():
raise ValueError(f"Not a file: {path}")
# Check file size limit
check_file_size(target_path, self.max_file_size)
# Read file
with open(target_path, "rb") as f:
f.seek(offset)
data = f.read(limit) if limit else f.read()
total_size = target_path.stat().st_size
# Detect binary vs text
binary = is_binary_file(data)
if binary:
content = base64.b64encode(data).decode("ascii")
encoding = None
else:
content = data.decode("utf-8", errors="replace")
encoding = "utf-8"
return FileContentResult(
content=content,
is_binary=binary,
encoding=encoding,
total_size=total_size,
bytes_read=len(data),
offset=offset,
)
def write_file(self, path: str, content: str, is_base64: bool = False) -> OperationResult:
"""
Write or overwrite a file.
Args:
path: Relative path to the file
content: File content (text or base64 encoded binary)
is_base64: If True, content is base64 encoded binary data
Returns:
OperationResult indicating success or failure
"""
logger.debug(
f"write_file called: path={path}, is_base64={is_base64}, content_length={len(content)}"
)
# Validate path
target_path = validate_path(path, self.root)
logger.debug(f"Resolved path: {target_path}")
try:
# Ensure parent directory exists
target_path.parent.mkdir(parents=True, exist_ok=True)
if is_base64:
# Decode base64 and write binary
data = base64.b64decode(content)
with open(target_path, "wb") as f:
f.write(data)
else:
# Write text
with open(target_path, "w", encoding="utf-8") as f:
f.write(content)
size = target_path.stat().st_size
logger.info(f"File written successfully: {target_path} ({size} bytes)")
return OperationResult(
success=True,
message=f"File written successfully: {path} ({size} bytes)",
path=get_relative_path(target_path, self.root),
)
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to write file: {str(e)}",
path=get_relative_path(target_path, self.root),
)
def append_file(self, path: str, content: str) -> OperationResult:
"""
Append content to an existing file.
Args:
path: Relative path to the file
content: Text content to append
Returns:
OperationResult indicating success or failure
"""
# Validate path
target_path = validate_path(path, self.root)
if not target_path.exists():
return OperationResult(
success=False,
message=f"File not found: {path}",
path=get_relative_path(target_path, self.root),
)
try:
with open(target_path, "a", encoding="utf-8") as f:
f.write(content)
size = target_path.stat().st_size
return OperationResult(
success=True,
message=f"Content appended successfully: {path} (new size: {size} bytes)",
path=get_relative_path(target_path, self.root),
)
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to append to file: {str(e)}",
path=get_relative_path(target_path, self.root),
)
def delete_file(self, path: str) -> OperationResult:
"""
Delete a file.
Args:
path: Relative path to the file
Returns:
OperationResult indicating success or failure
"""
logger.debug(f"delete_file called: path={path}")
# Validate path
target_path = validate_path(path, self.root)
logger.debug(f"Resolved path: {target_path}")
if not target_path.exists():
return OperationResult(
success=False,
message=f"File not found: {path}",
path=get_relative_path(target_path, self.root),
)
if not target_path.is_file():
return OperationResult(
success=False,
message=f"Not a file: {path}",
path=get_relative_path(target_path, self.root),
)
try:
logger.warning(f"Deleting file: {target_path}")
target_path.unlink()
logger.info(f"File deleted successfully: {target_path}")
return OperationResult(
success=True,
message=f"File deleted successfully: {path}",
path=get_relative_path(target_path, self.root),
)
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to delete file: {str(e)}",
path=get_relative_path(target_path, self.root),
)
def move_file(self, source_path: str, destination_path: str) -> OperationResult:
"""
Move or rename a file.
Args:
source_path: Relative path to the source file
destination_path: Relative path to the destination
Returns:
OperationResult indicating success or failure
"""
# Validate both paths
source = validate_path(source_path, self.root)
destination = validate_path(destination_path, self.root)
if not source.exists():
return OperationResult(
success=False,
message=f"Source file not found: {source_path}",
path=get_relative_path(source, self.root),
)
if not source.is_file():
return OperationResult(
success=False,
message=f"Source is not a file: {source_path}",
path=get_relative_path(source, self.root),
)
if destination.exists():
return OperationResult(
success=False,
message=f"Destination already exists: {destination_path}",
path=get_relative_path(destination, self.root),
)
try:
# Ensure destination parent directory exists
destination.parent.mkdir(parents=True, exist_ok=True)
# Move file
source.rename(destination)
return OperationResult(
success=True,
message=f"File moved from '{source_path}' to '{destination_path}'",
path=get_relative_path(destination, self.root),
)
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to move file: {str(e)}",
path=get_relative_path(source, self.root),
)
def get_file_metadata(self, path: str) -> FileMetadata:
"""
Get detailed metadata about a file.
Args:
path: Relative path to the file
Returns:
FileMetadata with detailed information
"""
# Validate path
target_path = validate_path(path, self.root)
if not target_path.exists():
return FileMetadata(
path=get_relative_path(target_path, self.root),
exists=False,
size_bytes=0,
is_binary=False,
mime_type=None,
created=0,
modified=0,
accessed=0,
permissions="",
)
if not target_path.is_file():
raise ValueError(f"Not a file: {path}")
stat = target_path.stat()
# Detect if binary
try:
with open(target_path, "rb") as f:
sample = f.read(8192)
binary = is_binary_file(sample)
except Exception:
binary = True
# Guess MIME type
mime_type, _ = mimetypes.guess_type(str(target_path))
# Get permissions as string (rwxrwxrwx)
mode = stat.st_mode
perms = stat_module.filemode(mode)
return FileMetadata(
path=get_relative_path(target_path, self.root),
exists=True,
size_bytes=stat.st_size,
is_binary=binary,
mime_type=mime_type,
created=stat.st_ctime,
modified=stat.st_mtime,
accessed=stat.st_atime,
permissions=perms,
)