Code MCP
by ezyang
- codemcp
- tools
#!/usr/bin/env python3
import logging
import os
import subprocess
import time
from typing import Any
from ..common import normalize_file_path
from ..git import is_git_repository
from ..shell import run_command
__all__ = [
"grep_files",
"git_grep",
"render_result_for_assistant",
"TOOL_NAME_FOR_PROMPT",
"DESCRIPTION",
]
# Define constants
MAX_RESULTS = 100
TOOL_NAME_FOR_PROMPT = "Grep"
DESCRIPTION = f"""
Searches for files containing a specified pattern (regular expression) using git grep.
Files with a match are returned, up to a maximum of {MAX_RESULTS} files.
Note that this tool only works inside git repositories.
Example:
grep "function.*hello" /path/to/repo # Find files containing functions with "hello" in their name
grep "console\\.log" /path/to/repo --include="*.js" # Find JS files with console.log statements
"""
async def git_grep(
pattern: str,
path: str | None = None,
include: str | None = None,
signal=None,
) -> list[str]:
"""Execute git grep to search for pattern in files.
Args:
pattern: The regular expression pattern to search for
path: The directory to search in (must be in a git repository)
include: Optional file pattern to filter the search
signal: Optional abort signal to terminate the subprocess
Returns:
A list of file paths with matches
"""
if path is None:
raise ValueError("Path must be provided for git grep")
# Normalize the directory path
absolute_path = normalize_file_path(path)
# Verify this is a git repository - this check uses the mocked version in tests
if not await is_git_repository(absolute_path):
raise ValueError(f"The provided path is not in a git repository: {path}")
# In non-test environment, verify the path exists and is a directory
if not os.environ.get("DESKAID_TESTING"):
# Check if path exists and is a directory
if not os.path.exists(absolute_path):
raise FileNotFoundError(f"Directory does not exist: {path}")
if not os.path.isdir(absolute_path):
raise NotADirectoryError(f"Path is not a directory: {path}")
# Build git grep command
# -l: list file names only
# -i: case insensitive matching
args = ["git", "grep", "-li", pattern]
# Add file pattern if specified
if include:
args.extend(["--", include])
logging.debug(f"Executing git grep command: {' '.join(args)}")
try:
# Execute git grep command asynchronously
# Use explicit parameters to avoid confusion in mocking
result = await run_command(
cmd=args,
cwd=absolute_path,
capture_output=True,
text=True,
check=False, # Don't raise exception if git grep doesn't find matches
)
# git grep returns exit code 1 when no matches are found, which is normal
if result.returncode not in [0, 1]:
logging.error(
f"git grep failed with exit code {result.returncode}: {result.stderr}",
)
raise subprocess.SubprocessError(f"git grep failed: {result.stderr}")
# Process results - split by newline and filter empty lines
matches = [line.strip() for line in result.stdout.split("\n") if line.strip()]
# Convert to absolute paths
matches = [os.path.join(absolute_path, match) for match in matches]
return matches
except subprocess.SubprocessError as e:
logging.exception(f"Error executing git grep: {e!s}")
raise
def render_result_for_assistant(output: dict[str, Any]) -> str:
"""Render the results in a format suitable for the assistant.
Args:
output: The grep results dictionary
Returns:
A formatted string representation of the results
"""
num_files = output.get("numFiles", 0)
filenames = output.get("filenames", [])
if num_files == 0:
return "No files found"
result = f"Found {num_files} file{'' if num_files == 1 else 's'}\n{os.linesep.join(filenames[:MAX_RESULTS])}"
if num_files > MAX_RESULTS:
result += (
"\n(Results are truncated. Consider using a more specific path or pattern.)"
)
return result
async def grep_files(
pattern: str,
path: str | None = None,
include: str | None = None,
signal=None,
) -> dict[str, Any]:
"""Search for a pattern in files within a directory.
Args:
pattern: The regular expression pattern to search for
path: The directory to search in (must be in a git repository)
include: Optional file pattern to filter the search
signal: Optional abort signal to terminate the subprocess
Returns:
A dictionary with execution stats and matched files
"""
start_time = time.time()
try:
# Execute git grep asynchronously
matches = await git_grep(pattern, path, include, signal)
# Sort matches
try:
# First try sorting by modification time
import asyncio
loop = asyncio.get_event_loop()
# Get file stats asynchronously
stats = []
for match in matches:
stat = await loop.run_in_executor(
None, lambda m=match: os.stat(m) if os.path.exists(m) else None
)
stats.append(stat)
matches_with_stats = list(zip(matches, stats, strict=False))
# In tests, sort by filename for deterministic results
if os.environ.get("NODE_ENV") == "test":
matches_with_stats.sort(key=lambda x: x[0])
else:
# Sort by modification time (newest first), with filename as tiebreaker
matches_with_stats.sort(
key=lambda x: (-(x[1].st_mtime if x[1] else 0), x[0])
)
matches = [match for match, _ in matches_with_stats]
except Exception as e:
# Fall back to sorting by name if there's an error
logging.debug(
f"Error sorting by modification time, falling back to name sort: {e!s}",
)
matches.sort()
# Calculate execution time
execution_time = int(
(time.time() - start_time) * 1000,
) # Convert to milliseconds
# Prepare output
output = {
"filenames": matches[:MAX_RESULTS],
"durationMs": execution_time,
"numFiles": len(matches),
}
# Add formatted result for assistant
output["resultForAssistant"] = render_result_for_assistant(output)
return output
except Exception as e:
# Calculate execution time even on error
execution_time = int((time.time() - start_time) * 1000)
# Return empty results with error info
error_output = {
"filenames": [],
"durationMs": execution_time,
"numFiles": 0,
"error": str(e),
"resultForAssistant": f"Error: {e!s}",
}
return error_output