Skip to main content
Glama

MCP GPT Image 1

storage_server.py4.26 kB
import os from typing import List import aiofiles from fastapi import FastAPI, HTTPException, UploadFile from fastapi.responses import StreamingResponse import uvicorn import filetype app = FastAPI(title="Mini Object Storage Server") # 根存储目录(生产请配置到磁盘路径或挂载盘) STORAGE_ROOT = "storage_data" os.makedirs(STORAGE_ROOT, exist_ok=True) # 辅助:获取桶目录绝对路径 def bucket_path(bucket: str) -> str: return os.path.join(STORAGE_ROOT, bucket) # 辅助:获取对象在文件系统的真实路径 def object_path(bucket: str, object_key: str) -> str: # 防止 .. 越权 safe_key = object_key.strip("/").replace("..", "") return os.path.join(bucket_path(bucket), safe_key) # 1) 列举所有 buckets @app.get("/buckets", response_model=List[str]) async def list_buckets(): return [d for d in os.listdir(STORAGE_ROOT) if os.path.isdir(os.path.join(STORAGE_ROOT, d))] # 2) 创建 bucket @app.put("/buckets/{bucket}") async def create_bucket(bucket: str): bp = bucket_path(bucket) if os.path.exists(bp): raise HTTPException(400, f"Bucket '{bucket}' already exists.") os.makedirs(bp) return {"bucket": bucket, "created": True} # # 4) 列举 bucket 下所有对象(递归扁平化) # @app.get("/buckets/{bucket}/objects", response_model=List[str]) # async def list_objects(bucket: str): # bp = bucket_path(bucket) # if not os.path.isdir(bp): # raise HTTPException(404, "Bucket not found.") # objects = [] # for root, _, files in os.walk(bp): # for f in files: # full = os.path.join(root, f) # # 相对于桶根目录的 Key # key = os.path.relpath(full, bp).replace("\\", "/") # objects.append(key) # return objects # 5) 上传/覆盖一个对象 @app.put("/buckets/{bucket}/objects/{object_key:path}") async def upload_object(bucket: str, object_key: str, file: UploadFile): bp = bucket_path(bucket) if not os.path.isdir(bp): raise HTTPException(404, "Bucket not found.") dest_path = object_path(bucket, object_key) os.makedirs(os.path.dirname(dest_path), exist_ok=True) # 以异步方式写入文件 async with aiofiles.open(dest_path, "wb") as out_f: while True: chunk = await file.read(1024 * 1024) if not chunk: break await out_f.write(chunk) url_path = f'buckets/{bucket}/objects/{object_key}' return {"url_path":url_path ,"bucket": bucket, "key": object_key} # 6) 下载一个对象 @app.get("/buckets/{bucket}/objects/{object_key:path}") async def download_object(bucket: str, object_key: str): path = object_path(bucket, object_key) print(os.path.abspath(path)) if not os.path.isfile(path): raise HTTPException(404, "Object not found.") # 读取文件头部部分字节用于类型判断 async with aiofiles.open(path, "rb") as f: head = await f.read(261) # filetype 推荐读取 261 字节 kind = filetype.guess(head) if kind: mime_type = kind.mime else: mime_type = "application/octet-stream" async def streamer(): async with aiofiles.open(path, "rb") as f: while True: chunk = await f.read(1024 * 1024) if not chunk: break yield chunk return StreamingResponse(streamer(), media_type=mime_type) # # 7) 删除一个对象 # @app.delete("/buckets/{bucket}/objects/{object_key:path}") # async def delete_object(bucket: str, object_key: str): # path = object_path(bucket, object_key) # if not os.path.isfile(path): # raise HTTPException(404, "Object not found.") # os.remove(path) # # 可选:清理空目录 # parent = os.path.dirname(path) # while parent != bucket_path(bucket) and not os.listdir(parent): # os.rmdir(parent) # parent = os.path.dirname(parent) # return {"bucket": bucket, "key": object_key, "deleted": True} if __name__ == "__main__": uvicorn.run( "storage_server:app", host="", port=9000, reload=True, # 开发时热重载,可删 workers=1, # 生产可调整 )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GaussianGuaicai/MCP-OpenAI-ImageAPI'

If you have feedback or need assistance with the MCP directory API, please join our Discord server