Skip to main content
Glama

Enhanced File Operations MCP Server

by SkippyzNJiff
main.py65.1 kB
import os import subprocess import shutil import glob import fnmatch import hashlib import zipfile import tarfile import json import csv import re import time import threading import psutil import platform from pathlib import Path from datetime import datetime, timedelta from mcp.server.fastmcp import FastMCP import paramiko import socket import stat from io import StringIO import logging import sys # SSH Connection management ssh_connections = {} # Dictionary to store active SSH connections sftp_connections = {} # Dictionary to store active SFTP connections # — configure root logger to DEBUG, log to both console and server.log logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(levelname)s %(name)s: %(message)s", handlers=[ logging.FileHandler("server.log", encoding="utf-8"), logging.StreamHandler() ] ) mcp = FastMCP("ultimate-file-ops") BASE_DIR = os.getcwd() CURRENT_DIR = BASE_DIR # Track current working directory def resolve_path(path: str, allow_absolute: bool = False) -> str: """Resolve and validate file paths""" try: if allow_absolute and os.path.isabs(path): return os.path.abspath(path) # Use current directory instead of base directory full = os.path.abspath(os.path.join(CURRENT_DIR, path)) return full except Exception: return path def safe_resolve_path(path: str) -> str: """Resolve path with base directory restriction (for sensitive operations)""" try: full = os.path.abspath(os.path.join(BASE_DIR, path)) if not full.startswith(BASE_DIR): raise ValueError("Access outside base directory is prohibited") return full except Exception: raise ValueError(f"Invalid path: {path}") def format_bytes(bytes_value): """Format bytes to human readable format""" if bytes_value is None or bytes_value < 0: return "0 B" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes_value < 1024.0: return f"{bytes_value:.2f} {unit}" bytes_value /= 1024.0 return f"{bytes_value:.2f} PB" def get_ssh_connection(connection_id: str): """Get SSH connection by ID""" if connection_id not in ssh_connections: raise ValueError(f"SSH connection '{connection_id}' not found. Use ssh_connect first.") return ssh_connections[connection_id] def get_sftp_connection(connection_id: str): """Get SFTP connection by ID""" if connection_id not in sftp_connections: # Try to create SFTP from existing SSH connection if connection_id in ssh_connections: ssh_client = ssh_connections[connection_id] sftp_connections[connection_id] = ssh_client.open_sftp() else: raise ValueError(f"SSH connection '{connection_id}' not found. Use ssh_connect first.") return sftp_connections[connection_id] # ================================ # NAVIGATION & DIRECTORY TOOLS # ================================ @mcp.tool() async def get_current_directory() -> str: """Get the current working directory""" return f"Current directory: {CURRENT_DIR}" @mcp.tool() async def change_directory(path: str) -> str: """Change the current working directory to any location""" global CURRENT_DIR try: if path == "..": new_dir = os.path.dirname(CURRENT_DIR) elif path == ".": new_dir = CURRENT_DIR elif os.path.isabs(path): new_dir = os.path.abspath(path) else: new_dir = os.path.abspath(os.path.join(CURRENT_DIR, path)) if not os.path.exists(new_dir): return f"Error: Directory '{path}' does not exist" if not os.path.isdir(new_dir): return f"Error: Path '{path}' is not a directory" CURRENT_DIR = new_dir return f"Changed directory to: {CURRENT_DIR}" except Exception as e: return f"Error: {e}" @mcp.tool() async def list_dir(path: str = ".", detailed: bool = False, sort_by: str = "name") -> str: """List contents of a directory with optional detailed info and sorting""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: Path '{path}' does not exist" if not os.path.isdir(target): return f"Error: Path '{path}' is not a directory" items = [] for item in os.listdir(target): item_path = os.path.join(target, item) try: stat_info = os.stat(item_path) item_info = { 'name': item, 'path': item_path, 'is_dir': os.path.isdir(item_path), 'size': stat_info.st_size, 'modified': stat_info.st_mtime } items.append(item_info) except (OSError, IOError): # Skip files we can't access continue # Sort items if sort_by == "size": items.sort(key=lambda x: x['size'], reverse=True) elif sort_by == "modified": items.sort(key=lambda x: x['modified'], reverse=True) else: # name items.sort(key=lambda x: x['name'].lower()) if not items: return "Empty directory" result = [f"Contents of {target}:"] for item in items: if item['is_dir']: if detailed: mod_time = time.ctime(item['modified']) result.append(f"[DIR] {item['name']}/ (modified: {mod_time})") else: result.append(f"[DIR] {item['name']}/") else: if detailed: size_str = format_bytes(item['size']) mod_time = time.ctime(item['modified']) result.append(f"[FILE] {item['name']} ({size_str}, modified: {mod_time})") else: result.append(f"[FILE] {item['name']} ({format_bytes(item['size'])})") return "\n".join(result) except Exception as e: return f"Error: {e}" @mcp.tool() async def create_directory(path: str) -> str: """Create a directory (including parent directories if needed)""" target = resolve_path(path, allow_absolute=True) try: os.makedirs(target, exist_ok=True) return f"Successfully created directory {target}" except Exception as e: return f"Error: {e}" @mcp.tool() async def delete_directory(path: str, recursive: bool = False) -> str: """Delete a directory (use recursive=True to delete non-empty directories)""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: Directory '{path}' does not exist" if not os.path.isdir(target): return f"Error: Path '{path}' is not a directory" if recursive: shutil.rmtree(target) return f"Successfully deleted directory {target} and all contents" else: os.rmdir(target) return f"Successfully deleted empty directory {target}" except OSError as e: if e.errno == 39: # Directory not empty return f"Error: Directory '{path}' is not empty. Use recursive=True to delete non-empty directories" return f"Error: {e}" except Exception as e: return f"Error: {e}" # ================================ # FILE OPERATIONS # ================================ @mcp.tool() async def read_file(path: str, encoding: str = "utf-8", lines_limit: int = None) -> str: """Read the contents of a file with optional encoding and line limit""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" with open(target, "r", encoding=encoding, errors="replace") as f: if lines_limit: lines = [] for i, line in enumerate(f): if i >= lines_limit: lines.append(f"... (truncated after {lines_limit} lines)") break lines.append(line.rstrip()) content = "\n".join(lines) else: content = f.read() return f"Contents of {target}:\n{content}" except Exception as e: return f"Error: {e}" @mcp.tool() async def read_file_bytes(path: str, start: int = 0, length: int = None) -> str: """Read raw bytes from a file (useful for binary files)""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" with open(target, "rb") as f: f.seek(start) if length: data = f.read(length) else: data = f.read() # Convert to hex representation hex_data = data.hex() return f"Binary data from {target} (start: {start}, length: {len(data)} bytes):\n{hex_data}" except Exception as e: return f"Error: {e}" @mcp.tool() async def write_file(path: str, content: str, encoding: str = "utf-8") -> str: """Write content to a file (overwrites existing content)""" target = resolve_path(path, allow_absolute=True) try: # Create directory if it doesn't exist directory = os.path.dirname(target) if directory: os.makedirs(directory, exist_ok=True) with open(target, "w", encoding=encoding) as f: f.write(content) return f"Successfully wrote to {target}" except Exception as e: return f"Error: {e}" @mcp.tool() async def append_file(path: str, content: str, encoding: str = "utf-8") -> str: """Append content to a file""" target = resolve_path(path, allow_absolute=True) try: # Create directory if it doesn't exist os.makedirs(os.path.dirname(target), exist_ok=True) with open(target, "a", encoding=encoding) as f: f.write(content) return f"Successfully appended to {target}" except Exception as e: return f"Error: {e}" @mcp.tool() async def delete_file(path: str) -> str: """Delete a file""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" os.remove(target) return f"Successfully deleted {target}" except Exception as e: return f"Error: {e}" @mcp.tool() async def edit_file_lines(path: str, start: int, end: int, new_text: str) -> str: """Edit specific lines in a file (0-indexed, end is exclusive)""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" with open(target, "r", encoding="utf-8") as f: lines = f.readlines() if start < 0 or start >= len(lines): return f"Error: Start line {start} is out of range (file has {len(lines)} lines)" if end < start or end > len(lines): return f"Error: End line {end} is invalid (start: {start}, file has {len(lines)} lines)" # Replace the specified lines new_lines = new_text.splitlines(keepends=True) lines[start:end] = new_lines with open(target, "w", encoding="utf-8") as f: f.writelines(lines) return f"Successfully edited lines {start}-{end-1} in {target}" except Exception as e: return f"Error: {e}" # ================================ # FILE MANAGEMENT # ================================ @mcp.tool() async def copy_file(source: str, destination: str, preserve_metadata: bool = True) -> str: """Copy a file from source to destination""" src = resolve_path(source, allow_absolute=True) dst = resolve_path(destination, allow_absolute=True) try: if not os.path.exists(src): return f"Error: Source file '{source}' does not exist" if not os.path.isfile(src): return f"Error: Source '{source}' is not a file" # Create destination directory if it doesn't exist os.makedirs(os.path.dirname(dst), exist_ok=True) if preserve_metadata: shutil.copy2(src, dst) else: shutil.copy(src, dst) return f"Successfully copied {src} to {dst}" except Exception as e: return f"Error: {e}" @mcp.tool() async def copy_directory(source: str, destination: str) -> str: """Copy a directory from source to destination""" src = resolve_path(source, allow_absolute=True) dst = resolve_path(destination, allow_absolute=True) try: if not os.path.exists(src): return f"Error: Source directory '{source}' does not exist" if not os.path.isdir(src): return f"Error: Source '{source}' is not a directory" shutil.copytree(src, dst, dirs_exist_ok=True) return f"Successfully copied directory {src} to {dst}" except Exception as e: return f"Error: {e}" @mcp.tool() async def move_file(source: str, destination: str) -> str: """Move/rename a file or directory from source to destination""" src = resolve_path(source, allow_absolute=True) dst = resolve_path(destination, allow_absolute=True) try: if not os.path.exists(src): return f"Error: Source '{source}' does not exist" # Create destination directory if it doesn't exist if os.path.isfile(src): os.makedirs(os.path.dirname(dst), exist_ok=True) shutil.move(src, dst) return f"Successfully moved {src} to {dst}" except Exception as e: return f"Error: {e}" @mcp.tool() async def get_file_info(path: str) -> str: """Get detailed information about a file or directory""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: Path '{path}' does not exist" stat = os.stat(target) file_type = "Directory" if os.path.isdir(target) else "File" size = stat.st_size created_time = time.ctime(stat.st_ctime) modified_time = time.ctime(stat.st_mtime) accessed_time = time.ctime(stat.st_atime) # Get permissions (Windows format) permissions = oct(stat.st_mode)[-3:] result = f"""Path: {target} Type: {file_type} Size: {format_bytes(size)} ({size} bytes) Permissions: {permissions} Created: {created_time} Modified: {modified_time} Accessed: {accessed_time}""" if os.path.isfile(target): # Add file extension info _, ext = os.path.splitext(target) result += f"\nExtension: {ext if ext else 'None'}" # Add file hash with open(target, 'rb') as f: content = f.read() md5_hash = hashlib.md5(content).hexdigest() sha256_hash = hashlib.sha256(content).hexdigest() result += f"\nMD5: {md5_hash}" result += f"\nSHA256: {sha256_hash}" return result except Exception as e: return f"Error: {e}" # ================================ # SEARCH & DISCOVERY # ================================ @mcp.tool() async def search_files(pattern: str, search_path: str = ".", recursive: bool = True, case_sensitive: bool = False) -> str: """Search for files matching a pattern (supports wildcards like *.txt, *.py)""" search_dir = resolve_path(search_path, allow_absolute=True) try: if not os.path.exists(search_dir): return f"Error: Search path '{search_path}' does not exist" if not os.path.isdir(search_dir): return f"Error: Search path '{search_path}' is not a directory" matches = [] if recursive: for root, dirs, files in os.walk(search_dir): for file in files: if case_sensitive: match = fnmatch.fnmatch(file, pattern) else: match = fnmatch.fnmatch(file.lower(), pattern.lower()) if match: matches.append(os.path.join(root, file)) else: for file in os.listdir(search_dir): file_path = os.path.join(search_dir, file) if os.path.isfile(file_path): if case_sensitive: match = fnmatch.fnmatch(file, pattern) else: match = fnmatch.fnmatch(file.lower(), pattern.lower()) if match: matches.append(file_path) if not matches: return f"No files found matching pattern '{pattern}' in {search_dir}" return f"Found {len(matches)} files matching '{pattern}':\n" + "\n".join(matches) except Exception as e: return f"Error: {e}" @mcp.tool() async def find_in_files(search_text: str, file_pattern: str = "*", search_path: str = ".", recursive: bool = True, case_sensitive: bool = False, line_numbers: bool = True) -> str: """Search for text content within files""" search_dir = resolve_path(search_path, allow_absolute=True) try: if not os.path.exists(search_dir): return f"Error: Search path '{search_path}' does not exist" matches = [] file_count = 0 search_func = search_text if case_sensitive else search_text.lower() if recursive: for root, dirs, files in os.walk(search_dir): for file in files: if fnmatch.fnmatch(file, file_pattern): file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: file_count += 1 for line_num, line in enumerate(f, 1): line_to_search = line if case_sensitive else line.lower() if search_func in line_to_search: if line_numbers: matches.append(f"{file_path}:{line_num}: {line.strip()}") else: matches.append(f"{file_path}: {line.strip()}") except: continue else: for file in os.listdir(search_dir): file_path = os.path.join(search_dir, file) if os.path.isfile(file_path) and fnmatch.fnmatch(file, file_pattern): try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: file_count += 1 for line_num, line in enumerate(f, 1): line_to_search = line if case_sensitive else line.lower() if search_func in line_to_search: if line_numbers: matches.append(f"{file_path}:{line_num}: {line.strip()}") else: matches.append(f"{file_path}: {line.strip()}") except: continue if not matches: return f"Text '{search_text}' not found in {file_count} files searched" return f"Found '{search_text}' in {len(matches)} locations:\n" + "\n".join(matches) except Exception as e: return f"Error: {e}" @mcp.tool() async def find_files_by_size(min_size: int = 0, max_size: int = None, search_path: str = ".", recursive: bool = True) -> str: """Find files within a specific size range (in bytes)""" search_dir = resolve_path(search_path, allow_absolute=True) try: if not os.path.exists(search_dir): return f"Error: Search path '{search_path}' does not exist" matches = [] if recursive: for root, dirs, files in os.walk(search_dir): for file in files: file_path = os.path.join(root, file) try: size = os.path.getsize(file_path) if size >= min_size and (max_size is None or size <= max_size): matches.append((file_path, size)) except: continue else: for file in os.listdir(search_dir): file_path = os.path.join(search_dir, file) if os.path.isfile(file_path): try: size = os.path.getsize(file_path) if size >= min_size and (max_size is None or size <= max_size): matches.append((file_path, size)) except: continue if not matches: return f"No files found in size range {format_bytes(min_size)} - {format_bytes(max_size) if max_size else 'unlimited'}" # Sort by size (largest first) matches.sort(key=lambda x: x[1], reverse=True) result = [f"Found {len(matches)} files in size range:"] for file_path, size in matches: result.append(f"{file_path} ({format_bytes(size)})") return "\n".join(result) except Exception as e: return f"Error: {e}" @mcp.tool() async def find_duplicate_files(search_path: str = ".", recursive: bool = True) -> str: """Find duplicate files based on content hash""" search_dir = resolve_path(search_path, allow_absolute=True) try: if not os.path.exists(search_dir): return f"Error: Search path '{search_path}' does not exist" file_hashes = {} if recursive: for root, dirs, files in os.walk(search_dir): for file in files: file_path = os.path.join(root, file) try: with open(file_path, 'rb') as f: content = f.read() file_hash = hashlib.md5(content).hexdigest() if file_hash not in file_hashes: file_hashes[file_hash] = [] file_hashes[file_hash].append(file_path) except: continue duplicates = {h: paths for h, paths in file_hashes.items() if len(paths) > 1} if not duplicates: return "No duplicate files found" result = [f"Found {len(duplicates)} sets of duplicate files:"] for file_hash, paths in duplicates.items(): result.append(f"\nHash {file_hash}:") for path in paths: size = os.path.getsize(path) result.append(f" - {path} ({format_bytes(size)})") return "\n".join(result) except Exception as e: return f"Error: {e}" # ================================ # COMPRESSION & ARCHIVING # ================================ @mcp.tool() async def create_zip(archive_path: str, files_and_dirs: list, compression_level: int = 6) -> str: """Create a ZIP archive from files and directories""" archive_target = resolve_path(archive_path, allow_absolute=True) try: os.makedirs(os.path.dirname(archive_target), exist_ok=True) with zipfile.ZipFile(archive_target, 'w', zipfile.ZIP_DEFLATED, compresslevel=compression_level) as zipf: for item in files_and_dirs: item_path = resolve_path(item, allow_absolute=True) if not os.path.exists(item_path): return f"Error: Path '{item}' does not exist" if os.path.isfile(item_path): zipf.write(item_path, os.path.basename(item_path)) elif os.path.isdir(item_path): for root, dirs, files in os.walk(item_path): for file in files: file_path = os.path.join(root, file) arc_name = os.path.relpath(file_path, os.path.dirname(item_path)) zipf.write(file_path, arc_name) return f"Successfully created ZIP archive: {archive_target}" except Exception as e: return f"Error: {e}" @mcp.tool() async def extract_zip(archive_path: str, extract_to: str = ".") -> str: """Extract a ZIP archive""" archive_target = resolve_path(archive_path, allow_absolute=True) extract_target = resolve_path(extract_to, allow_absolute=True) try: if not os.path.exists(archive_target): return f"Error: Archive '{archive_path}' does not exist" os.makedirs(extract_target, exist_ok=True) with zipfile.ZipFile(archive_target, 'r') as zipf: zipf.extractall(extract_target) return f"Successfully extracted {archive_target} to {extract_target}" except Exception as e: return f"Error: {e}" @mcp.tool() async def list_zip_contents(archive_path: str) -> str: """List contents of a ZIP archive""" archive_target = resolve_path(archive_path, allow_absolute=True) try: if not os.path.exists(archive_target): return f"Error: Archive '{archive_path}' does not exist" with zipfile.ZipFile(archive_target, 'r') as zipf: result = [f"Contents of {archive_target}:"] for info in zipf.filelist: size = info.file_size compressed_size = info.compress_size compression_ratio = (1 - compressed_size / size) * 100 if size > 0 else 0 result.append(f" {info.filename} ({format_bytes(size)}, compressed: {format_bytes(compressed_size)}, {compression_ratio:.1f}% compression)") return "\n".join(result) except Exception as e: return f"Error: {e}" # ================================ # FILE HASHING & CHECKSUMS # ================================ @mcp.tool() async def calculate_file_hash(path: str, algorithm: str = "sha256") -> str: """Calculate hash of a file (md5, sha1, sha256, sha512)""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" hash_algo = getattr(hashlib, algorithm.lower(), None) if not hash_algo: return f"Error: Unsupported hash algorithm '{algorithm}'" hasher = hash_algo() with open(target, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) return f"{algorithm.upper()} hash of {target}: {hasher.hexdigest()}" except Exception as e: return f"Error: {e}" @mcp.tool() async def verify_file_hash(path: str, expected_hash: str, algorithm: str = "sha256") -> str: """Verify a file's hash against an expected value""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" hash_algo = getattr(hashlib, algorithm.lower(), None) if not hash_algo: return f"Error: Unsupported hash algorithm '{algorithm}'" hasher = hash_algo() with open(target, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) actual_hash = hasher.hexdigest() if actual_hash.lower() == expected_hash.lower(): return f"✓ Hash verification PASSED for {target}" else: return f"✗ Hash verification FAILED for {target}\nExpected: {expected_hash}\nActual: {actual_hash}" except Exception as e: return f"Error: {e}" # ================================ # FILE COMPARISON # ================================ @mcp.tool() async def compare_files(file1: str, file2: str, mode: str = "binary") -> str: """Compare two files (binary or text mode)""" file1_target = resolve_path(file1, allow_absolute=True) file2_target = resolve_path(file2, allow_absolute=True) try: if not os.path.exists(file1_target): return f"Error: File '{file1}' does not exist" if not os.path.exists(file2_target): return f"Error: File '{file2}' does not exist" if mode == "binary": with open(file1_target, 'rb') as f1, open(file2_target, 'rb') as f2: content1 = f1.read() content2 = f2.read() if content1 == content2: return f"Files are identical (binary comparison)" else: return f"Files are different (binary comparison)\nFile1 size: {len(content1)} bytes\nFile2 size: {len(content2)} bytes" elif mode == "text": with open(file1_target, 'r', encoding='utf-8', errors='ignore') as f1: lines1 = f1.readlines() with open(file2_target, 'r', encoding='utf-8', errors='ignore') as f2: lines2 = f2.readlines() if lines1 == lines2: return f"Files are identical (text comparison)" # Find differences import difflib diff = list(difflib.unified_diff(lines1, lines2, fromfile=file1, tofile=file2, lineterm='')) if len(diff) > 100: # Limit output for large diffs diff = diff[:100] + ["... (truncated, too many differences)"] return f"Files are different (text comparison):\n" + "\n".join(diff) except Exception as e: return f"Error: {e}" # ================================ # SYSTEM INFORMATION # ================================ @mcp.tool() async def get_system_info() -> str: """Get system information""" try: info = { 'Platform': platform.platform(), 'System': platform.system(), 'Release': platform.release(), 'Version': platform.version(), 'Machine': platform.machine(), 'Processor': platform.processor(), 'Python Version': platform.python_version(), } # Memory info memory = psutil.virtual_memory() info['Total Memory'] = format_bytes(memory.total) info['Available Memory'] = format_bytes(memory.available) info['Memory Usage'] = f"{memory.percent}%" # Disk info disk = psutil.disk_usage('/') info['Total Disk Space'] = format_bytes(disk.total) info['Free Disk Space'] = format_bytes(disk.free) info['Disk Usage'] = f"{(disk.used / disk.total * 100):.1f}%" # CPU info info['CPU Count'] = psutil.cpu_count() info['CPU Usage'] = f"{psutil.cpu_percent(interval=1)}%" result = ["System Information:"] for key, value in info.items(): result.append(f"{key}: {value}") return "\n".join(result) except Exception as e: return f"Error: {e}" @mcp.tool() async def get_disk_usage(path: str = ".") -> str: """Get disk usage information for a path""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: Path '{path}' does not exist" if os.path.isfile(target): size = os.path.getsize(target) return f"File size: {format_bytes(size)} ({size} bytes)" total_size = 0 file_count = 0 dir_count = 0 for root, dirs, files in os.walk(target): dir_count += len(dirs) for file in files: file_count += 1 file_path = os.path.join(root, file) try: total_size += os.path.getsize(file_path) except: continue return f"""Directory: {target} Total size: {format_bytes(total_size)} ({total_size} bytes) Files: {file_count} Subdirectories: {dir_count}""" except Exception as e: return f"Error: {e}" @mcp.tool() async def get_environment_variables(filter_pattern: str = None) -> str: """Get environment variables, optionally filtered by pattern""" try: env_vars = dict(os.environ) if filter_pattern: filtered_vars = {k: v for k, v in env_vars.items() if fnmatch.fnmatch(k.lower(), filter_pattern.lower())} env_vars = filtered_vars if not env_vars: return f"No environment variables found" + (f" matching '{filter_pattern}'" if filter_pattern else "") result = [f"Environment Variables" + (f" (filtered by '{filter_pattern}')" if filter_pattern else "") + ":"] for key, value in sorted(env_vars.items()): result.append(f"{key}={value}") return "\n".join(result) except Exception as e: return f"Error: {e}" @mcp.tool() async def set_environment_variable(name: str, value: str) -> str: """Set an environment variable for the current session""" try: os.environ[name] = value return f"Successfully set environment variable {name}={value}" except Exception as e: return f"Error: {e}" # ================================ # PROCESS MANAGEMENT # ================================ @mcp.tool() async def list_processes(filter_name: str = None) -> str: """List running processes, optionally filtered by name""" try: processes = [] for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'status']): try: if filter_name and filter_name.lower() not in proc.info['name'].lower(): continue processes.append(proc.info) except (psutil.NoSuchProcess, psutil.AccessDenied): continue if not processes: return f"No processes found" + (f" matching '{filter_name}'" if filter_name else "") # Sort by CPU usage processes.sort(key=lambda x: x['cpu_percent'] or 0, reverse=True) result = [f"Running Processes" + (f" (filtered by '{filter_name}')" if filter_name else "") + ":"] result.append("PID\tNAME\t\t\tCPU%\tMEM%\tSTATUS") result.append("-" * 60) for proc in processes[:20]: # Limit to top 20 name = (proc['name'][:15] + '...') if len(proc['name']) > 18 else proc['name'] result.append(f"{proc['pid']}\t{name:<18}\t{proc['cpu_percent']:.1f}\t{proc['memory_percent']:.1f}\t{proc['status']}") if len(processes) > 20: result.append(f"... and {len(processes) - 20} more processes") return "\n".join(result) except Exception as e: return f"Error: {e}" # ================================ # SYSTEM COMMANDS # ================================ @mcp.tool() async def run_powershell(cmd: str, timeout: int = 30) -> str: """Execute a PowerShell command (use with caution)""" try: proc = subprocess.run( ["powershell", "-Command", cmd], capture_output=True, text=True, shell=False, timeout=timeout, cwd=CURRENT_DIR # Run in current directory ) output = f"Executed in: {CURRENT_DIR}\nCommand: {cmd}\n\n" if proc.stdout: output += f"STDOUT:\n{proc.stdout.strip()}\n" if proc.stderr: output += f"STDERR:\n{proc.stderr.strip()}\n" output += f"\nExit code: {proc.returncode}" return output or f"Command completed with exit code {proc.returncode}" except subprocess.TimeoutExpired: return f"Error: Command timed out after {timeout} seconds" except Exception as e: return f"Error: {e}" @mcp.tool() async def run_command(cmd: str, shell: bool = False, timeout: int = 30) -> str: """Execute a system command (cmd, bash, etc.)""" try: if shell: proc = subprocess.run( cmd, capture_output=True, text=True, shell=True, timeout=timeout, cwd=CURRENT_DIR ) else: proc = subprocess.run( cmd.split(), capture_output=True, text=True, shell=False, timeout=timeout, cwd=CURRENT_DIR ) output = f"Executed in: {CURRENT_DIR}\nCommand: {cmd}\n\n" if proc.stdout: output += f"STDOUT:\n{proc.stdout.strip()}\n" if proc.stderr: output += f"STDERR:\n{proc.stderr.strip()}\n" output += f"\nExit code: {proc.returncode}" return output or f"Command completed with exit code {proc.returncode}" except subprocess.TimeoutExpired: return f"Error: Command timed out after {timeout} seconds" except Exception as e: return f"Error: {e}" # ================================ # TEXT & DATA PROCESSING # ================================ @mcp.tool() async def count_lines_words_chars(path: str) -> str: """Count lines, words, and characters in a text file""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" with open(target, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() lines = content.count('\n') + (1 if content and not content.endswith('\n') else 0) words = len(content.split()) chars = len(content) chars_no_spaces = len(content.replace(' ', '').replace('\n', '').replace('\t', '')) return f"""Text analysis for {target}: Lines: {lines} Words: {words} Characters (total): {chars} Characters (no whitespace): {chars_no_spaces} File size: {format_bytes(len(content.encode('utf-8')))}""" except Exception as e: return f"Error: {e}" @mcp.tool() async def replace_in_file(path: str, search_pattern: str, replacement: str, use_regex: bool = False, backup: bool = True) -> str: """Replace text in a file with optional regex support and backup""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" with open(target, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Create backup if requested if backup: backup_path = target + '.bak' shutil.copy2(target, backup_path) if use_regex: import re new_content = re.sub(search_pattern, replacement, content) count = len(re.findall(search_pattern, content)) else: new_content = content.replace(search_pattern, replacement) count = content.count(search_pattern) with open(target, 'w', encoding='utf-8') as f: f.write(new_content) result = f"Successfully replaced {count} occurrences in {target}" if backup: result += f"\nBackup created: {backup_path}" return result except Exception as e: return f"Error: {e}" @mcp.tool() async def sort_file_lines(path: str, reverse: bool = False, numeric: bool = False, remove_duplicates: bool = False) -> str: """Sort lines in a text file""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" with open(target, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() # Remove newlines for sorting, but remember to add them back clean_lines = [line.rstrip('\n\r') for line in lines] if remove_duplicates: clean_lines = list(dict.fromkeys(clean_lines)) # Preserve order while removing duplicates if numeric: # Try to sort numerically try: clean_lines.sort(key=lambda x: float(x.split()[0]) if x.split() else 0, reverse=reverse) except: clean_lines.sort(reverse=reverse) # Fall back to string sort else: clean_lines.sort(reverse=reverse) # Add newlines back sorted_lines = [line + '\n' for line in clean_lines] with open(target, 'w', encoding='utf-8') as f: f.writelines(sorted_lines) return f"Successfully sorted {len(sorted_lines)} lines in {target}" except Exception as e: return f"Error: {e}" # ================================ # ADVANCED TOOLS # ================================ @mcp.tool() async def create_file_from_template(template_path: str, output_path: str, replacements: dict) -> str: """Create a file from a template with variable replacements""" template_target = resolve_path(template_path, allow_absolute=True) output_target = resolve_path(output_path, allow_absolute=True) try: if not os.path.exists(template_target): return f"Error: Template file '{template_path}' does not exist" with open(template_target, 'r', encoding='utf-8') as f: content = f.read() # Replace variables in format {{variable_name}} for key, value in replacements.items(): content = content.replace(f"{{{{{key}}}}}", str(value)) # Create output directory if needed os.makedirs(os.path.dirname(output_target), exist_ok=True) with open(output_target, 'w', encoding='utf-8') as f: f.write(content) return f"Successfully created {output_target} from template {template_target}" except Exception as e: return f"Error: {e}" @mcp.tool() async def split_file(path: str, chunk_size: int, unit: str = "lines") -> str: """Split a file into smaller chunks (by lines or bytes)""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: File '{path}' does not exist" if not os.path.isfile(target): return f"Error: Path '{path}' is not a file" base_name = os.path.splitext(target)[0] ext = os.path.splitext(target)[1] files_created = [] if unit == "lines": with open(target, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() for i in range(0, len(lines), chunk_size): chunk_lines = lines[i:i + chunk_size] chunk_file = f"{base_name}_part{i//chunk_size + 1:03d}{ext}" with open(chunk_file, 'w', encoding='utf-8') as cf: cf.writelines(chunk_lines) files_created.append(chunk_file) elif unit == "bytes": with open(target, 'rb') as f: part_num = 1 while True: chunk = f.read(chunk_size) if not chunk: break chunk_file = f"{base_name}_part{part_num:03d}{ext}" with open(chunk_file, 'wb') as cf: cf.write(chunk) files_created.append(chunk_file) part_num += 1 return f"Successfully split {target} into {len(files_created)} files:\n" + "\n".join(files_created) except Exception as e: return f"Error: {e}" @mcp.tool() async def merge_files(file_list: list, output_path: str, separator: str = "") -> str: """Merge multiple files into one""" output_target = resolve_path(output_path, allow_absolute=True) try: os.makedirs(os.path.dirname(output_target), exist_ok=True) with open(output_target, 'w', encoding='utf-8') as outf: for i, file_path in enumerate(file_list): file_target = resolve_path(file_path, allow_absolute=True) if not os.path.exists(file_target): return f"Error: File '{file_path}' does not exist" with open(file_target, 'r', encoding='utf-8', errors='ignore') as inf: content = inf.read() outf.write(content) if separator and i < len(file_list) - 1: outf.write(separator) return f"Successfully merged {len(file_list)} files into {output_target}" except Exception as e: return f"Error: {e}" @mcp.tool() async def watch_file_changes(path: str, duration: int = 10) -> str: """Watch a file for changes for a specified duration (seconds)""" target = resolve_path(path, allow_absolute=True) try: if not os.path.exists(target): return f"Error: Path '{path}' does not exist" initial_stat = os.stat(target) initial_mtime = initial_stat.st_mtime initial_size = initial_stat.st_size changes = [] start_time = time.time() while time.time() - start_time < duration: try: current_stat = os.stat(target) current_mtime = current_stat.st_mtime current_size = current_stat.st_size if current_mtime != initial_mtime or current_size != initial_size: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") change_info = f"[{timestamp}] Modified - Size: {format_bytes(current_size)}" changes.append(change_info) initial_mtime = current_mtime initial_size = current_size time.sleep(0.5) # Check every 500ms except FileNotFoundError: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") changes.append(f"[{timestamp}] File deleted") break if not changes: return f"No changes detected in {target} during {duration} seconds" return f"Changes detected in {target}:\n" + "\n".join(changes) except Exception as e: return f"Error: {e}" # ================================ # SSH CONNECTION MANAGEMENT # ================================ @mcp.tool() async def ssh_connect(connection_id: str, hostname: str, username: str, password: str = None, private_key_path: str = None, port: int = 22, timeout: int = 10) -> str: """Connect to an SSH server. Use either password or private key authentication.""" try: # Close existing connection if it exists if connection_id in ssh_connections: ssh_connections[connection_id].close() if connection_id in sftp_connections: sftp_connections[connection_id].close() del sftp_connections[connection_id] # Create new SSH client ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Prepare authentication connect_kwargs = { 'hostname': hostname, 'port': port, 'username': username, 'timeout': timeout } if private_key_path: # Use private key authentication key_path = resolve_path(private_key_path, allow_absolute=True) if not os.path.exists(key_path): return f"Error: Private key file '{private_key_path}' does not exist" try: # Try different key types for key_class in [paramiko.RSAKey, paramiko.DSAKey, paramiko.ECDSAKey, paramiko.Ed25519Key]: try: private_key = key_class.from_private_key_file(key_path) connect_kwargs['pkey'] = private_key break except: continue else: return f"Error: Could not load private key from '{private_key_path}'" except Exception as e: return f"Error loading private key: {e}" elif password: connect_kwargs['password'] = password else: return "Error: Either password or private_key_path must be provided" # Connect ssh_client.connect(**connect_kwargs) ssh_connections[connection_id] = ssh_client # Test connection stdin, stdout, stderr = ssh_client.exec_command('echo "SSH connection successful"') result = stdout.read().decode().strip() return f"Successfully connected to {username}@{hostname}:{port} as '{connection_id}'\nTest result: {result}" except paramiko.AuthenticationException: return f"Error: Authentication failed for {username}@{hostname}" except paramiko.SSHException as e: return f"Error: SSH connection failed - {e}" except socket.timeout: return f"Error: Connection timeout to {hostname}:{port}" except Exception as e: return f"Error: {e}" @mcp.tool() async def ssh_disconnect(connection_id: str) -> str: """Disconnect from an SSH server""" try: if connection_id in sftp_connections: sftp_connections[connection_id].close() del sftp_connections[connection_id] if connection_id in ssh_connections: ssh_connections[connection_id].close() del ssh_connections[connection_id] return f"Successfully disconnected from '{connection_id}'" else: return f"No active connection found for '{connection_id}'" except Exception as e: return f"Error: {e}" @mcp.tool() async def ssh_list_connections() -> str: """List all active SSH connections""" if not ssh_connections: return "No active SSH connections" result = ["Active SSH connections:"] for conn_id, ssh_client in ssh_connections.items(): try: transport = ssh_client.get_transport() if transport and transport.is_active(): remote_addr = transport.getpeername() result.append(f" {conn_id}: {remote_addr[0]}:{remote_addr[1]} (active)") else: result.append(f" {conn_id}: (inactive)") except: result.append(f" {conn_id}: (status unknown)") return "\n".join(result) # ================================ # SSH COMMAND EXECUTION # ================================ @mcp.tool() async def ssh_exec(connection_id: str, command: str, timeout: int = 30, get_pty: bool = False) -> str: """Execute a command on the remote SSH server""" try: ssh_client = get_ssh_connection(connection_id) # Execute command stdin, stdout, stderr = ssh_client.exec_command(command, timeout=timeout, get_pty=get_pty) # Read output stdout_data = stdout.read().decode('utf-8', errors='replace') stderr_data = stderr.read().decode('utf-8', errors='replace') exit_code = stdout.channel.recv_exit_status() # Format output result = f"Command: {command}\nConnection: {connection_id}\n\n" if stdout_data: result += f"STDOUT:\n{stdout_data}\n" if stderr_data: result += f"STDERR:\n{stderr_data}\n" result += f"Exit code: {exit_code}" return result except Exception as e: return f"Error executing command on '{connection_id}': {e}" @mcp.tool() async def ssh_exec_interactive(connection_id: str, commands: list, timeout: int = 30) -> str: """Execute multiple commands in an interactive session""" try: ssh_client = get_ssh_connection(connection_id) # Create interactive shell shell = ssh_client.invoke_shell() shell.settimeout(timeout) output = StringIO() # Send commands for command in commands: shell.send(command + '\n') time.sleep(0.5) # Wait for command to process # Read available output while shell.recv_ready(): data = shell.recv(1024).decode('utf-8', errors='replace') output.write(data) # Send exit command shell.send('exit\n') time.sleep(0.5) # Read remaining output while shell.recv_ready(): data = shell.recv(1024).decode('utf-8', errors='replace') output.write(data) shell.close() result = f"Interactive session on '{connection_id}':\nCommands: {', '.join(commands)}\n\n" result += f"Output:\n{output.getvalue()}" return result except Exception as e: return f"Error in interactive session on '{connection_id}': {e}" @mcp.tool() async def ssh_get_system_info(connection_id: str) -> str: """Get system information from the remote server""" try: ssh_client = get_ssh_connection(connection_id) commands = { 'hostname': 'hostname', 'uptime': 'uptime', 'uname': 'uname -a', 'disk_usage': 'df -h', 'memory': 'free -h', 'cpu_info': 'cat /proc/cpuinfo | grep "model name" | head -1', 'load_average': 'cat /proc/loadavg', 'processes': 'ps aux --sort=-%cpu | head -10' } results = {} for name, cmd in commands.items(): try: stdin, stdout, stderr = ssh_client.exec_command(cmd, timeout=10) output = stdout.read().decode('utf-8', errors='replace').strip() if output: results[name] = output else: error = stderr.read().decode('utf-8', errors='replace').strip() results[name] = f"Error: {error}" if error else "No output" except: results[name] = "Command failed" result = [f"System Information for '{connection_id}':"] for name, output in results.items(): result.append(f"\n{name.upper()}:") result.append(output) return "\n".join(result) except Exception as e: return f"Error getting system info from '{connection_id}': {e}" # ================================ # SSH FILE OPERATIONS (SFTP) # ================================ @mcp.tool() async def ssh_list_remote_dir(connection_id: str, remote_path: str = ".", detailed: bool = False) -> str: """List contents of a remote directory via SFTP""" try: sftp = get_sftp_connection(connection_id) # List directory contents try: files = sftp.listdir_attr(remote_path) except FileNotFoundError: return f"Error: Remote directory '{remote_path}' does not exist" if not files: return f"Remote directory '{remote_path}' is empty" result = [f"Contents of remote directory '{remote_path}' on '{connection_id}':"] # Sort files by name files.sort(key=lambda x: x.filename) for file_attr in files: if detailed: # Format detailed info mode = file_attr.st_mode size = file_attr.st_size or 0 mtime = time.ctime(file_attr.st_mtime) if file_attr.st_mtime else "Unknown" file_type = "DIR " if stat.S_ISDIR(mode) else "FILE" result.append(f"[{file_type}] {file_attr.filename:<30} {format_bytes(size):<10} {mtime}") else: # Simple format if stat.S_ISDIR(file_attr.st_mode): result.append(f"[DIR] {file_attr.filename}/") else: size = file_attr.st_size or 0 result.append(f"[FILE] {file_attr.filename} ({format_bytes(size)})") return "\n".join(result) except Exception as e: return f"Error listing remote directory '{remote_path}' on '{connection_id}': {e}" @mcp.tool() async def ssh_read_remote_file(connection_id: str, remote_path: str, encoding: str = "utf-8", max_size: int = 1024*1024) -> str: """Read a remote file via SFTP""" try: sftp = get_sftp_connection(connection_id) # Check file size first try: file_attr = sftp.stat(remote_path) if file_attr.st_size > max_size: return f"Error: File size ({format_bytes(file_attr.st_size)}) exceeds maximum allowed size ({format_bytes(max_size)})" except FileNotFoundError: return f"Error: Remote file '{remote_path}' does not exist" # Read file with sftp.open(remote_path, 'r') as remote_file: content = remote_file.read().decode(encoding, errors='replace') return f"Contents of remote file '{remote_path}' on '{connection_id}':\n{content}" except Exception as e: return f"Error reading remote file '{remote_path}' on '{connection_id}': {e}" @mcp.tool() async def ssh_write_remote_file(connection_id: str, remote_path: str, content: str, encoding: str = "utf-8", backup: bool = False) -> str: """Write content to a remote file via SFTP""" try: sftp = get_sftp_connection(connection_id) # Create backup if requested and file exists backup_path = None if backup: try: sftp.stat(remote_path) # Check if file exists backup_path = f"{remote_path}.bak" sftp.rename(remote_path, backup_path) except FileNotFoundError: pass # File doesn't exist, no backup needed # Write file with sftp.open(remote_path, 'w') as remote_file: remote_file.write(content.encode(encoding)) result = f"Successfully wrote to remote file '{remote_path}' on '{connection_id}'" if backup_path: result += f"\nBackup created: {backup_path}" return result except Exception as e: return f"Error writing remote file '{remote_path}' on '{connection_id}': {e}" @mcp.tool() async def ssh_download_file(connection_id: str, remote_path: str, local_path: str = None) -> str: """Download a file from remote server to local machine""" try: sftp = get_sftp_connection(connection_id) # Determine local path if local_path is None: local_path = os.path.basename(remote_path) local_target = resolve_path(local_path, allow_absolute=True) # Create local directory if needed os.makedirs(os.path.dirname(local_target), exist_ok=True) # Download file sftp.get(remote_path, local_target) # Get file info local_size = os.path.getsize(local_target) return f"Successfully downloaded '{remote_path}' from '{connection_id}' to '{local_target}' ({format_bytes(local_size)})" except FileNotFoundError: return f"Error: Remote file '{remote_path}' does not exist on '{connection_id}'" except Exception as e: return f"Error downloading file '{remote_path}' from '{connection_id}': {e}" @mcp.tool() async def ssh_upload_file(connection_id: str, local_path: str, remote_path: str = None) -> str: """Upload a file from local machine to remote server""" try: sftp = get_sftp_connection(connection_id) # Check local file exists local_target = resolve_path(local_path, allow_absolute=True) if not os.path.exists(local_target): return f"Error: Local file '{local_path}' does not exist" # Determine remote path if remote_path is None: remote_path = os.path.basename(local_target) # Upload file sftp.put(local_target, remote_path) # Get file info local_size = os.path.getsize(local_target) return f"Successfully uploaded '{local_target}' to '{remote_path}' on '{connection_id}' ({format_bytes(local_size)})" except Exception as e: return f"Error uploading file '{local_path}' to '{connection_id}': {e}" @mcp.tool() async def ssh_create_remote_dir(connection_id: str, remote_path: str) -> str: """Create a directory on the remote server""" try: sftp = get_sftp_connection(connection_id) sftp.mkdir(remote_path) return f"Successfully created remote directory '{remote_path}' on '{connection_id}'" except Exception as e: return f"Error creating remote directory '{remote_path}' on '{connection_id}': {e}" @mcp.tool() async def ssh_delete_remote_file(connection_id: str, remote_path: str) -> str: """Delete a file on the remote server""" try: sftp = get_sftp_connection(connection_id) sftp.remove(remote_path) return f"Successfully deleted remote file '{remote_path}' on '{connection_id}'" except FileNotFoundError: return f"Error: Remote file '{remote_path}' does not exist on '{connection_id}'" except Exception as e: return f"Error deleting remote file '{remote_path}' on '{connection_id}': {e}" @mcp.tool() async def ssh_delete_remote_dir(connection_id: str, remote_path: str) -> str: """Delete a directory on the remote server""" try: sftp = get_sftp_connection(connection_id) sftp.rmdir(remote_path) return f"Successfully deleted remote directory '{remote_path}' on '{connection_id}'" except Exception as e: return f"Error deleting remote directory '{remote_path}' on '{connection_id}': {e}" @mcp.tool() async def health_check() -> str: """Check if the MCP server is running properly""" return f"MCP server is running. Current directory: {CURRENT_DIR}" if __name__ == "__main__": logging.debug("Starting enhanced-file-ops MCP server") def run_mcp(): """Run the MCP server""" logging.debug("MCP thread starting (stdio transport)...") try: mcp.run(transport="stdio") except Exception as e: logging.exception("Unhandled exception inside MCP server") sys.exit(1) # Start the MCP server in the background server_thread = threading.Thread(target=run_mcp, daemon=True) server_thread.start() logging.info("MCP server thread launched successfully") # Keep the main thread alive until interrupted try: while server_thread.is_alive(): time.sleep(1) except KeyboardInterrupt: logging.info("KeyboardInterrupt received; shutting down enhanced-file-ops server...") sys.exit(0)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SkippyzNJiff/McpFileOps'

If you have feedback or need assistance with the MCP directory API, please join our Discord server