Code MCP
by ezyang
- codemcp
#!/usr/bin/env python3
import logging
import os
from mcp.server.fastmcp import Context, FastMCP
from .tools.edit_file import edit_file_content
from .tools.grep import grep_files
from .tools.init_project import init_project
from .tools.ls import ls_directory
from .tools.read_file import read_file_content
from .tools.run_command import run_command
from .tools.write_file import write_file_content
# Initialize FastMCP server
mcp = FastMCP("codemcp")
# NB: If you edit this, also edit codemcp/tools/init_project.py
@mcp.tool()
async def codemcp(
ctx: Context,
subtool: str,
*,
path: str | None = None,
content: str | None = None,
old_string: str | None = None,
new_string: str | None = None,
offset: int | None = None,
limit: int | None = None,
description: str | None = None,
pattern: str | None = None,
include: str | None = None,
command: str | None = None,
arguments: list[str] | str | None = None,
old_str: str | None = None, # Added for backward compatibility
new_str: str | None = None, # Added for backward compatibility
) -> str:
"""If and only if the user explicitly asks you to initialize codemcp with
path, you should invoke this tool with arguments `InitProject
path`. This will return instructions which you should
IMMEDIATELY follow before continuing.
Arguments:
subtool: The subtool to run (InitProject, ...)
path: The path to the file or directory to operate on
... (there are other arguments which are documented later)
"""
# Define expected parameters for each subtool
expected_params = {
"ReadFile": {"path", "offset", "limit"},
"WriteFile": {"path", "content", "description"},
"EditFile": {
"path",
"old_string",
"new_string",
"description",
"old_str",
"new_str",
},
"LS": {"path"},
"InitProject": {"path"},
"RunCommand": {"path", "command", "arguments"},
"Grep": {"pattern", "path", "include"},
}
# Check if subtool exists
if subtool not in expected_params:
return f"Unknown subtool: {subtool}. Available subtools: {', '.join(expected_params.keys())}"
# Handle string arguments - convert to a list with one element
if isinstance(arguments, str):
arguments = [arguments]
# Get all provided non-None parameters
provided_params = {
param: value
for param, value in {
"path": path,
"content": content,
"old_string": old_string,
"new_string": new_string,
"offset": offset,
"limit": limit,
"description": description,
"pattern": pattern,
"include": include,
"command": command,
"arguments": arguments,
# Include backward compatibility parameters
"old_str": old_str,
"new_str": new_str,
}.items()
if value is not None
}
# Check for unexpected parameters
unexpected_params = set(provided_params.keys()) - expected_params[subtool]
if unexpected_params:
return f"Error: Unexpected parameters for {subtool} subtool: {', '.join(unexpected_params)}"
# Now handle each subtool with its expected parameters
if subtool == "ReadFile":
if path is None:
return "Error: path is required for ReadFile subtool"
return read_file_content(path, offset, limit)
if subtool == "WriteFile":
if path is None:
return "Error: path is required for WriteFile subtool"
if description is None:
return "Error: description is required for WriteFile subtool"
content_str = content or ""
return write_file_content(path, content_str, description)
if subtool == "EditFile":
if path is None:
return "Error: path is required for EditFile subtool"
if description is None:
return "Error: description is required for EditFile subtool"
if old_string is None and old_str is None:
# TODO: I want telemetry to tell me when this occurs.
return "Error: Either old_string or old_str is required for EditFile subtool (use empty string for new file creation)"
# Accept either old_string or old_str (prefer old_string if both are provided)
old_content = old_string or old_str or ""
# Accept either new_string or new_str (prefer new_string if both are provided)
new_content = new_string or new_str or ""
return edit_file_content(path, old_content, new_content, None, description)
if subtool == "LS":
if path is None:
return "Error: path is required for LS subtool"
return ls_directory(path)
if subtool == "InitProject":
if path is None:
return "Error: path is required for InitProject subtool"
return init_project(path)
if subtool == "RunCommand":
# When is something a command as opposed to a subtool? They are
# basically the same thing, but commands are always USER defined.
# This means we shove them all in RunCommand so they are guaranteed
# not to conflict with codemcp's subtools.
if path is None:
return "Error: path is required for RunCommand subtool"
if command is None:
return "Error: command is required for RunCommand subtool"
return run_command(path, command, arguments)
if subtool == "Grep":
if pattern is None:
return "Error: pattern is required for Grep subtool"
if path is None:
return "Error: path is required for Grep subtool"
try:
result = grep_files(pattern, path, include)
return result.get(
"resultForAssistant",
f"Found {result.get('numFiles', 0)} file(s)",
)
except Exception as e:
logging.warning(
f"Exception suppressed in grep subtool: {e!s}", exc_info=True
)
return f"Error executing grep: {e!s}"
def configure_logging(log_file="codemcp.log"):
"""Configure logging to write to both a file and the console.
The log level is determined from the configuration file ~/.codemcprc.
It can be overridden by setting the DESKAID_DEBUG environment variable.
Example: DESKAID_DEBUG=1 python -m codemcp
By default, logs from the 'mcp' module are filtered out unless in debug mode.
"""
from .config import get_logger_verbosity
log_dir = os.path.join(os.path.expanduser("~"), ".codemcp")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, log_file)
# Get log level from config, with environment variable override
log_level_str = os.environ.get("DESKAID_DEBUG_LEVEL") or get_logger_verbosity()
# Map string log level to logging constants
log_level_map = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
# Convert string to logging level, default to INFO if invalid
log_level = log_level_map.get(log_level_str.upper(), logging.INFO)
# Force DEBUG level if DESKAID_DEBUG is set (for backward compatibility)
debug_mode = False
if os.environ.get("DESKAID_DEBUG"):
log_level = logging.DEBUG
debug_mode = True
# Create a root logger
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
# Clear any existing handlers
if root_logger.hasHandlers():
root_logger.handlers.clear()
# Create file handler
file_handler = logging.FileHandler(log_path)
file_handler.setLevel(log_level)
# Create console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
# Create formatter and add it to the handlers
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Set up filter to exclude logs from 'mcp' module unless in debug mode
class ModuleFilter(logging.Filter):
def filter(self, record):
# Allow all logs in debug mode, otherwise filter 'mcp' module
if debug_mode or not record.name.startswith("mcp"):
return True
return False
module_filter = ModuleFilter()
file_handler.addFilter(module_filter)
console_handler.addFilter(module_filter)
# Add the handlers to the root logger
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
logging.info(f"Logging configured. Log file: {log_path}")
logging.info(f"Log level set to: {logging.getLevelName(log_level)}")
if not debug_mode:
logging.info("Logs from 'mcp' module are being filtered")
def run():
"""Run the MCP server."""
configure_logging()
mcp.run()