folder_operations.py•14 kB
"""
Folder comparison and duplicate detection operations
"""
import os
from datetime import datetime
from pathlib import Path
from typing import Literal
from workspace_security import validate_path, WORKSPACE_ROOT
from file_operations import is_text_file, calculate_crc32
# Get max depth from environment
MAX_DEPTH = int(os.getenv("INTELLIDIFF_MAX_DEPTH", 10))
def compare_folders(
left_path: str,
right_path: str,
max_depth: int = None,
include_binary: bool = True,
comparison_mode: Literal["exact", "smart_text"] = "exact"
) -> dict:
"""Compare two folder structures recursively."""
try:
if max_depth is None:
max_depth = MAX_DEPTH
if max_depth < 0:
max_depth = MAX_DEPTH
left_root = validate_path(left_path)
right_root = validate_path(right_path)
if not left_root.exists():
return {"error": f"Left folder does not exist: {left_path}"}
if not right_root.exists():
return {"error": f"Right folder does not exist: {right_path}"}
if not left_root.is_dir():
return {"error": f"Left path is not a directory: {left_path}"}
if not right_root.is_dir():
return {"error": f"Right path is not a directory: {right_path}"}
def scan_directory(root_path: Path, current_depth: int = 0) -> dict:
"""Recursively scan directory and return file info."""
files = {}
dirs = {}
if current_depth >= max_depth:
return {"files": files, "dirs": dirs}
try:
for item in root_path.iterdir():
# Skip hidden files and directories
if item.name.startswith('.'):
continue
relative_path = item.relative_to(root_path)
if item.is_file():
try:
stat = item.stat()
is_text = is_text_file(item)
files[str(relative_path)] = {
"size": stat.st_size,
"is_text": is_text,
"modified_time": stat.st_mtime,
"crc32": calculate_crc32(item) if include_binary or is_text else None
}
except (OSError, ValueError):
# Skip files we can't read
continue
elif item.is_dir():
# Handle symlinks to prevent loops
if item.is_symlink():
try:
resolved = item.resolve()
# Check if symlink points outside workspace
resolved.relative_to(WORKSPACE_ROOT)
# Check for loops by seeing if we're going back up the tree
if str(resolved) in str(root_path):
continue
except (ValueError, OSError):
continue
subdir_result = scan_directory(item, current_depth + 1)
dirs[str(relative_path)] = subdir_result
except (OSError, PermissionError):
# Skip directories we can't access
pass
return {"files": files, "dirs": dirs}
# Get current timestamp for context
current_time = datetime.now()
current_context = {
"current_date": current_time.strftime("%Y-%m-%d"),
"current_time": current_time.strftime("%H:%M:%S"),
"current_datetime": current_time.isoformat(),
"weekday": current_time.strftime("%A"),
"timestamp": current_time.timestamp()
}
# Scan both directories
left_structure = scan_directory(left_root)
right_structure = scan_directory(right_root)
# Find orphans and differences
def compare_structures(left_struct, right_struct, path_prefix=""):
"""Compare two directory structures."""
results = {
"identical_files": [],
"different_files": [],
"left_only": [],
"right_only": [],
"total_files": 0,
"total_dirs": 0
}
# Compare files
left_files = left_struct.get("files", {})
right_files = right_struct.get("files", {})
all_files = set(left_files.keys()) | set(right_files.keys())
for filename in all_files:
full_path = f"{path_prefix}/{filename}" if path_prefix else filename
results["total_files"] += 1
if filename in left_files and filename in right_files:
left_info = left_files[filename]
right_info = right_files[filename]
# Compare files
if left_info.get("crc32") and right_info.get("crc32"):
if left_info["crc32"] == right_info["crc32"]:
results["identical_files"].append({
"path": full_path,
"size": left_info["size"],
"crc32": left_info["crc32"]
})
else:
results["different_files"].append({
"path": full_path,
"left_size": left_info["size"],
"right_size": right_info["size"],
"left_crc32": left_info["crc32"],
"right_crc32": right_info["crc32"]
})
else:
# Can't compare (binary files not included or read error)
results["different_files"].append({
"path": full_path,
"reason": "comparison_not_available",
"left_size": left_info["size"],
"right_size": right_info["size"]
})
elif filename in left_files:
results["left_only"].append({
"path": full_path,
"size": left_files[filename]["size"],
"is_text": left_files[filename]["is_text"]
})
else: # filename in right_files
results["right_only"].append({
"path": full_path,
"size": right_files[filename]["size"],
"is_text": right_files[filename]["is_text"]
})
# Compare subdirectories
left_dirs = left_struct.get("dirs", {})
right_dirs = right_struct.get("dirs", {})
all_dirs = set(left_dirs.keys()) | set(right_dirs.keys())
for dirname in all_dirs:
full_path = f"{path_prefix}/{dirname}" if path_prefix else dirname
results["total_dirs"] += 1
if dirname in left_dirs and dirname in right_dirs:
# Recursively compare subdirectories
subdir_results = compare_structures(
left_dirs[dirname],
right_dirs[dirname],
full_path
)
# Merge results
for key in ["identical_files", "different_files", "left_only", "right_only"]:
results[key].extend(subdir_results[key])
results["total_files"] += subdir_results["total_files"]
results["total_dirs"] += subdir_results["total_dirs"]
elif dirname in left_dirs:
results["left_only"].append({
"path": full_path,
"type": "directory"
})
else: # dirname in right_dirs
results["right_only"].append({
"path": full_path,
"type": "directory"
})
return results
comparison_results = compare_structures(left_structure, right_structure)
return {
"left_path": str(left_root),
"right_path": str(right_root),
"comparison_mode": comparison_mode,
"max_depth": max_depth,
"include_binary": include_binary,
"current_context": current_context,
"summary": {
"total_files": comparison_results["total_files"],
"total_dirs": comparison_results["total_dirs"],
"identical_files": len(comparison_results["identical_files"]),
"different_files": len(comparison_results["different_files"]),
"left_only_items": len(comparison_results["left_only"]),
"right_only_items": len(comparison_results["right_only"])
},
"identical_files": comparison_results["identical_files"],
"different_files": comparison_results["different_files"],
"orphans": {
"left_only": comparison_results["left_only"],
"right_only": comparison_results["right_only"]
}
}
except Exception as e:
return {"error": str(e)}
def find_identical_files(
folder_path: str,
max_depth: int = None
) -> dict:
"""Find files with identical content (same CRC32) within a folder."""
try:
if max_depth is None:
max_depth = MAX_DEPTH
if max_depth < 0:
max_depth = MAX_DEPTH
folder_root = validate_path(folder_path)
if not folder_root.exists():
return {"error": f"Folder does not exist: {folder_path}"}
if not folder_root.is_dir():
return {"error": f"Path is not a directory: {folder_path}"}
# Collect all files with their CRC32 hashes
file_hashes = {} # crc32 -> list of file paths
def scan_for_duplicates(root_path: Path, current_depth: int = 0):
if current_depth >= max_depth:
return
try:
for item in root_path.iterdir():
if item.name.startswith('.'):
continue
if item.is_file():
try:
crc32 = calculate_crc32(item)
relative_path = item.relative_to(folder_root)
if crc32 not in file_hashes:
file_hashes[crc32] = []
file_hashes[crc32].append({
"path": str(relative_path),
"size": item.stat().st_size,
"is_text": is_text_file(item)
})
except (OSError, ValueError):
continue
elif item.is_dir() and not item.is_symlink():
scan_for_duplicates(item, current_depth + 1)
except (OSError, PermissionError):
pass
scan_for_duplicates(folder_root)
# Get current timestamp for context
current_time = datetime.now()
current_context = {
"current_date": current_time.strftime("%Y-%m-%d"),
"current_time": current_time.strftime("%H:%M:%S"),
"current_datetime": current_time.isoformat(),
"weekday": current_time.strftime("%A"),
"timestamp": current_time.timestamp()
}
# Find duplicates (CRC32 with more than one file)
duplicates = {}
unique_files = 0
for crc32, files in file_hashes.items():
if len(files) > 1:
duplicates[crc32] = {
"files": files,
"count": len(files),
"size": files[0]["size"],
"total_wasted_bytes": files[0]["size"] * (len(files) - 1)
}
else:
unique_files += 1
total_files = sum(len(files) for files in file_hashes.values())
total_duplicate_files = sum(len(files) for files in duplicates.values())
total_wasted_bytes = sum(dup["total_wasted_bytes"] for dup in duplicates.values())
return {
"folder_path": str(folder_root),
"max_depth": max_depth,
"current_context": current_context,
"summary": {
"total_files": total_files,
"unique_files": unique_files,
"duplicate_files": total_duplicate_files,
"duplicate_groups": len(duplicates),
"total_wasted_bytes": total_wasted_bytes
},
"duplicates": duplicates
}
except Exception as e:
return {"error": str(e)}