Filesystem MCP Server
by mark3labs
package main
import (
"context"
"encoding/base64"
"fmt"
"log"
"mime"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
const (
// Maximum size for inline content (5MB)
MAX_INLINE_SIZE = 5 * 1024 * 1024
// Maximum size for base64 encoding (1MB)
MAX_BASE64_SIZE = 1 * 1024 * 1024
)
type FileInfo struct {
Size int64 `json:"size"`
Created time.Time `json:"created"`
Modified time.Time `json:"modified"`
Accessed time.Time `json:"accessed"`
IsDirectory bool `json:"isDirectory"`
IsFile bool `json:"isFile"`
Permissions string `json:"permissions"`
}
type FilesystemServer struct {
allowedDirs []string
server *server.MCPServer
}
func NewFilesystemServer(allowedDirs []string) (*FilesystemServer, error) {
// Normalize and validate directories
normalized := make([]string, 0, len(allowedDirs))
for _, dir := range allowedDirs {
abs, err := filepath.Abs(dir)
if err != nil {
return nil, fmt.Errorf("failed to resolve path %s: %w", dir, err)
}
info, err := os.Stat(abs)
if err != nil {
return nil, fmt.Errorf(
"failed to access directory %s: %w",
abs,
err,
)
}
if !info.IsDir() {
return nil, fmt.Errorf("path is not a directory: %s", abs)
}
// Ensure the path ends with a separator to prevent prefix matching issues
// For example, /tmp/foo should not match /tmp/foobar
normalized = append(normalized, filepath.Clean(abs)+string(filepath.Separator))
}
s := &FilesystemServer{
allowedDirs: normalized,
server: server.NewMCPServer(
"secure-filesystem-server",
"0.4.1",
server.WithResourceCapabilities(true, true),
),
}
// Register resource handlers
s.server.AddResource(mcp.NewResource(
"file://",
"File System",
mcp.WithResourceDescription("Access to files and directories on the local file system"),
), s.handleReadResource)
// Register tool handlers
s.server.AddTool(mcp.NewTool(
"read_file",
mcp.WithDescription("Read the complete contents of a file from the file system."),
mcp.WithString("path",
mcp.Description("Path to the file to read"),
mcp.Required(),
),
), s.handleReadFile)
s.server.AddTool(mcp.NewTool(
"write_file",
mcp.WithDescription("Create a new file or overwrite an existing file with new content."),
mcp.WithString("path",
mcp.Description("Path where to write the file"),
mcp.Required(),
),
mcp.WithString("content",
mcp.Description("Content to write to the file"),
mcp.Required(),
),
), s.handleWriteFile)
s.server.AddTool(mcp.NewTool(
"list_directory",
mcp.WithDescription("Get a detailed listing of all files and directories in a specified path."),
mcp.WithString("path",
mcp.Description("Path of the directory to list"),
mcp.Required(),
),
), s.handleListDirectory)
s.server.AddTool(mcp.NewTool(
"create_directory",
mcp.WithDescription("Create a new directory or ensure a directory exists."),
mcp.WithString("path",
mcp.Description("Path of the directory to create"),
mcp.Required(),
),
), s.handleCreateDirectory)
s.server.AddTool(mcp.NewTool(
"move_file",
mcp.WithDescription("Move or rename files and directories."),
mcp.WithString("source",
mcp.Description("Source path of the file or directory"),
mcp.Required(),
),
mcp.WithString("destination",
mcp.Description("Destination path"),
mcp.Required(),
),
), s.handleMoveFile)
s.server.AddTool(mcp.NewTool(
"search_files",
mcp.WithDescription("Recursively search for files and directories matching a pattern."),
mcp.WithString("path",
mcp.Description("Starting path for the search"),
mcp.Required(),
),
mcp.WithString("pattern",
mcp.Description("Search pattern to match against file names"),
mcp.Required(),
),
), s.handleSearchFiles)
s.server.AddTool(mcp.NewTool(
"get_file_info",
mcp.WithDescription("Retrieve detailed metadata about a file or directory."),
mcp.WithString("path",
mcp.Description("Path to the file or directory"),
mcp.Required(),
),
), s.handleGetFileInfo)
s.server.AddTool(mcp.NewTool(
"list_allowed_directories",
mcp.WithDescription("Returns the list of directories that this server is allowed to access."),
), s.handleListAllowedDirectories)
return s, nil
}
// isPathInAllowedDirs checks if a path is within any of the allowed directories
func (s *FilesystemServer) isPathInAllowedDirs(path string) bool {
// Ensure path is absolute and clean
absPath, err := filepath.Abs(path)
if err != nil {
return false
}
// Add trailing separator to ensure we're checking a directory or a file within a directory
// and not a prefix match (e.g., /tmp/foo should not match /tmp/foobar)
if !strings.HasSuffix(absPath, string(filepath.Separator)) {
// If it's a file, we need to check its directory
if info, err := os.Stat(absPath); err == nil && !info.IsDir() {
absPath = filepath.Dir(absPath) + string(filepath.Separator)
} else {
absPath = absPath + string(filepath.Separator)
}
}
// Check if the path is within any of the allowed directories
for _, dir := range s.allowedDirs {
if strings.HasPrefix(absPath, dir) {
return true
}
}
return false
}
func (s *FilesystemServer) validatePath(requestedPath string) (string, error) {
// Always convert to absolute path first
abs, err := filepath.Abs(requestedPath)
if err != nil {
return "", fmt.Errorf("invalid path: %w", err)
}
// Check if path is within allowed directories
if !s.isPathInAllowedDirs(abs) {
return "", fmt.Errorf(
"access denied - path outside allowed directories: %s",
abs,
)
}
// Handle symlinks
realPath, err := filepath.EvalSymlinks(abs)
if err != nil {
if !os.IsNotExist(err) {
return "", err
}
// For new files, check parent directory
parent := filepath.Dir(abs)
realParent, err := filepath.EvalSymlinks(parent)
if err != nil {
return "", fmt.Errorf("parent directory does not exist: %s", parent)
}
if !s.isPathInAllowedDirs(realParent) {
return "", fmt.Errorf(
"access denied - parent directory outside allowed directories",
)
}
return abs, nil
}
// Check if the real path (after resolving symlinks) is still within allowed directories
if !s.isPathInAllowedDirs(realPath) {
return "", fmt.Errorf(
"access denied - symlink target outside allowed directories",
)
}
return realPath, nil
}
func (s *FilesystemServer) getFileStats(path string) (FileInfo, error) {
info, err := os.Stat(path)
if err != nil {
return FileInfo{}, err
}
return FileInfo{
Size: info.Size(),
Created: info.ModTime(), // Note: ModTime used as birth time isn't always available
Modified: info.ModTime(),
Accessed: info.ModTime(), // Note: Access time isn't always available
IsDirectory: info.IsDir(),
IsFile: !info.IsDir(),
Permissions: fmt.Sprintf("%o", info.Mode().Perm()),
}, nil
}
func (s *FilesystemServer) searchFiles(
rootPath, pattern string,
) ([]string, error) {
var results []string
pattern = strings.ToLower(pattern)
err := filepath.Walk(
rootPath,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil // Skip errors and continue
}
// Try to validate path
if _, err := s.validatePath(path); err != nil {
return nil // Skip invalid paths
}
if strings.Contains(strings.ToLower(info.Name()), pattern) {
results = append(results, path)
}
return nil
},
)
if err != nil {
return nil, err
}
return results, nil
}
// detectMimeType tries to determine the MIME type of a file
func detectMimeType(path string) string {
// First try by extension
ext := filepath.Ext(path)
if ext != "" {
mimeType := mime.TypeByExtension(ext)
if mimeType != "" {
return mimeType
}
}
// If that fails, try to read a bit of the file
file, err := os.Open(path)
if err != nil {
return "application/octet-stream" // Default
}
defer file.Close()
// Read first 512 bytes to detect content type
buffer := make([]byte, 512)
n, err := file.Read(buffer)
if err != nil {
return "application/octet-stream" // Default
}
// Use http.DetectContentType
return http.DetectContentType(buffer[:n])
}
// isTextFile determines if a file is likely a text file based on MIME type
func isTextFile(mimeType string) bool {
return strings.HasPrefix(mimeType, "text/") ||
mimeType == "application/json" ||
mimeType == "application/xml" ||
mimeType == "application/javascript" ||
mimeType == "application/x-javascript" ||
strings.Contains(mimeType, "+xml") ||
strings.Contains(mimeType, "+json")
}
// isImageFile determines if a file is an image based on MIME type
func isImageFile(mimeType string) bool {
return strings.HasPrefix(mimeType, "image/")
}
// pathToResourceURI converts a file path to a resource URI
func pathToResourceURI(path string) string {
return "file://" + path
}
// Resource handler
func (s *FilesystemServer) handleReadResource(
ctx context.Context,
request mcp.ReadResourceRequest,
) ([]mcp.ResourceContents, error) {
uri := request.Params.URI
// Check if it's a file:// URI
if !strings.HasPrefix(uri, "file://") {
return nil, fmt.Errorf("unsupported URI scheme: %s", uri)
}
// Extract the path from the URI
path := strings.TrimPrefix(uri, "file://")
// Validate the path
validPath, err := s.validatePath(path)
if err != nil {
return nil, err
}
// Get file info
fileInfo, err := os.Stat(validPath)
if err != nil {
return nil, err
}
// If it's a directory, return a listing
if fileInfo.IsDir() {
entries, err := os.ReadDir(validPath)
if err != nil {
return nil, err
}
var result strings.Builder
result.WriteString(fmt.Sprintf("Directory listing for: %s\n\n", validPath))
for _, entry := range entries {
entryPath := filepath.Join(validPath, entry.Name())
entryURI := pathToResourceURI(entryPath)
if entry.IsDir() {
result.WriteString(fmt.Sprintf("[DIR] %s (%s)\n", entry.Name(), entryURI))
} else {
info, err := entry.Info()
if err == nil {
result.WriteString(fmt.Sprintf("[FILE] %s (%s) - %d bytes\n",
entry.Name(), entryURI, info.Size()))
} else {
result.WriteString(fmt.Sprintf("[FILE] %s (%s)\n", entry.Name(), entryURI))
}
}
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: uri,
MIMEType: "text/plain",
Text: result.String(),
},
}, nil
}
// It's a file, determine how to handle it
mimeType := detectMimeType(validPath)
// Check file size
if fileInfo.Size() > MAX_INLINE_SIZE {
// File is too large to inline, return a reference instead
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: uri,
MIMEType: "text/plain",
Text: fmt.Sprintf("File is too large to display inline (%d bytes). Use the read_file tool to access specific portions.", fileInfo.Size()),
},
}, nil
}
// Read the file content
content, err := os.ReadFile(validPath)
if err != nil {
return nil, err
}
// Handle based on content type
if isTextFile(mimeType) {
// It's a text file, return as text
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: uri,
MIMEType: mimeType,
Text: string(content),
},
}, nil
} else {
// It's a binary file
if fileInfo.Size() <= MAX_BASE64_SIZE {
// Small enough for base64 encoding
return []mcp.ResourceContents{
mcp.BlobResourceContents{
URI: uri,
MIMEType: mimeType,
Blob: base64.StdEncoding.EncodeToString(content),
},
}, nil
} else {
// Too large for base64, return a reference
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: uri,
MIMEType: "text/plain",
Text: fmt.Sprintf("Binary file (%s, %d bytes). Use the read_file tool to access specific portions.", mimeType, fileInfo.Size()),
},
}, nil
}
}
}
// Tool handlers
func (s *FilesystemServer) handleReadFile(
ctx context.Context,
request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
path, ok := request.Params.Arguments["path"].(string)
if !ok {
return nil, fmt.Errorf("path must be a string")
}
// Handle empty or relative paths like "." or "./" by converting to absolute path
if path == "." || path == "./" {
// Get current working directory
cwd, err := os.Getwd()
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error resolving current directory: %v", err),
},
},
IsError: true,
}, nil
}
path = cwd
}
validPath, err := s.validatePath(path)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: %v", err),
},
},
IsError: true,
}, nil
}
// Check if it's a directory
info, err := os.Stat(validPath)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: %v", err),
},
},
IsError: true,
}, nil
}
if info.IsDir() {
// For directories, return a resource reference instead
resourceURI := pathToResourceURI(validPath)
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("This is a directory. Use the resource URI to browse its contents: %s", resourceURI),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("Directory: %s", validPath),
},
},
},
}, nil
}
// Determine MIME type
mimeType := detectMimeType(validPath)
// Check file size
if info.Size() > MAX_INLINE_SIZE {
// File is too large to inline, return a resource reference
resourceURI := pathToResourceURI(validPath)
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("File is too large to display inline (%d bytes). Access it via resource URI: %s", info.Size(), resourceURI),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("Large file: %s (%s, %d bytes)", validPath, mimeType, info.Size()),
},
},
},
}, nil
}
// Read file content
content, err := os.ReadFile(validPath)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error reading file: %v", err),
},
},
IsError: true,
}, nil
}
// Handle based on content type
if isTextFile(mimeType) {
// It's a text file, return as text
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: string(content),
},
},
}, nil
} else if isImageFile(mimeType) {
// It's an image file, return as image content
if info.Size() <= MAX_BASE64_SIZE {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Image file: %s (%s, %d bytes)", validPath, mimeType, info.Size()),
},
mcp.ImageContent{
Type: "image",
Data: base64.StdEncoding.EncodeToString(content),
MIMEType: mimeType,
},
},
}, nil
} else {
// Too large for base64, return a reference
resourceURI := pathToResourceURI(validPath)
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Image file is too large to display inline (%d bytes). Access it via resource URI: %s", info.Size(), resourceURI),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("Large image: %s (%s, %d bytes)", validPath, mimeType, info.Size()),
},
},
},
}, nil
}
} else {
// It's another type of binary file
resourceURI := pathToResourceURI(validPath)
if info.Size() <= MAX_BASE64_SIZE {
// Small enough for base64 encoding
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Binary file: %s (%s, %d bytes)", validPath, mimeType, info.Size()),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.BlobResourceContents{
URI: resourceURI,
MIMEType: mimeType,
Blob: base64.StdEncoding.EncodeToString(content),
},
},
},
}, nil
} else {
// Too large for base64, return a reference
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Binary file: %s (%s, %d bytes). Access it via resource URI: %s", validPath, mimeType, info.Size(), resourceURI),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("Binary file: %s (%s, %d bytes)", validPath, mimeType, info.Size()),
},
},
},
}, nil
}
}
}
func (s *FilesystemServer) handleWriteFile(
ctx context.Context,
request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
path, ok := request.Params.Arguments["path"].(string)
if !ok {
return nil, fmt.Errorf("path must be a string")
}
content, ok := request.Params.Arguments["content"].(string)
if !ok {
return nil, fmt.Errorf("content must be a string")
}
// Handle empty or relative paths like "." or "./" by converting to absolute path
if path == "." || path == "./" {
// Get current working directory
cwd, err := os.Getwd()
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error resolving current directory: %v", err),
},
},
IsError: true,
}, nil
}
path = cwd
}
validPath, err := s.validatePath(path)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: %v", err),
},
},
IsError: true,
}, nil
}
// Check if it's a directory
if info, err := os.Stat(validPath); err == nil && info.IsDir() {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "Error: Cannot write to a directory",
},
},
IsError: true,
}, nil
}
// Create parent directories if they don't exist
parentDir := filepath.Dir(validPath)
if err := os.MkdirAll(parentDir, 0755); err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error creating parent directories: %v", err),
},
},
IsError: true,
}, nil
}
if err := os.WriteFile(validPath, []byte(content), 0644); err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error writing file: %v", err),
},
},
IsError: true,
}, nil
}
// Get file info for the response
info, err := os.Stat(validPath)
if err != nil {
// File was written but we couldn't get info
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Successfully wrote to %s", path),
},
},
}, nil
}
resourceURI := pathToResourceURI(validPath)
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Successfully wrote %d bytes to %s", info.Size(), path),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("File: %s (%d bytes)", validPath, info.Size()),
},
},
},
}, nil
}
func (s *FilesystemServer) handleListDirectory(
ctx context.Context,
request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
path, ok := request.Params.Arguments["path"].(string)
if !ok {
return nil, fmt.Errorf("path must be a string")
}
// Handle empty or relative paths like "." or "./" by converting to absolute path
if path == "." || path == "./" {
// Get current working directory
cwd, err := os.Getwd()
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error resolving current directory: %v", err),
},
},
IsError: true,
}, nil
}
path = cwd
}
validPath, err := s.validatePath(path)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: %v", err),
},
},
IsError: true,
}, nil
}
// Check if it's a directory
info, err := os.Stat(validPath)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: %v", err),
},
},
IsError: true,
}, nil
}
if !info.IsDir() {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "Error: Path is not a directory",
},
},
IsError: true,
}, nil
}
entries, err := os.ReadDir(validPath)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error reading directory: %v", err),
},
},
IsError: true,
}, nil
}
var result strings.Builder
result.WriteString(fmt.Sprintf("Directory listing for: %s\n\n", validPath))
for _, entry := range entries {
entryPath := filepath.Join(validPath, entry.Name())
resourceURI := pathToResourceURI(entryPath)
if entry.IsDir() {
result.WriteString(fmt.Sprintf("[DIR] %s (%s)\n", entry.Name(), resourceURI))
} else {
info, err := entry.Info()
if err == nil {
result.WriteString(fmt.Sprintf("[FILE] %s (%s) - %d bytes\n",
entry.Name(), resourceURI, info.Size()))
} else {
result.WriteString(fmt.Sprintf("[FILE] %s (%s)\n", entry.Name(), resourceURI))
}
}
}
// Return both text content and embedded resource
resourceURI := pathToResourceURI(validPath)
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: result.String(),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("Directory: %s", validPath),
},
},
},
}, nil
}
func (s *FilesystemServer) handleCreateDirectory(
ctx context.Context,
request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
path, ok := request.Params.Arguments["path"].(string)
if !ok {
return nil, fmt.Errorf("path must be a string")
}
// Handle empty or relative paths like "." or "./" by converting to absolute path
if path == "." || path == "./" {
// Get current working directory
cwd, err := os.Getwd()
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error resolving current directory: %v", err),
},
},
IsError: true,
}, nil
}
path = cwd
}
validPath, err := s.validatePath(path)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: %v", err),
},
},
IsError: true,
}, nil
}
// Check if path already exists
if info, err := os.Stat(validPath); err == nil {
if info.IsDir() {
resourceURI := pathToResourceURI(validPath)
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Directory already exists: %s", path),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("Directory: %s", validPath),
},
},
},
}, nil
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: Path exists but is not a directory: %s", path),
},
},
IsError: true,
}, nil
}
if err := os.MkdirAll(validPath, 0755); err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error creating directory: %v", err),
},
},
IsError: true,
}, nil
}
resourceURI := pathToResourceURI(validPath)
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Successfully created directory %s", path),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("Directory: %s", validPath),
},
},
},
}, nil
}
func (s *FilesystemServer) handleMoveFile(
ctx context.Context,
request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
source, ok := request.Params.Arguments["source"].(string)
if !ok {
return nil, fmt.Errorf("source must be a string")
}
destination, ok := request.Params.Arguments["destination"].(string)
if !ok {
return nil, fmt.Errorf("destination must be a string")
}
// Handle empty or relative paths for source
if source == "." || source == "./" {
// Get current working directory
cwd, err := os.Getwd()
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error resolving current directory: %v", err),
},
},
IsError: true,
}, nil
}
source = cwd
}
// Handle empty or relative paths for destination
if destination == "." || destination == "./" {
// Get current working directory
cwd, err := os.Getwd()
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error resolving current directory: %v", err),
},
},
IsError: true,
}, nil
}
destination = cwd
}
validSource, err := s.validatePath(source)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error with source path: %v", err),
},
},
IsError: true,
}, nil
}
// Check if source exists
if _, err := os.Stat(validSource); os.IsNotExist(err) {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: Source does not exist: %s", source),
},
},
IsError: true,
}, nil
}
validDest, err := s.validatePath(destination)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error with destination path: %v", err),
},
},
IsError: true,
}, nil
}
// Create parent directory for destination if it doesn't exist
destDir := filepath.Dir(validDest)
if err := os.MkdirAll(destDir, 0755); err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error creating destination directory: %v", err),
},
},
IsError: true,
}, nil
}
if err := os.Rename(validSource, validDest); err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error moving file: %v", err),
},
},
IsError: true,
}, nil
}
resourceURI := pathToResourceURI(validDest)
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf(
"Successfully moved %s to %s",
source,
destination,
),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("Moved file: %s", validDest),
},
},
},
}, nil
}
func (s *FilesystemServer) handleSearchFiles(
ctx context.Context,
request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
path, ok := request.Params.Arguments["path"].(string)
if !ok {
return nil, fmt.Errorf("path must be a string")
}
pattern, ok := request.Params.Arguments["pattern"].(string)
if !ok {
return nil, fmt.Errorf("pattern must be a string")
}
// Handle empty or relative paths like "." or "./" by converting to absolute path
if path == "." || path == "./" {
// Get current working directory
cwd, err := os.Getwd()
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error resolving current directory: %v", err),
},
},
IsError: true,
}, nil
}
path = cwd
}
validPath, err := s.validatePath(path)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: %v", err),
},
},
IsError: true,
}, nil
}
// Check if it's a directory
info, err := os.Stat(validPath)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: %v", err),
},
},
IsError: true,
}, nil
}
if !info.IsDir() {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "Error: Search path must be a directory",
},
},
IsError: true,
}, nil
}
results, err := s.searchFiles(validPath, pattern)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error searching files: %v",
err),
},
},
IsError: true,
}, nil
}
if len(results) == 0 {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("No files found matching pattern '%s' in %s", pattern, path),
},
},
}, nil
}
// Format results with resource URIs
var formattedResults strings.Builder
formattedResults.WriteString(fmt.Sprintf("Found %d results:\n\n", len(results)))
for _, result := range results {
resourceURI := pathToResourceURI(result)
info, err := os.Stat(result)
if err == nil {
if info.IsDir() {
formattedResults.WriteString(fmt.Sprintf("[DIR] %s (%s)\n", result, resourceURI))
} else {
formattedResults.WriteString(fmt.Sprintf("[FILE] %s (%s) - %d bytes\n",
result, resourceURI, info.Size()))
}
} else {
formattedResults.WriteString(fmt.Sprintf("%s (%s)\n", result, resourceURI))
}
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: formattedResults.String(),
},
},
}, nil
}
func (s *FilesystemServer) handleGetFileInfo(
ctx context.Context,
request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
path, ok := request.Params.Arguments["path"].(string)
if !ok {
return nil, fmt.Errorf("path must be a string")
}
// Handle empty or relative paths like "." or "./" by converting to absolute path
if path == "." || path == "./" {
// Get current working directory
cwd, err := os.Getwd()
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error resolving current directory: %v", err),
},
},
IsError: true,
}, nil
}
path = cwd
}
validPath, err := s.validatePath(path)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error: %v", err),
},
},
IsError: true,
}, nil
}
info, err := s.getFileStats(validPath)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error getting file info: %v", err),
},
},
IsError: true,
}, nil
}
// Get MIME type for files
mimeType := "directory"
if info.IsFile {
mimeType = detectMimeType(validPath)
}
resourceURI := pathToResourceURI(validPath)
// Determine file type text
var fileTypeText string
if info.IsDirectory {
fileTypeText = "Directory"
} else {
fileTypeText = "File"
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf(
"File information for: %s\n\nSize: %d bytes\nCreated: %s\nModified: %s\nAccessed: %s\nIsDirectory: %v\nIsFile: %v\nPermissions: %s\nMIME Type: %s\nResource URI: %s",
validPath,
info.Size,
info.Created.Format(time.RFC3339),
info.Modified.Format(time.RFC3339),
info.Accessed.Format(time.RFC3339),
info.IsDirectory,
info.IsFile,
info.Permissions,
mimeType,
resourceURI,
),
},
mcp.EmbeddedResource{
Type: "resource",
Resource: mcp.TextResourceContents{
URI: resourceURI,
MIMEType: "text/plain",
Text: fmt.Sprintf("%s: %s (%s, %d bytes)",
fileTypeText,
validPath,
mimeType,
info.Size),
},
},
},
}, nil
}
func (s *FilesystemServer) handleListAllowedDirectories(
ctx context.Context,
request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
// Remove the trailing separator for display purposes
displayDirs := make([]string, len(s.allowedDirs))
for i, dir := range s.allowedDirs {
displayDirs[i] = strings.TrimSuffix(dir, string(filepath.Separator))
}
var result strings.Builder
result.WriteString("Allowed directories:\n\n")
for _, dir := range displayDirs {
resourceURI := pathToResourceURI(dir)
result.WriteString(fmt.Sprintf("%s (%s)\n", dir, resourceURI))
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: result.String(),
},
},
}, nil
}
func (s *FilesystemServer) Serve() error {
return server.ServeStdio(s.server)
}
func main() {
// Parse command line arguments
if len(os.Args) < 2 {
fmt.Fprintf(
os.Stderr,
"Usage: %s <allowed-directory> [additional-directories...]\n",
os.Args[0],
)
os.Exit(1)
}
// Create and start the server
fs, err := NewFilesystemServer(os.Args[1:])
if err != nil {
log.Fatalf("Failed to create server: %v", err)
}
// Serve requests
if err := fs.Serve(); err != nil {
log.Fatalf("Server error: %v", err)
}
}