"""MCP Server implementation for file system operations."""
import asyncio
import logging
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
Tool,
TextContent,
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
)
from .config import Config, ConfigManager
from .safety import SafetyManager
from .file_operations import FileOperationsManager
logger = logging.getLogger(__name__)
class FileSystemMCPServer:
"""MCP Server for file system operations."""
def __init__(self, config: Config):
"""Initialize the MCP server."""
self.config = config
self.safety_manager = SafetyManager(config)
self.file_ops = FileOperationsManager(config, self.safety_manager)
self.server = Server("file-system-mcp-server")
# Register handlers
self._register_handlers()
def _register_handlers(self) -> None:
"""Register MCP handlers."""
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="read_file",
description="Read file contents with metadata. Supports text files and provides binary file detection.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to read"
}
},
"required": ["path"]
}
),
Tool(
name="write_file",
description="Create a new file with content. Includes overwrite protection and automatic directory creation.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path where the file should be created"
},
"content": {
"type": "string",
"description": "Content to write to the file"
},
"overwrite": {
"type": "boolean",
"description": "Whether to overwrite existing file",
"default": False
}
},
"required": ["path", "content"]
}
),
Tool(
name="update_file",
description="Update an existing file with automatic backup creation and rollback on failure.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to update"
},
"content": {
"type": "string",
"description": "New content for the file"
}
},
"required": ["path", "content"]
}
),
Tool(
name="list_directory",
description="List directory contents with optional filtering and recursive traversal.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the directory to list"
},
"pattern": {
"type": "string",
"description": "Optional glob pattern to filter files (e.g., '*.py', 'test_*')"
},
"recursive": {
"type": "boolean",
"description": "Whether to list recursively",
"default": False
}
},
"required": ["path"]
}
),
Tool(
name="delete_file",
description="Safely delete a file or directory with backup creation (moves to backup instead of permanent deletion).",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file or directory to delete"
}
},
"required": ["path"]
}
),
Tool(
name="get_file_info",
description="Get detailed metadata information about a file or directory.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file or directory"
}
},
"required": ["path"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls."""
try:
if name == "read_file":
result = await self.file_ops.read_file(arguments["path"])
return [TextContent(
type="text",
text=self._format_result(result.to_dict())
)]
elif name == "write_file":
result = await self.file_ops.write_file(
arguments["path"],
arguments["content"],
arguments.get("overwrite", False)
)
return [TextContent(
type="text",
text=self._format_result(result.to_dict())
)]
elif name == "update_file":
result = await self.file_ops.update_file(
arguments["path"],
arguments["content"]
)
return [TextContent(
type="text",
text=self._format_result(result.to_dict())
)]
elif name == "list_directory":
result = await self.file_ops.list_directory(
arguments["path"],
arguments.get("pattern"),
arguments.get("recursive", False)
)
return [TextContent(
type="text",
text=self._format_result(result.to_dict())
)]
elif name == "delete_file":
result = await self.file_ops.delete_file(arguments["path"])
return [TextContent(
type="text",
text=self._format_result(result.to_dict())
)]
elif name == "get_file_info":
result = await self.file_ops.get_file_info(arguments["path"])
return [TextContent(
type="text",
text=self._format_result(result.to_dict())
)]
else:
return [TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except Exception as e:
logger.error(f"Error calling tool {name}: {e}")
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
def _format_result(self, result: Dict[str, Any]) -> str:
"""Format result for display."""
import json
return json.dumps(result, indent=2, default=str)
async def run(self) -> None:
"""Run the MCP server."""
logger.info("Starting File System MCP Server")
# Validate configuration
issues = ConfigManager.validate_paths(self.config)
if issues:
logger.warning("Configuration issues found:")
for issue in issues:
logger.warning(f" - {issue}")
# Run server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="file-system-mcp-server",
server_version="0.1.0",
capabilities=self.server.get_capabilities()
)
)
async def main(config_path: Optional[str] = None) -> None:
"""Main entry point for the MCP server."""
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
# Load configuration
config = ConfigManager.load_config(config_path)
# Set log level from config
logging.getLogger().setLevel(getattr(logging, config.log_level))
# Create and run server
server = FileSystemMCPServer(config)
await server.run()
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())