mcp_server.py•9.05 kB
# 내용은 mcp_filesystem_tools.py와 동일하게 복사
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any
import httpx
import asyncio
import urllib.parse
app = FastAPI(title="File System MCP Server", version="1.0.0")
# Backend API configuration
BACKEND_API_URL = "http://localhost:8001"
# ---- MCP tool registry ----------------------------------------------------
TOOLS: Dict[str, Dict[str, Any]] = {
"list_directory": {
"description": "List files and directories in a specified path.",
"args_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Directory path to list (default: current directory)",
}
},
"required": [],
},
},
"read_file": {
"description": "Read the contents of a file.",
"args_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to read",
},
"encoding": {
"type": "string",
"description": "File encoding (default: utf-8)",
},
},
"required": ["file_path"],
},
},
"write_file": {
"description": "Write content to a file.",
"args_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to write",
},
"content": {
"type": "string",
"description": "Content to write to the file",
},
"encoding": {
"type": "string",
"description": "File encoding (default: utf-8)",
},
},
"required": ["file_path", "content"],
},
},
"get_file_info": {
"description": "Get detailed information about a file or directory.",
"args_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file or directory",
}
},
"required": ["path"],
},
},
"create_directory": {
"description": "Create a new directory.",
"args_schema": {
"type": "object",
"properties": {
"dir_path": {
"type": "string",
"description": "Path of the directory to create",
}
},
"required": ["dir_path"],
},
},
"delete_file": {
"description": "Delete a file or directory.",
"args_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file or directory to delete",
}
},
"required": ["path"],
},
},
"copy_file": {
"description": "Copy a file from source to destination.",
"args_schema": {
"type": "object",
"properties": {
"source_path": {
"type": "string",
"description": "Path to the source file to copy",
},
"destination_path": {
"type": "string",
"description": "Path where the file should be copied to",
},
"overwrite": {
"type": "boolean",
"description": "Whether to overwrite existing file (default: false)",
},
},
"required": ["source_path", "destination_path"],
},
},
}
class InvokeRequest(BaseModel):
tool: str
inputs: Dict[str, Any]
# ---- MCP mandatory endpoints ---------------------------------------------
@app.get("/health")
async def health():
"""Minimal liveness probe (non‑spec but common)"""
return {"status": "ok", "service": "File System MCP Server"}
@app.get("/list_tools")
async def list_tools():
"""Return tool metadata according to MCP (`list_tools`)."""
return [{"name": name, **meta} for name, meta in TOOLS.items()]
@app.post("/invoke_tool")
async def invoke_tool(req: InvokeRequest):
"""Execute the chosen tool with given inputs (MCP `invoke_tool`)."""
name = req.tool
if name not in TOOLS:
raise HTTPException(status_code=404, detail="unknown tool")
try:
if name == "list_directory":
path = req.inputs.get("path", "")
return await list_directory_tool(path)
elif name == "read_file":
file_path = req.inputs["file_path"]
encoding = req.inputs.get("encoding", "utf-8")
return await read_file_tool(file_path, encoding)
elif name == "write_file":
file_path = req.inputs["file_path"]
content = req.inputs["content"]
encoding = req.inputs.get("encoding", "utf-8")
return await write_file_tool(file_path, content, encoding)
elif name == "get_file_info":
path = req.inputs["path"]
return await get_file_info_tool(path)
elif name == "create_directory":
dir_path = req.inputs["dir_path"]
return await create_directory_tool(dir_path)
elif name == "delete_file":
path = req.inputs["path"]
return await delete_file_tool(path)
elif name == "copy_file":
source_path = req.inputs["source_path"]
destination_path = req.inputs["destination_path"]
overwrite = req.inputs.get("overwrite", False)
return await copy_file_tool(source_path, destination_path, overwrite)
else:
raise HTTPException(status_code=501, detail="tool not implemented")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ---- Tool implementations (Backend API wrappers) -------------------------
async def list_directory_tool(path: str) -> Dict[str, Any]:
"""List files and directories by calling backend API."""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
# Handle empty path as /files/list/ (current directory)
if not path:
url = f"{BACKEND_API_URL}/files/list/"
else:
# URL encode the path to handle special characters
encoded_path = urllib.parse.quote(path, safe="")
url = f"{BACKEND_API_URL}/files/list/{encoded_path}"
response = await client.get(url)
response.raise_for_status()
return {"output": response.json()}
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=e.response.status_code, detail=e.response.text
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def read_file_tool(file_path: str, encoding: str = "utf-8") -> Dict[str, Any]:
"""Read file contents by calling backend API."""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
encoded_path = urllib.parse.quote(file_path, safe="")
response = await client.get(
f"{BACKEND_API_URL}/files/read/{encoded_path}",
params={"encoding": encoding},
)
response.raise_for_status()
return {"output": response.json()}
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=e.response.status_code, detail=e.response.text
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def write_file_tool(
file_path: str, content: str, encoding: str = "utf-8"
) -> Dict[str, Any]:
"""Write file contents by calling backend API."""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
encoded_path = urllib.parse.quote(file_path, safe="")
response = await client.post(
f"{BACKEND_API_URL}/files/write/{encoded_path}",
json={"content": content, "encoding": encoding},
)
response.raise_for_status()
return {"output": response.json()}
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=e.response.status_code, detail=e.response.text
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 이하 생략 (get_file_info_tool, create_directory_tool, delete_file_tool, copy_file_tool 등 동일하게 복사)