Skip to main content
Glama

IntelliDiff MCP Server

by batteryshark
folder_operations.py14 kB
""" Folder comparison and duplicate detection operations """ import os from datetime import datetime from pathlib import Path from typing import Literal from workspace_security import validate_path, WORKSPACE_ROOT from file_operations import is_text_file, calculate_crc32 # Get max depth from environment MAX_DEPTH = int(os.getenv("INTELLIDIFF_MAX_DEPTH", 10)) def compare_folders( left_path: str, right_path: str, max_depth: int = None, include_binary: bool = True, comparison_mode: Literal["exact", "smart_text"] = "exact" ) -> dict: """Compare two folder structures recursively.""" try: if max_depth is None: max_depth = MAX_DEPTH if max_depth < 0: max_depth = MAX_DEPTH left_root = validate_path(left_path) right_root = validate_path(right_path) if not left_root.exists(): return {"error": f"Left folder does not exist: {left_path}"} if not right_root.exists(): return {"error": f"Right folder does not exist: {right_path}"} if not left_root.is_dir(): return {"error": f"Left path is not a directory: {left_path}"} if not right_root.is_dir(): return {"error": f"Right path is not a directory: {right_path}"} def scan_directory(root_path: Path, current_depth: int = 0) -> dict: """Recursively scan directory and return file info.""" files = {} dirs = {} if current_depth >= max_depth: return {"files": files, "dirs": dirs} try: for item in root_path.iterdir(): # Skip hidden files and directories if item.name.startswith('.'): continue relative_path = item.relative_to(root_path) if item.is_file(): try: stat = item.stat() is_text = is_text_file(item) files[str(relative_path)] = { "size": stat.st_size, "is_text": is_text, "modified_time": stat.st_mtime, "crc32": calculate_crc32(item) if include_binary or is_text else None } except (OSError, ValueError): # Skip files we can't read continue elif item.is_dir(): # Handle symlinks to prevent loops if item.is_symlink(): try: resolved = item.resolve() # Check if symlink points outside workspace resolved.relative_to(WORKSPACE_ROOT) # Check for loops by seeing if we're going back up the tree if str(resolved) in str(root_path): continue except (ValueError, OSError): continue subdir_result = scan_directory(item, current_depth + 1) dirs[str(relative_path)] = subdir_result except (OSError, PermissionError): # Skip directories we can't access pass return {"files": files, "dirs": dirs} # Get current timestamp for context current_time = datetime.now() current_context = { "current_date": current_time.strftime("%Y-%m-%d"), "current_time": current_time.strftime("%H:%M:%S"), "current_datetime": current_time.isoformat(), "weekday": current_time.strftime("%A"), "timestamp": current_time.timestamp() } # Scan both directories left_structure = scan_directory(left_root) right_structure = scan_directory(right_root) # Find orphans and differences def compare_structures(left_struct, right_struct, path_prefix=""): """Compare two directory structures.""" results = { "identical_files": [], "different_files": [], "left_only": [], "right_only": [], "total_files": 0, "total_dirs": 0 } # Compare files left_files = left_struct.get("files", {}) right_files = right_struct.get("files", {}) all_files = set(left_files.keys()) | set(right_files.keys()) for filename in all_files: full_path = f"{path_prefix}/{filename}" if path_prefix else filename results["total_files"] += 1 if filename in left_files and filename in right_files: left_info = left_files[filename] right_info = right_files[filename] # Compare files if left_info.get("crc32") and right_info.get("crc32"): if left_info["crc32"] == right_info["crc32"]: results["identical_files"].append({ "path": full_path, "size": left_info["size"], "crc32": left_info["crc32"] }) else: results["different_files"].append({ "path": full_path, "left_size": left_info["size"], "right_size": right_info["size"], "left_crc32": left_info["crc32"], "right_crc32": right_info["crc32"] }) else: # Can't compare (binary files not included or read error) results["different_files"].append({ "path": full_path, "reason": "comparison_not_available", "left_size": left_info["size"], "right_size": right_info["size"] }) elif filename in left_files: results["left_only"].append({ "path": full_path, "size": left_files[filename]["size"], "is_text": left_files[filename]["is_text"] }) else: # filename in right_files results["right_only"].append({ "path": full_path, "size": right_files[filename]["size"], "is_text": right_files[filename]["is_text"] }) # Compare subdirectories left_dirs = left_struct.get("dirs", {}) right_dirs = right_struct.get("dirs", {}) all_dirs = set(left_dirs.keys()) | set(right_dirs.keys()) for dirname in all_dirs: full_path = f"{path_prefix}/{dirname}" if path_prefix else dirname results["total_dirs"] += 1 if dirname in left_dirs and dirname in right_dirs: # Recursively compare subdirectories subdir_results = compare_structures( left_dirs[dirname], right_dirs[dirname], full_path ) # Merge results for key in ["identical_files", "different_files", "left_only", "right_only"]: results[key].extend(subdir_results[key]) results["total_files"] += subdir_results["total_files"] results["total_dirs"] += subdir_results["total_dirs"] elif dirname in left_dirs: results["left_only"].append({ "path": full_path, "type": "directory" }) else: # dirname in right_dirs results["right_only"].append({ "path": full_path, "type": "directory" }) return results comparison_results = compare_structures(left_structure, right_structure) return { "left_path": str(left_root), "right_path": str(right_root), "comparison_mode": comparison_mode, "max_depth": max_depth, "include_binary": include_binary, "current_context": current_context, "summary": { "total_files": comparison_results["total_files"], "total_dirs": comparison_results["total_dirs"], "identical_files": len(comparison_results["identical_files"]), "different_files": len(comparison_results["different_files"]), "left_only_items": len(comparison_results["left_only"]), "right_only_items": len(comparison_results["right_only"]) }, "identical_files": comparison_results["identical_files"], "different_files": comparison_results["different_files"], "orphans": { "left_only": comparison_results["left_only"], "right_only": comparison_results["right_only"] } } except Exception as e: return {"error": str(e)} def find_identical_files( folder_path: str, max_depth: int = None ) -> dict: """Find files with identical content (same CRC32) within a folder.""" try: if max_depth is None: max_depth = MAX_DEPTH if max_depth < 0: max_depth = MAX_DEPTH folder_root = validate_path(folder_path) if not folder_root.exists(): return {"error": f"Folder does not exist: {folder_path}"} if not folder_root.is_dir(): return {"error": f"Path is not a directory: {folder_path}"} # Collect all files with their CRC32 hashes file_hashes = {} # crc32 -> list of file paths def scan_for_duplicates(root_path: Path, current_depth: int = 0): if current_depth >= max_depth: return try: for item in root_path.iterdir(): if item.name.startswith('.'): continue if item.is_file(): try: crc32 = calculate_crc32(item) relative_path = item.relative_to(folder_root) if crc32 not in file_hashes: file_hashes[crc32] = [] file_hashes[crc32].append({ "path": str(relative_path), "size": item.stat().st_size, "is_text": is_text_file(item) }) except (OSError, ValueError): continue elif item.is_dir() and not item.is_symlink(): scan_for_duplicates(item, current_depth + 1) except (OSError, PermissionError): pass scan_for_duplicates(folder_root) # Get current timestamp for context current_time = datetime.now() current_context = { "current_date": current_time.strftime("%Y-%m-%d"), "current_time": current_time.strftime("%H:%M:%S"), "current_datetime": current_time.isoformat(), "weekday": current_time.strftime("%A"), "timestamp": current_time.timestamp() } # Find duplicates (CRC32 with more than one file) duplicates = {} unique_files = 0 for crc32, files in file_hashes.items(): if len(files) > 1: duplicates[crc32] = { "files": files, "count": len(files), "size": files[0]["size"], "total_wasted_bytes": files[0]["size"] * (len(files) - 1) } else: unique_files += 1 total_files = sum(len(files) for files in file_hashes.values()) total_duplicate_files = sum(len(files) for files in duplicates.values()) total_wasted_bytes = sum(dup["total_wasted_bytes"] for dup in duplicates.values()) return { "folder_path": str(folder_root), "max_depth": max_depth, "current_context": current_context, "summary": { "total_files": total_files, "unique_files": unique_files, "duplicate_files": total_duplicate_files, "duplicate_groups": len(duplicates), "total_wasted_bytes": total_wasted_bytes }, "duplicates": duplicates } except Exception as e: return {"error": str(e)}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/batteryshark/mcp-intellidiff'

If you have feedback or need assistance with the MCP directory API, please join our Discord server