Skip to main content
Glama
filesystem_mcp_server.py10.5 kB
""" Filesystem MCP Server using FastMCP Provides 8 tools for file operations and 1 PDF resource """ from fastmcp import FastMCP from pathlib import Path import shutil from datetime import datetime from typing import Optional from config import WORKSPACE_DIR, MCP_SERVER_HOST, MCP_SERVER_PORT # Initialize FastMCP server mcp = FastMCP("Filesystem MCP Server") # ==================== UTILITY FUNCTIONS ==================== def validate_path(path: str) -> Path: """ Validate and resolve path within workspace. Prevents path traversal attacks. """ # Convert to Path object requested_path = Path(path) # Resolve to absolute path within workspace if requested_path.is_absolute(): raise ValueError("Absolute paths are not allowed") # Resolve relative to workspace full_path = (WORKSPACE_DIR / requested_path).resolve() # Ensure path is within workspace if not str(full_path).startswith(str(WORKSPACE_DIR)): raise ValueError("Path traversal detected - access denied") return full_path # ==================== TOOL 1: READ FILE ==================== @mcp.tool() def read_file(path: str, encoding: str = "utf-8") -> str: """ Read contents of a file. Args: path: Relative path to file within workspace encoding: File encoding (default: utf-8) Returns: File contents as string """ try: file_path = validate_path(path) if not file_path.exists(): return f"❌ Error: File '{path}' does not exist" if not file_path.is_file(): return f"❌ Error: '{path}' is not a file" # Try reading as text try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() return f"✅ File: {path}\n\n{content}" except UnicodeDecodeError: # If text fails, read as binary with open(file_path, 'rb') as f: content = f.read() return f"✅ Binary file: {path}\nSize: {len(content)} bytes" except Exception as e: return f"❌ Error reading file: {str(e)}" # ==================== TOOL 2: WRITE FILE ==================== @mcp.tool() def write_file(path: str, content: str, encoding: str = "utf-8") -> str: """ Create or overwrite a file with content. Args: path: Relative path to file within workspace content: Content to write to file encoding: File encoding (default: utf-8) Returns: Success or error message """ try: file_path = validate_path(path) # Create parent directories if needed file_path.parent.mkdir(parents=True, exist_ok=True) # Write content with open(file_path, 'w', encoding=encoding) as f: f.write(content) return f"✅ File written successfully: {path} ({len(content)} characters)" except Exception as e: return f"❌ Error writing file: {str(e)}" # ==================== TOOL 3: APPEND FILE ==================== @mcp.tool() def append_file(path: str, content: str, encoding: str = "utf-8") -> str: """ Append content to an existing file. Args: path: Relative path to file within workspace content: Content to append encoding: File encoding (default: utf-8) Returns: Success or error message """ try: file_path = validate_path(path) if not file_path.exists(): return f"❌ Error: File '{path}' does not exist. Use write_file to create it." # Append content with open(file_path, 'a', encoding=encoding) as f: f.write(content) return f"✅ Content appended to: {path} ({len(content)} characters added)" except Exception as e: return f"❌ Error appending to file: {str(e)}" # ==================== TOOL 4: DELETE FILE ==================== @mcp.tool() def delete_file(path: str) -> str: """ Delete a file. Args: path: Relative path to file within workspace Returns: Success or error message """ try: file_path = validate_path(path) if not file_path.exists(): return f"❌ Error: File '{path}' does not exist" if not file_path.is_file(): return f"❌ Error: '{path}' is not a file. Use a different tool for directories." # Delete file file_path.unlink() return f"✅ File deleted successfully: {path}" except Exception as e: return f"❌ Error deleting file: {str(e)}" # ==================== TOOL 5: LIST DIRECTORY ==================== @mcp.tool() def list_directory(path: str = ".", pattern: Optional[str] = None) -> str: """ List files and directories. Args: path: Relative path to directory (default: current directory) pattern: Optional glob pattern (e.g., "*.txt") Returns: Formatted list of files and directories """ try: dir_path = validate_path(path) if not dir_path.exists(): return f"❌ Error: Directory '{path}' does not exist" if not dir_path.is_dir(): return f"❌ Error: '{path}' is not a directory" # Get items if pattern: items = list(dir_path.glob(pattern)) else: items = list(dir_path.iterdir()) if not items: return f"📁 Directory '{path}' is empty" # Sort: directories first, then files items.sort(key=lambda x: (not x.is_dir(), x.name)) # Format output output = [f"📁 Directory: {path}\n"] for item in items: if item.is_dir(): output.append(f" 📂 {item.name}/") else: size = item.stat().st_size size_str = f"{size:,} bytes" if size < 1024 else f"{size/1024:.1f} KB" output.append(f" 📄 {item.name} ({size_str})") return "\n".join(output) except Exception as e: return f"❌ Error listing directory: {str(e)}" # ==================== TOOL 6: CREATE DIRECTORY ==================== @mcp.tool() def create_directory(path: str, parents: bool = True) -> str: """ Create a new directory. Args: path: Relative path to new directory parents: Create parent directories if needed (default: True) Returns: Success or error message """ try: dir_path = validate_path(path) if dir_path.exists(): return f"❌ Error: '{path}' already exists" # Create directory dir_path.mkdir(parents=parents, exist_ok=False) return f"✅ Directory created successfully: {path}" except Exception as e: return f"❌ Error creating directory: {str(e)}" # ==================== TOOL 7: MOVE FILE ==================== @mcp.tool() def move_file(source: str, destination: str) -> str: """ Move or rename a file. Args: source: Current file path destination: New file path Returns: Success or error message """ try: source_path = validate_path(source) dest_path = validate_path(destination) if not source_path.exists(): return f"❌ Error: Source '{source}' does not exist" if not source_path.is_file(): return f"❌ Error: '{source}' is not a file" if dest_path.exists(): return f"❌ Error: Destination '{destination}' already exists" # Create destination parent directories if needed dest_path.parent.mkdir(parents=True, exist_ok=True) # Move file shutil.move(str(source_path), str(dest_path)) return f"✅ File moved: {source} → {destination}" except Exception as e: return f"❌ Error moving file: {str(e)}" # ==================== TOOL 8: GET FILE INFO ==================== @mcp.tool() def get_file_info(path: str) -> str: """ Get detailed information about a file. Args: path: Relative path to file Returns: File metadata (size, modified time, etc.) """ try: file_path = validate_path(path) if not file_path.exists(): return f"❌ Error: '{path}' does not exist" stat = file_path.stat() # Format information info = { "name": file_path.name, "path": str(path), "type": "directory" if file_path.is_dir() else "file", "size_bytes": stat.st_size, "size_human": f"{stat.st_size:,} bytes" if stat.st_size < 1024 else f"{stat.st_size/1024:.2f} KB", "created": datetime.fromtimestamp(stat.st_ctime).strftime("%Y-%m-%d %H:%M:%S"), "modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"), "permissions": oct(stat.st_mode)[-3:] } output = [f"📋 File Information: {path}\n"] for key, value in info.items(): output.append(f" {key}: {value}") return "\n".join(output) except Exception as e: return f"❌ Error getting file info: {str(e)}" # ==================== SERVER STARTUP ==================== if __name__ == "__main__": print("🚀 MCP Server starting...") print(f"📁 Workspace directory: {WORKSPACE_DIR}") print(f"🌐 Server running on http://{MCP_SERVER_HOST}:{MCP_SERVER_PORT}") print(f"🔗 SSE endpoint: http://{MCP_SERVER_HOST}:{MCP_SERVER_PORT}/sse") print(f"✅ Available tools: 9") print(f"✅ Available resources: 1 (PDF)") print("\n🔧 Tools registered:") print(" 1. read_file") print(" 2. write_file") print(" 3. append_file") print(" 4. delete_file") print(" 5. list_directory") print(" 6. create_directory") print(" 7. move_file") print(" 8. get_file_info") print(" 9. health_check") print("\n🎨 Streamlit UI: streamlit run app.py") print("⌨️ Press Ctrl+C to stop\n") # Run server with SSE transport mcp.run(transport="sse", host=MCP_SERVER_HOST, port=MCP_SERVER_PORT)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chakorabdellatif/filesystem-mcp-with-FastMCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server