Skip to main content
Glama
server.py60.9 kB
import hashlib import logging import os import re import subprocess import tempfile import ast import tokenize import fnmatch import io import datetime import json import inspect import functools from typing import Optional, Dict, Any, Union, Literal import argparse import black from black.report import NothingChanged from fastmcp import FastMCP import duckdb logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("text_editor") def calculate_id(text: str, start: int = None, end: int = None) -> str: """ Calculate a unique ID for content verification based on the text content. The ID is formed by combining a line prefix (if line numbers are provided) with a truncated SHA-256 id of the content. This allows quick verification that content hasn't changed between operations. Args: text (str): Content to generate ID for start (Optional[int]): Starting line number for the content end (Optional[int]): Ending line number for the content Returns: str: ID string in format: [LinePrefix]-[Truncatedid] Example: "L10-15-a7" for content spanning lines 10-15 Example: "L5-b3" for content on line 5 only """ prefix = "" if start and end: prefix = f"L{start}-{end}-" if start == end: prefix = f"L{start}-" return f"{prefix}{hashlib.sha256(text.encode()).hexdigest()[:2]}" def generate_diff_preview( original_lines: list, modified_lines: list, start: int, end: int ) -> dict: """ Generate a diff preview comparing original and modified content. Args: original_lines (list): List of original file lines modified_lines (list): List of modified file lines start (int): Start line number of the edit (1-based) end (int): End line number of the edit (1-based) Returns: dict: A dictionary with keys prefixed with + or - to indicate additions/deletions Format: [("-1", "removed line"), ("+1", "added line")] """ diffs = [] # Add some context lines before the change context_start = max(0, start - 1 - 3) # 3 lines of context before for i in range(context_start, start - 1): diffs.append((i + 1, original_lines[i].rstrip())) # Show removed lines for i in range(start - 1, end): diffs.append((f"-{i+1}", original_lines[i].rstrip())) # Show added lines new_content = "".join( modified_lines[ start - 1 : start - 1 + len(modified_lines) - len(original_lines) + (end - (start - 1)) ] ) new_lines = new_content.splitlines() for i, line in enumerate(new_lines): diffs.append((f"+{start+i}", line)) context_end = min(len(original_lines), end + 3) # 3 lines of context after for i in range(end, context_end): diffs.append((i + 1, original_lines[i].rstrip())) return { "diff_lines": diffs, } def create_logging_tool_decorator(original_decorator, log_callback): """ Create a wrapper around the FastMCP tool decorator that logs tool usage. Args: original_decorator: The original FastMCP tool decorator log_callback: A callback function that will be called with (tool_name, args_dict, response) Returns: A wrapped decorator function that logs tool usage """ def tool_decorator_with_logging(*args, **kwargs): # Get the original decorator with args wrapped_decorator = original_decorator(*args, **kwargs) def wrapper(func): # Create our logging wrapper @functools.wraps(func) # Preserve func's metadata async def logged_func(*func_args, **func_kwargs): tool_name = func.__name__ # Convert args to dict for logging args_dict = {} if func_args and len(func_args) > 0: # Get parameter names from function signature sig = inspect.signature(func) param_names = list(sig.parameters.keys()) # Skip self parameter if it exists if param_names and param_names[0] == "self": param_names = param_names[1:] for i, arg in enumerate(func_args): if i < len(param_names): args_dict[param_names[i]] = ( str(arg) if isinstance(arg, (bytes, bytearray)) else arg ) args_dict.update(func_kwargs) # Call the original function response = await func(*func_args, **func_kwargs) # Log the tool usage with the response log_callback(tool_name, args_dict, response) # Return the response return response # Apply the original decorator to our logged function wrapped_func = wrapped_decorator(logged_func) return wrapped_func return wrapper return tool_decorator_with_logging class TextEditorServer: """ A server implementation for a text editor application using FastMCP. This class provides a comprehensive set of tools for manipulating text files in a controlled and safe manner. It implements a structured editing workflow with content verification to prevent conflicts during editing operations. Primary features: - File management: Setting active file, creating new files, and deleting files - Content access: Reading full file content or specific line ranges - Text search: Finding lines matching specific text patterns - Safe editing: Two-step edit process with diff preview and confirmation - Syntax validation: Automatic syntax checking for Python and JavaScript files - Protected files: Prevent access to sensitive files via pattern matching The server uses content hashing to generate unique IDs that ensure file content integrity during editing operations. All tools are registered with FastMCP for remote procedure calling. Edit workflow: 1. Select content range with the select() tool to identify lines for editing 2. Propose changes with overwrite() to generate a diff preview 3. Use confirm() to apply or cancel() to discard the pending modifications Attributes: mcp (FastMCP): The MCP server instance for handling tool registrations max_select_lines (int): Maximum number of lines that can be edited with ID verification enable_js_syntax_check (bool): Whether JavaScript syntax checking is enabled protected_paths (list): List of file patterns and paths that are restricted from access current_file_path (str, optional): Path to the currently active file selected_start (int, optional): Start line of the current selection selected_end (int, optional): End line of the current selection selected_id (str, optional): ID of the current selection for verification pending_modified_lines (list, optional): Pending modified lines for preview before committing pending_diff (dict, optional): Diff preview of pending changes """ def __init__(self): # Initialize MCP server self.mcp = FastMCP("text-editor") # Initialize DuckDB for usage statistics if enabled self.usage_stats_enabled = os.getenv("DUCKDB_USAGE_STATS", "0").lower() in [ "1", "true", "yes", ] if self.usage_stats_enabled: self.stats_db_path = os.getenv("STATS_DB_PATH", "text_editor_stats.duckdb") self._init_stats_db() # Replace the original tool decorator with our wrapper # Making sure we preserve all the original functionality original_tool = self.mcp.tool logger.debug( { "message": f"Setting up logging tool decorator using {self.stats_db_path}" } ) self.mcp.tool = create_logging_tool_decorator( original_tool, self._log_tool_usage ) logger.debug({"msg": "Logging tool decorator set up complete"}) self.max_select_lines = int(os.getenv("MAX_SELECT_LINES", "50")) self.enable_js_syntax_check = os.getenv( "ENABLE_JS_SYNTAX_CHECK", "1" ).lower() in ["1", "true", "yes"] self.fail_on_python_syntax_error = os.getenv( "FAIL_ON_PYTHON_SYNTAX_ERROR", "1" ).lower() in ["1", "true", "yes"] self.fail_on_js_syntax_error = os.getenv( "FAIL_ON_JS_SYNTAX_ERROR", "0" ).lower() in ["1", "true", "yes"] self.protected_paths = ( os.getenv("PROTECTED_PATHS", "").split(",") if os.getenv("PROTECTED_PATHS") else [] ) self.current_file_path = None self.selected_start = None self.selected_end = None self.selected_id = None self.pending_modified_lines = None self.pending_diff = None self.python_venv = os.getenv("PYTHON_VENV") self.register_tools() def _init_stats_db(self): """Initialize the DuckDB database for storing tool usage statistics.""" logger.debug({"msg": f"Initializing stats database at {self.stats_db_path}"}) try: # Connect to DuckDB and create the table if it doesn't exist with duckdb.connect(self.stats_db_path) as conn: logger.debug({"msg": "Connected to DuckDB for initialization"}) conn.execute(""" CREATE TABLE IF NOT EXISTS tool_usage ( tool_name VARCHAR, args JSON, response JSON, timestamp TIMESTAMP, current_file VARCHAR, request_id VARCHAR, client_id VARCHAR ) """) conn.commit() logger.debug({"msg": "Database tables initialized successfully"}) except Exception as e: logger.debug({"msg": f"Error initializing stats database: {str(e)}"}) def _log_tool_usage(self, tool_name: str, args: dict, response=None): """ Log tool usage to the DuckDB database if stats are enabled. This function is called by the decorator wrapper for each tool invocation. Args: tool_name (str): Name of the tool being used args (dict): Arguments passed to the tool response (dict, optional): Response returned by the tool """ # Skip if stats are disabled if not hasattr(self, "usage_stats_enabled") or not self.usage_stats_enabled: return logger.debug({"message": f"Logging tool usage: {tool_name}"}) client_id = None try: # Get request ID if available request_id = None if hasattr(self.mcp, "_mcp_server") and hasattr( self.mcp._mcp_server, "request_context" ): request_id = getattr( self.mcp._mcp_server.request_context, "request_id", None ) if hasattr(self.mcp, "_mcp_server") and hasattr( self.mcp._mcp_server, "request_context" ): # Access client_id from meta if available if ( hasattr(self.mcp._mcp_server.request_context, "meta") and self.mcp._mcp_server.request_context.meta ): client_id = getattr( self.mcp._mcp_server.request_context.meta, "client_id", None ) # Safely convert args to serializable format safe_args = {} for key, value in args.items(): # Skip large objects and convert non-serializable objects to strings if ( hasattr(value, "__len__") and not isinstance(value, (str, dict, list, tuple)) and len(value) > 1000 ): safe_args[key] = f"<{type(value).__name__} of length {len(value)}>" else: try: # Test if value is JSON serializable json.dumps({key: value}) safe_args[key] = value except (TypeError, OverflowError): # If not serializable, convert to string representation safe_args[key] = f"<{type(value).__name__}>" # Format arguments as JSON args_json = json.dumps(safe_args) # Process response - since we're using JSON RPC, responses should already be serializable response_json = None if response is not None: try: response_json = json.dumps(response) except (TypeError, OverflowError): # Handle edge cases where something non-serializable might have been returned logger.debug( { "message": f"Non-serializable response received from {tool_name}, converting to string representation" } ) if isinstance(response, dict): # For dictionaries, process each value separately safe_response = {} for key, value in response.items(): try: json.dumps({key: value}) safe_response[key] = value except (TypeError, OverflowError): safe_response[key] = f"<{type(value).__name__}>" response_json = json.dumps(safe_response) else: # For non-dict types, store a basic representation response_json = json.dumps({"result": str(response)}) logger.debug( {"msg": f"Attempting to connect to DuckDB at {self.stats_db_path}"} ) try: with duckdb.connect(self.stats_db_path) as conn: logger.debug({"msg": f"Connected to DuckDB successfully"}) conn.execute( """ INSERT INTO tool_usage (tool_name, args, response, timestamp, current_file, request_id, client_id) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( tool_name, args_json, response_json, datetime.datetime.now(), self.current_file_path, request_id, client_id, ), ) conn.commit() logger.debug({"msg": f"Insert completed successfully"}) except Exception as e: logger.debug({"msg": f"DuckDB error: {str(e)}"}) except Exception as e: logger.debug({"msg": f"Error logging tool usage: {str(e)}"}) def register_tools(self): @self.mcp.tool() async def set_file(filepath: str) -> str: """ Set the current file to work with. This is always the first step in the workflow. You must set a file before you can use other tools like read, select etc. """ if not os.path.isfile(filepath): return f"Error: File not found at '{filepath}'" # Check if the file path matches any of the protected paths for pattern in self.protected_paths: pattern = pattern.strip() if not pattern: continue # Check for absolute path match if filepath == pattern: return f"Error: Access to '{filepath}' is denied due to PROTECTED_PATHS configuration" # Check for glob pattern match (e.g., *.env, .env*, etc.) if "*" in pattern: # First try matching the full path if fnmatch.fnmatch(filepath, pattern): return f"Error: Access to '{filepath}' is denied due to PROTECTED_PATHS configuration (matches pattern '{pattern}')" # Then try matching just the basename basename = os.path.basename(filepath) if fnmatch.fnmatch(basename, pattern): return f"Error: Access to '{filepath}' is denied due to PROTECTED_PATHS configuration (matches pattern '{pattern}')" self.current_file_path = filepath return f"File set to: '{filepath}'" @self.mcp.tool() async def skim() -> Dict[str, Any]: """ Read text from the current file, truncated to the first `SKIM_MAX_LINES` lines. Returns: dict: lines, total_lines, max_select_lines """ if self.current_file_path is None: return {"error": "No file path is set. Use set_file first."} with open(self.current_file_path, "r", encoding="utf-8") as file: lines = file.readlines() formatted_lines = [] max_lines_to_show = int(os.getenv("SKIM_MAX_LINES", "500")) lines_to_process = lines[:max_lines_to_show] for i, line in enumerate(lines_to_process, 1): formatted_lines.append((i, line.rstrip())) result = { "lines": formatted_lines, "total_lines": len(lines), "max_select_lines": self.max_select_lines, } # Add hint if file was truncated if len(lines) > max_lines_to_show: result["truncated"] = True result["hint"] = ( f"File has {len(lines)} total lines. Only showing first {max_lines_to_show} lines. Use `read` to view specific line ranges or `find_line` to search for content in the remaining lines." ) return result @self.mcp.tool() async def read(start: int, end: int) -> Dict[str, Any]: """ Read lines from the current file from start line to end line, returning them in a dictionary like {"lines":[[1,"text on first line"],[2,"text on second line"]]}. This makes it easier to find the precise lines to select for editing. Args: start (int, optional): Start line number end (int, optional): End line number Returns: dict: lines, start_line, end_line """ result = {} if self.current_file_path is None: return {"error": "No file path is set. Use set_file first."} try: with open(self.current_file_path, "r", encoding="utf-8") as file: lines = file.readlines() if start < 1: return {"error": "start must be at least 1"} if end > len(lines): end = len(lines) if start > end: return { "error": f"{start=} cannot be greater than {end=}. {len(lines)=}" } selected_lines = lines[start - 1 : end] formatted_lines = [] for i, line in enumerate(selected_lines, start): formatted_lines.append((i, line.rstrip())) result["lines"] = formatted_lines result["start_line"] = start result["end_line"] = end return result except Exception as e: return {"error": f"Error reading file: {str(e)}"} @self.mcp.tool() async def select( start: int, end: int, ) -> Dict[str, Any]: """ Select lines from for subsequent overwrite operation. Args: start (int): Start line number (1-based) end (int): End line number (1-based) Returns: dict: status, lines, start, end, id, line_count, message """ if self.current_file_path is None: return {"error": "No file path is set. Use set_file first."} try: with open(self.current_file_path, "r", encoding="utf-8") as file: lines = file.readlines() if start < 1: return {"error": "start must be at least 1."} if end > len(lines): end = len(lines) if start > end: return {"error": "start cannot be greater than end."} if end - start + 1 > self.max_select_lines: return { "error": f"Cannot select more than {self.max_select_lines} lines at once (attempted {end - start + 1} lines)." } selected_lines = lines[start - 1 : end] text = "".join(selected_lines) current_id = calculate_id(text, start, end) self.selected_start = start self.selected_end = end self.selected_id = current_id # Convert selected lines to a list without line numbers lines_content = [line.rstrip() for line in selected_lines] result = { "status": "success", "lines": lines_content, "start": start, "end": end, "id": current_id, "line_count": len(selected_lines), "message": f"Selected lines {start} to {end} for editing.", } return result except Exception as e: return {"error": f"Error selecting lines: {str(e)}"} @self.mcp.tool() async def overwrite( new_lines: dict, ) -> Dict[str, Any]: """ Overwrite the selected lines with new text. Amount of new lines can differ from the original selection Args: new_lines (dict): Example: {"lines":["line one", "second line"]} Returns: dict: Diff preview showing the proposed changes, and any syntax errors for JS or Python """ new_lines = new_lines.get("lines") if self.current_file_path is None: return {"error": "No file path is set. Use set_file first."} if ( self.selected_start is None or self.selected_end is None or self.selected_id is None ): return {"error": "No selection has been made. Use select tool first."} try: with open(self.current_file_path, "r", encoding="utf-8") as file: lines = file.readlines() except Exception as e: return {"error": f"Error reading file: {str(e)}"} start = self.selected_start end = self.selected_end id = self.selected_id current_content = "".join(lines[start - 1 : end]) computed_id = calculate_id(current_content, start, end) if computed_id != id: return { "error": "id verification failed. The content may have been modified since you last read it." } processed_new_lines = [] for line in new_lines: if not line.endswith("\n"): processed_new_lines.append(line + "\n") else: processed_new_lines.append(line) if ( processed_new_lines and end < len(lines) and not processed_new_lines[-1].endswith("\n") ): processed_new_lines[-1] += "\n" before = lines[: start - 1] after = lines[end:] modified_lines = before + processed_new_lines + after diff_result = generate_diff_preview(lines, modified_lines, start, end) error = None if self.current_file_path.endswith(".py"): full_content = "".join(modified_lines) try: black.format_file_contents( full_content, fast=True, mode=black.Mode(), ) except black.InvalidInput as e: error = { "error": f"Python syntax error: {str(e)}", "diff_lines": diff_result, "auto_cancel": self.fail_on_python_syntax_error, } except Exception as e: if not isinstance(e, NothingChanged): error = { "error": f"Black check raised {type(e)}: {str(e)}", "diff_lines": diff_result, } elif self.enable_js_syntax_check and self.current_file_path.endswith( (".jsx", ".js") ): with tempfile.NamedTemporaryFile( mode="w", suffix=".jsx", delete=False ) as temp: temp_path = temp.name temp.writelines(modified_lines) try: presets = ( ["@babel/preset-react"] if self.current_file_path.endswith(".jsx") else ["@babel/preset-env"] ) cmd = [ "npx", "babel", "--presets", ",".join(presets), "--no-babelrc", temp_path, "--out-file", "/dev/null", # Output to nowhere, we just want to check syntax ] # Execute Babel to transform (which validates syntax) process = subprocess.run(cmd, capture_output=True, text=True) if process.returncode != 0: error_output = process.stderr filtered_lines = [] for line in error_output.split("\n"): if "node_modules/@babel" not in line: filtered_lines.append(line) filtered_error = "\n".join(filtered_lines).strip() if not filtered_error: filtered_error = "JavaScript syntax error detected" error = { "error": f"JavaScript syntax error: {filtered_error}", "diff_lines": diff_result, "auto_cancel": self.fail_on_js_syntax_error, } except Exception as e: os.unlink(temp_path) error = { "error": f"Error checking JavaScript syntax: {str(e)}", "diff_lines": diff_result, } finally: if os.path.exists(temp_path): os.unlink(temp_path) self.pending_modified_lines = modified_lines self.pending_diff = diff_result result = { "status": "preview", "message": "Changes ready to apply. Use confirm() to apply or cancel() to discard.", "diff_lines": diff_result["diff_lines"], "start": start, "end": end, } if error: result.update(error) if error.get("auto_cancel", False): self.pending_modified_lines = None self.pending_diff = None result["status"] = "auto_cancelled" result["message"] = ( "Changes automatically cancelled due to syntax error. The lines are still selected." ) else: result["message"] += ( " It looks like there is a syntax error, but you can choose to fix it in the subsequent edits." ) return result # Later on this tool should be shown conditionally, however, most clients don't support this functionality yet. @self.mcp.tool() async def confirm() -> Dict[str, Any]: """Confirm action""" if self.pending_modified_lines is None or self.pending_diff is None: return {"error": "No pending changes to apply. Use overwrite first."} try: with open(self.current_file_path, "w", encoding="utf-8") as file: file.writelines(self.pending_modified_lines) result = { "status": "success", "message": f"Changes applied successfully.", } self.selected_start = None self.selected_end = None self.selected_id = None self.pending_modified_lines = None self.pending_diff = None return result except Exception as e: return {"error": f"Error writing to file: {str(e)}"} @self.mcp.tool() async def cancel() -> Dict[str, Any]: """ Cancel action """ if self.pending_modified_lines is None or self.pending_diff is None: return {"error": "No pending changes to discard. Use overwrite first."} self.pending_modified_lines = None self.pending_diff = None return { "status": "success", "message": "Action cancelled.", } @self.mcp.tool() async def delete_file() -> Dict[str, Any]: """ Delete current file """ if self.current_file_path is None: return {"error": "No file path is set. Use set_file first."} try: if not os.path.exists(self.current_file_path): return {"error": f"File '{self.current_file_path}' does not exist."} os.remove(self.current_file_path) deleted_path = self.current_file_path self.current_file_path = None return { "status": "success", "message": f"File '{deleted_path}' was successfully deleted.", } except Exception as e: return {"error": f"Error deleting file: {str(e)}"} @self.mcp.tool() async def new_file(filepath: str) -> Dict[str, Any]: """ Creates a new file. After creating new file, the first line is automatically selected for editing. Automatically creates parent directories if they don't exist. Args: filepath (str): Path of the new file Returns: dict: Status message with selection info """ self.current_file_path = filepath if ( os.path.exists(self.current_file_path) and os.path.getsize(self.current_file_path) > 0 ): return { "error": "Cannot create new file. Current file exists and is not empty." } try: # Create parent directories if they don't exist directory = os.path.dirname(self.current_file_path) if directory: os.makedirs(directory, exist_ok=True) text = "# NEW_FILE - REMOVE THIS HEADER" with open(self.current_file_path, "w", encoding="utf-8") as file: file.write(text) # Automatically select the first line for editing self.selected_start = 1 self.selected_end = 1 self.selected_id = calculate_id(text, 1, 1) result = { "status": "success", "text": text, "current_file_path": self.current_file_path, "id": self.selected_id, "selected_start": self.selected_start, "selected_end": self.selected_end, "message": "File created successfully. First line is now selected for editing.", } return result except Exception as e: return {"error": f"Error creating file: {str(e)}"} @self.mcp.tool() async def find_line( search_text: str, ) -> Dict[str, Any]: """ Find lines that match provided text in the current file. Args: search_text (str): Text to search for in the file Returns: dict: Matching lines with their line numbers, and full text """ if self.current_file_path is None: return {"error": "No file path is set. Use set_file first."} try: with open(self.current_file_path, "r", encoding="utf-8") as file: lines = file.readlines() matches = [] for i, line in enumerate(lines, start=1): if search_text in line: matches.append([i, line]) result = { "status": "success", "matches": matches, "total_matches": len(matches), } return result except Exception as e: return {"error": f"Error searching file: {str(e)}"} @self.mcp.tool() async def listdir(dirpath: str) -> Dict[str, Any]: try: return { "filenames": os.listdir(dirpath), "path": dirpath, } except NotADirectoryError as e: return { "error": "Specified path is not a directory.", "path": dirpath, } except Exception as e: return { "error": f"Unexpected error when listing the directory: {str(e)}" } @self.mcp.tool() async def find_function( function_name: str, ) -> Dict[str, Any]: """ Find a function or method definition in a Python or JS/JSX file. Uses AST parsers. Args: function_name (str): Name of the function or method to find Returns: dict: function lines with their line numbers, start_line, and end_line """ if self.current_file_path is None: return {"error": "No file path is set. Use set_file first."} # Check if the file is a supported type (Python or JavaScript/JSX) is_python = self.current_file_path.endswith(".py") is_javascript = self.current_file_path.endswith((".js", ".jsx")) if not (is_python or is_javascript): return { "error": "This tool only works with Python (.py) or JavaScript/JSX (.js, .jsx) files." } try: with open(self.current_file_path, "r", encoding="utf-8") as file: source_code = file.read() lines = source_code.splitlines(True) # Keep line endings # Process JavaScript/JSX files if is_javascript: return self._find_js_function(function_name, source_code, lines) # For Python files, parse the source code to AST tree = ast.parse(source_code) # Find the function in the AST function_node = None class_node = None parent_function = None # Helper function to find a function or method node def find_node(node): nonlocal function_node, class_node, parent_function if isinstance(node, ast.FunctionDef) and node.name == function_name: function_node = node return True # Check for methods in classes elif isinstance(node, ast.ClassDef): for item in node.body: if ( isinstance(item, ast.FunctionDef) and item.name == function_name ): function_node = item class_node = node return True # Check for nested functions elif isinstance(node, ast.FunctionDef): for item in node.body: # Find directly nested function definitions if ( isinstance(item, ast.FunctionDef) and item.name == function_name ): function_node = item # Store parent function information parent_function = node return True # Recursively search for nested functions/methods for child in ast.iter_child_nodes(node): if find_node(child): return True return False # Search for the function in the AST find_node(tree) if not function_node: return { "error": f"Function or method '{function_name}' not found in the file." } # Get the line range for the function start_line = function_node.lineno end_line = 0 # Find the end line by looking at tokens with open(self.current_file_path, "rb") as file: tokens = list(tokenize.tokenize(file.readline)) # Find the function definition token function_def_index = -1 for i, token in enumerate(tokens): if token.type == tokenize.NAME and token.string == function_name: if ( i > 0 and tokens[i - 1].type == tokenize.NAME and tokens[i - 1].string == "def" ): function_def_index = i break if function_def_index == -1: # Fallback - use AST to determine the end # First, get the end_lineno from the function node itself end_line = function_node.end_lineno or start_line # Then walk through all nodes inside the function to find the deepest end_lineno # This handles nested functions and statements properly # Walk through all nodes inside the function to find the deepest end_lineno # This handles nested functions and statements properly for node in ast.walk(function_node): if hasattr(node, "end_lineno") and node.end_lineno: end_line = max(end_line, node.end_lineno) # Specifically look for nested function definitions # by checking for FunctionDef nodes within the function body for node in ast.walk(function_node): if ( isinstance(node, ast.FunctionDef) and node is not function_node ): if hasattr(node, "end_lineno") and node.end_lineno: end_line = max(end_line, node.end_lineno) else: # Find the closing token of the function (either the next function/class at the same level or the end of file) indent_level = tokens[function_def_index].start[ 1 ] # Get the indentation of the function in_function = False nested_level = 0 for token in tokens[function_def_index + 1 :]: current_line = token.start[0] if current_line > start_line: # Start tracking when we're inside the function body if not in_function and token.string == ":": in_function = True continue # Track nested blocks by indentation if in_function: current_indent = token.start[1] # Find a token at the same indentation level as the function definition # but only if we're not in a nested block if ( current_indent <= indent_level and token.type == tokenize.NAME and token.string in ("def", "class") and nested_level == 0 ): end_line = current_line - 1 break # Track nested blocks elif ( current_indent > indent_level and token.type == tokenize.NAME ): if token.string in ("def", "class"): nested_level += 1 # Look for the end of nested blocks elif ( nested_level > 0 and current_indent <= indent_level ): nested_level -= 1 # If we couldn't find the end, use the last line of the file if end_line == 0: end_line = len(lines) # Include decorators if present for decorator in function_node.decorator_list: start_line = min(start_line, decorator.lineno) # Adjust for methods inside classes if class_node: class_body_start = min( item.lineno for item in class_node.body if hasattr(item, "lineno") ) if function_node.lineno == class_body_start: # If this is the first method, include the class definition start_line = class_node.lineno # Normalize line numbers (1-based for API consistency) function_lines = lines[start_line - 1 : end_line] # Format the results similar to the read tool formatted_lines = [] for i, line in enumerate(function_lines, start_line): formatted_lines.append((i, line.rstrip())) result = { "status": "success", "lines": formatted_lines, "start_line": start_line, "end_line": end_line, } # Add parent function information if this is a nested function if parent_function: result["is_nested"] = True result["parent_function"] = parent_function.name return result except Exception as e: return {"error": f"Error finding function: {str(e)}"} @self.mcp.tool() async def set_python_path(path: str): """ Set it before running tests so the project is correctly recognized """ os.environ["PYTHONPATH"] = path @self.mcp.tool() async def run_tests( test_path: Optional[str] = None, test_name: Optional[str] = None, verbose: bool = False, collect_only: bool = False, ) -> Dict[str, Any]: """ Run pytest tests using the specified Python virtual environment. Args: test_path (str, optional): Directory or file path containing tests to run test_name (str, optional): Specific test function/method to run verbose (bool, optional): Run tests in verbose mode collect_only (bool, optional): Only collect tests without executing them Returns: dict: Test execution results including returncode, output, and execution time """ if self.python_venv is None: return { "error": "No Python environment found. It needs to be set in the MCP config as environment variable called PYTHON_VENV." } # Build pytest arguments pytest_args = [] # Add test path if specified if test_path: pytest_args.append(test_path) # Add specific test name if specified if test_name: pytest_args.append(f"-k {test_name}") # Add verbosity flag if specified if verbose: pytest_args.append("-v") # Add collect-only flag if specified if collect_only: pytest_args.append("--collect-only") # Run the tests return self._run_tests(pytest_args) def _find_js_function( self, function_name: str, source_code: str, lines: list ) -> Dict[str, Any]: """ Helper method to find JavaScript/JSX function definitions using Babel AST parsing. Args: function_name (str): Name of the function to find source_code (str): Source code content lines (list): Source code split by lines with line endings preserved Returns: dict: Dictionary with function information """ try: # First try using Babel for accurate parsing if it's available if self.enable_js_syntax_check: babel_result = self._find_js_function_babel( function_name, source_code, lines ) if babel_result and not babel_result.get("error"): return babel_result # Fallback to regex approach if Babel parsing fails or is disabled # Pattern for named function declaration # Matches: function functionName(args) { body } # Also matches: async function functionName(args) { body } function_pattern = re.compile( r"(?:async\s+)?function\s+(?P<functionName>\w+)\s*\((?P<functionArguments>[^()]*)\)\s*{", re.MULTILINE, ) # Pattern for arrow functions with explicit name # Matches: const functionName = (args) => { body } or const functionName = args => { body } # Also matches async variants: const functionName = async (args) => { body } # Also matches component inner functions: const innerFunction = async () => { ... } arrow_pattern = re.compile( r"(?:(?:const|let|var)\s+)?(?P<functionName>\w+)\s*=\s*(?:async\s+)?(?:\((?P<functionArguments>[^()]*)\)|(?P<singleArg>\w+))\s*=>\s*{", re.MULTILINE, ) # Pattern for object method definitions # Matches: functionName(args) { body } in object literals or classes # Also matches: async functionName(args) { body } method_pattern = re.compile( r"(?:^|,|{)\s*(?:async\s+)?(?P<functionName>\w+)\s*\((?P<functionArguments>[^()]*)\)\s*{", re.MULTILINE, ) # Pattern for React hooks like useCallback, useEffect, etc. # Matches: const functionName = useCallback(async () => { ... }, [deps]) hook_pattern = re.compile( r"const\s+(?P<functionName>\w+)\s*=\s*use\w+\((?:async\s+)?\(?[^{]*\)?\s*=>[^{]*{", re.MULTILINE, ) # Search for the function matches = [] # Check all patterns for pattern in [ function_pattern, arrow_pattern, method_pattern, hook_pattern, ]: for match in pattern.finditer(source_code): if match.groupdict().get("functionName") == function_name: matches.append(match) if not matches: return {"error": f"Function '{function_name}' not found in the file."} # Use the first match match = matches[0] start_pos = match.start() # Find the line number for the start start_line = 1 pos = 0 for i, line in enumerate(lines, 1): next_pos = pos + len(line) if pos <= start_pos < next_pos: start_line = i break pos = next_pos # Find the closing brace that matches the opening brace of the function # Count the number of opening and closing braces brace_count = 0 end_pos = start_pos in_string = False string_delimiter = None escaped = False for i in range(start_pos, len(source_code)): char = source_code[i] # Handle strings to avoid counting braces inside strings if not escaped and char in ['"', "'", "`"]: if not in_string: in_string = True string_delimiter = char elif char == string_delimiter: in_string = False # Check for escape character if char == "\\" and not escaped: escaped = True continue escaped = False # Only count braces outside of strings if not in_string: if char == "{": brace_count += 1 elif char == "}": brace_count -= 1 if brace_count == 0: end_pos = i + 1 # Include the closing brace break # Find the end line number end_line = 1 pos = 0 for i, line in enumerate(lines, 1): next_pos = pos + len(line) if pos <= end_pos < next_pos: end_line = i break pos = next_pos # Extract the function lines function_lines = lines[start_line - 1 : end_line] # Format results like the read tool formatted_lines = [] for i, line in enumerate(function_lines, start_line): formatted_lines.append((i, line.rstrip())) result = { "status": "success", "lines": formatted_lines, "start_line": start_line, "end_line": end_line, } return result except Exception as e: return {"error": f"Error finding JavaScript function: {str(e)}"} def _find_js_function_babel( self, function_name: str, source_code: str, lines: list ) -> Dict[str, Any]: """ Use Babel to parse JavaScript/JSX code and find function definitions. This provides more accurate function location by using proper AST parsing rather than regex pattern matching. Args: function_name (str): Name of the function to find source_code (str): Source code content lines (list): Source code split by lines with line endings preserved Returns: dict: Dictionary with function information or None if Babel fails """ try: # Create a temporary file with the source code with tempfile.NamedTemporaryFile( mode="w", suffix=".jsx", delete=False ) as temp: temp_path = temp.name temp.write(source_code) # Determine the appropriate Babel preset is_jsx = self.current_file_path.endswith(".jsx") presets = ["@babel/preset-react"] if is_jsx else ["@babel/preset-env"] # Use Babel to output the AST as JSON cmd = [ "npx", "babel", "--presets", ",".join(presets), "--plugins", # Add the AST plugin that outputs function locations "babel-plugin-ast-function-metadata", "--no-babelrc", temp_path, "--out-file", "/dev/null", # Output to nowhere, we just want the AST metadata ] # Execute Babel to get the AST with function locations process = subprocess.run(cmd, capture_output=True, text=True) # Clean up the temporary file try: os.unlink(temp_path) except: pass # If Babel execution failed, return None to fall back to regex if process.returncode != 0: return None # Parse the output to find location data output = process.stdout # Look for the JSON that has our function location data location_data = None import json try: # Extract the JSON output from Babel plugin # Format is typically like: FUNCTION_LOCATIONS: {... json data ...} match = re.search(r"FUNCTION_LOCATIONS:\s*({.*})", output, re.DOTALL) if match: json_str = match.group(1) locations = json.loads(json_str) # Find our specific function location_data = locations.get(function_name) except (json.JSONDecodeError, AttributeError) as e: return None if not location_data: return None # Get the line information from the location data start_line = location_data.get("start", {}).get("line", 0) end_line = location_data.get("end", {}).get("line", 0) if start_line <= 0 or end_line <= 0: return None # Extract the function lines function_lines = lines[start_line - 1 : end_line] # Format results like the read tool formatted_lines = [] for i, line in enumerate(function_lines, start_line): formatted_lines.append((i, line.rstrip())) result = { "status": "success", "lines": formatted_lines, "start_line": start_line, "end_line": end_line, "parser": "babel", # Flag that this was parsed with Babel } return result except Exception as e: # If anything goes wrong, return None to fall back to regex approach return None def _run_tests(self, pytest_args=None, python_venv=None): """ Run pytest tests using the specified Python virtual environment. Args: pytest_args (list, optional): List of arguments to pass to pytest python_venv (str, optional): Path to Python executable in virtual environment If not provided, uses PYTHON_VENV environment variable Returns: dict: Test execution results including returncode, output, and execution time """ try: # Determine the Python executable to use python_venv = ( python_venv or self.python_venv ) # Use the class level python_venv # If no venv is specified, use the system Python python_cmd = python_venv or "python" # Build the command to run pytest cmd = [python_cmd, "-m", "pytest"] # Add any additional pytest arguments if pytest_args: cmd.extend(pytest_args) # Record the start time start_time = datetime.datetime.now() # Run the pytest command process = subprocess.run(cmd, capture_output=True, text=True) # Record the end time and calculate duration end_time = datetime.datetime.now() duration = (end_time - start_time).total_seconds() # Return the results return { "status": "success" if process.returncode == 0 else "failure", "returncode": process.returncode, "stdout": process.stdout, "stderr": process.stderr, "duration": duration, "command": " ".join(cmd), } except Exception as e: return { "status": "error", "error": str(e), "command": " ".join(cmd) if "cmd" in locals() else None, } def run(self, transport="stdio", **transport_kwargs): """Run the MCP server.""" self.mcp.run(transport=transport, **transport_kwargs) def main(): """Entry point for the application. This function is used both for direct execution and when the package is installed via UVX, allowing the application to be run using the `editor-mcp` command. """ parser = argparse.ArgumentParser(description="Text Editor MCP Server") parser.add_argument( "--transport", default="stdio", choices=["stdio", "http", "streamable-http"], help="Transport type", ) parser.add_argument( "--host", default="127.0.0.1", help="Host to bind to (for HTTP transport)" ) parser.add_argument( "--port", type=int, default=8001, help="Port to bind to (for HTTP transport)" ) parser.add_argument( "--path", default="/mcp", help="Path for HTTP endpoint (for HTTP transport)" ) args = parser.parse_args() host = os.environ.get("FASTMCP_SERVER_HOST", args.host) port = int(os.environ.get("FASTMCP_SERVER_PORT", args.port)) path = os.environ.get("FASTMCP_SERVER_PATH", args.path) transport = os.environ.get("FASTMCP_SERVER_TRANSPORT", args.transport) # Normalize transport name for FastMCP if transport in ["http", "streamable-http"]: transport = "streamable-http" # Run the server with the configured transport if transport == "streamable-http": text_editor_server.run(transport=transport, host=host, port=port, path=path) else: text_editor_server.run(transport=transport) text_editor_server = TextEditorServer() mcp = text_editor_server.mcp if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danielpodrazka/editor-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server