main.py•65.1 kB
import os
import subprocess
import shutil
import glob
import fnmatch
import hashlib
import zipfile
import tarfile
import json
import csv
import re
import time
import threading
import psutil
import platform
from pathlib import Path
from datetime import datetime, timedelta
from mcp.server.fastmcp import FastMCP
import paramiko
import socket
import stat
from io import StringIO
import logging
import sys
# SSH Connection management
ssh_connections = {} # Dictionary to store active SSH connections
sftp_connections = {} # Dictionary to store active SFTP connections
# — configure root logger to DEBUG, log to both console and server.log
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
handlers=[
logging.FileHandler("server.log", encoding="utf-8"),
logging.StreamHandler()
]
)
mcp = FastMCP("ultimate-file-ops")
BASE_DIR = os.getcwd()
CURRENT_DIR = BASE_DIR # Track current working directory
def resolve_path(path: str, allow_absolute: bool = False) -> str:
"""Resolve and validate file paths"""
try:
if allow_absolute and os.path.isabs(path):
return os.path.abspath(path)
# Use current directory instead of base directory
full = os.path.abspath(os.path.join(CURRENT_DIR, path))
return full
except Exception:
return path
def safe_resolve_path(path: str) -> str:
"""Resolve path with base directory restriction (for sensitive operations)"""
try:
full = os.path.abspath(os.path.join(BASE_DIR, path))
if not full.startswith(BASE_DIR):
raise ValueError("Access outside base directory is prohibited")
return full
except Exception:
raise ValueError(f"Invalid path: {path}")
def format_bytes(bytes_value):
"""Format bytes to human readable format"""
if bytes_value is None or bytes_value < 0:
return "0 B"
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} PB"
def get_ssh_connection(connection_id: str):
"""Get SSH connection by ID"""
if connection_id not in ssh_connections:
raise ValueError(f"SSH connection '{connection_id}' not found. Use ssh_connect first.")
return ssh_connections[connection_id]
def get_sftp_connection(connection_id: str):
"""Get SFTP connection by ID"""
if connection_id not in sftp_connections:
# Try to create SFTP from existing SSH connection
if connection_id in ssh_connections:
ssh_client = ssh_connections[connection_id]
sftp_connections[connection_id] = ssh_client.open_sftp()
else:
raise ValueError(f"SSH connection '{connection_id}' not found. Use ssh_connect first.")
return sftp_connections[connection_id]
# ================================
# NAVIGATION & DIRECTORY TOOLS
# ================================
@mcp.tool()
async def get_current_directory() -> str:
"""Get the current working directory"""
return f"Current directory: {CURRENT_DIR}"
@mcp.tool()
async def change_directory(path: str) -> str:
"""Change the current working directory to any location"""
global CURRENT_DIR
try:
if path == "..":
new_dir = os.path.dirname(CURRENT_DIR)
elif path == ".":
new_dir = CURRENT_DIR
elif os.path.isabs(path):
new_dir = os.path.abspath(path)
else:
new_dir = os.path.abspath(os.path.join(CURRENT_DIR, path))
if not os.path.exists(new_dir):
return f"Error: Directory '{path}' does not exist"
if not os.path.isdir(new_dir):
return f"Error: Path '{path}' is not a directory"
CURRENT_DIR = new_dir
return f"Changed directory to: {CURRENT_DIR}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def list_dir(path: str = ".", detailed: bool = False, sort_by: str = "name") -> str:
"""List contents of a directory with optional detailed info and sorting"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: Path '{path}' does not exist"
if not os.path.isdir(target):
return f"Error: Path '{path}' is not a directory"
items = []
for item in os.listdir(target):
item_path = os.path.join(target, item)
try:
stat_info = os.stat(item_path)
item_info = {
'name': item,
'path': item_path,
'is_dir': os.path.isdir(item_path),
'size': stat_info.st_size,
'modified': stat_info.st_mtime
}
items.append(item_info)
except (OSError, IOError):
# Skip files we can't access
continue
# Sort items
if sort_by == "size":
items.sort(key=lambda x: x['size'], reverse=True)
elif sort_by == "modified":
items.sort(key=lambda x: x['modified'], reverse=True)
else: # name
items.sort(key=lambda x: x['name'].lower())
if not items:
return "Empty directory"
result = [f"Contents of {target}:"]
for item in items:
if item['is_dir']:
if detailed:
mod_time = time.ctime(item['modified'])
result.append(f"[DIR] {item['name']}/ (modified: {mod_time})")
else:
result.append(f"[DIR] {item['name']}/")
else:
if detailed:
size_str = format_bytes(item['size'])
mod_time = time.ctime(item['modified'])
result.append(f"[FILE] {item['name']} ({size_str}, modified: {mod_time})")
else:
result.append(f"[FILE] {item['name']} ({format_bytes(item['size'])})")
return "\n".join(result)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def create_directory(path: str) -> str:
"""Create a directory (including parent directories if needed)"""
target = resolve_path(path, allow_absolute=True)
try:
os.makedirs(target, exist_ok=True)
return f"Successfully created directory {target}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def delete_directory(path: str, recursive: bool = False) -> str:
"""Delete a directory (use recursive=True to delete non-empty directories)"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: Directory '{path}' does not exist"
if not os.path.isdir(target):
return f"Error: Path '{path}' is not a directory"
if recursive:
shutil.rmtree(target)
return f"Successfully deleted directory {target} and all contents"
else:
os.rmdir(target)
return f"Successfully deleted empty directory {target}"
except OSError as e:
if e.errno == 39: # Directory not empty
return f"Error: Directory '{path}' is not empty. Use recursive=True to delete non-empty directories"
return f"Error: {e}"
except Exception as e:
return f"Error: {e}"
# ================================
# FILE OPERATIONS
# ================================
@mcp.tool()
async def read_file(path: str, encoding: str = "utf-8", lines_limit: int = None) -> str:
"""Read the contents of a file with optional encoding and line limit"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
with open(target, "r", encoding=encoding, errors="replace") as f:
if lines_limit:
lines = []
for i, line in enumerate(f):
if i >= lines_limit:
lines.append(f"... (truncated after {lines_limit} lines)")
break
lines.append(line.rstrip())
content = "\n".join(lines)
else:
content = f.read()
return f"Contents of {target}:\n{content}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def read_file_bytes(path: str, start: int = 0, length: int = None) -> str:
"""Read raw bytes from a file (useful for binary files)"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
with open(target, "rb") as f:
f.seek(start)
if length:
data = f.read(length)
else:
data = f.read()
# Convert to hex representation
hex_data = data.hex()
return f"Binary data from {target} (start: {start}, length: {len(data)} bytes):\n{hex_data}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def write_file(path: str, content: str, encoding: str = "utf-8") -> str:
"""Write content to a file (overwrites existing content)"""
target = resolve_path(path, allow_absolute=True)
try:
# Create directory if it doesn't exist
directory = os.path.dirname(target)
if directory:
os.makedirs(directory, exist_ok=True)
with open(target, "w", encoding=encoding) as f:
f.write(content)
return f"Successfully wrote to {target}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def append_file(path: str, content: str, encoding: str = "utf-8") -> str:
"""Append content to a file"""
target = resolve_path(path, allow_absolute=True)
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(target), exist_ok=True)
with open(target, "a", encoding=encoding) as f:
f.write(content)
return f"Successfully appended to {target}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def delete_file(path: str) -> str:
"""Delete a file"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
os.remove(target)
return f"Successfully deleted {target}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def edit_file_lines(path: str, start: int, end: int, new_text: str) -> str:
"""Edit specific lines in a file (0-indexed, end is exclusive)"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
with open(target, "r", encoding="utf-8") as f:
lines = f.readlines()
if start < 0 or start >= len(lines):
return f"Error: Start line {start} is out of range (file has {len(lines)} lines)"
if end < start or end > len(lines):
return f"Error: End line {end} is invalid (start: {start}, file has {len(lines)} lines)"
# Replace the specified lines
new_lines = new_text.splitlines(keepends=True)
lines[start:end] = new_lines
with open(target, "w", encoding="utf-8") as f:
f.writelines(lines)
return f"Successfully edited lines {start}-{end-1} in {target}"
except Exception as e:
return f"Error: {e}"
# ================================
# FILE MANAGEMENT
# ================================
@mcp.tool()
async def copy_file(source: str, destination: str, preserve_metadata: bool = True) -> str:
"""Copy a file from source to destination"""
src = resolve_path(source, allow_absolute=True)
dst = resolve_path(destination, allow_absolute=True)
try:
if not os.path.exists(src):
return f"Error: Source file '{source}' does not exist"
if not os.path.isfile(src):
return f"Error: Source '{source}' is not a file"
# Create destination directory if it doesn't exist
os.makedirs(os.path.dirname(dst), exist_ok=True)
if preserve_metadata:
shutil.copy2(src, dst)
else:
shutil.copy(src, dst)
return f"Successfully copied {src} to {dst}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def copy_directory(source: str, destination: str) -> str:
"""Copy a directory from source to destination"""
src = resolve_path(source, allow_absolute=True)
dst = resolve_path(destination, allow_absolute=True)
try:
if not os.path.exists(src):
return f"Error: Source directory '{source}' does not exist"
if not os.path.isdir(src):
return f"Error: Source '{source}' is not a directory"
shutil.copytree(src, dst, dirs_exist_ok=True)
return f"Successfully copied directory {src} to {dst}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def move_file(source: str, destination: str) -> str:
"""Move/rename a file or directory from source to destination"""
src = resolve_path(source, allow_absolute=True)
dst = resolve_path(destination, allow_absolute=True)
try:
if not os.path.exists(src):
return f"Error: Source '{source}' does not exist"
# Create destination directory if it doesn't exist
if os.path.isfile(src):
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.move(src, dst)
return f"Successfully moved {src} to {dst}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def get_file_info(path: str) -> str:
"""Get detailed information about a file or directory"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: Path '{path}' does not exist"
stat = os.stat(target)
file_type = "Directory" if os.path.isdir(target) else "File"
size = stat.st_size
created_time = time.ctime(stat.st_ctime)
modified_time = time.ctime(stat.st_mtime)
accessed_time = time.ctime(stat.st_atime)
# Get permissions (Windows format)
permissions = oct(stat.st_mode)[-3:]
result = f"""Path: {target}
Type: {file_type}
Size: {format_bytes(size)} ({size} bytes)
Permissions: {permissions}
Created: {created_time}
Modified: {modified_time}
Accessed: {accessed_time}"""
if os.path.isfile(target):
# Add file extension info
_, ext = os.path.splitext(target)
result += f"\nExtension: {ext if ext else 'None'}"
# Add file hash
with open(target, 'rb') as f:
content = f.read()
md5_hash = hashlib.md5(content).hexdigest()
sha256_hash = hashlib.sha256(content).hexdigest()
result += f"\nMD5: {md5_hash}"
result += f"\nSHA256: {sha256_hash}"
return result
except Exception as e:
return f"Error: {e}"
# ================================
# SEARCH & DISCOVERY
# ================================
@mcp.tool()
async def search_files(pattern: str, search_path: str = ".", recursive: bool = True, case_sensitive: bool = False) -> str:
"""Search for files matching a pattern (supports wildcards like *.txt, *.py)"""
search_dir = resolve_path(search_path, allow_absolute=True)
try:
if not os.path.exists(search_dir):
return f"Error: Search path '{search_path}' does not exist"
if not os.path.isdir(search_dir):
return f"Error: Search path '{search_path}' is not a directory"
matches = []
if recursive:
for root, dirs, files in os.walk(search_dir):
for file in files:
if case_sensitive:
match = fnmatch.fnmatch(file, pattern)
else:
match = fnmatch.fnmatch(file.lower(), pattern.lower())
if match:
matches.append(os.path.join(root, file))
else:
for file in os.listdir(search_dir):
file_path = os.path.join(search_dir, file)
if os.path.isfile(file_path):
if case_sensitive:
match = fnmatch.fnmatch(file, pattern)
else:
match = fnmatch.fnmatch(file.lower(), pattern.lower())
if match:
matches.append(file_path)
if not matches:
return f"No files found matching pattern '{pattern}' in {search_dir}"
return f"Found {len(matches)} files matching '{pattern}':\n" + "\n".join(matches)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def find_in_files(search_text: str, file_pattern: str = "*", search_path: str = ".", recursive: bool = True, case_sensitive: bool = False, line_numbers: bool = True) -> str:
"""Search for text content within files"""
search_dir = resolve_path(search_path, allow_absolute=True)
try:
if not os.path.exists(search_dir):
return f"Error: Search path '{search_path}' does not exist"
matches = []
file_count = 0
search_func = search_text if case_sensitive else search_text.lower()
if recursive:
for root, dirs, files in os.walk(search_dir):
for file in files:
if fnmatch.fnmatch(file, file_pattern):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
file_count += 1
for line_num, line in enumerate(f, 1):
line_to_search = line if case_sensitive else line.lower()
if search_func in line_to_search:
if line_numbers:
matches.append(f"{file_path}:{line_num}: {line.strip()}")
else:
matches.append(f"{file_path}: {line.strip()}")
except:
continue
else:
for file in os.listdir(search_dir):
file_path = os.path.join(search_dir, file)
if os.path.isfile(file_path) and fnmatch.fnmatch(file, file_pattern):
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
file_count += 1
for line_num, line in enumerate(f, 1):
line_to_search = line if case_sensitive else line.lower()
if search_func in line_to_search:
if line_numbers:
matches.append(f"{file_path}:{line_num}: {line.strip()}")
else:
matches.append(f"{file_path}: {line.strip()}")
except:
continue
if not matches:
return f"Text '{search_text}' not found in {file_count} files searched"
return f"Found '{search_text}' in {len(matches)} locations:\n" + "\n".join(matches)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def find_files_by_size(min_size: int = 0, max_size: int = None, search_path: str = ".", recursive: bool = True) -> str:
"""Find files within a specific size range (in bytes)"""
search_dir = resolve_path(search_path, allow_absolute=True)
try:
if not os.path.exists(search_dir):
return f"Error: Search path '{search_path}' does not exist"
matches = []
if recursive:
for root, dirs, files in os.walk(search_dir):
for file in files:
file_path = os.path.join(root, file)
try:
size = os.path.getsize(file_path)
if size >= min_size and (max_size is None or size <= max_size):
matches.append((file_path, size))
except:
continue
else:
for file in os.listdir(search_dir):
file_path = os.path.join(search_dir, file)
if os.path.isfile(file_path):
try:
size = os.path.getsize(file_path)
if size >= min_size and (max_size is None or size <= max_size):
matches.append((file_path, size))
except:
continue
if not matches:
return f"No files found in size range {format_bytes(min_size)} - {format_bytes(max_size) if max_size else 'unlimited'}"
# Sort by size (largest first)
matches.sort(key=lambda x: x[1], reverse=True)
result = [f"Found {len(matches)} files in size range:"]
for file_path, size in matches:
result.append(f"{file_path} ({format_bytes(size)})")
return "\n".join(result)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def find_duplicate_files(search_path: str = ".", recursive: bool = True) -> str:
"""Find duplicate files based on content hash"""
search_dir = resolve_path(search_path, allow_absolute=True)
try:
if not os.path.exists(search_dir):
return f"Error: Search path '{search_path}' does not exist"
file_hashes = {}
if recursive:
for root, dirs, files in os.walk(search_dir):
for file in files:
file_path = os.path.join(root, file)
try:
with open(file_path, 'rb') as f:
content = f.read()
file_hash = hashlib.md5(content).hexdigest()
if file_hash not in file_hashes:
file_hashes[file_hash] = []
file_hashes[file_hash].append(file_path)
except:
continue
duplicates = {h: paths for h, paths in file_hashes.items() if len(paths) > 1}
if not duplicates:
return "No duplicate files found"
result = [f"Found {len(duplicates)} sets of duplicate files:"]
for file_hash, paths in duplicates.items():
result.append(f"\nHash {file_hash}:")
for path in paths:
size = os.path.getsize(path)
result.append(f" - {path} ({format_bytes(size)})")
return "\n".join(result)
except Exception as e:
return f"Error: {e}"
# ================================
# COMPRESSION & ARCHIVING
# ================================
@mcp.tool()
async def create_zip(archive_path: str, files_and_dirs: list, compression_level: int = 6) -> str:
"""Create a ZIP archive from files and directories"""
archive_target = resolve_path(archive_path, allow_absolute=True)
try:
os.makedirs(os.path.dirname(archive_target), exist_ok=True)
with zipfile.ZipFile(archive_target, 'w', zipfile.ZIP_DEFLATED, compresslevel=compression_level) as zipf:
for item in files_and_dirs:
item_path = resolve_path(item, allow_absolute=True)
if not os.path.exists(item_path):
return f"Error: Path '{item}' does not exist"
if os.path.isfile(item_path):
zipf.write(item_path, os.path.basename(item_path))
elif os.path.isdir(item_path):
for root, dirs, files in os.walk(item_path):
for file in files:
file_path = os.path.join(root, file)
arc_name = os.path.relpath(file_path, os.path.dirname(item_path))
zipf.write(file_path, arc_name)
return f"Successfully created ZIP archive: {archive_target}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def extract_zip(archive_path: str, extract_to: str = ".") -> str:
"""Extract a ZIP archive"""
archive_target = resolve_path(archive_path, allow_absolute=True)
extract_target = resolve_path(extract_to, allow_absolute=True)
try:
if not os.path.exists(archive_target):
return f"Error: Archive '{archive_path}' does not exist"
os.makedirs(extract_target, exist_ok=True)
with zipfile.ZipFile(archive_target, 'r') as zipf:
zipf.extractall(extract_target)
return f"Successfully extracted {archive_target} to {extract_target}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def list_zip_contents(archive_path: str) -> str:
"""List contents of a ZIP archive"""
archive_target = resolve_path(archive_path, allow_absolute=True)
try:
if not os.path.exists(archive_target):
return f"Error: Archive '{archive_path}' does not exist"
with zipfile.ZipFile(archive_target, 'r') as zipf:
result = [f"Contents of {archive_target}:"]
for info in zipf.filelist:
size = info.file_size
compressed_size = info.compress_size
compression_ratio = (1 - compressed_size / size) * 100 if size > 0 else 0
result.append(f" {info.filename} ({format_bytes(size)}, compressed: {format_bytes(compressed_size)}, {compression_ratio:.1f}% compression)")
return "\n".join(result)
except Exception as e:
return f"Error: {e}"
# ================================
# FILE HASHING & CHECKSUMS
# ================================
@mcp.tool()
async def calculate_file_hash(path: str, algorithm: str = "sha256") -> str:
"""Calculate hash of a file (md5, sha1, sha256, sha512)"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
hash_algo = getattr(hashlib, algorithm.lower(), None)
if not hash_algo:
return f"Error: Unsupported hash algorithm '{algorithm}'"
hasher = hash_algo()
with open(target, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
return f"{algorithm.upper()} hash of {target}: {hasher.hexdigest()}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def verify_file_hash(path: str, expected_hash: str, algorithm: str = "sha256") -> str:
"""Verify a file's hash against an expected value"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
hash_algo = getattr(hashlib, algorithm.lower(), None)
if not hash_algo:
return f"Error: Unsupported hash algorithm '{algorithm}'"
hasher = hash_algo()
with open(target, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
actual_hash = hasher.hexdigest()
if actual_hash.lower() == expected_hash.lower():
return f"✓ Hash verification PASSED for {target}"
else:
return f"✗ Hash verification FAILED for {target}\nExpected: {expected_hash}\nActual: {actual_hash}"
except Exception as e:
return f"Error: {e}"
# ================================
# FILE COMPARISON
# ================================
@mcp.tool()
async def compare_files(file1: str, file2: str, mode: str = "binary") -> str:
"""Compare two files (binary or text mode)"""
file1_target = resolve_path(file1, allow_absolute=True)
file2_target = resolve_path(file2, allow_absolute=True)
try:
if not os.path.exists(file1_target):
return f"Error: File '{file1}' does not exist"
if not os.path.exists(file2_target):
return f"Error: File '{file2}' does not exist"
if mode == "binary":
with open(file1_target, 'rb') as f1, open(file2_target, 'rb') as f2:
content1 = f1.read()
content2 = f2.read()
if content1 == content2:
return f"Files are identical (binary comparison)"
else:
return f"Files are different (binary comparison)\nFile1 size: {len(content1)} bytes\nFile2 size: {len(content2)} bytes"
elif mode == "text":
with open(file1_target, 'r', encoding='utf-8', errors='ignore') as f1:
lines1 = f1.readlines()
with open(file2_target, 'r', encoding='utf-8', errors='ignore') as f2:
lines2 = f2.readlines()
if lines1 == lines2:
return f"Files are identical (text comparison)"
# Find differences
import difflib
diff = list(difflib.unified_diff(lines1, lines2, fromfile=file1, tofile=file2, lineterm=''))
if len(diff) > 100: # Limit output for large diffs
diff = diff[:100] + ["... (truncated, too many differences)"]
return f"Files are different (text comparison):\n" + "\n".join(diff)
except Exception as e:
return f"Error: {e}"
# ================================
# SYSTEM INFORMATION
# ================================
@mcp.tool()
async def get_system_info() -> str:
"""Get system information"""
try:
info = {
'Platform': platform.platform(),
'System': platform.system(),
'Release': platform.release(),
'Version': platform.version(),
'Machine': platform.machine(),
'Processor': platform.processor(),
'Python Version': platform.python_version(),
}
# Memory info
memory = psutil.virtual_memory()
info['Total Memory'] = format_bytes(memory.total)
info['Available Memory'] = format_bytes(memory.available)
info['Memory Usage'] = f"{memory.percent}%"
# Disk info
disk = psutil.disk_usage('/')
info['Total Disk Space'] = format_bytes(disk.total)
info['Free Disk Space'] = format_bytes(disk.free)
info['Disk Usage'] = f"{(disk.used / disk.total * 100):.1f}%"
# CPU info
info['CPU Count'] = psutil.cpu_count()
info['CPU Usage'] = f"{psutil.cpu_percent(interval=1)}%"
result = ["System Information:"]
for key, value in info.items():
result.append(f"{key}: {value}")
return "\n".join(result)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def get_disk_usage(path: str = ".") -> str:
"""Get disk usage information for a path"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: Path '{path}' does not exist"
if os.path.isfile(target):
size = os.path.getsize(target)
return f"File size: {format_bytes(size)} ({size} bytes)"
total_size = 0
file_count = 0
dir_count = 0
for root, dirs, files in os.walk(target):
dir_count += len(dirs)
for file in files:
file_count += 1
file_path = os.path.join(root, file)
try:
total_size += os.path.getsize(file_path)
except:
continue
return f"""Directory: {target}
Total size: {format_bytes(total_size)} ({total_size} bytes)
Files: {file_count}
Subdirectories: {dir_count}"""
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def get_environment_variables(filter_pattern: str = None) -> str:
"""Get environment variables, optionally filtered by pattern"""
try:
env_vars = dict(os.environ)
if filter_pattern:
filtered_vars = {k: v for k, v in env_vars.items()
if fnmatch.fnmatch(k.lower(), filter_pattern.lower())}
env_vars = filtered_vars
if not env_vars:
return f"No environment variables found" + (f" matching '{filter_pattern}'" if filter_pattern else "")
result = [f"Environment Variables" + (f" (filtered by '{filter_pattern}')" if filter_pattern else "") + ":"]
for key, value in sorted(env_vars.items()):
result.append(f"{key}={value}")
return "\n".join(result)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def set_environment_variable(name: str, value: str) -> str:
"""Set an environment variable for the current session"""
try:
os.environ[name] = value
return f"Successfully set environment variable {name}={value}"
except Exception as e:
return f"Error: {e}"
# ================================
# PROCESS MANAGEMENT
# ================================
@mcp.tool()
async def list_processes(filter_name: str = None) -> str:
"""List running processes, optionally filtered by name"""
try:
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'status']):
try:
if filter_name and filter_name.lower() not in proc.info['name'].lower():
continue
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if not processes:
return f"No processes found" + (f" matching '{filter_name}'" if filter_name else "")
# Sort by CPU usage
processes.sort(key=lambda x: x['cpu_percent'] or 0, reverse=True)
result = [f"Running Processes" + (f" (filtered by '{filter_name}')" if filter_name else "") + ":"]
result.append("PID\tNAME\t\t\tCPU%\tMEM%\tSTATUS")
result.append("-" * 60)
for proc in processes[:20]: # Limit to top 20
name = (proc['name'][:15] + '...') if len(proc['name']) > 18 else proc['name']
result.append(f"{proc['pid']}\t{name:<18}\t{proc['cpu_percent']:.1f}\t{proc['memory_percent']:.1f}\t{proc['status']}")
if len(processes) > 20:
result.append(f"... and {len(processes) - 20} more processes")
return "\n".join(result)
except Exception as e:
return f"Error: {e}"
# ================================
# SYSTEM COMMANDS
# ================================
@mcp.tool()
async def run_powershell(cmd: str, timeout: int = 30) -> str:
"""Execute a PowerShell command (use with caution)"""
try:
proc = subprocess.run(
["powershell", "-Command", cmd],
capture_output=True,
text=True,
shell=False,
timeout=timeout,
cwd=CURRENT_DIR # Run in current directory
)
output = f"Executed in: {CURRENT_DIR}\nCommand: {cmd}\n\n"
if proc.stdout:
output += f"STDOUT:\n{proc.stdout.strip()}\n"
if proc.stderr:
output += f"STDERR:\n{proc.stderr.strip()}\n"
output += f"\nExit code: {proc.returncode}"
return output or f"Command completed with exit code {proc.returncode}"
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {timeout} seconds"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def run_command(cmd: str, shell: bool = False, timeout: int = 30) -> str:
"""Execute a system command (cmd, bash, etc.)"""
try:
if shell:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
shell=True,
timeout=timeout,
cwd=CURRENT_DIR
)
else:
proc = subprocess.run(
cmd.split(),
capture_output=True,
text=True,
shell=False,
timeout=timeout,
cwd=CURRENT_DIR
)
output = f"Executed in: {CURRENT_DIR}\nCommand: {cmd}\n\n"
if proc.stdout:
output += f"STDOUT:\n{proc.stdout.strip()}\n"
if proc.stderr:
output += f"STDERR:\n{proc.stderr.strip()}\n"
output += f"\nExit code: {proc.returncode}"
return output or f"Command completed with exit code {proc.returncode}"
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {timeout} seconds"
except Exception as e:
return f"Error: {e}"
# ================================
# TEXT & DATA PROCESSING
# ================================
@mcp.tool()
async def count_lines_words_chars(path: str) -> str:
"""Count lines, words, and characters in a text file"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
with open(target, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.count('\n') + (1 if content and not content.endswith('\n') else 0)
words = len(content.split())
chars = len(content)
chars_no_spaces = len(content.replace(' ', '').replace('\n', '').replace('\t', ''))
return f"""Text analysis for {target}:
Lines: {lines}
Words: {words}
Characters (total): {chars}
Characters (no whitespace): {chars_no_spaces}
File size: {format_bytes(len(content.encode('utf-8')))}"""
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def replace_in_file(path: str, search_pattern: str, replacement: str, use_regex: bool = False, backup: bool = True) -> str:
"""Replace text in a file with optional regex support and backup"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
with open(target, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Create backup if requested
if backup:
backup_path = target + '.bak'
shutil.copy2(target, backup_path)
if use_regex:
import re
new_content = re.sub(search_pattern, replacement, content)
count = len(re.findall(search_pattern, content))
else:
new_content = content.replace(search_pattern, replacement)
count = content.count(search_pattern)
with open(target, 'w', encoding='utf-8') as f:
f.write(new_content)
result = f"Successfully replaced {count} occurrences in {target}"
if backup:
result += f"\nBackup created: {backup_path}"
return result
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def sort_file_lines(path: str, reverse: bool = False, numeric: bool = False, remove_duplicates: bool = False) -> str:
"""Sort lines in a text file"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
with open(target, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
# Remove newlines for sorting, but remember to add them back
clean_lines = [line.rstrip('\n\r') for line in lines]
if remove_duplicates:
clean_lines = list(dict.fromkeys(clean_lines)) # Preserve order while removing duplicates
if numeric:
# Try to sort numerically
try:
clean_lines.sort(key=lambda x: float(x.split()[0]) if x.split() else 0, reverse=reverse)
except:
clean_lines.sort(reverse=reverse) # Fall back to string sort
else:
clean_lines.sort(reverse=reverse)
# Add newlines back
sorted_lines = [line + '\n' for line in clean_lines]
with open(target, 'w', encoding='utf-8') as f:
f.writelines(sorted_lines)
return f"Successfully sorted {len(sorted_lines)} lines in {target}"
except Exception as e:
return f"Error: {e}"
# ================================
# ADVANCED TOOLS
# ================================
@mcp.tool()
async def create_file_from_template(template_path: str, output_path: str, replacements: dict) -> str:
"""Create a file from a template with variable replacements"""
template_target = resolve_path(template_path, allow_absolute=True)
output_target = resolve_path(output_path, allow_absolute=True)
try:
if not os.path.exists(template_target):
return f"Error: Template file '{template_path}' does not exist"
with open(template_target, 'r', encoding='utf-8') as f:
content = f.read()
# Replace variables in format {{variable_name}}
for key, value in replacements.items():
content = content.replace(f"{{{{{key}}}}}", str(value))
# Create output directory if needed
os.makedirs(os.path.dirname(output_target), exist_ok=True)
with open(output_target, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully created {output_target} from template {template_target}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def split_file(path: str, chunk_size: int, unit: str = "lines") -> str:
"""Split a file into smaller chunks (by lines or bytes)"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: File '{path}' does not exist"
if not os.path.isfile(target):
return f"Error: Path '{path}' is not a file"
base_name = os.path.splitext(target)[0]
ext = os.path.splitext(target)[1]
files_created = []
if unit == "lines":
with open(target, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
for i in range(0, len(lines), chunk_size):
chunk_lines = lines[i:i + chunk_size]
chunk_file = f"{base_name}_part{i//chunk_size + 1:03d}{ext}"
with open(chunk_file, 'w', encoding='utf-8') as cf:
cf.writelines(chunk_lines)
files_created.append(chunk_file)
elif unit == "bytes":
with open(target, 'rb') as f:
part_num = 1
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunk_file = f"{base_name}_part{part_num:03d}{ext}"
with open(chunk_file, 'wb') as cf:
cf.write(chunk)
files_created.append(chunk_file)
part_num += 1
return f"Successfully split {target} into {len(files_created)} files:\n" + "\n".join(files_created)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def merge_files(file_list: list, output_path: str, separator: str = "") -> str:
"""Merge multiple files into one"""
output_target = resolve_path(output_path, allow_absolute=True)
try:
os.makedirs(os.path.dirname(output_target), exist_ok=True)
with open(output_target, 'w', encoding='utf-8') as outf:
for i, file_path in enumerate(file_list):
file_target = resolve_path(file_path, allow_absolute=True)
if not os.path.exists(file_target):
return f"Error: File '{file_path}' does not exist"
with open(file_target, 'r', encoding='utf-8', errors='ignore') as inf:
content = inf.read()
outf.write(content)
if separator and i < len(file_list) - 1:
outf.write(separator)
return f"Successfully merged {len(file_list)} files into {output_target}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def watch_file_changes(path: str, duration: int = 10) -> str:
"""Watch a file for changes for a specified duration (seconds)"""
target = resolve_path(path, allow_absolute=True)
try:
if not os.path.exists(target):
return f"Error: Path '{path}' does not exist"
initial_stat = os.stat(target)
initial_mtime = initial_stat.st_mtime
initial_size = initial_stat.st_size
changes = []
start_time = time.time()
while time.time() - start_time < duration:
try:
current_stat = os.stat(target)
current_mtime = current_stat.st_mtime
current_size = current_stat.st_size
if current_mtime != initial_mtime or current_size != initial_size:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
change_info = f"[{timestamp}] Modified - Size: {format_bytes(current_size)}"
changes.append(change_info)
initial_mtime = current_mtime
initial_size = current_size
time.sleep(0.5) # Check every 500ms
except FileNotFoundError:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
changes.append(f"[{timestamp}] File deleted")
break
if not changes:
return f"No changes detected in {target} during {duration} seconds"
return f"Changes detected in {target}:\n" + "\n".join(changes)
except Exception as e:
return f"Error: {e}"
# ================================
# SSH CONNECTION MANAGEMENT
# ================================
@mcp.tool()
async def ssh_connect(connection_id: str, hostname: str, username: str, password: str = None,
private_key_path: str = None, port: int = 22, timeout: int = 10) -> str:
"""Connect to an SSH server. Use either password or private key authentication."""
try:
# Close existing connection if it exists
if connection_id in ssh_connections:
ssh_connections[connection_id].close()
if connection_id in sftp_connections:
sftp_connections[connection_id].close()
del sftp_connections[connection_id]
# Create new SSH client
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Prepare authentication
connect_kwargs = {
'hostname': hostname,
'port': port,
'username': username,
'timeout': timeout
}
if private_key_path:
# Use private key authentication
key_path = resolve_path(private_key_path, allow_absolute=True)
if not os.path.exists(key_path):
return f"Error: Private key file '{private_key_path}' does not exist"
try:
# Try different key types
for key_class in [paramiko.RSAKey, paramiko.DSAKey, paramiko.ECDSAKey, paramiko.Ed25519Key]:
try:
private_key = key_class.from_private_key_file(key_path)
connect_kwargs['pkey'] = private_key
break
except:
continue
else:
return f"Error: Could not load private key from '{private_key_path}'"
except Exception as e:
return f"Error loading private key: {e}"
elif password:
connect_kwargs['password'] = password
else:
return "Error: Either password or private_key_path must be provided"
# Connect
ssh_client.connect(**connect_kwargs)
ssh_connections[connection_id] = ssh_client
# Test connection
stdin, stdout, stderr = ssh_client.exec_command('echo "SSH connection successful"')
result = stdout.read().decode().strip()
return f"Successfully connected to {username}@{hostname}:{port} as '{connection_id}'\nTest result: {result}"
except paramiko.AuthenticationException:
return f"Error: Authentication failed for {username}@{hostname}"
except paramiko.SSHException as e:
return f"Error: SSH connection failed - {e}"
except socket.timeout:
return f"Error: Connection timeout to {hostname}:{port}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def ssh_disconnect(connection_id: str) -> str:
"""Disconnect from an SSH server"""
try:
if connection_id in sftp_connections:
sftp_connections[connection_id].close()
del sftp_connections[connection_id]
if connection_id in ssh_connections:
ssh_connections[connection_id].close()
del ssh_connections[connection_id]
return f"Successfully disconnected from '{connection_id}'"
else:
return f"No active connection found for '{connection_id}'"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def ssh_list_connections() -> str:
"""List all active SSH connections"""
if not ssh_connections:
return "No active SSH connections"
result = ["Active SSH connections:"]
for conn_id, ssh_client in ssh_connections.items():
try:
transport = ssh_client.get_transport()
if transport and transport.is_active():
remote_addr = transport.getpeername()
result.append(f" {conn_id}: {remote_addr[0]}:{remote_addr[1]} (active)")
else:
result.append(f" {conn_id}: (inactive)")
except:
result.append(f" {conn_id}: (status unknown)")
return "\n".join(result)
# ================================
# SSH COMMAND EXECUTION
# ================================
@mcp.tool()
async def ssh_exec(connection_id: str, command: str, timeout: int = 30, get_pty: bool = False) -> str:
"""Execute a command on the remote SSH server"""
try:
ssh_client = get_ssh_connection(connection_id)
# Execute command
stdin, stdout, stderr = ssh_client.exec_command(command, timeout=timeout, get_pty=get_pty)
# Read output
stdout_data = stdout.read().decode('utf-8', errors='replace')
stderr_data = stderr.read().decode('utf-8', errors='replace')
exit_code = stdout.channel.recv_exit_status()
# Format output
result = f"Command: {command}\nConnection: {connection_id}\n\n"
if stdout_data:
result += f"STDOUT:\n{stdout_data}\n"
if stderr_data:
result += f"STDERR:\n{stderr_data}\n"
result += f"Exit code: {exit_code}"
return result
except Exception as e:
return f"Error executing command on '{connection_id}': {e}"
@mcp.tool()
async def ssh_exec_interactive(connection_id: str, commands: list, timeout: int = 30) -> str:
"""Execute multiple commands in an interactive session"""
try:
ssh_client = get_ssh_connection(connection_id)
# Create interactive shell
shell = ssh_client.invoke_shell()
shell.settimeout(timeout)
output = StringIO()
# Send commands
for command in commands:
shell.send(command + '\n')
time.sleep(0.5) # Wait for command to process
# Read available output
while shell.recv_ready():
data = shell.recv(1024).decode('utf-8', errors='replace')
output.write(data)
# Send exit command
shell.send('exit\n')
time.sleep(0.5)
# Read remaining output
while shell.recv_ready():
data = shell.recv(1024).decode('utf-8', errors='replace')
output.write(data)
shell.close()
result = f"Interactive session on '{connection_id}':\nCommands: {', '.join(commands)}\n\n"
result += f"Output:\n{output.getvalue()}"
return result
except Exception as e:
return f"Error in interactive session on '{connection_id}': {e}"
@mcp.tool()
async def ssh_get_system_info(connection_id: str) -> str:
"""Get system information from the remote server"""
try:
ssh_client = get_ssh_connection(connection_id)
commands = {
'hostname': 'hostname',
'uptime': 'uptime',
'uname': 'uname -a',
'disk_usage': 'df -h',
'memory': 'free -h',
'cpu_info': 'cat /proc/cpuinfo | grep "model name" | head -1',
'load_average': 'cat /proc/loadavg',
'processes': 'ps aux --sort=-%cpu | head -10'
}
results = {}
for name, cmd in commands.items():
try:
stdin, stdout, stderr = ssh_client.exec_command(cmd, timeout=10)
output = stdout.read().decode('utf-8', errors='replace').strip()
if output:
results[name] = output
else:
error = stderr.read().decode('utf-8', errors='replace').strip()
results[name] = f"Error: {error}" if error else "No output"
except:
results[name] = "Command failed"
result = [f"System Information for '{connection_id}':"]
for name, output in results.items():
result.append(f"\n{name.upper()}:")
result.append(output)
return "\n".join(result)
except Exception as e:
return f"Error getting system info from '{connection_id}': {e}"
# ================================
# SSH FILE OPERATIONS (SFTP)
# ================================
@mcp.tool()
async def ssh_list_remote_dir(connection_id: str, remote_path: str = ".", detailed: bool = False) -> str:
"""List contents of a remote directory via SFTP"""
try:
sftp = get_sftp_connection(connection_id)
# List directory contents
try:
files = sftp.listdir_attr(remote_path)
except FileNotFoundError:
return f"Error: Remote directory '{remote_path}' does not exist"
if not files:
return f"Remote directory '{remote_path}' is empty"
result = [f"Contents of remote directory '{remote_path}' on '{connection_id}':"]
# Sort files by name
files.sort(key=lambda x: x.filename)
for file_attr in files:
if detailed:
# Format detailed info
mode = file_attr.st_mode
size = file_attr.st_size or 0
mtime = time.ctime(file_attr.st_mtime) if file_attr.st_mtime else "Unknown"
file_type = "DIR " if stat.S_ISDIR(mode) else "FILE"
result.append(f"[{file_type}] {file_attr.filename:<30} {format_bytes(size):<10} {mtime}")
else:
# Simple format
if stat.S_ISDIR(file_attr.st_mode):
result.append(f"[DIR] {file_attr.filename}/")
else:
size = file_attr.st_size or 0
result.append(f"[FILE] {file_attr.filename} ({format_bytes(size)})")
return "\n".join(result)
except Exception as e:
return f"Error listing remote directory '{remote_path}' on '{connection_id}': {e}"
@mcp.tool()
async def ssh_read_remote_file(connection_id: str, remote_path: str, encoding: str = "utf-8",
max_size: int = 1024*1024) -> str:
"""Read a remote file via SFTP"""
try:
sftp = get_sftp_connection(connection_id)
# Check file size first
try:
file_attr = sftp.stat(remote_path)
if file_attr.st_size > max_size:
return f"Error: File size ({format_bytes(file_attr.st_size)}) exceeds maximum allowed size ({format_bytes(max_size)})"
except FileNotFoundError:
return f"Error: Remote file '{remote_path}' does not exist"
# Read file
with sftp.open(remote_path, 'r') as remote_file:
content = remote_file.read().decode(encoding, errors='replace')
return f"Contents of remote file '{remote_path}' on '{connection_id}':\n{content}"
except Exception as e:
return f"Error reading remote file '{remote_path}' on '{connection_id}': {e}"
@mcp.tool()
async def ssh_write_remote_file(connection_id: str, remote_path: str, content: str,
encoding: str = "utf-8", backup: bool = False) -> str:
"""Write content to a remote file via SFTP"""
try:
sftp = get_sftp_connection(connection_id)
# Create backup if requested and file exists
backup_path = None
if backup:
try:
sftp.stat(remote_path) # Check if file exists
backup_path = f"{remote_path}.bak"
sftp.rename(remote_path, backup_path)
except FileNotFoundError:
pass # File doesn't exist, no backup needed
# Write file
with sftp.open(remote_path, 'w') as remote_file:
remote_file.write(content.encode(encoding))
result = f"Successfully wrote to remote file '{remote_path}' on '{connection_id}'"
if backup_path:
result += f"\nBackup created: {backup_path}"
return result
except Exception as e:
return f"Error writing remote file '{remote_path}' on '{connection_id}': {e}"
@mcp.tool()
async def ssh_download_file(connection_id: str, remote_path: str, local_path: str = None) -> str:
"""Download a file from remote server to local machine"""
try:
sftp = get_sftp_connection(connection_id)
# Determine local path
if local_path is None:
local_path = os.path.basename(remote_path)
local_target = resolve_path(local_path, allow_absolute=True)
# Create local directory if needed
os.makedirs(os.path.dirname(local_target), exist_ok=True)
# Download file
sftp.get(remote_path, local_target)
# Get file info
local_size = os.path.getsize(local_target)
return f"Successfully downloaded '{remote_path}' from '{connection_id}' to '{local_target}' ({format_bytes(local_size)})"
except FileNotFoundError:
return f"Error: Remote file '{remote_path}' does not exist on '{connection_id}'"
except Exception as e:
return f"Error downloading file '{remote_path}' from '{connection_id}': {e}"
@mcp.tool()
async def ssh_upload_file(connection_id: str, local_path: str, remote_path: str = None) -> str:
"""Upload a file from local machine to remote server"""
try:
sftp = get_sftp_connection(connection_id)
# Check local file exists
local_target = resolve_path(local_path, allow_absolute=True)
if not os.path.exists(local_target):
return f"Error: Local file '{local_path}' does not exist"
# Determine remote path
if remote_path is None:
remote_path = os.path.basename(local_target)
# Upload file
sftp.put(local_target, remote_path)
# Get file info
local_size = os.path.getsize(local_target)
return f"Successfully uploaded '{local_target}' to '{remote_path}' on '{connection_id}' ({format_bytes(local_size)})"
except Exception as e:
return f"Error uploading file '{local_path}' to '{connection_id}': {e}"
@mcp.tool()
async def ssh_create_remote_dir(connection_id: str, remote_path: str) -> str:
"""Create a directory on the remote server"""
try:
sftp = get_sftp_connection(connection_id)
sftp.mkdir(remote_path)
return f"Successfully created remote directory '{remote_path}' on '{connection_id}'"
except Exception as e:
return f"Error creating remote directory '{remote_path}' on '{connection_id}': {e}"
@mcp.tool()
async def ssh_delete_remote_file(connection_id: str, remote_path: str) -> str:
"""Delete a file on the remote server"""
try:
sftp = get_sftp_connection(connection_id)
sftp.remove(remote_path)
return f"Successfully deleted remote file '{remote_path}' on '{connection_id}'"
except FileNotFoundError:
return f"Error: Remote file '{remote_path}' does not exist on '{connection_id}'"
except Exception as e:
return f"Error deleting remote file '{remote_path}' on '{connection_id}': {e}"
@mcp.tool()
async def ssh_delete_remote_dir(connection_id: str, remote_path: str) -> str:
"""Delete a directory on the remote server"""
try:
sftp = get_sftp_connection(connection_id)
sftp.rmdir(remote_path)
return f"Successfully deleted remote directory '{remote_path}' on '{connection_id}'"
except Exception as e:
return f"Error deleting remote directory '{remote_path}' on '{connection_id}': {e}"
@mcp.tool()
async def health_check() -> str:
"""Check if the MCP server is running properly"""
return f"MCP server is running. Current directory: {CURRENT_DIR}"
if __name__ == "__main__":
logging.debug("Starting enhanced-file-ops MCP server")
def run_mcp():
"""Run the MCP server"""
logging.debug("MCP thread starting (stdio transport)...")
try:
mcp.run(transport="stdio")
except Exception as e:
logging.exception("Unhandled exception inside MCP server")
sys.exit(1)
# Start the MCP server in the background
server_thread = threading.Thread(target=run_mcp, daemon=True)
server_thread.start()
logging.info("MCP server thread launched successfully")
# Keep the main thread alive until interrupted
try:
while server_thread.is_alive():
time.sleep(1)
except KeyboardInterrupt:
logging.info("KeyboardInterrupt received; shutting down enhanced-file-ops server...")
sys.exit(0)