"""File operations for MCP Python REPL server."""
import logging
import os
from pathlib import Path
from typing import Optional, Dict, Any
import chardet
from .models import FileInfo
logger = logging.getLogger(__name__)
class FileHandler:
"""Handles file operations within project directory."""
def __init__(self, project_dir: Path | None = None):
"""Initialize FileHandler with project directory.
Args:
project_dir: Project directory path. If None, uses current working directory.
"""
self.project_dir = (project_dir or Path.cwd()).resolve()
def _validate_path(self, file_path: str) -> Path:
"""Validate that file path is within project directory.
Args:
file_path: File path to validate
Returns:
Resolved Path object
Raises:
ValueError: If path is outside project directory
"""
requested_path = Path(file_path).resolve()
# Make path relative to project directory if it's not absolute
if not requested_path.is_absolute():
requested_path = (self.project_dir / file_path).resolve()
# Check if path is within project directory
try:
requested_path.relative_to(self.project_dir)
except ValueError:
raise ValueError(f"Path '{file_path}' is outside project directory '{self.project_dir}'")
return requested_path
def _detect_encoding(self, file_path: Path) -> str:
"""Detect file encoding.
Args:
file_path: Path to file
Returns:
Detected encoding or 'utf-8' as default
"""
try:
with open(file_path, 'rb') as f:
raw_data = f.read(10000) # Read first 10KB for detection
result = chardet.detect(raw_data)
encoding = result.get('encoding', 'utf-8')
# Fallback to utf-8 if detection is uncertain
if encoding is None or result.get('confidence', 0) < 0.7:
encoding = 'utf-8'
return encoding
except Exception as e:
logger.warning(f"Encoding detection failed for {file_path}: {e}")
return 'utf-8'
async def read_file(self, file_path: str, encoding: Optional[str] = None) -> FileInfo:
"""Read a file from the project directory.
Args:
file_path: Path to file relative to project directory
encoding: Optional encoding override
Returns:
FileInfo object with file details and content
"""
try:
validated_path = self._validate_path(file_path)
if not validated_path.exists():
return FileInfo(
path=str(validated_path.relative_to(self.project_dir)),
exists=False,
error="File does not exist"
)
if not validated_path.is_file():
return FileInfo(
path=str(validated_path.relative_to(self.project_dir)),
exists=True,
error="Path is not a file"
)
# Detect encoding if not provided
file_encoding = encoding or self._detect_encoding(validated_path)
# Get file size
file_size = validated_path.stat().st_size
# Read file content
try:
content = validated_path.read_text(encoding=file_encoding)
return FileInfo(
path=str(validated_path.relative_to(self.project_dir)),
exists=True,
size=file_size,
encoding=file_encoding,
content=content
)
except UnicodeDecodeError as e:
# Try with different encoding
try:
content = validated_path.read_text(encoding='utf-8', errors='replace')
return FileInfo(
path=str(validated_path.relative_to(self.project_dir)),
exists=True,
size=file_size,
encoding='utf-8',
content=content,
error=f"Encoding issue (used utf-8 with replacement): {e}"
)
except Exception as inner_e:
return FileInfo(
path=str(validated_path.relative_to(self.project_dir)),
exists=True,
size=file_size,
error=f"Failed to read file: {inner_e}"
)
except Exception as e:
logger.error(f"File read error for {file_path}: {e}")
return FileInfo(
path=file_path,
exists=False,
error=str(e)
)
async def write_file(self, file_path: str, content: str, encoding: Optional[str] = None) -> FileInfo:
"""Write content to a file in the project directory.
Args:
file_path: Path to file relative to project directory
content: Content to write
encoding: Optional encoding (defaults to utf-8)
Returns:
FileInfo object with operation result
"""
try:
validated_path = self._validate_path(file_path)
file_encoding = encoding or 'utf-8'
# Create parent directories if they don't exist
validated_path.parent.mkdir(parents=True, exist_ok=True)
# Backup existing file if it exists
backup_created = False
if validated_path.exists():
backup_path = validated_path.with_suffix(validated_path.suffix + '.backup')
try:
backup_path.write_bytes(validated_path.read_bytes())
backup_created = True
logger.info(f"Created backup: {backup_path}")
except Exception as e:
logger.warning(f"Failed to create backup for {validated_path}: {e}")
# Write content
validated_path.write_text(content, encoding=file_encoding)
# Get file size after writing
file_size = validated_path.stat().st_size
success_message = f"File written successfully"
if backup_created:
success_message += " (backup created)"
return FileInfo(
path=str(validated_path.relative_to(self.project_dir)),
exists=True,
size=file_size,
encoding=file_encoding,
content=None, # Don't include content in write response
error=None
)
except Exception as e:
logger.error(f"File write error for {file_path}: {e}")
return FileInfo(
path=file_path,
exists=False,
error=str(e)
)
def get_project_info(self) -> Dict[str, Any]:
"""Get information about the project directory.
Returns:
Dictionary with project information
"""
try:
return {
"project_dir": str(self.project_dir),
"exists": self.project_dir.exists(),
"is_directory": self.project_dir.is_dir(),
"files_count": len(list(self.project_dir.rglob("*"))) if self.project_dir.exists() else 0,
"has_pyproject_toml": (self.project_dir / "pyproject.toml").exists(),
"has_requirements_txt": (self.project_dir / "requirements.txt").exists(),
"has_env_file": (self.project_dir / ".env").exists(),
"has_venv": (self.project_dir / ".venv").exists()
}
except Exception as e:
logger.error(f"Failed to get project info: {e}")
return {
"project_dir": str(self.project_dir),
"error": str(e)
}