"""File management operations for LocalFS MCP."""
import base64
import mimetypes
import stat as stat_module
from pathlib import Path
from mcp.server.fastmcp import Context
from .schemas import LocalFSConfig
from .schemas.file import FileEntry, FileListResult, FileContentResult, FileMetadata
from .schemas.common import OperationResult
from .safety import (
validate_path,
check_file_size,
is_binary_file,
get_relative_path,
DEFAULT_ROOT,
)
def register_file_tools(server):
"""Register all file management tools with the MCP server."""
@server.tool()
def list_files(path: str = ".", pattern: str = "*") -> dict:
"""
List all files in a given directory.
Args:
path: Relative path from root directory (default: current directory)
pattern: Glob pattern to filter files (default: "*" for all files)
Returns:
Dictionary with list of files and metadata
"""
root = DEFAULT_ROOT
# Validate and resolve path
target_path = validate_path(path, root)
if not target_path.exists():
raise FileNotFoundError(f"Directory not found: {path}")
if not target_path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
# List files matching pattern
files = []
for item in target_path.glob(pattern):
if item.is_file():
stat = item.stat()
# Detect if binary by reading first chunk
try:
with open(item, "rb") as f:
sample = f.read(512)
binary = is_binary_file(sample)
except Exception:
binary = True
files.append(
FileEntry(
name=item.name,
path=get_relative_path(item, root),
size_bytes=stat.st_size,
created=stat.st_ctime,
modified=stat.st_mtime,
is_binary=binary,
)
)
# Sort by name
files.sort(key=lambda f: f.name.lower())
return FileListResult(
files=files,
total_count=len(files),
).model_dump()
@server.tool()
def read_file(path: str, offset: int = 0, limit: int | None = None) -> dict:
"""
Read file content with optional chunking support.
For text files, content is returned as UTF-8 text.
For binary files, content is base64 encoded.
Args:
path: Relative path to the file
offset: Byte offset to start reading from (default: 0)
limit: Maximum number of bytes to read (default: None for entire file)
Returns:
Dictionary with file content and metadata
"""
root = DEFAULT_ROOT
# Validate and resolve path
target_path = validate_path(path, root)
if not target_path.exists():
raise FileNotFoundError(f"File not found: {path}")
if not target_path.is_file():
raise ValueError(f"Not a file: {path}")
# Check file size limit
max_size = LocalFSConfig.max_file_size_mb * 1024 * 1024
check_file_size(target_path, max_size)
# Read file
with open(target_path, "rb") as f:
f.seek(offset)
data = f.read(limit) if limit else f.read()
total_size = target_path.stat().st_size
# Detect binary vs text
binary = is_binary_file(data)
if binary:
content = base64.b64encode(data).decode("ascii")
encoding = None
else:
content = data.decode("utf-8", errors="replace")
encoding = "utf-8"
return FileContentResult(
content=content,
is_binary=binary,
encoding=encoding,
total_size=total_size,
bytes_read=len(data),
offset=offset,
).model_dump()
@server.tool()
def write_file(path: str, content: str, is_base64: bool = False, ctx: Context = None) -> dict:
"""
Write or overwrite a file.
Args:
path: Relative path to the file
content: File content (text or base64 encoded binary)
is_base64: If True, content is base64 encoded binary data (default: False)
ctx: MCP context with session config
Returns:
Dictionary with operation result
"""
config = ctx.session_config
root = Path(config.root_directory)
# Validate path
target_path = validate_path(path, root)
try:
# Ensure parent directory exists
target_path.parent.mkdir(parents=True, exist_ok=True)
if is_base64:
# Decode base64 and write binary
data = base64.b64decode(content)
with open(target_path, "wb") as f:
f.write(data)
else:
# Write text
with open(target_path, "w", encoding="utf-8") as f:
f.write(content)
size = target_path.stat().st_size
return OperationResult(
success=True,
message=f"File written successfully: {path} ({size} bytes)",
path=get_relative_path(target_path, root),
).model_dump()
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to write file: {str(e)}",
path=get_relative_path(target_path, root),
).model_dump()
@server.tool()
def append_file(path: str, content: str, ctx: Context = None) -> dict:
"""
Append content to an existing file.
Args:
path: Relative path to the file
content: Text content to append
ctx: MCP context with session config
Returns:
Dictionary with operation result
"""
config = ctx.session_config
root = Path(config.root_directory)
# Validate path
target_path = validate_path(path, root)
if not target_path.exists():
return OperationResult(
success=False,
message=f"File not found: {path}",
path=get_relative_path(target_path, root),
).model_dump()
try:
with open(target_path, "a", encoding="utf-8") as f:
f.write(content)
size = target_path.stat().st_size
return OperationResult(
success=True,
message=f"Content appended successfully: {path} (new size: {size} bytes)",
path=get_relative_path(target_path, root),
).model_dump()
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to append to file: {str(e)}",
path=get_relative_path(target_path, root),
).model_dump()
@server.tool()
def delete_file(path: str, ctx: Context = None) -> dict:
"""
Delete a file.
Args:
path: Relative path to the file
ctx: MCP context with session config
Returns:
Dictionary with operation result
"""
config = ctx.session_config
root = Path(config.root_directory)
# Validate path
target_path = validate_path(path, root)
if not target_path.exists():
return OperationResult(
success=False,
message=f"File not found: {path}",
path=get_relative_path(target_path, root),
).model_dump()
if not target_path.is_file():
return OperationResult(
success=False,
message=f"Not a file: {path}",
path=get_relative_path(target_path, root),
).model_dump()
try:
target_path.unlink()
return OperationResult(
success=True,
message=f"File deleted successfully: {path}",
path=get_relative_path(target_path, root),
).model_dump()
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to delete file: {str(e)}",
path=get_relative_path(target_path, root),
).model_dump()
@server.tool()
def move_file(source_path: str, destination_path: str, ctx: Context = None) -> dict:
"""
Move or rename a file.
Args:
source_path: Relative path to the source file
destination_path: Relative path to the destination
ctx: MCP context with session config
Returns:
Dictionary with operation result
"""
config = ctx.session_config
root = Path(config.root_directory)
# Validate both paths
source = validate_path(source_path, root)
destination = validate_path(destination_path, root)
if not source.exists():
return OperationResult(
success=False,
message=f"Source file not found: {source_path}",
path=get_relative_path(source, root),
).model_dump()
if not source.is_file():
return OperationResult(
success=False,
message=f"Source is not a file: {source_path}",
path=get_relative_path(source, root),
).model_dump()
if destination.exists():
return OperationResult(
success=False,
message=f"Destination already exists: {destination_path}",
path=get_relative_path(destination, root),
).model_dump()
try:
# Ensure destination parent directory exists
destination.parent.mkdir(parents=True, exist_ok=True)
# Move file
source.rename(destination)
return OperationResult(
success=True,
message=f"File moved from '{source_path}' to '{destination_path}'",
path=get_relative_path(destination, root),
).model_dump()
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to move file: {str(e)}",
path=get_relative_path(source, root),
).model_dump()
@server.tool()
def get_file_metadata(path: str) -> dict:
"""
Get detailed metadata about a file.
Args:
path: Relative path to the file
Returns:
Dictionary with detailed file metadata
"""
root = DEFAULT_ROOT
# Validate path
target_path = validate_path(path, root)
if not target_path.exists():
return FileMetadata(
path=get_relative_path(target_path, root),
exists=False,
size_bytes=0,
is_binary=False,
mime_type=None,
created=0,
modified=0,
accessed=0,
permissions="",
).model_dump()
if not target_path.is_file():
raise ValueError(f"Not a file: {path}")
stat = target_path.stat()
# Detect if binary
try:
with open(target_path, "rb") as f:
sample = f.read(8192)
binary = is_binary_file(sample)
except Exception:
binary = True
# Guess MIME type
mime_type, _ = mimetypes.guess_type(str(target_path))
# Get permissions as string (rwxrwxrwx)
mode = stat.st_mode
perms = stat_module.filemode(mode)
return FileMetadata(
path=get_relative_path(target_path, root),
exists=True,
size_bytes=stat.st_size,
is_binary=binary,
mime_type=mime_type,
created=stat.st_ctime,
modified=stat.st_mtime,
accessed=stat.st_atime,
permissions=perms,
).model_dump()