MCP Tools

Apache 2.0
17
  • Apple
  • Linux
import re import subprocess import tempfile from pathlib import Path from typing import Optional, List, Dict, Any from marker.converters.pdf import PdfConverter import io import contextlib from marker.models import create_model_dict from marker.output import text_from_rendered from mcp.server.fastmcp import FastMCP mcp = FastMCP("zbigniew-mcp") @mcp.tool( description=""" Execute a shell command and return the complete results including stdout, stderr, and exit code. Provide the command as a list of strings where the first element is the command name and subsequent elements are arguments. This approach prevents shell injection vulnerabilities. Examples: - List files: execute_shell_command(["ls", "-la"]) - Find text in files: execute_shell_command(["grep", "-r", "TODO", "./src"]) - Run a Python script: execute_shell_command(["python", "analysis.py", "--input", "data.csv"]) - Get system info: execute_shell_command(["uname", "-a"]) The command will timeout after the specified seconds (default: 60) to prevent hanging. """, ) def execute_shell_command( command: list[str], timeout: int = 60, working_dir: str = None ) -> dict: """Execute a shell command and return comprehensive results.""" try: result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, cwd=working_dir, text=True ) return { "stdout": result.stdout, "stderr": result.stderr, "exit_code": result.returncode, "command": " ".join(command), "success": result.returncode == 0 } except subprocess.TimeoutExpired: return { "stdout": "", "stderr": f"Command timed out after {timeout} seconds", "exit_code": -1, "command": " ".join(command), "success": False } except Exception as e: return { "stdout": "", "stderr": f"Error executing command: {str(e)}", "exit_code": -1, "command": " ".join(command), "success": False } @mcp.tool( description=""" Show contents of a file with options to display specific line ranges. This tool allows you to view file content with control over which lines to display. Parameters: - file_path: Path to the file to display - start_line: Line number to start from (1-based indexing, defaults to 1) - num_lines: Number of lines to display (defaults to all lines) Examples: - Show entire file: show_file("/path/to/file.txt") - Show first 10 lines: show_file("/path/to/file.txt", num_lines=10) - Show lines 5-15: show_file("/path/to/file.txt", start_line=5, num_lines=10) Returns the content as a string and information about the lines shown. """, ) def show_file( file_path: Path, start_line: int = 1, num_lines: Optional[int] = None ) -> dict: """Display content of a file with optional line range specification.""" if not file_path.exists(): return { "success": False, "error": f"File {file_path} does not exist", "content": "", "lines_shown": 0, "total_lines": 0 } try: # Ensure start_line is at least 1 (1-based indexing) start_line = max(1, start_line) # Read all lines from the file all_lines = file_path.read_text().splitlines() total_lines = len(all_lines) # Adjust start_line if it's beyond file length if start_line > total_lines: return { "success": False, "error": f"Start line {start_line} is beyond the file length ({total_lines} lines)", "content": "", "lines_shown": 0, "total_lines": total_lines } # Convert to 0-based indexing for slicing start_idx = start_line - 1 # Determine end index based on num_lines if num_lines is None: end_idx = total_lines else: end_idx = min(start_idx + num_lines, total_lines) # Extract the requested lines selected_lines = all_lines[start_idx:end_idx] content = "\n".join(selected_lines) return { "success": True, "content": content, "lines_shown": len(selected_lines), "total_lines": total_lines, "start_line": start_line, "end_line": start_idx + len(selected_lines) } except Exception as e: return { "success": False, "error": f"Error reading file: {str(e)}", "content": "", "lines_shown": 0, "total_lines": 0 } @mcp.tool( description=""" Search for patterns in a file using Python regular expressions. This tool allows you to search through a file for lines matching a regular expression pattern. Parameters: - file_path: Path to the file to search - pattern: Python regular expression pattern to search for - case_sensitive: Whether the search should be case-sensitive (default: True) - max_matches: Maximum number of matches to return (default: 100, use -1 for all matches) Examples: - Find all function definitions: search_in_file("/path/to/script.py", r"def\\s+\\w+\\s*\\(") - Find all TODO comments: search_in_file("/path/to/code.py", r"#\\s*TODO", case_sensitive=False) Returns the matching lines with their line numbers and the total count of matches. """, ) def search_in_file( file_path: Path, pattern: str, case_sensitive: bool = True, max_matches: int = 100 ) -> dict: """Search for regex patterns in a file and return matching lines with line numbers.""" if not file_path.exists(): return { "success": False, "error": f"File {file_path} does not exist", "matches": [], "match_count": 0 } try: # Compile the regex pattern flags = 0 if case_sensitive else re.IGNORECASE regex = re.compile(pattern, flags) # Read the file and search for matches matches = [] with open(file_path, 'r') as f: for i, line in enumerate(f, 1): if regex.search(line): matches.append({ "line_number": i, "content": line.rstrip('\n') }) if max_matches > 0 and len(matches) >= max_matches: break return { "success": True, "matches": matches, "match_count": len(matches), "truncated": max_matches > 0 and len(matches) >= max_matches } except re.error as e: return { "success": False, "error": f"Invalid regular expression: {str(e)}", "matches": [], "match_count": 0 } except Exception as e: return { "success": False, "error": f"Error searching file: {str(e)}", "matches": [], "match_count": 0 } @mcp.tool( description=""" Efficiently edit a file with advanced options for text manipulation. This tool allows making precise changes to file content with multiple modes of operation: 1. String replacements: Replace exact text strings throughout the file 2. Line-based operations: Insert, modify, or delete specific lines by number Examples: - Replace text: edit_file("config.json", replacements={"\"debug\": false": "\"debug\": true"}) - Insert at line 5: edit_file("script.py", line_operations=[{"operation": "insert", "line": 5, "content": "# New comment"}]) - Delete lines 10-15: edit_file("file.txt", line_operations=[{"operation": "delete", "start_line": 10, "end_line": 15}]) - Replace line 20: edit_file("file.txt", line_operations=[{"operation": "replace", "line": 20, "content": "Updated content"}]) - Mixed operations: Combine both replacements and line operations in a single call Returns a summary of all changes made to the file. """, ) def edit_file( file_path: Path, replacements: Optional[Dict[str, str]] = None, line_operations: Optional[List[Dict[str, Any]]] = None, create_if_missing: bool = False ) -> dict: """Edit a file with advanced options for string replacements and line operations.""" if not file_path.exists(): if create_if_missing: file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text("") else: return { "success": False, "error": f"File {file_path} does not exist", "replacements_made": {}, "line_operations_performed": [] } try: # Read the original file content original_content = file_path.read_text() content = original_content # Track changes made replacement_counts = {} line_ops_performed = [] # Perform string replacements if specified if replacements: for old, new in replacements.items(): count = content.count(old) if count > 0: content = content.replace(old, new) replacement_counts[old] = count # Perform line operations if specified if line_operations: # Convert content to lines for line-based operations lines = content.splitlines() original_line_count = len(lines) # Sort operations by line number (descending) to avoid index shifting # This ensures operations at higher line numbers are processed first sorted_ops = sorted( line_operations, key=lambda op: -(op.get("line", 0) if "line" in op else op.get("start_line", 0)) ) for op in sorted_ops: operation_type = op.get("operation", "").lower() if operation_type == "insert": line_num = op.get("line", 0) new_content = op.get("content", "") # Convert to 0-based index idx = min(max(0, line_num - 1), len(lines)) # Insert the new content if isinstance(new_content, list): for i, line in enumerate(new_content): lines.insert(idx + i, line) else: lines.insert(idx, new_content) line_ops_performed.append({ "operation": "insert", "line": line_num, "success": True }) elif operation_type == "replace": line_num = op.get("line", 0) new_content = op.get("content", "") # Convert to 0-based index idx = line_num - 1 # Validate line number if 0 <= idx < len(lines): old_line = lines[idx] lines[idx] = new_content line_ops_performed.append({ "operation": "replace", "line": line_num, "success": True, "old_content": old_line }) else: line_ops_performed.append({ "operation": "replace", "line": line_num, "success": False, "error": f"Line {line_num} is out of range (1-{len(lines)})" }) elif operation_type == "delete": start_line = op.get("start_line", 0) end_line = op.get("end_line", start_line) # Convert to 0-based indices start_idx = max(0, start_line - 1) end_idx = min(len(lines), end_line) # Validate line range if start_idx < end_idx: deleted_lines = lines[start_idx:end_idx] lines[start_idx:end_idx] = [] line_ops_performed.append({ "operation": "delete", "start_line": start_line, "end_line": end_line, "lines_deleted": end_idx - start_idx, "success": True, "deleted_content": deleted_lines }) else: line_ops_performed.append({ "operation": "delete", "start_line": start_line, "end_line": end_line, "success": False, "error": f"Invalid line range: {start_line}-{end_line}" }) else: line_ops_performed.append({ "operation": operation_type, "success": False, "error": f"Unknown operation type: {operation_type}" }) # Convert lines back to content content = "\n".join(lines) # Write the modified content back to the file with open(file_path, "w") as f: f.write(content) return { "success": True, "original_size": len(original_content), "new_size": len(content), "changed": original_content != content, "replacements_made": replacement_counts, "line_operations_performed": line_ops_performed } except Exception as e: return { "success": False, "error": f"Error editing file: {str(e)}", "replacements_made": {}, "line_operations_performed": [] } @mcp.tool( description=""" Write content to a file with options to append or overwrite existing content. This tool allows you to write content to a file with options to append or overwrite the existing content. Parameters: - file_path: Path to the file to write - content: Text content to write to the file - mode: Write mode to use: 'w' (overwrite) or 'a' (append, default) Examples: - Overwrite file: write_file("/path/to/file.txt", "New content") - Append to file: write_file("/path/to/log.txt", "Log entry", mode="a") """ ) def write_file(file_path: Path, content: str, mode: str = "w") -> dict: """Write content to a file.""" try: with open(file_path, mode) as f: f.write(content) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} @mcp.tool( description=""" Fetch a web page by converting it to PDF and then extracting text. This tool fetches a web page by converting it to PDF using Chromium and then extracts the text content. Parameters: - url: URL of the web page to fetch Returns the extracted text content of the web page. """ ) def fetch_page(url: str) -> str: """ Fetch a web page by converting it to PDF and then extracting text. Utilizes a temporary file and suppresses any stdout produced by the PDF converter. """ # Create a named temporary file and immediately close it so Chromium can write to it. with tempfile.NamedTemporaryFile(prefix="page", suffix=".pdf", delete=False) as tmp_pdf: temp_pdf_path = tmp_pdf.name try: # Build the command for Chromium to convert the page to PDF. command = [ "chromium", "--headless", "--disable-gpu", f"--print-to-pdf={temp_pdf_path}", url ] result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if result.returncode != 0: return f"Error fetching page: {result.stderr}" with io.StringIO() as buf, contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf): converter = PdfConverter(artifact_dict=create_model_dict()) rendered = converter(temp_pdf_path) text, _, images = text_from_rendered(rendered) return text finally: try: Path(temp_pdf_path).unlink() except Exception: pass