Skip to main content
Glama

MCP Tools

Apache 2.0
24
  • Apple
  • Linux
server.py43.3 kB
import contextlib import io import re import subprocess import tempfile from pathlib import Path from typing import Optional, List, Dict, Any # --- Marker library imports with robust mocks --- try: from marker.converters.pdf import PdfConverter from marker.models import create_model_dict from marker.output import text_from_rendered except ImportError: # More informative warning print("Warning: marker library not found. Using mock functions for fetch_page that will return clear errors.") def create_model_dict(): return {} def text_from_rendered(rendered): if rendered == "mock rendered object": return "[MOCK] This is simulated text content from a PDF. The marker library is not installed.", None, [] return f"Error: Mock text extraction failed. The marker library is not installed and the mock received: {rendered}", None, [] class PdfConverter: def __init__(self, *args, **kwargs): pass def __call__(self, path): if Path(path).exists(): return "mock rendered object" return f"mock render error: File not found {path}" from mcp.server.fastmcp import FastMCP mcp = FastMCP("zbigniew-mcp") # --- Utility Functions --- def _find_browser_command(): """Find an installed browser command for headless operations.""" browsers = ["chromium", "chrome", "google-chrome", "chromium-browser"] for browser in browsers: try: result = subprocess.run( [browser, "--version"], capture_output=True, check=False, timeout=2 # Short timeout for version check ) if result.returncode == 0: return browser except (FileNotFoundError, subprocess.TimeoutExpired): continue return None @mcp.tool( description=""" Execute a shell command safely and return the complete results including stdout, stderr, and exit code. Two execution modes are supported: 1. Secure mode (default): Provide the command as a list of strings (e.g., ["ls", "-la"]) 2. Shell mode: Provide a shell_command string (e.g., "ls -la && grep foo bar.txt") when shell operators are needed Examples: - List files (secure): execute_shell_command(command=["ls", "-la"]) - List files and count them (shell): execute_shell_command(shell_command="ls -la | wc -l") - Chain commands: execute_shell_command(shell_command="cd /tmp && ls -la") - Conditional execution: execute_shell_command(shell_command="grep 'error' log.txt || echo 'No errors found'") Security note: shell_command mode is more powerful but vulnerable to injection if used with untrusted input. Always use command list mode when processing any external/user input. Returns a dictionary containing stdout, stderr, exit_code, the executed command string, and a boolean success flag. """, ) def execute_shell_command( command: Optional[list[str]] = None, shell_command: Optional[str] = None, timeout: int = 60, working_dir: Optional[str] = None ) -> dict: """Executes a command with support for both secure list mode and shell operator mode.""" # Validate that exactly one of command or shell_command is provided if (command is None and shell_command is None) or (command is not None and shell_command is not None): return { "stdout": "", "stderr": "Error: Exactly one of 'command' or 'shell_command' must be provided.", "exit_code": -1, "command": str(command) if command is not None else str(shell_command), "success": False } # Secure mode with command list if command is not None: # Validate command is a list of strings if not isinstance(command, list) or not all(isinstance(arg, str) for arg in command): return { "stdout": "", "stderr": "Error: 'command' must be provided as a list of strings.", "exit_code": -1, "command": str(command), "success": False } # Validate command is not empty if not command: return { "stdout": "", "stderr": "Error: Command list cannot be empty.", "exit_code": -1, "command": "[]", "success": False } # Execute in secure mode with shell=False use_shell = False command_to_execute = command command_str = " ".join(command) # Shell mode with string command else: # Validate shell_command is a non-empty string if not isinstance(shell_command, str) or not shell_command.strip(): return { "stdout": "", "stderr": "Error: 'shell_command' must be a non-empty string.", "exit_code": -1, "command": str(shell_command), "success": False } # Execute in shell mode with shell=True use_shell = True command_to_execute = shell_command command_str = shell_command # Ensure working_dir exists if provided if working_dir: work_dir_path = Path(working_dir) if not work_dir_path.exists(): return { "stdout": "", "stderr": f"Error: Working directory '{working_dir}' does not exist.", "exit_code": -1, "command": command_str, "success": False } if not work_dir_path.is_dir(): return { "stdout": "", "stderr": f"Error: Path '{working_dir}' is not a directory.", "exit_code": -1, "command": command_str, "success": False } try: result = subprocess.run( command_to_execute, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, cwd=working_dir, text=True, shell=use_shell, check=False # Don't raise exception on non-zero exit code ) return { "stdout": result.stdout, "stderr": result.stderr, "exit_code": result.returncode, "command": command_str, "success": result.returncode == 0 } except subprocess.TimeoutExpired: return { "stdout": "", "stderr": f"Command timed out after {timeout} seconds.", "exit_code": 124, # Standard timeout exit code "command": command_str, "success": False } except FileNotFoundError: command_name = command[0] if command is not None else "the specified command" return { "stdout": "", "stderr": f"Error: Command not found: '{command_name}'. Check if it's installed and in PATH.", "exit_code": 127, # Standard command not found exit code "command": command_str, "success": False } except PermissionError as e: command_name = command[0] if command is not None else "the specified command" return { "stdout": "", "stderr": f"Error: Permission denied executing '{command_name}'. Details: {str(e)}", "exit_code": 126, # Standard permission denied exit code "command": command_str, "success": False } except Exception as e: error_type = type(e).__name__ return { "stdout": "", "stderr": f"An unexpected error occurred executing command: {error_type}: {str(e)}", "exit_code": -1, "command": command_str, "success": False } @mcp.tool( description=""" Show contents of a file with options to display specific line ranges (1-based indexing). Examples: - Show entire file: show_file(file_path="/path/to/file.txt") - Show first 10 lines: show_file(file_path="/path/to/file.txt", num_lines=10) - Show lines 5-15: show_file(file_path="/path/to/file.txt", start_line=5, end_line=15) - Show last 10 lines: show_file(file_path="/path/to/file.txt", start_line=-10) Note: `end_line` takes precedence over `num_lines` if both are provided. Negative `start_line` counts from the end of the file. Returns the content as a string and detailed information about the lines shown and total lines. """, ) def show_file( file_path: str, start_line: int = 1, num_lines: Optional[int] = None, end_line: Optional[int] = None ) -> dict: """Displays content of a file with flexible line range specification.""" path = Path(file_path) # Validate file exists and is a file if not path.exists(): return { "success": False, "error": f"File not found: {file_path}", "content": "", "lines_shown": 0, "total_lines": 0, "requested_range": f"start={start_line}, num={num_lines}, end={end_line}" } if not path.is_file(): return { "success": False, "error": f"Path exists but is not a regular file: {file_path}", "content": "", "lines_shown": 0, "total_lines": 0, "requested_range": f"start={start_line}, num={num_lines}, end={end_line}" } try: # Try different encodings for better file compatibility try: all_lines = path.read_text(encoding='utf-8').splitlines() except UnicodeDecodeError: try: all_lines = path.read_text(encoding='latin-1').splitlines() except Exception as enc_e: return { "success": False, "error": f"Error reading file with UTF-8 or Latin-1 encoding: {type(enc_e).__name__}: {str(enc_e)}", "content": "", "lines_shown": 0, "total_lines": -1, "requested_range": f"start={start_line}, num={num_lines}, end={end_line}" } total_lines = len(all_lines) # Handle case where file is empty if total_lines == 0: return { "success": True, "content": "", "lines_shown": 0, "total_lines": 0, "start_line_actual": None, "end_line_actual": None, "info": "File is empty" } # Handle negative start_line (counts from end) if start_line < 0: start_line = total_lines + start_line + 1 # Ensure start_line is at least 1 (1-based indexing) start_line = max(1, start_line) # Determine the effective end line effective_end_line = total_lines if end_line is not None: if end_line < 0: # Negative end_line counts from the end effective_end_line = total_lines + end_line + 1 else: effective_end_line = min(end_line, total_lines) # End cannot be before start effective_end_line = max(start_line - 1, effective_end_line) elif num_lines is not None and num_lines >= 0: effective_end_line = min(start_line + num_lines - 1, total_lines) effective_end_line = max(1, effective_end_line) # Ensure end line is at least 1 # Adjust start_line if it's beyond file length after potential negative indexing calc if start_line > total_lines: # Special case: asking for lines beyond the end return { "success": True, # Technically successful read, just no content in range "content": "", "lines_shown": 0, "total_lines": total_lines, "start_line_actual": start_line, "end_line_actual": effective_end_line, "info": f"Requested start line {start_line} is beyond the file length ({total_lines} lines)." } # Convert to 0-based indexing for slicing start_idx = start_line - 1 # End index for slicing is exclusive in Python end_idx = effective_end_line # Ensure start index is not greater than end index if start_idx >= end_idx: selected_lines = [] else: selected_lines = all_lines[start_idx:end_idx] content = "\n".join(selected_lines) return { "success": True, "content": content, "lines_shown": len(selected_lines), "total_lines": total_lines, "start_line_actual": start_line, # The calculated start line (1-based) "end_line_actual": start_idx + len(selected_lines) # The actual last line number shown (1-based) } except IOError as e: return { "success": False, "error": f"IO error reading file: {type(e).__name__}: {str(e)}", "content": "", "lines_shown": 0, "total_lines": -1 } except Exception as e: return { "success": False, "error": f"Error reading or processing file: {type(e).__name__}: {str(e)}", "content": "", "lines_shown": 0, "total_lines": -1 } @mcp.tool( description=""" Search for a Python regular expression pattern within a file. Returns lines matching the pattern, along with their line numbers (1-based). Parameters: - file_path: Path to the file to search. - pattern: Python regular expression string. - case_sensitive: Perform case-sensitive matching (default: True). - max_matches: Limit the number of returned matches (default: 100, use -1 for unlimited). - context_lines: Include N lines of context before and after each match (default: 0). Examples: - Find 'TODO' comments (case-insensitive): search_in_file("code.py", r"#\\s*TODO", case_sensitive=False) - Find function definitions with 1 line context: search_in_file("script.py", r"^def\\s+\\w+", context_lines=1) Returns a list of matches, each containing line number, content, and optional context. """, ) def search_in_file( file_path: str, pattern: str, case_sensitive: bool = True, max_matches: int = 100, context_lines: int = 0 ) -> dict: """Searches for regex patterns in a file, returning matches with optional context.""" path = Path(file_path) # Validate file exists and is a file if not path.exists(): return { "success": False, "error": f"File not found: {file_path}", "matches": [], "total_matches_found": 0, "truncated": False } if not path.is_file(): return { "success": False, "error": f"Path exists but is not a regular file: {file_path}", "matches": [], "total_matches_found": 0, "truncated": False } try: # Validate and compile the regex pattern try: flags = 0 if case_sensitive else re.IGNORECASE regex = re.compile(pattern, flags) except re.error as re_err: return { "success": False, "error": f"Invalid regular expression '{pattern}': {str(re_err)}", "matches": [], "total_matches_found": 0, "truncated": False } # Read the file lines with encoding handling try: all_lines = path.read_text(encoding='utf-8').splitlines() except UnicodeDecodeError: try: all_lines = path.read_text(encoding='latin-1').splitlines() except Exception as enc_e: return { "success": False, "error": f"Error reading file with UTF-8 or Latin-1 encoding: {type(enc_e).__name__}: {str(enc_e)}", "matches": [], "total_matches_found": 0, "truncated": False } matches_found = [] total_lines = len(all_lines) match_limit = max_matches if max_matches > 0 else float('inf') i = 0 # Define i outside the loop for later use for i, line in enumerate(all_lines): if len(matches_found) >= match_limit: break if regex.search(line): match_info = { "line_number": i + 1, # 1-based line numbering "content": line } # Add context if requested if context_lines > 0: start_context = max(0, i - context_lines) end_context = min(total_lines, i + context_lines + 1) # Only include context if available if start_context < i: match_info["context_before"] = all_lines[start_context:i] if i + 1 < end_context: match_info["context_after"] = all_lines[i + 1:end_context] matches_found.append(match_info) # If we stopped due to max_matches, count remaining matches actual_total_matches_found = len(matches_found) truncated = False if max_matches > 0 and len(matches_found) >= max_matches and i + 1 < total_lines: # Continue counting without storing the matches for remaining_i in range(i + 1, total_lines): if regex.search(all_lines[remaining_i]): actual_total_matches_found += 1 truncated = actual_total_matches_found > len(matches_found) return { "success": True, "matches": matches_found, "total_matches_found": actual_total_matches_found, "matches_returned": len(matches_found), "truncated": truncated } except Exception as e: return { "success": False, "error": f"Error searching file: {type(e).__name__}: {str(e)}", "matches": [], "total_matches_found": 0, "truncated": False } @mcp.tool( description=""" Atomically edits a file using string replacements and/or line-based operations. Provides a summary of changes and a simplified log of operations attempted and their success status. Operations are applied sequentially: first all string replacements, then all line operations. Line numbers for line operations refer to the state *after* string replacements but *before* subsequent line operations within the same call. Parameters: - file_path: Path to the file to edit. - replacements: Optional dictionary { "find_string": "replace_string", ... }. Replaces all occurrences. - line_operations: Optional list of dictionaries for line-specific changes. Operations: - {"operation": "insert", "line": N, "content": "text" or ["line1", "line2"]} (Insert before line N, 1-based) - {"operation": "replace", "line": N, "content": "new text"} (Replace line N, 1-based) - {"operation": "delete", "start_line": N, "end_line": M} (Delete lines N to M inclusive, 1-based) - create_if_missing: If True, create the file (and parent directories) if it doesn't exist (default: False). Examples: - Replace 'foo' with 'bar': edit_file("file.txt", replacements={"foo": "bar"}) - Insert at line 1: edit_file("script.py", line_operations=[{"operation": "insert", "line": 1, "content": "#!/usr/bin/env python3"}]) - Delete line 5: edit_file("config.yaml", line_operations=[{"operation": "delete", "start_line": 5, "end_line": 5}]) - Combined: edit_file("file.txt", replacements={"old": "new"}, line_operations=[{"operation":"delete", "start_line":3, "end_line":3}]) Returns a dictionary containing success status, file info, a summary of changes (counts of operations), a simplified log detailing each operation's outcome, and a preview of the final content. """, ) def edit_file( file_path: str, replacements: Optional[Dict[str, str]] = None, line_operations: Optional[List[Dict[str, Any]]] = None, create_if_missing: bool = False ) -> dict: """Atomically edits a file using string replacements and/or line operations with simplified feedback.""" path = Path(file_path) operation_log = [] # Initialize summary statistics summary_stats = { "string_replacements_attempted": 0, "string_replacements_made": 0, # Number of replacements that actually changed the content "line_inserts_attempted": 0, "line_inserts_made": 0, # Number of lines actually inserted "line_replaces_attempted": 0, "line_replaces_made": 0, # Number of replaces that actually changed content "line_deletes_attempted": 0, "line_deletes_made": 0, # Number of lines actually deleted "operations_failed": 0, "file_created": False, } change_occurred = False original_content = "" original_size = -1 encoding_used = 'utf-8' # Default # --- File Existence Check --- if not path.exists(): if create_if_missing: try: path.parent.mkdir(parents=True, exist_ok=True) path.write_text("", encoding=encoding_used) # Create empty file original_content = "" original_size = 0 summary_stats["file_created"] = True operation_log.append({"operation": "create_file", "path": str(path), "success": True}) # No need to mark change_occurred=True for creation, only for content modification except Exception as e: summary_stats["operations_failed"] += 1 return { "success": False, "error": f"Failed to create file {path}: {type(e).__name__}: {str(e)}", "change_occurred": False, "summary": summary_stats, "operations_log": operation_log } else: summary_stats["operations_failed"] += 1 return { "success": False, "error": f"File not found: {path}", "change_occurred": False, "summary": summary_stats, "operations_log": [] # No ops attempted yet } elif not path.is_file(): summary_stats["operations_failed"] += 1 return { "success": False, "error": f"Path exists but is not a regular file: {path}", "change_occurred": False, "summary": summary_stats, "operations_log": [] } # --- Read Original Content --- read_success = False try: try: original_content = path.read_text(encoding='utf-8') encoding_used = 'utf-8' read_success = True except UnicodeDecodeError: try: original_content = path.read_text(encoding='latin-1') encoding_used = 'latin-1' read_success = True operation_log.append({ "operation": "read_file", "success": True, "warning": "Read using latin-1 encoding" }) except Exception as enc_e: raise IOError(f"Error reading file with UTF-8 or Latin-1: {str(enc_e)}") from enc_e if read_success and encoding_used == 'utf-8': # Avoid duplicate log if warning already added operation_log.append({ "operation": "read_file", "success": True, "encoding": encoding_used }) original_size = len(original_content) modified_content = original_content modified_lines = original_content.splitlines() except Exception as e: summary_stats["operations_failed"] += 1 operation_log.append({"operation": "read_file", "success": False, "error": str(e)}) return { "success": False, "error": f"Error reading file {path}: {type(e).__name__}: {str(e)}", "change_occurred": False, "summary": summary_stats, "operations_log": operation_log } # --- Apply Operations --- try: # 1. String Replacements if replacements: content_before_replacements = modified_content for old, new in replacements.items(): summary_stats["string_replacements_attempted"] += 1 count = modified_content.count(old) op_log_entry = { "operation": "replace_string", "old_string_preview": old[:50] + ('...' if len(old) > 50 else ''), # Keep preview short "new_string_preview": new[:50] + ('...' if len(new) > 50 else ''), "success": count > 0, "count": count } if count > 0: modified_content = modified_content.replace(old, new) # Actual change count happens after all replacements in this block operation_log.append(op_log_entry) if modified_content != content_before_replacements: summary_stats["string_replacements_made"] = summary_stats[ "string_replacements_attempted"] # Simplification: assume all attempts contributed if final content changed change_occurred = True modified_lines = modified_content.splitlines() # Update lines based on global replacements # 2. Line Operations if line_operations: # --- Pre-validation of line operations (simplified) --- valid_operations = [] for i, op in enumerate(line_operations): if not isinstance(op, dict) or "operation" not in op: operation_log.append({ "operation": "validate_operation", "index": i, "success": False, "error": "Invalid format or missing 'operation' key" }) summary_stats["operations_failed"] += 1 continue # Skip invalid operation structure op_type = op.get("operation", "").lower() if op_type not in ["insert", "replace", "delete"]: operation_log.append({ "operation": "validate_operation", "index": i, "type": op_type, "success": False, "error": "Invalid operation type" }) summary_stats["operations_failed"] += 1 continue # Basic check for required line keys (further checks during execution) if op_type in ["insert", "replace"] and "line" not in op: operation_log.append( {"operation": "validate_operation", "index": i, "type": op_type, "success": False, "error": "Missing 'line'"}) summary_stats["operations_failed"] += 1 continue if op_type == "delete" and "start_line" not in op: operation_log.append( {"operation": "validate_operation", "index": i, "type": op_type, "success": False, "error": "Missing 'start_line'"}) summary_stats["operations_failed"] += 1 continue valid_operations.append(op) # Assume valid for now if basic structure is okay # --- Sort valid operations by line number ascending --- try: sorted_ops_asc = sorted( valid_operations, key=lambda op: int(op.get("line", op.get("start_line", 0))) # Use 0 as fallback shouldn't happen if validation passed ) except (ValueError, TypeError) as sort_e: # Log sorting error but proceed with unsorted valid ops operation_log.append( {"operation": "sort_operations", "success": False, "error": f"Failed to sort: {str(sort_e)}"}) summary_stats["operations_failed"] += 1 # Count as a failure sorted_ops_asc = valid_operations # Use the unsorted list line_offset = 0 # Tracks cumulative line changes # --- Execute Line Operations --- for op in sorted_ops_asc: operation_type = op.get("operation", "").lower() op_result = {"operation": operation_type} op_success = False # Assume failure until proven otherwise lines_affected_count = 0 content_actually_changed = False current_len = len(modified_lines) # Length *before* this specific operation try: if operation_type == "insert": summary_stats["line_inserts_attempted"] += 1 line_num = int(op.get("line", 1)) new_content_raw = op.get("content", "") new_lines = [str(item) for item in new_content_raw] if isinstance(new_content_raw, list) else [ str(new_content_raw)] target_line_num = line_num + line_offset idx = min(max(0, target_line_num - 1), current_len) # Clamp index num_inserted = len(new_lines) modified_lines[idx:idx] = new_lines # Insert op_result.update({ "requested_line": line_num, "adjusted_line": idx + 1, # 1-based where insertion happened "lines_inserted": num_inserted }) line_offset += num_inserted lines_affected_count = num_inserted summary_stats["line_inserts_made"] += num_inserted change_occurred = True # Insertion always counts as a change op_success = True elif operation_type == "replace": summary_stats["line_replaces_attempted"] += 1 line_num = int(op.get("line", 0)) new_content = str(op.get("content", "")) target_line_num = line_num + line_offset idx = target_line_num - 1 if 0 <= idx < current_len: old_line = modified_lines[idx] op_result.update({ "requested_line": line_num, "adjusted_line": target_line_num }) if old_line != new_content: modified_lines[idx] = new_content content_actually_changed = True change_occurred = True # Mark overall change summary_stats["line_replaces_made"] += 1 op_result["changed"] = content_actually_changed # Add flag indicating if content differed lines_affected_count = 1 op_success = True else: op_result.update({ "requested_line": line_num, "adjusted_line": target_line_num, "error": f"Adjusted line {target_line_num} out of range (1-{current_len})" }) op_success = False elif operation_type == "delete": summary_stats["line_deletes_attempted"] += 1 start_line = int(op.get("start_line", 0)) end_line = int(op.get("end_line", start_line)) target_start_line = start_line + line_offset target_end_line = end_line + line_offset start_idx = max(0, target_start_line - 1) end_idx = min(current_len, target_end_line) # Slice end is exclusive, target is inclusive op_result.update({ "requested_range": f"{start_line}-{end_line}", "adjusted_range": f"{target_start_line}-{target_end_line}" }) if start_idx < end_idx and start_idx < current_len: num_deleted = end_idx - start_idx del modified_lines[start_idx:end_idx] op_result["lines_deleted"] = num_deleted line_offset -= num_deleted lines_affected_count = num_deleted summary_stats["line_deletes_made"] += num_deleted change_occurred = True # Deletion always counts as a change op_success = True else: op_result["error"] = f"Adjusted range invalid or out of bounds (1-{current_len})" op_success = False except (ValueError, TypeError, KeyError) as e: op_result["error"] = f"Parameter error: {type(e).__name__}: {str(e)}" op_success = False except Exception as e_inner: # Catch unexpected errors during operation op_result["error"] = f"Execution error: {type(e_inner).__name__}: {str(e_inner)}" op_success = False op_result["success"] = op_success if not op_success: summary_stats["operations_failed"] += 1 operation_log.append(op_result) # --- Write Changes Atomically (if any occurred) --- final_content = "\n".join(modified_lines) # Preserve trailing newline if original had one and content is not empty if final_content and not final_content.endswith("\n") and original_content and original_content.endswith("\n"): final_content += "\n" if change_occurred: temp_file = None try: with tempfile.NamedTemporaryFile(mode='w', encoding=encoding_used, dir=path.parent, delete=False, prefix=f".{path.name}~") as temp_f: temp_file = Path(temp_f.name) temp_f.write(final_content) temp_file.replace(path) # Atomic rename/replace operation_log.append({"operation": "write_file", "success": True}) except Exception as write_e: if temp_file and temp_file.exists(): temp_file.unlink(missing_ok=True) error_msg = f"Failed write: {type(write_e).__name__}: {str(write_e)}" operation_log.append({"operation": "write_file", "success": False, "error": error_msg}) summary_stats["operations_failed"] += 1 # Re-raise to be caught by the outer try/except raise IOError(f"Failed to write changes back to file {path}: {error_msg}") from write_e finally: # Ensure temp file is gone even if replace worked but something else failed later if temp_file and temp_file.exists(): temp_file.unlink(missing_ok=True) else: operation_log.append({"operation": "write_file", "skipped": True, "reason": "No changes detected"}) # --- Final Return --- return { "success": True, # Overall tool execution was successful (even if some ops failed but didn't halt) "file_path": str(path), "original_size": original_size, "new_size": len(final_content), "change_occurred": change_occurred, "summary": summary_stats, "operations_log": operation_log, # Keep a short preview, it can still be useful context "final_content_preview": "\n".join(final_content.splitlines()[:10]) + ( '\n...' if len(final_content.splitlines()) > 10 else '') } except Exception as e: # Catch errors during the operation or writing phase error_msg = f"Editing failed: {type(e).__name__}: {str(e)}" operation_log.append({"operation": "error", "message": error_msg}) summary_stats["operations_failed"] += 1 return { "success": False, "error": error_msg, "file_path": str(path), "change_occurred": False, # Assume no change if error occurred mid-way "summary": summary_stats, "operations_log": operation_log } @mcp.tool( description=""" Write content to a new file or overwrite/append to an existing file. Parameters: - file_path: Path to the file to write. - content: Text content to write. - mode: 'w' to overwrite (default), 'a' to append. Parent directories will be created if needed. Examples: - Create/overwrite file: write_file("/path/to/new_file.log", "Log start") - Append to file: write_file("/path/to/existing.log", "New entry\\n", mode="a") Returns success status and error message if applicable. """, ) def write_file(file_path: str, content: str, mode: str = "w") -> dict: """Writes content to a file, creating directories if necessary.""" path = Path(file_path) # Validate parameters if not isinstance(content, str): return { "success": False, "error": f"Content must be a string, got {type(content).__name__}", "file_path": str(path) } if mode not in ['w', 'a']: return { "success": False, "error": "Invalid mode. Use 'w' (overwrite) or 'a' (append).", "file_path": str(path) } try: # Create parent directories if they don't exist path.parent.mkdir(parents=True, exist_ok=True) # Write using specified mode and UTF-8 encoding with open(path, mode, encoding='utf-8') as f: bytes_written = f.write(content) return { "success": True, "bytes_written": bytes_written, "file_path": str(path), "mode": mode } except PermissionError as e: return { "success": False, "error": f"Permission denied: {str(e)}", "file_path": str(path) } except Exception as e: return { "success": False, "error": f"Error writing to file {path}: {type(e).__name__}: {str(e)}", "file_path": str(path) } @mcp.tool( description=""" Fetches the text content of a web page URL. Uses Chromium (headless) to render the page to PDF, then extracts text using 'marker'. Handles temporary file creation and cleanup. Best effort for complex pages. Parameters: - url: The URL to fetch (e.g., "https://www.google.com"). Returns the extracted text content as a string, or an error message. """, ) def fetch_page(url: str) -> str: """Fetches web page text via Chromium PDF conversion and marker text extraction.""" # Validate URL basic structure if not url.startswith(('http://', 'https://')): return "Error: Invalid URL provided. Must start with http:// or https://." # Find an available browser command browser_cmd = _find_browser_command() if not browser_cmd: return "Error: No compatible browser found. Please install Chromium or Chrome." # Use a secure temporary directory try: tmp_dir = tempfile.TemporaryDirectory() temp_pdf_path = Path(tmp_dir.name) / "page.pdf" except Exception as e: return f"Error: Could not create temporary directory for PDF conversion: {type(e).__name__}: {str(e)}" try: # Build the command for the browser command = [ browser_cmd, "--headless", "--disable-gpu", "--no-sandbox", # Often needed in containerized environments "--disable-dev-shm-usage", # Mitigates issues with limited /dev/shm size "--disable-software-rasterizer", "--disable-features=VizDisplayCompositor", "--timeout=30000", # 30 second timeout for rendering f"--print-to-pdf={str(temp_pdf_path)}", url ] # Execute browser command result = subprocess.run( command, capture_output=True, text=True, timeout=45 # Overall timeout slightly longer than internal browser timeout ) if result.returncode != 0: # Provide more informative error if available error_detail = result.stderr.strip() if not error_detail: error_detail = result.stdout.strip() # Sometimes errors go to stdout return f"Error fetching page with {browser_cmd} (code {result.returncode}): {error_detail or 'No output'}" # Check if PDF was actually created and has size if not temp_pdf_path.exists() or temp_pdf_path.stat().st_size == 0: error_detail = result.stderr.strip() if not error_detail: error_detail = result.stdout.strip() return f"Error: Browser ran but did not create a valid PDF file. Output: {error_detail or 'No output'}" # Extract text using marker (suppress stdout/stderr) with io.StringIO() as buf, contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf): try: converter = PdfConverter(artifact_dict=create_model_dict()) rendered = converter(str(temp_pdf_path)) text_content, _, _ = text_from_rendered(rendered) return text_content if text_content else "Error: No text content extracted." except Exception as marker_e: return f"Error during text extraction: {type(marker_e).__name__}: {str(marker_e)}" except subprocess.TimeoutExpired: return f"Error: Browser process timed out fetching URL: {url}" except FileNotFoundError: return f"Error: '{browser_cmd}' command not found despite earlier check. This is unexpected." except Exception as e: return f"Error during page fetch process: {type(e).__name__}: {str(e)}" finally: # Clean up the temporary directory if 'tmp_dir' in locals() and isinstance(tmp_dir, tempfile.TemporaryDirectory): tmp_dir.cleanup()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ZbigniewTomanek/my-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server