Skip to main content
Glama

File System MCP Server

by terzeron
backend_api.py8.52 kB
# 내용은 filesystem_backend_api.py와 동일하게 복사 from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict, Any, List, Optional import os import json from datetime import datetime from pathlib import Path app = FastAPI(title="File System Backend API", version="1.0.0") # ---- Data Models --------------------------------------------------------- class FileInfo(BaseModel): name: str type: str # "file" or "directory" size: Optional[int] = None modified: str created: str permissions: str class DirectoryListing(BaseModel): path: str items: List[FileInfo] total_count: int class FileContent(BaseModel): file_path: str content: str size: int encoding: str class FileWriteRequest(BaseModel): content: str encoding: str = "utf-8" class DirectoryCreateRequest(BaseModel): dir_path: str class FileCopyRequest(BaseModel): source_path: str destination_path: str overwrite: bool = False # ---- API Endpoints ------------------------------------------------------- @app.get("/health") async def health(): """Health check endpoint""" return {"status": "ok", "service": "File System Backend API"} @app.get("/files/list") async def list_directory_default() -> DirectoryListing: """List files and directories in the current directory (default path)""" return await list_directory_with_path(".") @app.get("/files/list/{path:path}") async def list_directory(path: str) -> DirectoryListing: """List files and directories in the specified path""" return await list_directory_with_path(path) async def list_directory_with_path(path: str) -> DirectoryListing: """Internal function to list directory contents""" try: # Handle empty path or trailing slash if not path or path == "" or path == ".": path = "." path_obj = Path(path) if not path_obj.exists(): raise HTTPException(status_code=404, detail=f"Directory not found: {path}") if not path_obj.is_dir(): raise HTTPException( status_code=400, detail=f"Path is not a directory: {path}" ) items = [] for item in path_obj.iterdir(): stat_info = item.stat() item_info = FileInfo( name=item.name, type="directory" if item.is_dir() else "file", size=stat_info.st_size if item.is_file() else None, modified=datetime.fromtimestamp(stat_info.st_mtime).isoformat(), created=datetime.fromtimestamp(stat_info.st_ctime).isoformat(), permissions=oct(stat_info.st_mode)[-3:], ) items.append(item_info) return DirectoryListing( path=str(path_obj.absolute()), items=items, total_count=len(items) ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/files/read/{file_path:path}") async def read_file(file_path: str, encoding: str = "utf-8") -> FileContent: """Read the contents of a file""" try: path_obj = Path(file_path) if not path_obj.exists(): raise HTTPException(status_code=404, detail=f"File not found: {file_path}") if not path_obj.is_file(): raise HTTPException( status_code=400, detail=f"Path is a directory, not a file: {file_path}" ) with open(path_obj, "r", encoding=encoding) as f: content = f.read() return FileContent( file_path=str(path_obj.absolute()), content=content, size=len(content), encoding=encoding, ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/files/write/{file_path:path}") async def write_file(file_path: str, request: FileWriteRequest) -> Dict[str, Any]: """Write content to a file""" try: path_obj = Path(file_path) # Create parent directories if they don't exist path_obj.parent.mkdir(parents=True, exist_ok=True) with open(path_obj, "w", encoding=request.encoding) as f: f.write(request.content) return { "file_path": str(path_obj.absolute()), "bytes_written": len(request.content.encode(request.encoding)), "encoding": request.encoding, "message": "File written successfully", } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/files/info/{path:path}") async def get_file_info(path: str) -> Dict[str, Any]: """Get detailed information about a file or directory""" try: path_obj = Path(path) if not path_obj.exists(): raise HTTPException(status_code=404, detail=f"Path not found: {path}") stat_info = path_obj.stat() info = { "path": str(path_obj.absolute()), "name": path_obj.name, "type": "directory" if path_obj.is_dir() else "file", "size": stat_info.st_size, "created": datetime.fromtimestamp(stat_info.st_ctime).isoformat(), "modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(), "accessed": datetime.fromtimestamp(stat_info.st_atime).isoformat(), "permissions": oct(stat_info.st_mode)[-3:], "owner": stat_info.st_uid, "group": stat_info.st_gid, } if path_obj.is_file(): info["extension"] = path_obj.suffix info["parent_directory"] = str(path_obj.parent) return info except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/directories/create") async def create_directory(request: DirectoryCreateRequest) -> Dict[str, Any]: """Create a new directory""" try: path_obj = Path(request.dir_path) if path_obj.exists(): raise HTTPException( status_code=409, detail=f"Directory already exists: {request.dir_path}" ) path_obj.mkdir(parents=True, exist_ok=False) return { "dir_path": str(path_obj.absolute()), "message": "Directory created successfully", } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/files/{path:path}") async def delete_file(path: str) -> Dict[str, Any]: """Delete a file or directory""" try: path_obj = Path(path) if not path_obj.exists(): raise HTTPException(status_code=404, detail=f"Path not found: {path}") if path_obj.is_dir(): path_obj.rmdir() # Only removes empty directories message = "Directory deleted successfully" else: path_obj.unlink() message = "File deleted successfully" return {"path": str(path_obj.absolute()), "message": message} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/files/copy") async def copy_file(request: FileCopyRequest) -> Dict[str, Any]: """Copy a file from source to destination""" try: source_path = Path(request.source_path) destination_path = Path(request.destination_path) # Check if source file exists if not source_path.exists(): raise HTTPException( status_code=404, detail=f"Source file not found: {request.source_path}" ) if not source_path.is_file(): raise HTTPException( status_code=400, detail=f"Source path is not a file: {request.source_path}", ) # Check if destination already exists if destination_path.exists() and not request.overwrite: raise HTTPException( status_code=409, detail=f"Destination file already exists: {request.destination_path}", ) # Copy file with open(source_path, "rb") as src, open(destination_path, "wb") as dst: data = src.read() dst.write(data) return { "source_path": str(source_path.absolute()), "destination_path": str(destination_path.absolute()), "bytes_copied": len(data), "message": "File copied successfully", } except Exception as e: raise HTTPException(status_code=500, detail=str(e))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/terzeron/mcp_test'

If you have feedback or need assistance with the MCP directory API, please join our Discord server