Skip to main content
Glama
safurrier

MCP Filesystem Server

find_duplicate_files

Identify duplicate files by comparing sizes and contents within a directory. Specify a starting path, search recursively, set size filters, and output results in text or JSON format.

Instructions

Find duplicate files by comparing file sizes and contents.

Args: path: Starting directory recursive: Whether to search subdirectories min_size: Minimum file size to consider (bytes) exclude_patterns: Optional patterns to exclude max_files: Maximum number of files to scan format: Output format ('text' or 'json') ctx: MCP context Returns: Duplicate file information

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pathYes
recursiveNo
min_sizeNo
exclude_patternsNo
max_filesNo
formatNotext

Implementation Reference

  • Core handler implementing the duplicate detection: validates path, scans directory recursively, groups files by size, computes MD5 hashes for potential duplicates, returns dict of hash to file paths.
    async def find_duplicate_files( self, root_path: Union[str, Path], recursive: bool = True, min_size: int = 1, exclude_patterns: Optional[List[str]] = None, max_files: int = 1000, ) -> Dict[str, List[str]]: """Find duplicate files by comparing file sizes and contents. Args: root_path: Starting directory recursive: Whether to search subdirectories min_size: Minimum file size to consider (bytes) exclude_patterns: Optional patterns to exclude max_files: Maximum number of files to scan Returns: Dictionary mapping file hash to list of identical files Raises: ValueError: If root_path is outside allowed directories """ import hashlib abs_path, allowed = await self.validator.validate_path(root_path) if not allowed: raise ValueError(f"Path outside allowed directories: {root_path}") if not abs_path.is_dir(): raise ValueError(f"Not a directory: {root_path}") # Compile exclude patterns if provided exclude_regexes = [] if exclude_patterns: for exclude in exclude_patterns: try: exclude_regexes.append(re.compile(exclude)) except re.error: logger.warning(f"Invalid exclude pattern: {exclude}") # First, group files by size size_groups: Dict[int, List[Path]] = {} files_processed = 0 async def scan_for_sizes(dir_path: Path) -> None: nonlocal files_processed if files_processed >= max_files: return try: entries = await anyio.to_thread.run_sync(list, dir_path.iterdir()) for entry in entries: if files_processed >= max_files: return # Skip if matched by exclude pattern path_str = str(entry) excluded = False for exclude_re in exclude_regexes: if exclude_re.search(path_str): excluded = True break if excluded: continue try: if entry.is_file(): size = entry.stat().st_size if size >= min_size: if size not in size_groups: size_groups[size] = [] size_groups[size].append(entry) files_processed += 1 elif entry.is_dir() and recursive: # Check if this path is still allowed ( entry_abs, entry_allowed, ) = await self.validator.validate_path(entry) if entry_allowed: await scan_for_sizes(entry) except (PermissionError, FileNotFoundError): # Skip entries we can't access pass except (PermissionError, FileNotFoundError): # Skip directories we can't access pass await scan_for_sizes(abs_path) # Now, for each size group with multiple files, compute and compare hashes duplicates: Dict[str, List[str]] = {} for size, files in size_groups.items(): if len(files) < 2: continue # Group files by hash hash_groups: Dict[str, List[Path]] = {} for file_path in files: try: # Compute file hash file_bytes = await anyio.to_thread.run_sync(file_path.read_bytes) file_hash = hashlib.md5(file_bytes).hexdigest() if file_hash not in hash_groups: hash_groups[file_hash] = [] hash_groups[file_hash].append(file_path) except (PermissionError, FileNotFoundError): # Skip files we can't access pass # Add duplicate groups to results for file_hash, hash_files in hash_groups.items(): if len(hash_files) >= 2: duplicates[file_hash] = [str(f) for f in hash_files] return duplicates
  • MCP tool registration via @mcp.tool() decorator. Wrapper function that delegates to Advanced component's handler and formats results for MCP response (text or JSON). Defines tool schema via parameters and docstring.
    @mcp.tool() async def find_duplicate_files( path: str, ctx: Context, recursive: bool = True, min_size: int = 1, exclude_patterns: Optional[List[str]] = None, max_files: int = 1000, format: str = "text", ) -> str: """Find duplicate files by comparing file sizes and contents. Args: path: Starting directory recursive: Whether to search subdirectories min_size: Minimum file size to consider (bytes) exclude_patterns: Optional patterns to exclude max_files: Maximum number of files to scan format: Output format ('text' or 'json') ctx: MCP context Returns: Duplicate file information """ try: components = get_components() duplicates = await components["advanced"].find_duplicate_files( path, recursive, min_size, exclude_patterns, max_files ) if format.lower() == "json": return json.dumps(duplicates, indent=2) # Format as text if not duplicates: return "No duplicate files found" lines = [] for file_hash, files in duplicates.items(): lines.append(f"Hash: {file_hash}") for file_path in files: lines.append(f" {file_path}") lines.append("") return f"Found {len(duplicates)} sets of duplicate files:\n\n" + "\n".join( lines ) except Exception as e: return f"Error finding duplicate files: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/safurrier/mcp-filesystem'

If you have feedback or need assistance with the MCP directory API, please join our Discord server