file_operations.py•3.7 kB
"""
Core file operations for IntelliDiff MCP Server
"""
import os
import zlib
import unicodedata
from pathlib import Path
from fastmcp.tools.tool import ToolResult
from mcp.types import TextContent
# Environment variable configuration
MAX_TEXT_SIZE = int(os.getenv("INTELLIDIFF_MAX_TEXT_SIZE", 10 * 1024 * 1024)) # 10MB default
MAX_BINARY_SIZE = int(os.getenv("INTELLIDIFF_MAX_BINARY_SIZE", 1024 * 1024 * 1024)) # 1GB default
CHUNK_SIZE = int(os.getenv("INTELLIDIFF_CHUNK_SIZE", 65536)) # 64KB default
def text_response(text: str) -> ToolResult:
"""Return raw text as a ToolResult without JSON wrapping overhead."""
return ToolResult(
content=[TextContent(type="text", text=text)],
structured_content=None # Explicitly disable structured content
)
def is_text_file(file_path: Path, chunk_size: int = 1024) -> bool:
"""Detect if a file is text by checking for null bytes in first chunk."""
try:
with open(file_path, 'rb') as f:
chunk = f.read(chunk_size)
return b'\0' not in chunk
except (OSError, IOError):
return False
def calculate_crc32(file_path: Path, chunk_size: int = None) -> str:
"""Calculate CRC32 hash of a file in chunks."""
if chunk_size is None:
chunk_size = CHUNK_SIZE
# Check file size limits
try:
stat = file_path.stat()
if stat.st_size > MAX_BINARY_SIZE:
raise ValueError(f"File too large for CRC32 calculation: {stat.st_size} bytes (max: {MAX_BINARY_SIZE})")
except OSError as e:
raise ValueError(f"Cannot access file '{file_path}': {e}")
crc = 0
try:
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
crc = zlib.crc32(chunk, crc)
return f"{crc & 0xffffffff:08x}"
except (OSError, IOError) as e:
raise ValueError(f"Cannot read file '{file_path}': {e}")
def normalize_text(text: str,
ignore_blank_lines: bool = False,
ignore_newline_differences: bool = False,
ignore_whitespace: bool = False,
ignore_case: bool = False,
normalize_tabs: bool = False,
unicode_normalize: bool = False) -> str:
"""Normalize text according to specified options."""
if unicode_normalize:
text = unicodedata.normalize('NFKC', text)
if normalize_tabs:
text = text.expandtabs()
if ignore_newline_differences:
text = text.replace('\r\n', '\n').replace('\r', '\n')
if ignore_case:
text = text.lower()
lines = text.splitlines(keepends=True)
if ignore_whitespace:
lines = [line.strip() + ('\n' if line.endswith(('\n', '\r\n', '\r')) else '')
for line in lines]
if ignore_blank_lines:
lines = [line for line in lines if line.strip()]
return ''.join(lines)
def read_text_file(file_path: Path, max_size: int = None) -> str:
"""Read a text file with size limit."""
if max_size is None:
max_size = MAX_TEXT_SIZE
try:
stat = file_path.stat()
if stat.st_size > max_size:
raise ValueError(f"File too large for text comparison: {stat.st_size} bytes (max: {max_size})")
except OSError as e:
raise ValueError(f"Cannot access file '{file_path}': {e}")
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
return f.read()
except (OSError, IOError) as e:
raise ValueError(f"Cannot read text file '{file_path}': {e}")