find_duplicate_files
Identify and manage duplicate files by comparing sizes and contents across directories. Supports recursive searches, size filters, exclusion patterns, and output formats like text or JSON.
Instructions
Find duplicate files by comparing file sizes and contents.
Args:
path: Starting directory
recursive: Whether to search subdirectories
min_size: Minimum file size to consider (bytes)
exclude_patterns: Optional patterns to exclude
max_files: Maximum number of files to scan
format: Output format ('text' or 'json')
ctx: MCP context
Returns:
Duplicate file information
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| exclude_patterns | No | ||
| format | No | text | |
| max_files | No | ||
| min_size | No | ||
| path | Yes | ||
| recursive | No |
Implementation Reference
- mcp_filesystem/advanced.py:381-507 (handler)Core handler implementation in the Advanced class. Scans directories recursively (with limits), groups files by size to optimize, computes MD5 hashes for candidates, and returns duplicates grouped by hash.async def find_duplicate_files( self, root_path: Union[str, Path], recursive: bool = True, min_size: int = 1, exclude_patterns: Optional[List[str]] = None, max_files: int = 1000, ) -> Dict[str, List[str]]: """Find duplicate files by comparing file sizes and contents. Args: root_path: Starting directory recursive: Whether to search subdirectories min_size: Minimum file size to consider (bytes) exclude_patterns: Optional patterns to exclude max_files: Maximum number of files to scan Returns: Dictionary mapping file hash to list of identical files Raises: ValueError: If root_path is outside allowed directories """ import hashlib abs_path, allowed = await self.validator.validate_path(root_path) if not allowed: raise ValueError(f"Path outside allowed directories: {root_path}") if not abs_path.is_dir(): raise ValueError(f"Not a directory: {root_path}") # Compile exclude patterns if provided exclude_regexes = [] if exclude_patterns: for exclude in exclude_patterns: try: exclude_regexes.append(re.compile(exclude)) except re.error: logger.warning(f"Invalid exclude pattern: {exclude}") # First, group files by size size_groups: Dict[int, List[Path]] = {} files_processed = 0 async def scan_for_sizes(dir_path: Path) -> None: nonlocal files_processed if files_processed >= max_files: return try: entries = await anyio.to_thread.run_sync(list, dir_path.iterdir()) for entry in entries: if files_processed >= max_files: return # Skip if matched by exclude pattern path_str = str(entry) excluded = False for exclude_re in exclude_regexes: if exclude_re.search(path_str): excluded = True break if excluded: continue try: if entry.is_file(): size = entry.stat().st_size if size >= min_size: if size not in size_groups: size_groups[size] = [] size_groups[size].append(entry) files_processed += 1 elif entry.is_dir() and recursive: # Check if this path is still allowed ( entry_abs, entry_allowed, ) = await self.validator.validate_path(entry) if entry_allowed: await scan_for_sizes(entry) except (PermissionError, FileNotFoundError): # Skip entries we can't access pass except (PermissionError, FileNotFoundError): # Skip directories we can't access pass await scan_for_sizes(abs_path) # Now, for each size group with multiple files, compute and compare hashes duplicates: Dict[str, List[str]] = {} for size, files in size_groups.items(): if len(files) < 2: continue # Group files by hash hash_groups: Dict[str, List[Path]] = {} for file_path in files: try: # Compute file hash file_bytes = await anyio.to_thread.run_sync(file_path.read_bytes) file_hash = hashlib.md5(file_bytes).hexdigest() if file_hash not in hash_groups: hash_groups[file_hash] = [] hash_groups[file_hash].append(file_path) except (PermissionError, FileNotFoundError): # Skip files we can't access pass # Add duplicate groups to results for file_hash, hash_files in hash_groups.items(): if len(hash_files) >= 2: duplicates[file_hash] = [str(f) for f in hash_files] return duplicates
- mcp_filesystem/server.py:515-564 (registration)MCP tool registration via @mcp.tool() decorator. Wrapper function that validates inputs, delegates to advanced component's find_duplicate_files, handles formatting (text/JSON), and error handling.@mcp.tool() async def find_duplicate_files( path: str, ctx: Context, recursive: bool = True, min_size: int = 1, exclude_patterns: Optional[List[str]] = None, max_files: int = 1000, format: str = "text", ) -> str: """Find duplicate files by comparing file sizes and contents. Args: path: Starting directory recursive: Whether to search subdirectories min_size: Minimum file size to consider (bytes) exclude_patterns: Optional patterns to exclude max_files: Maximum number of files to scan format: Output format ('text' or 'json') ctx: MCP context Returns: Duplicate file information """ try: components = get_components() duplicates = await components["advanced"].find_duplicate_files( path, recursive, min_size, exclude_patterns, max_files ) if format.lower() == "json": return json.dumps(duplicates, indent=2) # Format as text if not duplicates: return "No duplicate files found" lines = [] for file_hash, files in duplicates.items(): lines.append(f"Hash: {file_hash}") for file_path in files: lines.append(f" {file_path}") lines.append("") return f"Found {len(duplicates)} sets of duplicate files:\n\n" + "\n".join( lines ) except Exception as e: return f"Error finding duplicate files: {str(e)}"