server.py•921 B
from mcp.server.fastmcp import FastMCP
from qiniu import Auth, put_file
import config
import os
import uuid
# Initialize Qiniu Auth
q = Auth(config.QINIU_ACCESS_KEY, config.QINIU_SECRET_KEY)
# Create MCP server
mcp = FastMCP("qiniu-uploader")
@mcp.tool()
def upload_file(file_path: str) -> str:
"""Uploads a file to Qiniu and returns its public URL"""
if not os.path.exists(file_path):
raise ValueError(f"File not found: {file_path}")
# Generate unique key
key = f"mcp-uploads/{uuid.uuid4()}{os.path.splitext(file_path)[1]}"
# Generate upload token
token = q.upload_token(config.QINIU_BUCKET_NAME, key, config.QINIU_UPLOAD_EXPIRES)
# Upload file
ret, info = put_file(token, key, file_path)
if info.status_code == 200:
return f"{config.QINIU_DOMAIN}/{key}"
else:
raise Exception(f"Upload failed: {info}")
if __name__ == "__main__":
mcp.run()