"""Directory management operations for LocalFS MCP."""
import shutil
from pathlib import Path
from mcp.server.fastmcp import Context
from .schemas.directory import DirectoryEntry, DirectoryListResult, DirectoryMetadata
from .schemas.common import OperationResult
from .safety import (
validate_path,
get_relative_path,
DEFAULT_ROOT,
)
import logging
def register_directory_tools(server):
"""Register all directory management tools with the MCP server."""
@server.tool()
def list_directories(path: str = ".", ctx: Context = None) -> dict:
"""
List all subdirectories in a given path.
Args:
path: Relative path from root directory (default: current directory)
ctx: MCP context with session config
Returns:
Dictionary with list of directories and metadata
"""
root = DEFAULT_ROOT
logging.warning(f"Listing all subdirectories. {path}")
logging.warning(f"Context:: {ctx}")
# Validate and resolve path
target_path = validate_path(path, root)
if not target_path.exists():
raise FileNotFoundError(f"Directory not found: {path}")
if not target_path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
# List directories
directories = []
for item in target_path.iterdir():
if item.is_dir():
stat = item.stat()
# Count items in directory
try:
item_count = sum(1 for _ in item.iterdir())
except PermissionError:
item_count = 0
directories.append(
DirectoryEntry(
name=item.name,
path=get_relative_path(item, root),
item_count=item_count,
created=stat.st_ctime,
modified=stat.st_mtime,
)
)
# Sort by name
directories.sort(key=lambda d: d.name.lower())
return DirectoryListResult(
directories=directories,
total_count=len(directories),
).model_dump()
@server.tool()
def create_directory(path: str, create_parents: bool = False, ctx: Context = None) -> dict:
"""
Create a new directory.
Args:
path: Relative path for the new directory
create_parents: If True, create parent directories as needed (default: False)
ctx: MCP context with session config
Returns:
Dictionary with operation result
"""
config = ctx.session_config
root = Path(config.root_directory)
# Validate path (it won't exist yet, so we validate the parent)
target_path = validate_path(path, root)
# Check if already exists
if target_path.exists():
return OperationResult(
success=False,
message=f"Directory already exists: {path}",
path=get_relative_path(target_path, root),
).model_dump()
try:
if create_parents:
target_path.mkdir(parents=True, exist_ok=False)
else:
target_path.mkdir(parents=False, exist_ok=False)
return OperationResult(
success=True,
message=f"Directory created successfully: {path}",
path=get_relative_path(target_path, root),
).model_dump()
except FileNotFoundError:
return OperationResult(
success=False,
message="Parent directory does not exist. Use create_parents=True to create it.",
path=get_relative_path(target_path, root),
).model_dump()
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to create directory: {str(e)}",
path=get_relative_path(target_path, root),
).model_dump()
@server.tool()
def delete_directory(path: str, recursive: bool = False, ctx: Context = None) -> dict:
"""
Delete a directory.
Args:
path: Relative path to the directory to delete
recursive: If True, delete directory and all contents (default: False)
ctx: MCP context with session config
Returns:
Dictionary with operation result
"""
config = ctx.session_config
root = Path(config.root_directory)
# Validate and resolve path
target_path = validate_path(path, root)
if not target_path.exists():
return OperationResult(
success=False,
message=f"Directory not found: {path}",
path=get_relative_path(target_path, root),
).model_dump()
if not target_path.is_dir():
return OperationResult(
success=False,
message=f"Not a directory: {path}",
path=get_relative_path(target_path, root),
).model_dump()
try:
if recursive:
shutil.rmtree(target_path)
else:
target_path.rmdir()
return OperationResult(
success=True,
message=f"Directory deleted successfully: {path}",
path=get_relative_path(target_path, root),
).model_dump()
except OSError as e:
if "Directory not empty" in str(e):
return OperationResult(
success=False,
message="Directory not empty. Use recursive=True to delete with contents.",
path=get_relative_path(target_path, root),
).model_dump()
else:
return OperationResult(
success=False,
message=f"Failed to delete directory: {str(e)}",
path=get_relative_path(target_path, root),
).model_dump()
@server.tool()
def move_directory(source_path: str, destination_path: str, ctx: Context = None) -> dict:
"""
Move or rename a directory.
Args:
source_path: Relative path to the source directory
destination_path: Relative path to the destination
ctx: MCP context with session config
Returns:
Dictionary with operation result
"""
config = ctx.session_config
root = Path(config.root_directory)
# Validate both paths
source = validate_path(source_path, root)
destination = validate_path(destination_path, root)
if not source.exists():
return OperationResult(
success=False,
message=f"Source directory not found: {source_path}",
path=get_relative_path(source, root),
).model_dump()
if not source.is_dir():
return OperationResult(
success=False,
message=f"Source is not a directory: {source_path}",
path=get_relative_path(source, root),
).model_dump()
if destination.exists():
return OperationResult(
success=False,
message=f"Destination already exists: {destination_path}",
path=get_relative_path(destination, root),
).model_dump()
try:
shutil.move(str(source), str(destination))
return OperationResult(
success=True,
message=f"Directory moved from '{source_path}' to '{destination_path}'",
path=get_relative_path(destination, root),
).model_dump()
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to move directory: {str(e)}",
path=get_relative_path(source, root),
).model_dump()
@server.tool()
def get_directory_metadata(path: str = ".") -> dict:
"""
Get detailed metadata about a directory.
Args:
path: Relative path to the directory (default: current directory)
Returns:
Dictionary with detailed directory metadata
"""
root = DEFAULT_ROOT
# Validate and resolve path
target_path = validate_path(path, root)
if not target_path.exists():
return DirectoryMetadata(
path=get_relative_path(target_path, root),
exists=False,
item_count=0,
file_count=0,
directory_count=0,
created=0,
modified=0,
accessed=0,
).model_dump()
if not target_path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
stat = target_path.stat()
# Count items
try:
items = list(target_path.iterdir())
file_count = sum(1 for item in items if item.is_file())
directory_count = sum(1 for item in items if item.is_dir())
item_count = len(items)
except (PermissionError, OSError):
file_count = 0
directory_count = 0
item_count = 0
return DirectoryMetadata(
path=get_relative_path(target_path, root),
exists=True,
item_count=item_count,
file_count=file_count,
directory_count=directory_count,
created=stat.st_ctime,
modified=stat.st_mtime,
accessed=stat.st_atime,
).model_dump()