MCP Unified Server
by getfounded
- tools
#!/usr/bin/env python3
import os
import sys
from pathlib import Path
import json
import shutil
import difflib
from typing import List, Dict, Any, Optional, Union, Tuple
from dataclasses import dataclass, field
import fnmatch
import logging
from datetime import datetime
# Ensure compatibility with mcp server
from mcp.server.fastmcp import FastMCP, Context
from mcp.types import Tool, TextContent
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
stream=sys.stderr
)
# External MCP reference for tool registration
external_mcp = None
def set_external_mcp(mcp):
"""Set the external MCP reference for tool registration"""
global external_mcp
external_mcp = mcp
logging.info("Filesystem tools MCP reference set")
# Security utilities
class FilesystemSecurity:
"""Handles security validation for file paths"""
def __init__(self, allowed_directories: List[str]):
# Normalize all allowed directories to absolute paths
self.allowed_directories = [os.path.normpath(os.path.abspath(self._expand_home(d)))
for d in allowed_directories]
logging.info(f"Filesystem security initialized with allowed directories: {self.allowed_directories}")
def _expand_home(self, path: str) -> str:
"""Expand ~ to user's home directory"""
if path.startswith('~'):
return os.path.expanduser(path)
return path
async def validate_path(self, requested_path: str) -> str:
"""
Validate that a path is within allowed directories
Returns the absolute, normalized path if valid, otherwise raises an exception
"""
expanded_path = self._expand_home(requested_path)
absolute_path = os.path.abspath(expanded_path)
normalized_path = os.path.normpath(absolute_path)
# Check if path is within allowed directories
is_allowed = any(normalized_path.startswith(allowed_dir)
for allowed_dir in self.allowed_directories)
if not is_allowed:
raise ValueError(f"Access denied - path outside allowed directories: {normalized_path} not in {self.allowed_directories}")
# Handle symlinks by checking their real path
try:
real_path = os.path.realpath(normalized_path)
normalized_real = os.path.normpath(real_path)
is_real_path_allowed = any(normalized_real.startswith(allowed_dir)
for allowed_dir in self.allowed_directories)
if not is_real_path_allowed:
raise ValueError("Access denied - symlink target outside allowed directories")
return real_path
except FileNotFoundError:
# For new files that don't exist yet, verify parent directory
parent_dir = os.path.dirname(normalized_path)
try:
real_parent = os.path.realpath(parent_dir)
normalized_parent = os.path.normpath(real_parent)
is_parent_allowed = any(normalized_parent.startswith(allowed_dir)
for allowed_dir in self.allowed_directories)
if not is_parent_allowed:
raise ValueError("Access denied - parent directory outside allowed directories")
return normalized_path
except FileNotFoundError:
raise ValueError(f"Parent directory does not exist: {parent_dir}")
# File utility functions
class FilesystemTools:
"""Implements file operation tools that can be exposed via MCP"""
def __init__(self, security: FilesystemSecurity):
self.security = security
async def read_file(self, path: str) -> str:
"""Read a file with path validation"""
valid_path = await self.security.validate_path(path)
try:
with open(valid_path, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
# Try different encodings
try:
with open(valid_path, 'r', encoding='latin-1') as f:
return f.read()
except Exception as e:
raise ValueError(f"Failed to read file with alternative encoding: {str(e)}")
except Exception as e:
raise ValueError(f"Failed to read file: {str(e)}")
async def read_multiple_files(self, paths: List[str]) -> str:
"""Read multiple files and return their contents"""
results = []
for file_path in paths:
try:
content = await self.read_file(file_path)
results.append(f"{file_path}:\n{content}\n")
except Exception as e:
results.append(f"{file_path}: Error - {str(e)}")
return "\n---\n".join(results)
async def write_file(self, path: str, content: str) -> str:
"""Write content to a file with path validation"""
valid_path = await self.security.validate_path(path)
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(valid_path), exist_ok=True)
with open(valid_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote to {path}"
except Exception as e:
raise ValueError(f"Failed to write file: {str(e)}")
def _normalize_line_endings(self, text: str) -> str:
"""Normalize line endings to \n"""
return text.replace('\r\n', '\n')
def _create_unified_diff(self, original: str, modified: str, filepath: str = 'file') -> str:
"""Create a unified diff between two texts"""
original_norm = self._normalize_line_endings(original)
modified_norm = self._normalize_line_endings(modified)
original_lines = original_norm.splitlines(keepends=True)
modified_lines = modified_norm.splitlines(keepends=True)
diff = difflib.unified_diff(
original_lines,
modified_lines,
fromfile=f"a/{filepath}",
tofile=f"b/{filepath}",
lineterm=''
)
return ''.join(diff)
async def edit_file(self, path: str, edits: List[Dict[str, str]], dry_run: bool = False) -> str:
"""Apply edits to a file and return a diff of changes"""
valid_path = await self.security.validate_path(path)
try:
content = await self.read_file(valid_path)
content_norm = self._normalize_line_endings(content)
# Apply edits sequentially
modified_content = content_norm
for edit in edits:
old_text = self._normalize_line_endings(edit.get('oldText', ''))
new_text = self._normalize_line_endings(edit.get('newText', ''))
# If exact match exists, use it
if old_text in modified_content:
modified_content = modified_content.replace(old_text, new_text)
continue
# Try line-by-line matching with flexibility for whitespace
old_lines = old_text.split('\n')
content_lines = modified_content.split('\n')
match_found = False
for i in range(len(content_lines) - len(old_lines) + 1):
potential_match = content_lines[i:i+len(old_lines)]
# Compare lines with normalized whitespace
is_match = all(ol.strip() == pl.strip() for ol, pl in zip(old_lines, potential_match))
if is_match:
# Preserve original indentation of first line
original_indent = ''
match = content_lines[i].lstrip()
if match and len(content_lines[i]) > len(match):
original_indent = content_lines[i][:-len(match)]
# Create new lines with preserved indentation
new_lines = []
for j, line in enumerate(new_text.split('\n')):
if j == 0:
new_lines.append(original_indent + line.lstrip())
else:
new_lines.append(line)
# Replace lines in content
content_lines[i:i+len(old_lines)] = new_lines
modified_content = '\n'.join(content_lines)
match_found = True
break
if not match_found:
raise ValueError(f"Could not find exact match for edit: {old_text}")
# Create unified diff
diff = self._create_unified_diff(content, modified_content, path)
# Format diff with appropriate number of backticks
num_backticks = 3
while '`' * num_backticks in diff:
num_backticks += 1
formatted_diff = f"{'`' * num_backticks}diff\n{diff}{'`' * num_backticks}\n\n"
if not dry_run:
await self.write_file(path, modified_content)
return formatted_diff
except Exception as e:
raise ValueError(f"Failed to edit file: {str(e)}")
async def create_directory(self, path: str) -> str:
"""Create a directory with path validation"""
valid_path = await self.security.validate_path(path)
try:
os.makedirs(valid_path, exist_ok=True)
return f"Successfully created directory {path}"
except Exception as e:
raise ValueError(f"Failed to create directory: {str(e)}")
async def list_directory(self, path: str) -> str:
"""List contents of a directory with path validation"""
valid_path = await self.security.validate_path(path)
try:
entries = os.listdir(valid_path)
formatted = []
for entry in entries:
entry_path = os.path.join(valid_path, entry)
entry_type = "[DIR]" if os.path.isdir(entry_path) else "[FILE]"
formatted.append(f"{entry_type} {entry}")
return "\n".join(formatted)
except Exception as e:
raise ValueError(f"Failed to list directory: {str(e)}")
async def directory_tree(self, path: str) -> str:
"""Generate a directory tree structure as JSON"""
valid_path = await self.security.validate_path(path)
try:
def build_tree(current_path):
entries = os.listdir(current_path)
result = []
for entry in entries:
entry_path = os.path.join(current_path, entry)
entry_data = {
"name": entry,
"type": "directory" if os.path.isdir(entry_path) else "file"
}
if os.path.isdir(entry_path):
entry_data["children"] = build_tree(entry_path)
result.append(entry_data)
return result
tree_data = build_tree(valid_path)
return json.dumps(tree_data, indent=2)
except Exception as e:
raise ValueError(f"Failed to create directory tree: {str(e)}")
async def move_file(self, source: str, destination: str) -> str:
"""Move a file or directory with path validation"""
valid_source = await self.security.validate_path(source)
valid_dest = await self.security.validate_path(destination)
try:
# Create parent directories if they don't exist
os.makedirs(os.path.dirname(valid_dest), exist_ok=True)
shutil.move(valid_source, valid_dest)
return f"Successfully moved {source} to {destination}"
except Exception as e:
raise ValueError(f"Failed to move file: {str(e)}")
async def search_files(self, path: str, pattern: str, exclude_patterns: List[str] = None) -> str:
"""Recursively search for files matching a pattern"""
if exclude_patterns is None:
exclude_patterns = []
valid_root = await self.security.validate_path(path)
results = []
try:
for root, dirs, files in os.walk(valid_root):
# Check if the directory should be excluded
rel_dir = os.path.relpath(root, valid_root)
if rel_dir == '.':
rel_dir = ''
should_skip = False
for exclude in exclude_patterns:
if fnmatch.fnmatch(rel_dir, exclude):
should_skip = True
break
if should_skip:
continue
# Process directories
for i, dir_name in enumerate(dirs):
if any(fnmatch.fnmatch(dir_name, pat) for pat in exclude_patterns):
dirs.pop(i) # Don't traverse this directory
continue
if pattern.lower() in dir_name.lower():
results.append(os.path.join(root, dir_name))
# Process files
for file_name in files:
if any(fnmatch.fnmatch(file_name, pat) for pat in exclude_patterns):
continue
if pattern.lower() in file_name.lower():
results.append(os.path.join(root, file_name))
return "\n".join(results) if results else "No matches found"
except Exception as e:
raise ValueError(f"Failed to search files: {str(e)}")
async def get_file_info(self, path: str) -> str:
"""Get detailed metadata about a file or directory"""
valid_path = await self.security.validate_path(path)
try:
stats = os.stat(valid_path)
info = {
"size": stats.st_size,
"created": datetime.fromtimestamp(stats.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stats.st_mtime).isoformat(),
"accessed": datetime.fromtimestamp(stats.st_atime).isoformat(),
"isDirectory": os.path.isdir(valid_path),
"isFile": os.path.isfile(valid_path),
"permissions": oct(stats.st_mode)[-3:], # Last 3 digits of octal representation
"absolutePath": os.path.abspath(valid_path),
"filename": os.path.basename(valid_path)
}
return "\n".join(f"{key}: {value}" for key, value in info.items())
except Exception as e:
raise ValueError(f"Failed to get file info: {str(e)}")
async def list_allowed_directories(self) -> str:
"""List all allowed directories"""
return "Allowed directories:\n" + "\n".join(self.security.allowed_directories)
# Tool registration functions with proper MCP integration
class FileSystemTools:
"""Class containing tools for file system operations"""
def __init__(self, allowed_dirs=None):
# Default to user's home directory if no dirs specified
if allowed_dirs is None:
allowed_dirs = [os.path.expanduser("~")]
self.security = FilesystemSecurity(allowed_dirs)
self.tools = FilesystemTools(self.security)
logging.info(f"Filesystem tools initialized with allowed directories: {allowed_dirs}")
# Tool function definitions that will be registered with MCP
async def read_file(path: str, ctx: Context = None) -> str:
"""Read the complete contents of a file from the file system.
Handles various text encodings and provides detailed error messages
if the file cannot be read. Use this tool when you need to examine
the contents of a single file. Only works within allowed directories.
"""
try:
return await _get_fs_tools().tools.read_file(path)
except Exception as e:
return f"Error reading file: {str(e)}"
async def read_multiple_files(paths: List[str], ctx: Context = None) -> str:
"""Read the contents of multiple files simultaneously.
This is more efficient than reading files one by one when you need to analyze
or compare multiple files. Each file's content is returned with its
path as a reference. Failed reads for individual files won't stop
the entire operation. Only works within allowed directories.
"""
try:
return await _get_fs_tools().tools.read_multiple_files(paths)
except Exception as e:
return f"Error reading multiple files: {str(e)}"
async def write_file(path: str, content: str, ctx: Context = None) -> str:
"""Create a new file or completely overwrite an existing file with new content.
Use with caution as it will overwrite existing files without warning.
Handles text content with proper encoding. Only works within allowed directories.
"""
try:
return await _get_fs_tools().tools.write_file(path, content)
except Exception as e:
return f"Error writing file: {str(e)}"
async def edit_file(path: str, edits: List[Dict[str, str]], dry_run: bool = False, ctx: Context = None) -> str:
"""Make line-based edits to a text file.
Each edit replaces exact line sequences with new content. Returns a git-style diff
showing the changes made. Only works within allowed directories.
Parameters:
- path: Path to the file to edit
- edits: List of edit operations, each with 'oldText' and 'newText' properties
- dry_run: If True, returns diff without actually modifying the file
"""
try:
return await _get_fs_tools().tools.edit_file(path, edits, dry_run)
except Exception as e:
return f"Error editing file: {str(e)}"
async def create_directory(path: str, ctx: Context = None) -> str:
"""Create a new directory or ensure a directory exists.
Can create multiple nested directories in one operation. If the directory already exists,
this operation will succeed silently. Perfect for setting up directory
structures for projects or ensuring required paths exist. Only works within allowed directories.
"""
try:
return await _get_fs_tools().tools.create_directory(path)
except Exception as e:
return f"Error creating directory: {str(e)}"
async def list_directory(path: str, ctx: Context = None) -> str:
"""Get a detailed listing of all files and directories in a specified path.
Results clearly distinguish between files and directories with [FILE] and [DIR]
prefixes. This tool is essential for understanding directory structure and
finding specific files within a directory. Only works within allowed directories.
"""
try:
return await _get_fs_tools().tools.list_directory(path)
except Exception as e:
return f"Error listing directory: {str(e)}"
async def directory_tree(path: str, ctx: Context = None) -> str:
"""Get a recursive tree view of files and directories as a JSON structure.
Each entry includes 'name', 'type' (file/directory), and 'children' for directories.
Files have no children array, while directories always have a children array (which may be empty).
The output is formatted with 2-space indentation for readability. Only works within allowed directories.
"""
try:
return await _get_fs_tools().tools.directory_tree(path)
except Exception as e:
return f"Error creating directory tree: {str(e)}"
async def move_file(source: str, destination: str, ctx: Context = None) -> str:
"""Move or rename files and directories.
Can move files between directories and rename them in a single operation.
If the destination exists, the operation will fail. Works across different
directories and can be used for simple renaming within the same directory.
Both source and destination must be within allowed directories.
"""
try:
return await _get_fs_tools().tools.move_file(source, destination)
except Exception as e:
return f"Error moving file: {str(e)}"
async def search_files(path: str, pattern: str, exclude_patterns: List[str] = None, ctx: Context = None) -> str:
"""Recursively search for files and directories matching a pattern.
Searches through all subdirectories from the starting path. The search
is case-insensitive and matches partial names. Returns full paths to all
matching items. Great for finding files when you don't know their exact location.
Only searches within allowed directories.
"""
if exclude_patterns is None:
exclude_patterns = []
try:
return await _get_fs_tools().tools.search_files(path, pattern, exclude_patterns)
except Exception as e:
return f"Error searching files: {str(e)}"
async def get_file_info(path: str, ctx: Context = None) -> str:
"""Retrieve detailed metadata about a file or directory.
Returns comprehensive information including size, creation time, last modified time,
permissions, and type. This tool is perfect for understanding file characteristics
without reading the actual content. Only works within allowed directories.
"""
try:
return await _get_fs_tools().tools.get_file_info(path)
except Exception as e:
return f"Error getting file info: {str(e)}"
async def list_allowed_directories(ctx: Context = None) -> str:
"""Returns the list of directories that this server is allowed to access.
Use this to understand which directories are available before trying to access files.
"""
try:
return await _get_fs_tools().tools.list_allowed_directories()
except Exception as e:
return f"Error listing allowed directories: {str(e)}"
# Tool registration and initialization
_fs_tools_instance = None
def initialize_fs_tools(allowed_dirs=None):
"""Initialize the filesystem tools with specified allowed directories"""
global _fs_tools_instance
_fs_tools_instance = FileSystemTools(allowed_dirs)
return _fs_tools_instance
def _get_fs_tools():
"""Get or initialize the filesystem tools"""
global _fs_tools_instance
if _fs_tools_instance is None:
_fs_tools_instance = initialize_fs_tools()
return _fs_tools_instance
def get_filesystem_tools(allowed_dirs=None):
"""Get a dictionary of all filesystem tools for registration with MCP"""
# Initialize with allowed dirs if specified
if allowed_dirs:
initialize_fs_tools(allowed_dirs)
return {
"read_file": read_file,
"read_multiple_files": read_multiple_files,
"write_file": write_file,
"edit_file": edit_file,
"create_directory": create_directory,
"list_directory": list_directory,
"directory_tree": directory_tree,
"move_file": move_file,
"search_files": search_files,
"get_file_info": get_file_info,
"list_allowed_directories": list_allowed_directories
}
# If this file is run directly, print the list of tools
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python filesystem.py <allowed-directory> [additional-directories...]")
sys.exit(1)
allowed_dirs = [os.path.abspath(d) for d in sys.argv[1:]]
initialize_fs_tools(allowed_dirs)
print(f"Filesystem tools initialized with allowed directories: {allowed_dirs}")
print("Available tools:")
for tool_name in get_filesystem_tools().keys():
print(f"- {tool_name}")