file_tools.py.newā¢6.89 kB
"""
File operations tools for Windows Operations MCP.
Provides safe file and folder manipulation with Windows-specific handling.
Note: This module is being refactored. New implementations are in the file_operations package.
"""
import os
import shutil
import pathlib
import time
from typing import Dict, Any, Optional, List
import logging
from .file_operations import (
create_directory_safe,
delete_directory_safe,
move_directory_safe,
copy_directory_safe,
list_directory_contents
)
logger = logging.getLogger(__name__)
def register_file_operations(mcp):
"""Register file and folder operation tools with FastMCP."""
# FILE OPERATIONS
@mcp.tool()
def read_file_safe(file_path: str, encoding: str = "utf-8", max_size_mb: int = 10) -> Dict[str, Any]:
"""Safely read a file with encoding detection and size limits."""
start_time = time.time()
try:
if not file_path or not file_path.strip():
return {"success": False, "error": "File path cannot be empty", "execution_time": 0.0}
file_path = pathlib.Path(file_path).resolve()
if not file_path.exists():
return {"success": False,
"error": f"File does not exist: {file_path}",
"execution_time": time.time() - start_time}
if not file_path.is_file():
return {"success": False,
"error": f"Path is not a file: {file_path}",
"execution_time": time.time() - start_time}
file_size = file_path.stat().st_size
max_size_bytes = max_size_mb * 1024 * 1024
if file_size > max_size_bytes:
return {"success": False,
"error": f"File too large: {file_size:,} bytes (max: {max_size_bytes:,} bytes)",
"execution_time": time.time() - start_time}
with open(file_path, 'r', encoding=encoding, errors='replace') as f:
content = f.read()
stat = file_path.stat()
file_info = {
"name": file_path.name,
"path": str(file_path),
"size_bytes": file_size,
"created": time.ctime(stat.st_ctime),
"modified": time.ctime(stat.st_mtime),
"encoding_used": encoding,
"line_count": content.count('\n') + 1 if content else 0
}
return {
"success": True,
"content": content,
"file_info": file_info,
"execution_time": time.time() - start_time
}
except Exception as e:
return {
"success": False,
"error": f"Failed to read file: {str(e)}",
"execution_time": time.time() - start_time
}
@mcp.tool()
def write_file_safe(file_path: str, content: str, encoding: str = "utf-8",
create_backup: bool = True, create_dirs: bool = False) -> Dict[str, Any]:
"""Safely write content to a file with backup and atomic operations."""
start_time = time.time()
try:
if not file_path or not file_path.strip():
return {"success": False,
"error": "File path cannot be empty",
"execution_time": 0.0}
file_path = pathlib.Path(file_path).resolve()
if create_dirs:
file_path.parent.mkdir(parents=True, exist_ok=True)
backup_path = None
if file_path.exists() and create_backup:
timestamp = time.strftime("%Y%m%d_%H%M%S")
backup_path = file_path.with_suffix(f"{file_path.suffix}.backup_{timestamp}")
shutil.copy2(file_path, backup_path)
temp_path = file_path.with_suffix(f"{file_path.suffix}.tmp_{os.getpid()}")
with open(temp_path, 'w', encoding=encoding, errors='replace') as f:
f.write(content)
shutil.move(str(temp_path), str(file_path))
stat = file_path.stat()
return {
"success": True,
"file_info": {
"name": file_path.name,
"path": str(file_path),
"size_bytes": stat.st_size
},
"backup_path": str(backup_path) if backup_path else None,
"execution_time": time.time() - start_time
}
except Exception as e:
return {
"success": False,
"error": f"Failed to write file: {str(e)}",
"execution_time": time.time() - start_time
}
@mcp.tool()
def copy_file_safe(source_path: str, destination_path: str,
overwrite: bool = False) -> Dict[str, Any]:
"""Safely copy a file with overwrite protection."""
start_time = time.time()
try:
source_path = pathlib.Path(source_path).resolve()
destination_path = pathlib.Path(destination_path).resolve()
if not source_path.exists():
return {
"success": False,
"error": f"Source file does not exist: {source_path}",
"execution_time": time.time() - start_time
}
if destination_path.exists() and not overwrite:
return {
"success": False,
"error": f"Destination already exists: {destination_path}",
"execution_time": time.time() - start_time
}
destination_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source_path, destination_path)
return {
"success": True,
"source": str(source_path),
"destination": str(destination_path),
"execution_time": time.time() - start_time
}
except Exception as e:
return {
"success": False,
"error": f"Failed to copy file: {str(e)}",
"execution_time": time.time() - start_time
}
# Register the folder operations from file_operations package
mcp.tool()(list_directory_contents)
mcp.tool()(create_directory_safe)
mcp.tool()(delete_directory_safe)
mcp.tool()(move_directory_safe)
mcp.tool()(copy_directory_safe)
logger.info("File and folder operations tools registered successfully - 8 tools total")