#!/usr/bin/env python3
"""
Tool Implementations for MCP Server
This module provides all tool implementations for the MCP server,
including file operations, code analysis, and VSCode integration.
Author: Kommandant
Version: 1.0.0
License: MIT
"""
import json
import re
import subprocess
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
class LanguageConfig:
"""
Language configuration manager.
Handles loading and accessing language-specific configurations
including file extensions, patterns, and analysis rules.
"""
def __init__(self, config_data: Dict[str, Any]):
"""
Initialize language configuration.
Args:
config_data: Configuration dictionary from languages.json
"""
self.languages = config_data.get("languages", {})
self.safe_tools = set(config_data.get("safe_tools", []))
self.dangerous_tools = set(config_data.get("dangerous_tools", []))
self.code_extensions = set(self.languages.keys()) | {
".json", ".yaml", ".yml", ".xml", ".md"
}
def get_language(self, ext: str) -> str:
"""
Get language name by file extension.
Args:
ext: File extension (e.g., '.py')
Returns:
Language name or 'Unknown'
"""
return self.languages.get(ext.lower(), {}).get("name", "Unknown")
def get_patterns(self, ext: str) -> Dict[str, str]:
"""
Get regex patterns for language.
Args:
ext: File extension
Returns:
Dictionary of pattern names to regex patterns
"""
return self.languages.get(ext.lower(), {}).get("patterns", {})
def is_code_file(self, filepath: Path) -> bool:
"""
Check if file is a code file.
Args:
filepath: Path to check
Returns:
True if code file, False otherwise
"""
return filepath.suffix.lower() in self.code_extensions
# File operations
def read_file(path: str) -> str:
"""
Read file content from path.
Args:
path: File path to read
Returns:
File content as string or error message
"""
try:
file_path = Path(path).resolve()
return file_path.read_text(encoding='utf-8')
except FileNotFoundError:
return f"Error: File not found: {path}"
except PermissionError:
return f"Error: Permission denied: {path}"
except UnicodeDecodeError:
return f"Error: File is not valid UTF-8: {path}"
except Exception as e:
return f"Error reading file: {e}"
def write_file(path: str, content: str) -> str:
"""
Write content to file with detailed feedback.
Args:
path: Target file path
content: Content to write
Returns:
Success message with file details or error message
"""
try:
if not path or not path.strip():
return "Error: Filename cannot be empty"
path = path.strip()
if ".." in path:
return "Error: Invalid path - directory traversal not allowed"
project_dir = Path.cwd().resolve()
target_file = project_dir / path
if not str(target_file).startswith(str(project_dir)):
return "Error: Cannot write outside project directory"
target_file.parent.mkdir(parents=True, exist_ok=True)
target_file.write_text(content, encoding='utf-8')
result = "File written successfully!\n\n"
result += f"File: {path}\n"
result += f"Location: {target_file}\n"
result += f"Size: {len(content)} chars, {len(content.splitlines())} lines\n"
ext = target_file.suffix.lower()
lang_map = {
'.py': 'Python', '.js': 'JavaScript', '.ts': 'TypeScript',
'.java': 'Java', '.cpp': 'C++', '.c': 'C', '.go': 'Go',
'.rs': 'Rust', '.rb': 'Ruby', '.php': 'PHP', '.cs': 'C#',
'.kt': 'Kotlin', '.swift': 'Swift', '.html': 'HTML',
'.css': 'CSS', '.sql': 'SQL', '.sh': 'Shell', '.json': 'JSON',
'.md': 'Markdown', '.txt': 'Text', '.yaml': 'YAML', '.xml': 'XML'
}
if ext in lang_map:
result += f"Language: {lang_map[ext]}\n"
result += "\nThe file is now available in your project!"
return result
except PermissionError:
return f"Error: Permission denied - cannot write to {path}"
except Exception as e:
return f"Error writing file: {str(e)}"
def execute_command(command: str) -> str:
"""
Execute shell command.
Args:
command: Command to execute
Returns:
Command output or error message
"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=10,
stderr=subprocess.DEVNULL,
stdout=subprocess.PIPE
)
return result.stdout
except subprocess.TimeoutExpired:
return "Error: Command timed out after 10 seconds"
except Exception as e:
return f"Error executing command: {e}"
def list_files(directory: str = ".") -> str:
"""
List files in directory.
Args:
directory: Directory path (defaults to current directory)
Returns:
Formatted list of files or error message
"""
try:
dir_path = Path(directory).resolve()
if not dir_path.exists():
return f"Directory not found: {directory}"
if not dir_path.is_dir():
return f"Not a directory: {directory}"
files = list(dir_path.iterdir())
return "\n".join([
f"{f.name}{'/' if f.is_dir() else ''}"
for f in sorted(files)
])
except PermissionError:
return f"Error: Permission denied: {directory}"
except Exception as e:
return f"Error listing files: {e}"
def get_project_structure(directory: str, lang_config: LanguageConfig) -> str:
"""
Get project structure organized by programming language.
Args:
directory: Project root directory
lang_config: Language configuration instance
Returns:
Formatted project structure
"""
try:
root = Path(directory).resolve()
files_by_lang: Dict[str, List[str]] = {}
excluded_dirs = {".git", "node_modules", "__pycache__", ".venv", "venv"}
for file in root.rglob("*"):
if not file.is_file():
continue
if any(excluded in file.parts for excluded in excluded_dirs):
continue
if file.suffix.lower() in lang_config.code_extensions:
lang = lang_config.get_language(file.suffix.lower())
if lang not in files_by_lang:
files_by_lang[lang] = []
files_by_lang[lang].append(str(file.relative_to(root)))
result = "Project Structure by Language:\n" + "=" * 40 + "\n"
for lang in sorted(files_by_lang.keys()):
files = sorted(files_by_lang[lang])[:30]
result += f"\n{lang} ({len(files_by_lang[lang])} files):\n"
result += "\n".join(files)
return result
except Exception as e:
return f"Error getting structure: {e}"
def analyze_code(file_path: str, lang_config: LanguageConfig) -> str:
"""
Analyze code structure for functions, classes, and imports.
Args:
file_path: Path to file to analyze (optional)
lang_config: Language configuration instance
Returns:
Formatted code analysis
"""
try:
if not file_path:
workspace_dir = Path.cwd()
code_files = []
for ext in lang_config.code_extensions:
code_files.extend(workspace_dir.glob(f"*{ext}"))
if code_files:
file_path = str(max(code_files, key=lambda x: x.stat().st_mtime))
if not file_path:
return "No code file specified"
file = Path(file_path)
if not file.exists():
return f"File not found: {file_path}"
file_content = file.read_text(encoding='utf-8')
lines = file_content.split('\n')
ext = file.suffix.lower()
lang_name = lang_config.get_language(ext)
patterns = lang_config.get_patterns(ext)
results: Dict[str, List[str]] = {}
for pattern_type, pattern in patterns.items():
results[pattern_type] = []
regex = re.compile(pattern, re.MULTILINE)
for i, line in enumerate(lines, 1):
matches = regex.findall(line)
if matches:
for match in matches:
if isinstance(match, tuple):
match = match[0]
results[pattern_type].append(f"Line {i}: {match}")
analysis = f"Code Analysis: {file.name}\n"
analysis += f"Language: {lang_name}\n"
analysis += f"Total Lines: {len(lines)}\n"
analysis += "=" * 50 + "\n\n"
for pattern_type in sorted(results.keys()):
items = results[pattern_type]
if items:
analysis += f"{pattern_type.title()}s ({len(items)}):\n"
analysis += "\n".join(items[:30]) + "\n\n"
return analysis
except Exception as e:
return f"Error analyzing code: {e}"
def get_language_stats(directory: str, lang_config: LanguageConfig) -> str:
"""
Get statistics of all languages in project.
Args:
directory: Project directory
lang_config: Language configuration instance
Returns:
Formatted language statistics
"""
try:
root = Path(directory).resolve()
stats: Dict[str, Dict[str, int]] = {}
total_lines = 0
excluded_dirs = {".git", "node_modules", "__pycache__"}
for file in root.rglob("*"):
if not file.is_file():
continue
if any(excluded in str(file) for excluded in excluded_dirs):
continue
if file.suffix.lower() in lang_config.code_extensions:
lang = lang_config.get_language(file.suffix.lower())
try:
content = file.read_text(encoding='utf-8')
lines = len(content.split('\n'))
if lang not in stats:
stats[lang] = {"files": 0, "lines": 0}
stats[lang]["files"] += 1
stats[lang]["lines"] += lines
total_lines += lines
except:
pass
result = "Language Statistics:\n" + "=" * 40 + "\n\n"
for lang in sorted(stats.keys()):
data = stats[lang]
result += f"{lang}: {data['files']} files, {data['lines']} lines\n"
result += f"\nTotal: {sum(s['files'] for s in stats.values())} files, {total_lines} lines"
return result
except Exception as e:
return f"Error getting statistics: {e}"
def get_vscode_active_file(lang_config: LanguageConfig) -> str:
"""
Get currently active file from VSCode workspace.
Args:
lang_config: Language configuration instance
Returns:
Active file information
"""
try:
workspace_dir = Path.cwd().resolve()
code_files = []
excluded_dirs = {".git", "node_modules", "__pycache__"}
for file in workspace_dir.rglob("*"):
if not file.is_file():
continue
if not lang_config.is_code_file(file):
continue
if any(excluded in file.parts for excluded in excluded_dirs):
continue
try:
mtime = file.stat().st_mtime
code_files.append((mtime, file))
except:
pass
if not code_files:
return "No code files found in workspace"
latest_mtime, active_file = max(code_files, key=lambda x: x[0])
try:
content = active_file.read_text(encoding='utf-8')
except UnicodeDecodeError:
content = active_file.read_text(encoding='utf-8', errors='replace')
lang_name = lang_config.get_language(active_file.suffix.lower())
lines = content.split('\n')
result = f"Active VSCode File\n{'=' * 60}\n"
result += f"Language: {lang_name}\n"
result += f"File: {active_file.name}\n"
result += f"Path: {active_file.relative_to(workspace_dir)}\n"
result += f"Lines: {len(lines)}\n"
result += f"{'=' * 60}\n\n"
result += content
return result
except Exception as e:
return f"Error getting active file: {e}"
# VSCode detection functions
def get_vscode_window_title() -> Optional[str]:
"""
Get VSCode window title.
Returns:
Window title or None if VSCode not running
"""
try:
result = subprocess.run(
['powershell', '-Command',
"Get-Process code -ErrorAction SilentlyContinue | "
"Select-Object -First 1 | "
"ForEach-Object { $_.MainWindowTitle }"],
capture_output=True,
text=True,
timeout=3
)
title = result.stdout.strip()
if title:
return title
result2 = subprocess.run(
['powershell', '-Command',
"$count = (Get-Process code -ErrorAction SilentlyContinue | Measure-Object).Count; "
"if ($count -gt 0) { 'VSCode Running - ' + $count + ' process(es)' }"],
capture_output=True,
text=True,
timeout=3
)
return result2.stdout.strip() if result2.stdout.strip() else None
except:
return None
def get_vscode_recent_files() -> List[Dict[str, Any]]:
"""
Get recently accessed files (likely open in VSCode).
Returns:
List of file information dictionaries
"""
try:
workspace_dir = Path.cwd()
code_extensions = {
".py", ".js", ".ts", ".tsx", ".jsx", ".java", ".cpp", ".c",
".go", ".rs", ".rb", ".php", ".cs", ".kt", ".swift",
".json", ".yaml", ".yml", ".xml", ".md"
}
recent_files = []
current_time = time.time()
time_window = 3600 # 1 hour
excluded_dirs = {".git", "node_modules", "__pycache__"}
for file in workspace_dir.rglob("*"):
if not file.is_file():
continue
if file.suffix.lower() not in code_extensions:
continue
if any(excluded in file.parts for excluded in excluded_dirs):
continue
try:
mtime = file.stat().st_mtime
atime = file.stat().st_atime
last_access = max(mtime, atime)
time_diff = current_time - last_access
if time_diff < time_window:
recent_files.append({
'path': str(file),
'name': file.name,
'mtime': mtime,
'atime': atime,
'accessed_seconds_ago': int(time_diff)
})
except:
pass
recent_files.sort(key=lambda x: x['atime'], reverse=True)
return recent_files
except:
return []
def get_vscode_open_projects() -> Dict[str, Dict[str, str]]:
"""
Get VSCode open projects/workspaces from cache.
Returns:
Dictionary of workspace paths to project info
"""
try:
workspaces: Dict[str, Dict[str, str]] = {}
vscode_storage = Path.home() / "AppData" / "Roaming" / "Code" / "User" / "workspaceStorage"
if vscode_storage.exists():
for workspace_dir in vscode_storage.iterdir():
if workspace_dir.is_dir():
workspace_file = workspace_dir / "workspace.json"
if workspace_file.exists():
try:
with open(workspace_file, 'r', encoding='utf-8', errors='ignore') as f:
data = json.load(f)
if 'folders' in data and isinstance(data['folders'], list):
for folder in data['folders']:
if isinstance(folder, dict):
path = folder.get('path', '')
if path:
if path.startswith('file:///'):
path = path.replace('file:///', '').replace('/', '\\')
folder_name = Path(path).name if path else "Unknown"
workspaces[path] = {
'name': folder_name,
'type': 'workspace'
}
except:
pass
return workspaces
except:
return {}
def detect_vscode_files() -> str:
"""
Comprehensive VSCode file detection.
Returns:
Formatted VSCode detection report
"""
try:
result = "\n" + "=" * 70 + "\n"
result += " " * 15 + "VSCODE FILE DETECTION\n"
result += "=" * 70 + "\n\n"
window_title = get_vscode_window_title()
if window_title:
result += "VSCode Status:\n"
result += f" {window_title}\n\n"
else:
result += "VSCode is not currently running\n\n"
return result + "=" * 70 + "\n"
projects = get_vscode_open_projects()
if projects:
result += f"Open Projects ({len(projects)}):\n"
result += "-" * 70 + "\n"
for path, info in projects.items():
if path:
result += f" • {info['name']}\n"
result += f" Path: {path}\n\n"
else:
result += "No projects found in workspace storage\n\n"
recent = get_vscode_recent_files()
if recent:
result += f"Recently Accessed Files ({len(recent)} files):\n"
result += "-" * 70 + "\n"
for i, file in enumerate(recent[:20], 1):
result += f" {i}. {file['name']}\n"
result += f" Path: {file['path']}\n"
result += f" Last accessed: {file['accessed_seconds_ago']}s ago\n\n"
else:
result += "No recent files found\n\n"
result += "=" * 70 + "\n"
return result
except Exception as e:
return f"Error in VSCode detection: {str(e)}"
def write_code_to_project(filename: str, code: str, description: str = "") -> str:
"""
Write code directly to project directory with detailed feedback.
Args:
filename: Name of file to create (e.g., 'calculator.py')
code: Code content to write
description: Optional description of what the code does
Returns:
Success message with file details or error message
"""
try:
if not filename or not filename.strip():
return "Error: Filename cannot be empty"
filename = filename.strip()
if ".." in filename or filename.startswith("/") or filename.startswith("\\"):
return "Error: Invalid filename - directory traversal not allowed"
project_dir = Path.cwd().resolve()
target_file = project_dir / filename
if not str(target_file).startswith(str(project_dir)):
return "Error: Cannot write outside project directory"
target_file.parent.mkdir(parents=True, exist_ok=True)
target_file.write_text(code, encoding='utf-8')
result = "Code written successfully!\n\n"
result += f"File: {filename}\n"
result += f"Location: {target_file}\n"
result += f"Size: {len(code)} characters, {len(code.splitlines())} lines\n"
if description:
result += f"Description: {description}\n"
ext = target_file.suffix.lower()
lang_map = {
'.py': 'Python', '.js': 'JavaScript', '.ts': 'TypeScript',
'.java': 'Java', '.cpp': 'C++', '.c': 'C', '.go': 'Go',
'.rs': 'Rust', '.rb': 'Ruby', '.php': 'PHP', '.cs': 'C#',
'.kt': 'Kotlin', '.swift': 'Swift', '.html': 'HTML',
'.css': 'CSS', '.sql': 'SQL', '.sh': 'Shell', '.json': 'JSON'
}
if ext in lang_map:
result += f"Language: {lang_map[ext]}\n"
result += "\nThe file is now available in your project!"
return result
except PermissionError:
return f"Error: Permission denied - cannot write to {filename}"
except Exception as e:
return f"Error writing code to project: {str(e)}"
def change_code(filename: str, old_code: str, new_code: str, description: str = "") -> str:
"""
Change specific parts of code in an existing file.
Args:
filename: Name of file to modify (e.g., 'main.py')
old_code: Exact code snippet to find and replace
new_code: New code to replace it with
description: Optional description of the change
Returns:
Success message with change details or error message
"""
try:
if not filename or not filename.strip():
return "Error: Filename cannot be empty"
if not old_code or not old_code.strip():
return "Error: old_code cannot be empty"
if new_code is None:
return "Error: new_code cannot be None"
filename = filename.strip()
if ".." in filename or filename.startswith("/") or filename.startswith("\\"):
return "Error: Invalid filename - directory traversal not allowed"
project_dir = Path.cwd().resolve()
target_file = project_dir / filename
if not target_file.exists():
return f"Error: File not found: {filename}"
if not target_file.is_file():
return f"Error: {filename} is not a file"
if not str(target_file).startswith(str(project_dir)):
return "Error: Cannot modify files outside project directory"
try:
current_content = target_file.read_text(encoding='utf-8')
except UnicodeDecodeError:
return f"Error: File {filename} is not a valid text file (encoding issue)"
occurrences = current_content.count(old_code)
if occurrences == 0:
result = f"Error: Could not find the specified code in {filename}\n\n"
result += "The old_code snippet was not found in the file.\n"
result += "Make sure:\n"
result += " 1. The code matches EXACTLY (including whitespace)\n"
result += " 2. Use \\n for newlines\n"
result += " 3. Indentation matches the file\n\n"
result += f"File has {len(current_content.splitlines())} lines.\n"
lines = current_content.splitlines()
result += "\nFirst 10 lines of file:\n"
result += "-" * 50 + "\n"
for i, line in enumerate(lines[:10], 1):
result += f"{i:3}: {line}\n"
return result
if occurrences > 1:
return f"Error: The old_code appears {occurrences} times in the file. It must appear exactly once for safe replacement."
new_content = current_content.replace(old_code, new_code, 1)
target_file.write_text(new_content, encoding='utf-8')
result = "Code changed successfully!\n\n"
result += f"File: {filename}\n"
result += f"Location: {target_file}\n"
if description:
result += f"Change: {description}\n"
old_lines = old_code.count('\n') + 1
new_lines = new_code.count('\n') + 1
line_diff = new_lines - old_lines
result += f"\nStatistics:\n"
result += f" Old code: {old_lines} lines, {len(old_code)} characters\n"
result += f" New code: {new_lines} lines, {len(new_code)} characters\n"
if line_diff > 0:
result += f" Change: +{line_diff} lines added\n"
elif line_diff < 0:
result += f" Change: {line_diff} lines removed\n"
else:
result += f" Change: Same number of lines (modified content)\n"
result += f"\nThe file has been updated!"
result += f"\n\nChanged code:\n"
result += "-" * 50 + "\n"
result += f"Old:\n{old_code[:200]}{'...' if len(old_code) > 200 else ''}\n\n"
result += f"New:\n{new_code[:200]}{'...' if len(new_code) > 200 else ''}\n"
result += "-" * 50
return result
except PermissionError:
return f"Error: Permission denied - cannot modify {filename}"
except Exception as e:
return f"Error changing code: {str(e)}"