AiDD MCP Server
by skydeckai
import difflib
import json
import os
import re
import stat
import subprocess
from datetime import datetime
from typing import List
import mcp.types as types
from .state import state
def read_file_tool():
return {
"name": "read_file",
"description": "Read the complete contents of a file from the file system. "
"Handles various text encodings and provides detailed error messages "
"if the file cannot be read. Use this tool when you need to examine "
"the contents of a single file. Only works within the allowed directory."
"Example: Enter 'src/main.py' to read a Python file.",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to read",
}
},
"required": ["path"]
},
}
def write_file_tool():
return {
"name": "write_file",
"description": "Create a new file or overwrite an existing file with new content. "
"Use this to save changes, create new files, or update existing ones. "
"Use with caution as it will overwrite existing files without warning. "
"Path must be relative to the allowed directory. Creates parent directories if needed. "
"Example: Path='notes.txt', Content='Meeting notes for project X'",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path where to write the file"
},
"content": {
"type": "string",
"description": "Content to write to the file"
}
},
"required": ["path", "content"]
},
}
def move_file_tool():
return {
"name": "move_file",
"description": "Move or rename a file or directory to a new location. "
"This tool can be used to reorganize files and directories. "
"Both source and destination must be within the allowed directory. "
"If the destination already exists, the operation will fail. "
"Parent directories of the destination will be created if they don't exist. "
"Example: source='old.txt', destination='new/path/new.txt'",
"inputSchema": {
"type": "object",
"properties": {
"source": {
"type": "string",
"description": "Source path of the file or directory to move"
},
"destination": {
"type": "string",
"description": "Destination path where to move the file or directory"
}
},
"required": ["source", "destination"]
},
}
def search_files_tool():
return {
"name": "search_files",
"description": "Search for files and directories matching a pattern. "
"The search is recursive and case-insensitive. "
"Only searches within the allowed directory. "
"Returns paths relative to the allowed directory. "
"Searches in file and directory names, not content. "
"For searching within file contents, use the tree_sitter_map tool which can locate specific code elements like functions and classes. "
"Example: pattern='.py' finds all Python files, "
"pattern='test' finds all items with 'test' in the name.",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Starting directory for the search (defaults to allowed directory)"
},
"pattern": {
"type": "string",
"description": "Pattern to search for in file and directory names"
},
"include_hidden": {
"type": "boolean",
"description": "Whether to include hidden files and directories (defaults to false)"
}
},
"required": ["pattern"]
},
}
def get_file_info_tool():
return {
"name": "get_file_info",
"description": "Get detailed information about a file or directory. "
"Returns size, creation time, modification time, access time, "
"type (file/directory), and permissions. "
"All times are in ISO 8601 format. "
"This tool is perfect for understanding file characteristics without reading the actual content. "
"Only works within the allowed directory. "
"Example: path='src/main.py' returns details about main.py",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file or directory"
}
},
"required": ["path"]
},
}
def delete_file_tool():
return {
"name": "delete_file",
"description": "Delete a file or empty directory from the file system. "
"Use with caution as this operation cannot be undone. "
"For safety, this tool will not delete non-empty directories. "
"Only works within the allowed directory. "
"Example: path='old_file.txt' removes the specified file.",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file or empty directory to delete"
}
},
"required": ["path"]
},
}
def read_multiple_files_tool():
return {
"name": "read_multiple_files",
"description": "Read the contents of multiple files simultaneously. "
"This is more efficient than reading files one by one when you need to analyze "
"or compare multiple files. Each file's content is returned with its "
"path as a reference. Failed reads for individual files won't stop "
"the entire operation. Only works within the allowed directory."
"Example: Enter ['src/main.py', 'README.md'] to read both files.",
"inputSchema": {
"type": "object",
"properties": {
"paths": {
"type": "array",
"items": {"type": "string"},
"description": "List of file paths to read",
}
},
"required": ["paths"]
},
}
def edit_file_tool():
return {
"name": "edit_file",
"description": "Make line-based edits to a text file. Each edit replaces exact line sequences "
"with new content. Returns a git-style diff showing the changes made. "
"Only works within the allowed directory. "
"Always use dryRun first to preview changes before applying them.",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "File to edit"
},
"edits": {
"type": "array",
"items": {
"type": "object",
"properties": {
"oldText": {
"type": "string",
"description": "Text to search for (can be substring)"
},
"newText": {
"type": "string",
"description": "Text to replace with"
}
},
"required": ["oldText", "newText"]
},
"description": "List of edit operations"
},
"dryRun": {
"type": "boolean",
"description": "Preview changes without applying",
"default": False
},
"options": {
"type": "object",
"properties": {
"preserveIndentation": {
"type": "boolean",
"description": "Keep existing indentation",
"default": True
},
"normalizeWhitespace": {
"type": "boolean",
"description": "Normalize spaces while preserving structure",
"default": True
},
"partialMatch": {
"type": "boolean",
"description": "Enable fuzzy matching",
"default": True
}
}
}
},
"required": ["path", "edits"]
}
}
async def _read_single_file(path: str) -> List[types.TextContent]:
"""Helper function to read a single file with proper validation."""
from mcp.types import TextContent
# Determine full path based on whether input is absolute or relative
if os.path.isabs(path):
full_path = os.path.abspath(path) # Just normalize the absolute path
else:
# For relative paths, join with allowed_directory
full_path = os.path.abspath(os.path.join(state.allowed_directory, path))
if not full_path.startswith(state.allowed_directory):
raise ValueError(f"Access denied: Path ({full_path}) must be within allowed directory ({state.allowed_directory})")
if not os.path.exists(full_path):
raise ValueError(f"File does not exist: {full_path}")
if not os.path.isfile(full_path):
raise ValueError(f"Path is not a file: {full_path}")
try:
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
return [TextContent(
type="text",
text=content
)]
except UnicodeDecodeError:
raise ValueError(f"File is not a text file or has unknown encoding: {full_path}")
except PermissionError:
raise ValueError(f"Permission denied reading file: {full_path}")
except Exception as e:
raise ValueError(f"Error reading file: {str(e)}")
async def handle_write_file(arguments: dict):
"""Handle writing content to a file."""
from mcp.types import TextContent
path = arguments.get("path")
content = arguments.get("content")
if not path:
raise ValueError("path must be provided")
if content is None:
raise ValueError("content must be provided")
# Determine full path based on whether input is absolute or relative
if os.path.isabs(path):
full_path = os.path.abspath(path) # Just normalize the absolute path
else:
# For relative paths, join with allowed_directory
full_path = os.path.abspath(os.path.join(state.allowed_directory, path))
if not full_path.startswith(state.allowed_directory):
raise ValueError(f"Access denied: Path ({full_path}) must be within allowed directory ({state.allowed_directory})")
try:
# Create parent directories if they don't exist
os.makedirs(os.path.dirname(full_path), exist_ok=True)
# Write the file
with open(full_path, 'w', encoding='utf-8') as f:
f.write(content)
return [TextContent(
type="text",
text=f"Successfully wrote to {path}"
)]
except Exception as e:
raise ValueError(f"Error writing file: {str(e)}")
async def handle_read_file(arguments: dict):
path = arguments.get("path")
if not path:
raise ValueError("path must be provided")
return await _read_single_file(path)
async def handle_read_multiple_files(arguments: dict):
paths = arguments.get("paths", [])
if not isinstance(paths, list):
raise ValueError("paths must be a list of strings")
if not all(isinstance(p, str) for p in paths):
raise ValueError("all paths must be strings")
if not paths:
raise ValueError("paths list cannot be empty")
from mcp.types import TextContent
results = []
for path in paths:
try:
# Add file path header first
results.append(TextContent(
type="text",
text=f"\n==> {path} <==\n"
))
# Then add file contents
file_contents = await _read_single_file(path)
results.extend(file_contents)
except Exception as e:
results.append(TextContent(
type="text",
text=f"Error: {str(e)}\n"
))
return results
async def handle_move_file(arguments: dict):
"""Handle moving a file or directory to a new location."""
from mcp.types import TextContent
source = arguments.get("source")
destination = arguments.get("destination")
if not source:
raise ValueError("source must be provided")
if not destination:
raise ValueError("destination must be provided")
# Determine full paths based on whether inputs are absolute or relative
if os.path.isabs(source):
full_source = os.path.abspath(source)
else:
full_source = os.path.abspath(os.path.join(state.allowed_directory, source))
if os.path.isabs(destination):
full_destination = os.path.abspath(destination)
else:
full_destination = os.path.abspath(os.path.join(state.allowed_directory, destination))
# Security checks
if not full_source.startswith(state.allowed_directory):
raise ValueError(f"Access denied: Source path ({full_source}) must be within allowed directory")
if not full_destination.startswith(state.allowed_directory):
raise ValueError(f"Access denied: Destination path ({full_destination}) must be within allowed directory")
# Validate source exists
if not os.path.exists(full_source):
raise ValueError(f"Source path does not exist: {source}")
# Create parent directories of destination if they don't exist
os.makedirs(os.path.dirname(full_destination), exist_ok=True)
try:
# Perform the move operation
os.rename(full_source, full_destination)
return [TextContent(
type="text",
text=f"Successfully moved {source} to {destination}"
)]
except OSError as e:
raise ValueError(f"Error moving file: {str(e)}")
except Exception as e:
raise ValueError(f"Unexpected error: {str(e)}")
async def handle_search_files(arguments: dict):
"""Handle searching for files matching a pattern."""
from mcp.types import TextContent
pattern = arguments.get("pattern")
start_path = arguments.get("path", ".")
include_hidden = arguments.get("include_hidden", False)
if not pattern:
raise ValueError("pattern must be provided")
# Determine full path for search start
if os.path.isabs(start_path):
full_start_path = os.path.abspath(start_path)
else:
full_start_path = os.path.abspath(os.path.join(state.allowed_directory, start_path))
# Security check
if not full_start_path.startswith(state.allowed_directory):
raise ValueError(f"Access denied: Path ({full_start_path}) must be within allowed directory")
if not os.path.exists(full_start_path):
raise ValueError(f"Start path does not exist: {start_path}")
if not os.path.isdir(full_start_path):
raise ValueError(f"Start path is not a directory: {start_path}")
matches = []
pattern = pattern.lower() # Case-insensitive search
# Try git ls-files first
try:
# First, check if this is a git repository and get tracked files
result = subprocess.run(
['git', 'ls-files'],
cwd=full_start_path,
capture_output=True,
text=True,
check=True
)
# Also get git directories (excluding .git itself)
dirs_result = subprocess.run(
['git', 'ls-tree', '-d', '-r', '--name-only', 'HEAD'],
cwd=full_start_path,
capture_output=True,
text=True,
check=True
)
# Process git-tracked files
files = result.stdout.splitlines()
dirs = dirs_result.stdout.splitlines()
# Process directories first
for dir_path in dirs:
if pattern in dir_path.lower():
if include_hidden or not any(part.startswith('.') for part in dir_path.split(os.sep)):
matches.append(f"[DIR] {dir_path}")
# Then process files
for file_path in files:
if pattern in file_path.lower():
if include_hidden or not any(part.startswith('.') for part in file_path.split(os.sep)):
matches.append(f"[FILE] {file_path}")
except (subprocess.CalledProcessError, FileNotFoundError):
# Fallback to regular directory walk if git is not available or not a git repository
try:
for root, dirs, files in os.walk(full_start_path):
# Get paths relative to allowed directory
rel_root = os.path.relpath(root, state.allowed_directory)
# Skip hidden directories if not included
if not include_hidden:
dirs[:] = [d for d in dirs if not d.startswith('.')]
# Process directories
for dir_name in dirs:
if pattern in dir_name.lower():
rel_path = os.path.join(rel_root, dir_name)
if include_hidden or not any(part.startswith('.') for part in rel_path.split(os.sep)):
matches.append(f"[DIR] {rel_path}")
# Process files
for file_name in files:
if pattern in file_name.lower():
rel_path = os.path.join(rel_root, file_name)
if include_hidden or not any(part.startswith('.') for part in rel_path.split(os.sep)):
matches.append(f"[FILE] {rel_path}")
except Exception as e:
raise ValueError(f"Error searching files: {str(e)}")
# Sort matches for consistent output
matches.sort()
if not matches:
return [TextContent(
type="text",
text="No matches found"
)]
return [TextContent(
type="text",
text="\n".join(matches)
)]
async def handle_get_file_info(arguments: dict):
"""Handle getting detailed information about a file or directory."""
from mcp.types import TextContent
path = arguments.get("path")
if not path:
raise ValueError("path must be provided")
# Determine full path
if os.path.isabs(path):
full_path = os.path.abspath(path)
else:
full_path = os.path.abspath(os.path.join(state.allowed_directory, path))
# Security check
if not full_path.startswith(state.allowed_directory):
raise ValueError(f"Access denied: Path ({full_path}) must be within allowed directory")
if not os.path.exists(full_path):
raise ValueError(f"Path does not exist: {path}")
try:
stat_info = os.stat(full_path)
# Format file type
file_type = "directory" if os.path.isdir(full_path) else "file"
# Format permissions in octal
perms = stat.filemode(stat_info.st_mode)
info = f"""Type: {file_type}
Size: {stat_info.st_size:,} bytes
Created: {datetime.fromtimestamp(stat_info.st_ctime).isoformat()}
Modified: {datetime.fromtimestamp(stat_info.st_mtime).isoformat()}
Accessed: {datetime.fromtimestamp(stat_info.st_atime).isoformat()}
Permissions: {perms}"""
return [TextContent(type="text", text=info)]
except Exception as e:
raise ValueError(f"Error getting file info: {str(e)}")
async def handle_delete_file(arguments: dict):
"""Handle deleting a file or empty directory."""
from mcp.types import TextContent
path = arguments.get("path")
if not path:
raise ValueError("path must be provided")
# Determine full path
if os.path.isabs(path):
full_path = os.path.abspath(path)
else:
full_path = os.path.abspath(os.path.join(state.allowed_directory, path))
# Security check
if not full_path.startswith(state.allowed_directory):
raise ValueError(f"Access denied: Path ({full_path}) must be within allowed directory")
if not os.path.exists(full_path):
raise ValueError(f"Path does not exist: {path}")
try:
if os.path.isdir(full_path):
# Check if directory is empty
if os.listdir(full_path):
raise ValueError(f"Cannot delete non-empty directory: {path}")
os.rmdir(full_path)
return [TextContent(
type="text",
text=f"Successfully deleted empty directory: {path}"
)]
else:
os.remove(full_path)
return [TextContent(
type="text",
text=f"Successfully deleted file: {path}"
)]
except Exception as e:
raise ValueError(f"Error deleting {path}: {str(e)}")
def normalize_whitespace(text: str, preserve_indentation: bool = True) -> str:
"""Normalize whitespace while optionally preserving indentation."""
lines = text.splitlines()
normalized_lines = []
for line in lines:
if preserve_indentation:
# Preserve leading whitespace
indent = re.match(r'^\s*', line).group(0)
# Normalize other whitespace
content = re.sub(r'\s+', ' ', line.lstrip())
normalized_lines.append(f"{indent}{content}")
else:
# Normalize all whitespace
normalized_lines.append(re.sub(r'\s+', ' ', line.strip()))
return '\n'.join(normalized_lines)
def create_unified_diff(original: str, modified: str, filepath: str) -> str:
"""Create a unified diff between two texts."""
original_lines = original.splitlines()
modified_lines = modified.splitlines()
diff = difflib.unified_diff(
original_lines,
modified_lines,
fromfile=f'a/{filepath}',
tofile=f'b/{filepath}',
n=0, # No context lines needed for single line changes
lineterm='' # Don't add newlines here
)
# Join lines with single newlines and strip any extra whitespace
return '\n'.join(line.rstrip() for line in diff)
def find_substring_position(content: str, pattern: str) -> tuple[int, int]:
"""Find the position of a substring in content."""
pos = content.find(pattern)
if pos >= 0:
return pos, pos + len(pattern)
return -1, -1
def find_best_match(content: str, pattern: str, partial_match: bool = True) -> tuple[int, int, float]:
"""Find the best matching position for a pattern in content."""
if not partial_match:
# Exact matching only
pos = content.find(pattern)
if pos >= 0:
return pos, pos + len(pattern), 1.0
return -1, -1, 0.0
# Try exact substring match first
start, end = find_substring_position(content, pattern)
if start >= 0:
return start, end, 1.0
# If no exact substring match, try line-based fuzzy matching
# Split into lines for line-based matching
content_lines = content.splitlines()
pattern_lines = pattern.splitlines()
best_score = 0.0
best_start = -1
best_end = -1
for i in range(len(content_lines) - len(pattern_lines) + 1):
# Compare each potential match position
window = content_lines[i:i + len(pattern_lines)]
score = sum(difflib.SequenceMatcher(None, a, b).ratio()
for a, b in zip(window, pattern_lines)) / len(pattern_lines)
if score > best_score:
best_score = score
best_start = sum(len(line) + 1 for line in content_lines[:i])
best_end = best_start + sum(len(line) + 1 for line in window)
return best_start, best_end, best_score
async def apply_file_edits(file_path: str, edits: List[dict], dry_run: bool = False, options: dict = None) -> str:
"""Apply edits to a file with optional formatting and return diff."""
# Set default options
options = options or {}
preserve_indentation = options.get('preserveIndentation', True)
normalize_ws = options.get('normalizeWhitespace', True)
partial_match = options.get('partialMatch', True)
# Read file content
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Track modifications
modified_content = content
failed_matches = []
# Apply each edit
for edit in edits:
old_text = edit['oldText']
new_text = edit['newText']
# Normalize texts if requested
if normalize_ws:
search_text = normalize_whitespace(old_text, preserve_indentation)
working_content = normalize_whitespace(modified_content, preserve_indentation)
else:
search_text = old_text
working_content = modified_content
# Find best match
start, end, confidence = find_best_match(working_content, search_text, partial_match)
if confidence >= 0.8:
# Preserve indentation of first line if requested
if preserve_indentation and start >= 0:
indent = re.match(r'^\s*', modified_content[start:].splitlines()[0]).group(0)
replacement = '\n'.join(indent + line.lstrip()
for line in new_text.splitlines())
else:
replacement = new_text
# Apply the edit
modified_content = modified_content[:start] + replacement + modified_content[end:]
else:
failed_matches.append({
'oldText': old_text,
'confidence': confidence,
'bestMatch': working_content[start:end] if start >= 0 and end > start else None
})
# Create diff
diff = create_unified_diff(content, modified_content, os.path.basename(file_path))
# Write changes if not dry run
if not dry_run and not failed_matches:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(modified_content)
# Return results
failed_matches_text = '=== Failed Matches ===\n' + json.dumps(failed_matches, indent=2) + '\n\n' if failed_matches else ''
diff_text = f'=== Diff ===\n{diff}'
return failed_matches_text + diff_text
async def handle_edit_file(arguments: dict):
"""Handle editing a file with pattern matching and formatting."""
from mcp.types import TextContent
path = arguments.get("path")
edits = arguments.get("edits")
dry_run = arguments.get("dryRun", False)
options = arguments.get("options", {})
if not path:
raise ValueError("path must be provided")
if not edits or not isinstance(edits, list):
raise ValueError("edits must be a non-empty list")
# Validate edits structure
for edit in edits:
if not isinstance(edit, dict):
raise ValueError("each edit must be an object")
if 'oldText' not in edit or 'newText' not in edit:
raise ValueError("each edit must have oldText and newText properties")
# Determine full path and validate
if os.path.isabs(path):
full_path = os.path.abspath(path)
else:
full_path = os.path.abspath(os.path.join(state.allowed_directory, path))
if not full_path.startswith(state.allowed_directory):
raise ValueError(f"Access denied: Path ({full_path}) must be within allowed directory ({state.allowed_directory})")
try:
result = await apply_file_edits(full_path, edits, dry_run, options)
return [TextContent(type="text", text=result)]
except Exception as e:
raise ValueError(f"Error editing file: {str(e)}")