Skip to main content
Glama

File System MCP Server

by terzeron
mcp_server.py9.05 kB
# 내용은 mcp_filesystem_tools.py와 동일하게 복사 from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict, Any import httpx import asyncio import urllib.parse app = FastAPI(title="File System MCP Server", version="1.0.0") # Backend API configuration BACKEND_API_URL = "http://localhost:8001" # ---- MCP tool registry ---------------------------------------------------- TOOLS: Dict[str, Dict[str, Any]] = { "list_directory": { "description": "List files and directories in a specified path.", "args_schema": { "type": "object", "properties": { "path": { "type": "string", "description": "Directory path to list (default: current directory)", } }, "required": [], }, }, "read_file": { "description": "Read the contents of a file.", "args_schema": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the file to read", }, "encoding": { "type": "string", "description": "File encoding (default: utf-8)", }, }, "required": ["file_path"], }, }, "write_file": { "description": "Write content to a file.", "args_schema": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the file to write", }, "content": { "type": "string", "description": "Content to write to the file", }, "encoding": { "type": "string", "description": "File encoding (default: utf-8)", }, }, "required": ["file_path", "content"], }, }, "get_file_info": { "description": "Get detailed information about a file or directory.", "args_schema": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file or directory", } }, "required": ["path"], }, }, "create_directory": { "description": "Create a new directory.", "args_schema": { "type": "object", "properties": { "dir_path": { "type": "string", "description": "Path of the directory to create", } }, "required": ["dir_path"], }, }, "delete_file": { "description": "Delete a file or directory.", "args_schema": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file or directory to delete", } }, "required": ["path"], }, }, "copy_file": { "description": "Copy a file from source to destination.", "args_schema": { "type": "object", "properties": { "source_path": { "type": "string", "description": "Path to the source file to copy", }, "destination_path": { "type": "string", "description": "Path where the file should be copied to", }, "overwrite": { "type": "boolean", "description": "Whether to overwrite existing file (default: false)", }, }, "required": ["source_path", "destination_path"], }, }, } class InvokeRequest(BaseModel): tool: str inputs: Dict[str, Any] # ---- MCP mandatory endpoints --------------------------------------------- @app.get("/health") async def health(): """Minimal liveness probe (non‑spec but common)""" return {"status": "ok", "service": "File System MCP Server"} @app.get("/list_tools") async def list_tools(): """Return tool metadata according to MCP (`list_tools`).""" return [{"name": name, **meta} for name, meta in TOOLS.items()] @app.post("/invoke_tool") async def invoke_tool(req: InvokeRequest): """Execute the chosen tool with given inputs (MCP `invoke_tool`).""" name = req.tool if name not in TOOLS: raise HTTPException(status_code=404, detail="unknown tool") try: if name == "list_directory": path = req.inputs.get("path", "") return await list_directory_tool(path) elif name == "read_file": file_path = req.inputs["file_path"] encoding = req.inputs.get("encoding", "utf-8") return await read_file_tool(file_path, encoding) elif name == "write_file": file_path = req.inputs["file_path"] content = req.inputs["content"] encoding = req.inputs.get("encoding", "utf-8") return await write_file_tool(file_path, content, encoding) elif name == "get_file_info": path = req.inputs["path"] return await get_file_info_tool(path) elif name == "create_directory": dir_path = req.inputs["dir_path"] return await create_directory_tool(dir_path) elif name == "delete_file": path = req.inputs["path"] return await delete_file_tool(path) elif name == "copy_file": source_path = req.inputs["source_path"] destination_path = req.inputs["destination_path"] overwrite = req.inputs.get("overwrite", False) return await copy_file_tool(source_path, destination_path, overwrite) else: raise HTTPException(status_code=501, detail="tool not implemented") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ---- Tool implementations (Backend API wrappers) ------------------------- async def list_directory_tool(path: str) -> Dict[str, Any]: """List files and directories by calling backend API.""" async with httpx.AsyncClient(timeout=30.0) as client: try: # Handle empty path as /files/list/ (current directory) if not path: url = f"{BACKEND_API_URL}/files/list/" else: # URL encode the path to handle special characters encoded_path = urllib.parse.quote(path, safe="") url = f"{BACKEND_API_URL}/files/list/{encoded_path}" response = await client.get(url) response.raise_for_status() return {"output": response.json()} except httpx.HTTPStatusError as e: raise HTTPException( status_code=e.response.status_code, detail=e.response.text ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) async def read_file_tool(file_path: str, encoding: str = "utf-8") -> Dict[str, Any]: """Read file contents by calling backend API.""" async with httpx.AsyncClient(timeout=30.0) as client: try: encoded_path = urllib.parse.quote(file_path, safe="") response = await client.get( f"{BACKEND_API_URL}/files/read/{encoded_path}", params={"encoding": encoding}, ) response.raise_for_status() return {"output": response.json()} except httpx.HTTPStatusError as e: raise HTTPException( status_code=e.response.status_code, detail=e.response.text ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) async def write_file_tool( file_path: str, content: str, encoding: str = "utf-8" ) -> Dict[str, Any]: """Write file contents by calling backend API.""" async with httpx.AsyncClient(timeout=30.0) as client: try: encoded_path = urllib.parse.quote(file_path, safe="") response = await client.post( f"{BACKEND_API_URL}/files/write/{encoded_path}", json={"content": content, "encoding": encoding}, ) response.raise_for_status() return {"output": response.json()} except httpx.HTTPStatusError as e: raise HTTPException( status_code=e.response.status_code, detail=e.response.text ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # 이하 생략 (get_file_info_tool, create_directory_tool, delete_file_tool, copy_file_tool 등 동일하게 복사)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/terzeron/mcp_test'

If you have feedback or need assistance with the MCP directory API, please join our Discord server