Code MCP

#!/usr/bin/env python3 import asyncio import logging import os from ..access import check_edit_permission from ..common import normalize_file_path from ..git import is_git_repository __all__ = [ "ls_directory", "list_directory", "skip", "TreeNode", "create_file_tree", "print_tree", "MAX_FILES", ] MAX_FILES = 1000 TRUNCATED_MESSAGE = f"There are more than {MAX_FILES} files in the directory. Use more specific paths to explore nested directories. The first {MAX_FILES} files and directories are included below:\n\n" async def ls_directory(directory_path: str) -> str: """List the contents of a directory. Args: directory_path: The absolute path to the directory to list Returns: A formatted string representation of the directory contents, or an error message """ try: # Normalize the directory path full_directory_path = normalize_file_path(directory_path) # Validate the directory path if not os.path.exists(full_directory_path): return f"Error: Directory does not exist: {directory_path}" if not os.path.isdir(full_directory_path): return f"Error: Path is not a directory: {directory_path}" # Safety check: Verify the directory is within a git repository with codemcp.toml if not await is_git_repository(full_directory_path): return f"Error: Directory is not in a Git repository: {directory_path}" # Check edit permission (which verifies codemcp.toml exists) is_permitted, permission_message = await check_edit_permission( full_directory_path ) if not is_permitted: return f"Error: {permission_message}" # Get the directory contents asynchronously results = await list_directory(full_directory_path) # Sort the results results.sort() # Create a file tree and print it tree = create_file_tree(results) tree_output = print_tree(tree, cwd=full_directory_path) # Return the result with truncation message if needed if len(results) < MAX_FILES: return tree_output return f"{TRUNCATED_MESSAGE}{tree_output}" except Exception as e: logging.warning( f"Exception suppressed during directory listing: {e!s}", exc_info=True ) return f"Error listing directory: {e!s}" async def list_directory(initial_path: str) -> list[str]: """List all files and directories recursively. Args: initial_path: The path to start listing from Returns: A list of relative paths to files and directories """ results = [] loop = asyncio.get_event_loop() # Use a function to perform the directory listing asynchronously async def list_dir_async(): queue = [initial_path] while queue and len(results) <= MAX_FILES: path = queue.pop(0) if skip(path) and path != initial_path: continue if path != initial_path: # Add directories with trailing slash rel_path = os.path.relpath(path, initial_path) if os.path.isdir(path): rel_path = f"{rel_path}{os.sep}" results.append(rel_path) if os.path.isdir(path): try: # Get directory listing asynchronously children = await loop.run_in_executor( None, lambda: os.listdir(path) ) for child in children: child_path = os.path.join(path, child) if os.path.isdir(child_path): queue.append(child_path) elif not skip(child_path): rel_path = os.path.relpath(child_path, initial_path) results.append(rel_path) if len(results) > MAX_FILES: return results except (PermissionError, OSError): # Skip directories we can't access continue return results return await list_dir_async() def skip(path: str) -> bool: """Determine if a path should be skipped. Args: path: The path to check Returns: True if the path should be skipped, False otherwise """ basename = os.path.basename(path) if path != "." and basename.startswith("."): return True if "__pycache__" in path: return True return False class TreeNode: """A node in a file tree.""" def __init__(self, name: str, path: str, node_type: str): self.name = name self.path = path self.type = node_type self.children = [] def create_file_tree(sorted_paths: list[str]) -> list[TreeNode]: """Create a file tree from a list of paths. Args: sorted_paths: A list of sorted relative paths Returns: A list of TreeNode objects representing the root of the tree """ root = [] for path in sorted_paths: parts = path.split(os.sep) current_level = root current_path = "" for i, part in enumerate(parts): if not part: # Skip empty parts (trailing slashes) continue current_path = os.path.join(current_path, part) if current_path else part is_last_part = i == len(parts) - 1 # Check if this node already exists at this level existing_node = None for node in current_level: if node.name == part: existing_node = node break if existing_node: current_level = existing_node.children else: # Create a new node node_type = ( "file" if is_last_part and not path.endswith(os.sep) else "directory" ) new_node = TreeNode(part, current_path, node_type) current_level.append(new_node) current_level = new_node.children return root def print_tree( tree: list[TreeNode], level: int = 0, prefix: str = "", cwd: str = "", ) -> str: """Print a file tree. Args: tree: A list of TreeNode objects level: The current level in the tree prefix: The prefix to use for indentation cwd: The current working directory Returns: A formatted string representation of the tree """ result = "" # Add absolute path at root level if level == 0: result += f"- {cwd}{os.sep}\n" prefix = " " for node in tree: # Add the current node to the result node_suffix = f"{os.sep}" if node.type == "directory" else "" result += f"{prefix}{'-'} {node.name}{node_suffix}\n" # Recursively print children if node.children: result += print_tree(node.children, level + 1, f"{prefix} ", cwd) return result