#!/usr/bin/env python3
"""
MCP Server - Model Context Protocol Server
A production-ready MCP server implementing the Model Context Protocol
for VSCode and Claude integration with comprehensive tool support.
Author: Kommandant
Version: 1.0.0
License: MIT
"""
import asyncio
import hashlib
import json
import logging
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
# Import tool implementations
from tools import (
LanguageConfig,
analyze_code,
change_code,
detect_vscode_files,
execute_command,
get_language_stats,
get_project_structure,
get_vscode_active_file,
list_files,
read_file,
write_code_to_project,
write_file,
)
# Configure logging to stderr (doesn't interfere with JSON-RPC)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
logger = logging.getLogger(__name__)
# Protocol configuration
MCP_PROTOCOL_VERSION = "2024-11-05"
SERVER_NAME = "VSCode-MCP-Server"
SERVER_VERSION = "1.0.0"
# Performance configuration
CACHE_ENABLED = True
MAX_CACHE_SIZE = 1000
@dataclass
class ToolDefinition:
"""Tool definition metadata."""
name: str
description: str
params: Dict[str, str]
auto_approve: bool
# Tool definitions registry
TOOL_DEFINITIONS = {
"read_file": ToolDefinition(
name="read_file",
description="Read file content from a specific path. Use absolute paths for reliability.",
params={"path": "string"},
auto_approve=True
),
"write_file": ToolDefinition(
name="write_file",
description="Write or overwrite content to a file. Provides detailed feedback about the operation.",
params={"path": "string", "content": "string"},
auto_approve=False
),
"execute_command": ToolDefinition(
name="execute_command",
description="Run a shell command in the project terminal.",
params={"command": "string"},
auto_approve=False
),
"list_files": ToolDefinition(
name="list_files",
description="List files in a directory. If 'directory' is omitted, it defaults to the active VSCode project root.",
params={"directory": "string (optional)"},
auto_approve=True
),
"get_project_structure": ToolDefinition(
name="get_project_structure",
description="Get a high-level tree view of the project, categorized by programming language.",
params={"directory": "string (optional)"},
auto_approve=True
),
"analyze_code": ToolDefinition(
name="analyze_code",
description="Deeply analyze a specific file to find functions, classes, and imports.",
params={"file_path": "string (optional)"},
auto_approve=True
),
"get_language_stats": ToolDefinition(
name="get_language_stats",
description="Show a summary of file counts and line numbers per language.",
params={"directory": "string (optional)"},
auto_approve=True
),
"detect_vscode_files": ToolDefinition(
name="detect_vscode_files",
description="Search for currently open files and active projects in VSCode.",
params={},
auto_approve=True
),
"write_code_to_project": ToolDefinition(
name="write_code_to_project",
description="Write code directly to a file in the project. Use when user asks you to write code. Provides detailed feedback with file info.",
params={"filename": "string", "code": "string", "description": "string (optional)"},
auto_approve=False
),
"change_code": ToolDefinition(
name="change_code",
description="Change specific parts of code in an existing file. Finds and replaces exact code snippets with before/after preview.",
params={"filename": "string", "old_code": "string", "new_code": "string", "description": "string (optional)"},
auto_approve=False
),
}
class RequestCache:
"""Simple LRU-style cache for tool requests."""
def __init__(self, max_size: int = MAX_CACHE_SIZE):
"""
Initialize request cache.
Args:
max_size: Maximum number of cached items.
"""
self.max_size = max_size
self.cache: Dict[str, str] = {}
self.access_order: List[str] = []
def get(self, key: str) -> Optional[str]:
"""
Get cached value.
Args:
key: Cache key.
Returns:
Cached value or None if not found.
"""
if key in self.cache:
self.access_order.remove(key)
self.access_order.append(key)
return self.cache[key]
return None
def set(self, key: str, value: str) -> None:
"""
Set cached value.
Args:
key: Cache key.
value: Value to cache.
"""
if key in self.cache:
self.access_order.remove(key)
elif len(self.cache) >= self.max_size:
lru_key = self.access_order.pop(0)
del self.cache[lru_key]
self.cache[key] = value
self.access_order.append(key)
def clear(self) -> None:
"""Clear all cached items."""
self.cache.clear()
self.access_order.clear()
class ToolRegistry:
"""Registry for MCP tools."""
def __init__(self):
"""Initialize tool registry."""
self.tools: Dict[str, Dict[str, Any]] = {}
def register(
self,
name: str,
handler: Callable,
definition: ToolDefinition
) -> None:
"""
Register a tool.
Args:
name: Tool name.
handler: Tool handler function.
definition: Tool definition metadata.
"""
input_schema = {
"type": "object",
"properties": {
param: {"type": "string", "description": f"Target {param}"}
for param in definition.params.keys()
}
}
required_params = [
p for p, desc in definition.params.items()
if "(optional)" not in desc.lower()
]
if required_params:
input_schema["required"] = required_params
self.tools[name] = {
"description": definition.description,
"handler": handler,
"auto_approve": definition.auto_approve,
"inputSchema": input_schema
}
logger.info(f"Registered tool: {name}")
def get_tool(self, name: str) -> Optional[Dict[str, Any]]:
"""
Get tool by name.
Args:
name: Tool name.
Returns:
Tool dictionary or None if not found.
"""
return self.tools.get(name)
def list_tools(self) -> List[Dict[str, Any]]:
"""
List all registered tools.
Returns:
List of tool metadata dictionaries.
"""
return [
{
"name": name,
"description": tool["description"],
"inputSchema": tool["inputSchema"]
}
for name, tool in self.tools.items()
]
class MCPServer:
"""
Model Context Protocol Server.
Implements the MCP protocol for tool execution and communication
between Claude and VSCode.
"""
def __init__(self, lang_config: LanguageConfig):
"""
Initialize MCP server.
Args:
lang_config: Language configuration instance.
"""
self.lang_config = lang_config
self.tools = ToolRegistry()
self.cache = RequestCache() if CACHE_ENABLED else None
self.running = False
def _generate_cache_key(self, name: str, arguments: Dict[str, Any]) -> str:
"""
Generate cache key for request.
Args:
name: Tool name.
arguments: Tool arguments.
Returns:
Cache key hash.
"""
key_str = f"{name}:{json.dumps(arguments, sort_keys=True)}"
return hashlib.md5(key_str.encode()).hexdigest()
def write_response(self, obj: Dict[str, Any]) -> None:
"""
Write JSON-RPC response to stdout.
Args:
obj: Response object.
"""
try:
sys.stdout.write(json.dumps(obj) + '\n')
sys.stdout.flush()
except Exception as e:
logger.error(f"Error writing response: {e}")
def read_request(self) -> Optional[Dict[str, Any]]:
"""
Read JSON-RPC request from stdin.
Returns:
Request dictionary or None if error/EOF.
"""
try:
line = sys.stdin.readline()
if not line:
return None
return json.loads(line)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON request: {e}")
return None
except Exception as e:
logger.error(f"Error reading request: {e}")
return None
async def handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle initialize request.
Args:
params: Initialize parameters.
Returns:
Initialize response.
"""
logger.info("Handling initialize request")
return {
"protocolVersion": MCP_PROTOCOL_VERSION,
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": SERVER_NAME,
"version": SERVER_VERSION
}
}
async def handle_list_tools(self) -> Dict[str, Any]:
"""
Handle tools/list request.
Returns:
List of available tools.
"""
logger.info("Handling list_tools request")
return {
"tools": self.tools.list_tools()
}
async def handle_call_tool(
self,
name: str,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
Handle tools/call request.
Args:
name: Tool name.
arguments: Tool arguments.
Returns:
Tool execution result.
"""
logger.info(f"Handling call_tool request: {name}")
tool = self.tools.get_tool(name)
if not tool:
error_msg = f"Tool {name} not found"
logger.error(error_msg)
return {
"content": [{"type": "text", "text": error_msg}],
"isError": True
}
try:
is_safe = tool["auto_approve"]
cache_key = None
if is_safe and self.cache:
cache_key = self._generate_cache_key(name, arguments)
cached_result = self.cache.get(cache_key)
if cached_result:
logger.debug(f"Cache hit for {name}")
return {
"content": [{"type": "text", "text": cached_result}]
}
handler = tool["handler"]
sig_params = handler.__code__.co_varnames
if 'lang_config' in sig_params:
result = handler(**arguments, lang_config=self.lang_config)
else:
result = handler(**arguments)
result_str = str(result)
if is_safe and self.cache and cache_key:
self.cache.set(cache_key, result_str)
logger.info(f"Successfully executed tool: {name}")
return {
"content": [{"type": "text", "text": result_str}]
}
except TypeError as e:
error_msg = f"Invalid arguments for tool {name}: {e}"
logger.error(error_msg)
return {
"content": [{"type": "text", "text": error_msg}],
"isError": True
}
except Exception as e:
error_msg = f"Error executing tool {name}: {e}"
logger.error(error_msg)
return {
"content": [{"type": "text", "text": error_msg}],
"isError": True
}
async def run(self) -> None:
"""Main event loop for handling requests."""
self.running = True
try:
logger.info("=" * 60)
logger.info("MCP Server Started - Monitoring VSCode Activity")
logger.info("=" * 60)
active = get_vscode_active_file(self.lang_config)
logger.info(active)
logger.info("=" * 60)
except Exception as e:
logger.warning(f"Could not get active file on startup: {e}")
while self.running:
try:
request = self.read_request()
if request is None:
break
response = {
"jsonrpc": "2.0",
"id": request.get("id")
}
method = request.get("method")
params = request.get("params", {})
try:
if method == "initialize":
response["result"] = await self.handle_initialize(params)
elif method == "tools/list":
response["result"] = await self.handle_list_tools()
elif method == "tools/call":
response["result"] = await self.handle_call_tool(
params.get("name"),
params.get("arguments", {})
)
else:
error_msg = f"Method not found: {method}"
logger.warning(error_msg)
response["error"] = {
"code": -32601,
"message": error_msg
}
except Exception as e:
logger.error(f"Error handling {method}: {e}")
response["error"] = {
"code": -32603,
"message": str(e)
}
if request.get("id") is not None:
self.write_response(response)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, shutting down...")
self.running = False
break
except Exception as e:
logger.error(f"Unexpected error in main loop: {e}")
continue
def stop(self) -> None:
"""Stop the server."""
logger.info("Stopping MCP server...")
self.running = False
def load_language_config() -> Dict[str, Any]:
"""
Load language configuration from JSON file.
Returns:
Language configuration dictionary.
Raises:
FileNotFoundError: If config file doesn't exist.
json.JSONDecodeError: If config file is invalid JSON.
"""
config_path = Path(__file__).parent / "languages.json"
if not config_path.exists():
raise FileNotFoundError(f"Language config not found: {config_path}")
with config_path.open('r', encoding='utf-8') as f:
return json.load(f)
def main() -> None:
"""Main entry point for the MCP server."""
try:
logger.info("Loading language configuration...")
lang_config_data = load_language_config()
lang_config = LanguageConfig(lang_config_data)
logger.info("Initializing MCP server...")
server = MCPServer(lang_config)
project_root = Path(__file__).parent.resolve()
os.chdir(project_root)
logger.info(f"Working directory: {project_root}")
logger.info("Registering tools...")
server.tools.register("read_file", read_file, TOOL_DEFINITIONS["read_file"])
server.tools.register("write_file", write_file, TOOL_DEFINITIONS["write_file"])
server.tools.register("execute_command", execute_command, TOOL_DEFINITIONS["execute_command"])
server.tools.register("list_files", list_files, TOOL_DEFINITIONS["list_files"])
server.tools.register("get_project_structure", get_project_structure, TOOL_DEFINITIONS["get_project_structure"])
server.tools.register("analyze_code", analyze_code, TOOL_DEFINITIONS["analyze_code"])
server.tools.register("get_language_stats", get_language_stats, TOOL_DEFINITIONS["get_language_stats"])
server.tools.register("detect_vscode_files", detect_vscode_files, TOOL_DEFINITIONS["detect_vscode_files"])
server.tools.register("write_code_to_project", write_code_to_project, TOOL_DEFINITIONS["write_code_to_project"])
server.tools.register("change_code", change_code, TOOL_DEFINITIONS["change_code"])
logger.info("Starting MCP server...")
asyncio.run(server.run())
except FileNotFoundError as e:
logger.error(f"Configuration error: {e}")
sys.exit(1)
except json.JSONDecodeError as e:
logger.error(f"Invalid configuration JSON: {e}")
sys.exit(1)
except KeyboardInterrupt:
logger.info("Server stopped by user")
sys.exit(0)
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()