tools.py•26.2 kB
"""
MCP Tools for IntelliDiff Server
"""
import os
import re
import difflib
from datetime import datetime
from pathlib import Path
from typing import Literal
from fastmcp.tools.tool import ToolResult
from mcp.types import TextContent
from workspace_security import validate_path, WORKSPACE_ROOT
from file_operations import (
text_response, is_text_file, calculate_crc32, normalize_text,
read_text_file
)
# Get max depth from environment
MAX_DEPTH = int(os.getenv("INTELLIDIFF_MAX_DEPTH", 10))
def get_current_timestamp() -> dict:
"""Get current timestamp information for context."""
now = datetime.now()
return {
"current_date": now.strftime("%Y-%m-%d"),
"current_time": now.strftime("%H:%M:%S"),
"current_datetime": now.isoformat(),
"weekday": now.strftime("%A"),
"timestamp": now.timestamp()
}
def validate_workspace_path(path: str) -> ToolResult:
"""Validate that a path is within the workspace root."""
try:
validated_path = validate_path(path)
if validated_path.exists():
if validated_path.is_file():
path_type = "📄 file"
elif validated_path.is_dir():
path_type = "📁 directory"
else:
path_type = "❓ other"
status = "exists"
else:
path_type = "❓ unknown"
status = "does not exist"
output = f"""✅ **Path is valid**
📂 Resolved: `{validated_path}`
🏠 Workspace: `{WORKSPACE_ROOT}`
📋 Status: {path_type}, {status}"""
return text_response(output)
except ValueError as e:
output = f"""❌ **Path is invalid**
🚫 Error: {e}
🏠 Workspace: `{WORKSPACE_ROOT}`"""
return text_response(output)
def get_file_hash(file_path: str) -> ToolResult:
"""Get CRC32 hash and basic file info."""
try:
path = validate_path(file_path)
if not path.exists():
return text_response(f"❌ File does not exist: {file_path}")
if not path.is_file():
return text_response(f"❌ Path is not a file: {file_path}")
# Get file stats
stat = path.stat()
is_text = is_text_file(path)
crc32_hash = calculate_crc32(path)
file_type = "text" if is_text else "binary"
size_mb = stat.st_size / (1024 * 1024) if stat.st_size > 1024 * 1024 else None
size_str = f"{size_mb:.1f}MB" if size_mb else f"{stat.st_size} bytes"
# Format timestamps
modified_dt = datetime.fromtimestamp(stat.st_mtime)
created_dt = datetime.fromtimestamp(stat.st_ctime)
current_info = get_current_timestamp()
output = f"""📁 **{path.name}**
🔢 CRC32: `{crc32_hash}`
📏 Size: {size_str}
📄 Type: {file_type}
📅 Modified: {modified_dt.strftime('%Y-%m-%d %H:%M:%S')} ({modified_dt.strftime('%A')})
📅 Created: {created_dt.strftime('%Y-%m-%d %H:%M:%S')} ({created_dt.strftime('%A')})
📂 Path: {path}
🕐 **Current Time**: {current_info['current_date']} {current_info['current_time']} ({current_info['weekday']})"""
return ToolResult(
content=[TextContent(type="text", text=output)],
structured_content={
"file_path": str(path),
"crc32": crc32_hash,
"size_bytes": stat.st_size,
"file_type": file_type,
"is_text": is_text,
"modified_timestamp": stat.st_mtime,
"created_timestamp": stat.st_ctime,
"modified_datetime": modified_dt.isoformat(),
"created_datetime": created_dt.isoformat(),
"current_context": current_info
}
)
except Exception as e:
return text_response(f"❌ Error: {e}")
def compare_files(
left_path: str,
right_path: str,
mode: Literal["exact", "smart_text", "binary"] = "exact",
ignore_blank_lines: bool = False,
ignore_newline_differences: bool = False,
ignore_whitespace: bool = False,
ignore_case: bool = False,
normalize_tabs: bool = False,
unicode_normalize: bool = False
) -> ToolResult | dict:
"""Compare two files with specified comparison mode."""
try:
left = validate_path(left_path)
right = validate_path(right_path)
# Check if both files exist
if not left.exists():
return text_response(f"❌ Left file does not exist: {left_path}")
if not right.exists():
return text_response(f"❌ Right file does not exist: {right_path}")
if not left.is_file():
return text_response(f"❌ Left path is not a file: {left_path}")
if not right.is_file():
return text_response(f"❌ Right path is not a file: {right_path}")
# Get file info
left_stat = left.stat()
right_stat = right.stat()
left_is_text = is_text_file(left)
right_is_text = is_text_file(right)
# Handle different comparison modes
if mode == "binary" or (not left_is_text or not right_is_text):
# Binary comparison - just use CRC32
left_crc32 = calculate_crc32(left)
right_crc32 = calculate_crc32(right)
identical = left_crc32 == right_crc32
result_icon = "✅" if identical else "❌"
left_size_str = f"{left_stat.st_size / (1024*1024):.1f}MB" if left_stat.st_size > 1024*1024 else f"{left_stat.st_size} bytes"
right_size_str = f"{right_stat.st_size / (1024*1024):.1f}MB" if right_stat.st_size > 1024*1024 else f"{right_stat.st_size} bytes"
output = f"""{result_icon} **Binary Comparison**
📁 Left: `{left.name}` ({left_size_str}, CRC32: `{left_crc32}`)
📁 Right: `{right.name}` ({right_size_str}, CRC32: `{right_crc32}`)
🔍 Result: {'Identical' if identical else 'Different'}"""
return text_response(output)
elif mode == "exact":
# Exact comparison using CRC32 (fast)
if left_stat.st_size != right_stat.st_size:
left_size_str = f"{left_stat.st_size / (1024*1024):.1f}MB" if left_stat.st_size > 1024*1024 else f"{left_stat.st_size} bytes"
right_size_str = f"{right_stat.st_size / (1024*1024):.1f}MB" if right_stat.st_size > 1024*1024 else f"{right_stat.st_size} bytes"
output = f"""❌ **Exact Comparison - Different Sizes**
📁 Left: `{left.name}` ({left_size_str})
📁 Right: `{right.name}` ({right_size_str})
🔍 Result: Different (size mismatch)"""
return text_response(output)
left_crc32 = calculate_crc32(left)
right_crc32 = calculate_crc32(right)
identical = left_crc32 == right_crc32
result_icon = "✅" if identical else "❌"
size_str = f"{left_stat.st_size / (1024*1024):.1f}MB" if left_stat.st_size > 1024*1024 else f"{left_stat.st_size} bytes"
output = f"""{result_icon} **Exact Comparison**
📁 Left: `{left.name}` (CRC32: `{left_crc32}`)
📁 Right: `{right.name}` (CRC32: `{right_crc32}`)
📏 Size: {size_str}
🔍 Result: {'Identical' if identical else 'Different'}"""
return text_response(output)
elif mode == "smart_text":
# Smart text comparison with normalization
if not left_is_text or not right_is_text:
return text_response(f"❌ **Smart text comparison requires both files to be text files**\n📁 Left: {'text' if left_is_text else 'binary'}\n📁 Right: {'text' if right_is_text else 'binary'}")
# Read and normalize both files
left_content = read_text_file(left)
right_content = read_text_file(right)
left_normalized = normalize_text(
left_content,
ignore_blank_lines=ignore_blank_lines,
ignore_newline_differences=ignore_newline_differences,
ignore_whitespace=ignore_whitespace,
ignore_case=ignore_case,
normalize_tabs=normalize_tabs,
unicode_normalize=unicode_normalize
)
right_normalized = normalize_text(
right_content,
ignore_blank_lines=ignore_blank_lines,
ignore_newline_differences=ignore_newline_differences,
ignore_whitespace=ignore_whitespace,
ignore_case=ignore_case,
normalize_tabs=normalize_tabs,
unicode_normalize=unicode_normalize
)
identical = left_normalized == right_normalized
if identical:
# Build normalization summary
normalizations = []
if ignore_case: normalizations.append("case")
if ignore_whitespace: normalizations.append("whitespace")
if ignore_blank_lines: normalizations.append("blank lines")
if ignore_newline_differences: normalizations.append("line endings")
if normalize_tabs: normalizations.append("tabs")
if unicode_normalize: normalizations.append("unicode")
norm_text = f" (normalized: {', '.join(normalizations)})" if normalizations else ""
left_size_str = f"{left_stat.st_size / (1024*1024):.1f}MB" if left_stat.st_size > 1024*1024 else f"{left_stat.st_size} bytes"
right_size_str = f"{right_stat.st_size / (1024*1024):.1f}MB" if right_stat.st_size > 1024*1024 else f"{right_stat.st_size} bytes"
output = f"""✅ **Smart Text Comparison - Identical**
📁 Left: `{left.name}` ({left_size_str})
📁 Right: `{right.name}` ({right_size_str})
🔍 Result: Identical{norm_text}"""
return text_response(output)
else:
# Generate unified diff with line number tracking
left_lines = left_normalized.splitlines(keepends=True)
right_lines = right_normalized.splitlines(keepends=True)
diff_lines = list(difflib.unified_diff(
left_lines,
right_lines,
fromfile=str(left),
tofile=str(right),
lineterm=''
))
diff_text = '\n'.join(diff_lines)
# Parse diff to extract line ranges and changes
diff_chunks = []
current_chunk = None
for line in diff_lines:
if line.startswith('@@'):
# Parse hunk header: @@ -start,count +start,count @@
match = re.match(r'@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@', line)
if match:
left_start = int(match.group(1))
left_count = int(match.group(2)) if match.group(2) else 1
right_start = int(match.group(3))
right_count = int(match.group(4)) if match.group(4) else 1
current_chunk = {
"left_start": left_start,
"left_end": left_start + left_count - 1,
"right_start": right_start,
"right_end": right_start + right_count - 1,
"changes": []
}
diff_chunks.append(current_chunk)
elif current_chunk and line.startswith(('-', '+')):
# Track individual line changes
change_type = "removed" if line.startswith('-') else "added"
current_chunk["changes"].append({
"type": change_type,
"line": line[1:].rstrip('\n'),
"line_number": None # Will be calculated based on context
})
# Generate detailed change analysis
change_summary = {
"total_chunks": len(diff_chunks),
"chunks": diff_chunks,
"line_ranges": {
"left_affected": [],
"right_affected": []
}
}
# Extract affected line ranges for easy agent access
for chunk in diff_chunks:
if chunk["left_start"] <= chunk["left_end"]:
change_summary["line_ranges"]["left_affected"].append({
"start": chunk["left_start"],
"end": chunk["left_end"],
"count": chunk["left_end"] - chunk["left_start"] + 1
})
if chunk["right_start"] <= chunk["right_end"]:
change_summary["line_ranges"]["right_affected"].append({
"start": chunk["right_start"],
"end": chunk["right_end"],
"count": chunk["right_end"] - chunk["right_start"] + 1
})
# Return structured result with diff as text content
return ToolResult(
content=[TextContent(type="text", text=diff_text)],
structured_content={
"identical": False,
"left_path": str(left),
"right_path": str(right),
"left_size": left_stat.st_size,
"right_size": right_stat.st_size,
"left_line_count": len(left_lines),
"right_line_count": len(right_lines),
"comparison_mode": mode,
"differences_found": len([line for line in diff_lines if line.startswith(('+', '-')) and not line.startswith(('+++', '---'))]),
"change_summary": change_summary,
"normalization_applied": {
"ignore_blank_lines": ignore_blank_lines,
"ignore_newline_differences": ignore_newline_differences,
"ignore_whitespace": ignore_whitespace,
"ignore_case": ignore_case,
"normalize_tabs": normalize_tabs,
"unicode_normalize": unicode_normalize
}
}
)
except Exception as e:
return text_response(f"❌ **Comparison Error**: {e}")
def read_file_lines(
file_path: str,
start_line: int = 1,
end_line: int = None,
context_lines: int = 0
) -> ToolResult:
"""Read specific line ranges from a file with optional context."""
try:
path = validate_path(file_path)
if not path.exists():
return ToolResult(
content=[TextContent(type="text", text="")],
structured_content={"error": f"File does not exist: {file_path}"}
)
if not path.is_file():
return ToolResult(
content=[TextContent(type="text", text="")],
structured_content={"error": f"Path is not a file: {file_path}"}
)
# Check if it's a text file
if not is_text_file(path):
return ToolResult(
content=[TextContent(type="text", text="")],
structured_content={"error": f"File is not a text file: {file_path}"}
)
# Read file content
try:
content = read_text_file(path)
lines = content.splitlines()
total_lines = len(lines)
# Validate line numbers
if start_line < 1:
start_line = 1
if end_line is None:
end_line = total_lines
if end_line > total_lines:
end_line = total_lines
# Add context lines
actual_start = max(1, start_line - context_lines)
actual_end = min(total_lines, end_line + context_lines)
# Extract lines (convert to 0-based indexing)
selected_lines = lines[actual_start - 1:actual_end]
# Format output with line numbers
formatted_lines = []
for i, line in enumerate(selected_lines, start=actual_start):
prefix = ">>> " if start_line <= i <= end_line else " "
formatted_lines.append(f"{prefix}{i:4d}| {line}")
output_text = '\n'.join(formatted_lines)
return ToolResult(
content=[TextContent(type="text", text=output_text)],
structured_content={
"file_path": str(path),
"requested_range": {"start": start_line, "end": end_line},
"actual_range": {"start": actual_start, "end": actual_end},
"total_lines": total_lines,
"context_lines": context_lines,
"lines_returned": len(selected_lines)
}
)
except ValueError as e:
return ToolResult(
content=[TextContent(type="text", text="")],
structured_content={"error": str(e)}
)
except Exception as e:
return ToolResult(
content=[TextContent(type="text", text="")],
structured_content={"error": str(e)}
)
def batch_get_file_hashes(file_paths: list[str]) -> dict:
"""Get CRC32 hashes and info for multiple files at once."""
current_info = get_current_timestamp()
results = []
errors = []
for file_path in file_paths:
try:
path = validate_path(file_path)
if not path.exists():
errors.append(f"❌ File does not exist: {file_path}")
continue
if not path.is_file():
errors.append(f"❌ Path is not a file: {file_path}")
continue
# Get file stats
stat = path.stat()
is_text = is_text_file(path)
crc32_hash = calculate_crc32(path)
modified_dt = datetime.fromtimestamp(stat.st_mtime)
created_dt = datetime.fromtimestamp(stat.st_ctime)
results.append({
"file_path": str(path),
"filename": path.name,
"crc32": crc32_hash,
"size_bytes": stat.st_size,
"file_type": "text" if is_text else "binary",
"is_text": is_text,
"modified_timestamp": stat.st_mtime,
"created_timestamp": stat.st_ctime,
"modified_datetime": modified_dt.isoformat(),
"created_datetime": created_dt.isoformat(),
"relative_age_days": (datetime.now() - modified_dt).days
})
except Exception as e:
errors.append(f"❌ Error processing {file_path}: {e}")
return {
"current_context": current_info,
"total_files_requested": len(file_paths),
"successful_files": len(results),
"failed_files": len(errors),
"files": results,
"errors": errors
}
def batch_compare_files(file_pairs: list[dict]) -> dict:
"""Compare multiple file pairs at once. Each pair should have 'left' and 'right' keys."""
current_info = get_current_timestamp()
results = []
errors = []
for i, pair in enumerate(file_pairs):
try:
if not isinstance(pair, dict) or 'left' not in pair or 'right' not in pair:
errors.append(f"❌ Pair {i+1}: Invalid format, need dict with 'left' and 'right' keys")
continue
left_path = pair['left']
right_path = pair['right']
mode = pair.get('mode', 'exact')
# Validate paths
left = validate_path(left_path)
right = validate_path(right_path)
if not left.exists():
errors.append(f"❌ Pair {i+1}: Left file does not exist: {left_path}")
continue
if not right.exists():
errors.append(f"❌ Pair {i+1}: Right file does not exist: {right_path}")
continue
if not left.is_file() or not right.is_file():
errors.append(f"❌ Pair {i+1}: Both paths must be files")
continue
# Get file stats
left_stat = left.stat()
right_stat = right.stat()
left_is_text = is_text_file(left)
right_is_text = is_text_file(right)
# Quick comparison using CRC32
left_crc32 = calculate_crc32(left)
right_crc32 = calculate_crc32(right)
identical = left_crc32 == right_crc32
left_modified = datetime.fromtimestamp(left_stat.st_mtime)
right_modified = datetime.fromtimestamp(right_stat.st_mtime)
results.append({
"pair_index": i + 1,
"left_path": str(left),
"right_path": str(right),
"left_filename": left.name,
"right_filename": right.name,
"identical": identical,
"left_crc32": left_crc32,
"right_crc32": right_crc32,
"left_size": left_stat.st_size,
"right_size": right_stat.st_size,
"left_is_text": left_is_text,
"right_is_text": right_is_text,
"left_modified": left_modified.isoformat(),
"right_modified": right_modified.isoformat(),
"left_age_days": (datetime.now() - left_modified).days,
"right_age_days": (datetime.now() - right_modified).days,
"newer_file": "left" if left_modified > right_modified else "right" if right_modified > left_modified else "same",
"comparison_mode": mode
})
except Exception as e:
errors.append(f"❌ Error processing pair {i+1}: {e}")
# Summary stats
identical_count = sum(1 for r in results if r["identical"])
different_count = len(results) - identical_count
return {
"current_context": current_info,
"total_pairs_requested": len(file_pairs),
"successful_comparisons": len(results),
"failed_comparisons": len(errors),
"summary": {
"identical_pairs": identical_count,
"different_pairs": different_count,
"success_rate": len(results) / len(file_pairs) if file_pairs else 0
},
"comparisons": results,
"errors": errors
}
def batch_compare_folders(folder_pairs: list[dict]) -> dict:
"""Compare multiple folder pairs at once. Each pair should have 'left' and 'right' keys."""
current_info = get_current_timestamp()
results = []
errors = []
for i, pair in enumerate(folder_pairs):
try:
if not isinstance(pair, dict) or 'left' not in pair or 'right' not in pair:
errors.append(f"❌ Pair {i+1}: Invalid format, need dict with 'left' and 'right' keys")
continue
left_path = pair['left']
right_path = pair['right']
max_depth = pair.get('max_depth', MAX_DEPTH)
include_binary = pair.get('include_binary', True)
# Import here to avoid circular import
from folder_operations import compare_folders
result = compare_folders(
left_path=left_path,
right_path=right_path,
max_depth=max_depth,
include_binary=include_binary
)
if "error" in result:
errors.append(f"❌ Pair {i+1}: {result['error']}")
continue
# Add pair index and enhance with timing info
result["pair_index"] = i + 1
result["comparison_timestamp"] = current_info["current_datetime"]
results.append(result)
except Exception as e:
errors.append(f"❌ Error processing folder pair {i+1}: {e}")
# Summary across all folder comparisons
total_files = sum(r.get("summary", {}).get("total_files", 0) for r in results)
total_identical = sum(r.get("summary", {}).get("identical_files", 0) for r in results)
total_different = sum(r.get("summary", {}).get("different_files", 0) for r in results)
return {
"current_context": current_info,
"total_pairs_requested": len(folder_pairs),
"successful_comparisons": len(results),
"failed_comparisons": len(errors),
"aggregate_summary": {
"total_files_across_all": total_files,
"total_identical_across_all": total_identical,
"total_different_across_all": total_different,
"success_rate": len(results) / len(folder_pairs) if folder_pairs else 0
},
"folder_comparisons": results,
"errors": errors
}