Skip to main content
Glama
handlers.py14.6 kB
""" TOS MCP Server 功能处理器 """ import json import base64 import logging from typing import Any, Dict, List import tos from mcp.types import TextContent from .config import tos_config logger = logging.getLogger(__name__) # 初始化 TOS 客户端 tos_client = tos.TosClientV2( ak=tos_config.access_key, sk=tos_config.secret_key, endpoint=tos_config.endpoint, region=tos_config.region ) # 桶管理功能实现 async def create_bucket(args: Dict[str, Any]) -> List[TextContent]: """创建存储桶""" bucket_name = args["bucket_name"] acl = args.get("acl", "private") try: tos_client.create_bucket(bucket_name, tos.ACLType.ACL_Private if acl == "private" else tos.ACLType.ACL_Public_Read if acl == "public-read" else tos.ACLType.ACL_Public_Read_Write) return [TextContent(type="text", text=f"成功创建存储桶: {bucket_name}")] except Exception as e: return [TextContent(type="text", text=f"创建存储桶失败: {str(e)}")] async def list_buckets(_args: Dict[str, Any]) -> List[TextContent]: """列举存储桶""" try: resp = tos_client.list_buckets() buckets = [] for bucket in resp.buckets: buckets.append({ "name": bucket.name, "creation_date": str(bucket.creation_date) if bucket.creation_date else None, "location": bucket.location }) return [TextContent(type="text", text=json.dumps(buckets, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=f"列举存储桶失败: {str(e)}")] async def get_bucket_meta(args: Dict[str, Any]) -> List[TextContent]: """获取存储桶元数据""" bucket_name = args["bucket_name"] try: resp = tos_client.head_bucket(bucket_name) meta = { "bucket_name": bucket_name, "region": resp.region, "storage_class": str(resp.storage_class) if resp.storage_class else None } return [TextContent(type="text", text=json.dumps(meta, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=f"获取存储桶元数据失败: {str(e)}")] async def delete_bucket(args: Dict[str, Any]) -> List[TextContent]: """删除存储桶""" bucket_name = args["bucket_name"] try: tos_client.delete_bucket(bucket_name) return [TextContent(type="text", text=f"成功删除存储桶: {bucket_name}")] except Exception as e: return [TextContent(type="text", text=f"删除存储桶失败: {str(e)}")] # 对象管理功能实现 async def put_object(args: Dict[str, Any]) -> List[TextContent]: """上传对象""" bucket_name = args["bucket_name"] object_key = args["object_key"] content = args["content"] content_type = args.get("content_type", "application/octet-stream") is_base64 = args.get("is_base64", False) try: if is_base64: content_bytes = base64.b64decode(content) else: content_bytes = content.encode('utf-8') resp = tos_client.put_object(bucket_name, object_key, content=content_bytes, content_type=content_type, content_length=len(content_bytes)) return [TextContent(type="text", text=f"成功上传对象: {object_key} (ETag: {resp.etag})")] except Exception as e: return [TextContent(type="text", text=f"上传对象失败: {str(e)}")] async def get_object(args: Dict[str, Any]) -> List[TextContent]: """下载对象""" bucket_name = args["bucket_name"] object_key = args["object_key"] return_as_base64 = args.get("return_as_base64", False) try: resp = tos_client.get_object(bucket_name, object_key) content = resp.read() if return_as_base64: content_str = base64.b64encode(content).decode('utf-8') result = { "content": content_str, "content_type": resp.content_type, "content_length": resp.content_length, "encoding": "base64" } else: try: content_str = content.decode('utf-8') result = { "content": content_str, "content_type": resp.content_type, "content_length": resp.content_length, "encoding": "utf-8" } except UnicodeDecodeError: content_str = base64.b64encode(content).decode('utf-8') result = { "content": content_str, "content_type": resp.content_type, "content_length": resp.content_length, "encoding": "base64" } return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=f"下载对象失败: {str(e)}")] async def list_objects(args: Dict[str, Any]) -> List[TextContent]: """列举对象""" bucket_name = args["bucket_name"] prefix = args.get("prefix", "") delimiter = args.get("delimiter", "") max_keys = args.get("max_keys", 1000) try: resp = tos_client.list_objects_type2(bucket_name, prefix=prefix, delimiter=delimiter, max_keys=max_keys) result = { "objects": [], "common_prefixes": [], "is_truncated": resp.is_truncated, "next_continuation_token": resp.next_continuation_token } for obj in resp.contents: result["objects"].append({ "key": obj.key, "last_modified": str(obj.last_modified) if obj.last_modified else None, "size": obj.size, "etag": obj.etag, "storage_class": str(obj.storage_class) if obj.storage_class else None }) for prefix in resp.common_prefixes: result["common_prefixes"].append(prefix.prefix) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=f"列举对象失败: {str(e)}")] async def delete_object(args: Dict[str, Any]) -> List[TextContent]: """删除对象""" bucket_name = args["bucket_name"] object_key = args["object_key"] try: tos_client.delete_object(bucket_name, object_key) return [TextContent(type="text", text=f"成功删除对象: {object_key}")] except Exception as e: return [TextContent(type="text", text=f"删除对象失败: {str(e)}")] # 预签名 URL 功能实现 async def presigned_url(args: Dict[str, Any]) -> List[TextContent]: """生成预签名 URL""" bucket_name = args["bucket_name"] object_key = args["object_key"] method = args.get("method", "GET") expires = args.get("expires", 3600) try: if method == "GET": url = tos_client.pre_signed_url(tos.HttpMethodType.Http_Method_Get, bucket_name, object_key, expires) elif method == "PUT": url = tos_client.pre_signed_url(tos.HttpMethodType.Http_Method_Put, bucket_name, object_key, expires) elif method == "POST": url = tos_client.pre_signed_url(tos.HttpMethodType.Http_Method_Post, bucket_name, object_key, expires) elif method == "DELETE": url = tos_client.pre_signed_url(tos.HttpMethodType.Http_Method_Delete, bucket_name, object_key, expires) else: return [TextContent(type="text", text=f"不支持的HTTP方法: {method}")] result = { "url": url.signed_url, "method": method, "expires_in": expires, "bucket": bucket_name, "key": object_key } return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=f"生成预签名URL失败: {str(e)}")] # 图片处理功能实现 async def image_process(args: Dict[str, Any]) -> List[TextContent]: """图片处理(支持持久化)""" bucket_name = args["bucket_name"] object_key = args["object_key"] process = args["process"] save_bucket = args["save_bucket"] save_key = args["save_key"] try: # 使用官方SDK写法,通过save_bucket和save_object参数执行图片处理和持久化 resp = tos_client.get_object( bucket=bucket_name, key=object_key, process=process, save_bucket=base64.b64encode(save_bucket.encode("utf-8")).decode("utf-8"), save_object=base64.b64encode(save_key.encode("utf-8")).decode("utf-8") ) # 读取处理结果以确保处理完成 processed_data = resp.read() # 等待一下确保回写完成 import time time.sleep(1.0) # 生成处理后对象的预签名 URL download_url = tos_client.pre_signed_url(tos.HttpMethodType.Http_Method_Get, save_bucket, save_key, 3600) result = { "presigned_url": download_url.signed_url, "source_bucket": bucket_name, "source_key": object_key, "save_bucket": save_bucket, "save_key": save_key, "process": process, "processed_size": len(processed_data), "expires_in": 3600, "status": "processed" } return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=f"图片处理失败: {str(e)}")] async def image_info(args: Dict[str, Any]) -> List[TextContent]: """获取图片信息""" bucket_name = args["bucket_name"] object_key = args["object_key"] try: # 使用 get_object 方法通过 style 参数获取图片信息 # 设置处理参数为 image/info resp = tos_client.get_object(bucket_name, object_key, process="image/info") image_info_data = resp.read().decode('utf-8') # 尝试解析JSON响应 try: image_info_json = json.loads(image_info_data) result = { "bucket": bucket_name, "key": object_key, "image_info": image_info_json, "status": "success" } except json.JSONDecodeError: # 如果不是JSON格式,直接返回原始数据 result = { "bucket": bucket_name, "key": object_key, "image_info": image_info_data, "status": "success", "note": "返回原始格式数据" } return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=f"获取图片信息失败: {str(e)}")] # 视频处理功能实现 async def video_snapshot(args: Dict[str, Any]) -> List[TextContent]: """视频截帧(支持持久化)""" bucket_name = args["bucket_name"] object_key = args["object_key"] time = args.get("time", 300) format = args.get("format", "jpg") save_bucket = args["save_bucket"] save_key = args["save_key"] try: # 构建视频截帧处理参数,时间单位为毫秒 process = f"video/snapshot,t_{int(time)},f_{format}" # 使用官方SDK写法,通过save_bucket和save_object参数执行视频截帧和持久化 resp = tos_client.get_object( bucket=bucket_name, key=object_key, process=process, save_bucket=base64.b64encode(save_bucket.encode("utf-8")).decode("utf-8"), save_object=base64.b64encode(save_key.encode("utf-8")).decode("utf-8") ) # 读取处理结果以确保截帧完成 processed_data = resp.read() # 等待一下确保回写完成 import time as time_module time_module.sleep(1.0) # 生成截帧图片的预签名 URL download_url = tos_client.pre_signed_url(tos.HttpMethodType.Http_Method_Get, save_bucket, save_key, 3600) result = { "presigned_url": download_url.signed_url, "source_bucket": bucket_name, "source_key": object_key, "save_bucket": save_bucket, "save_key": save_key, "time": time, "format": format, "processed_size": len(processed_data), "expires_in": 3600, "status": "processed" } return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=f"视频截帧失败: {str(e)}")] async def video_info(args: Dict[str, Any]) -> List[TextContent]: """获取视频信息""" bucket_name = args["bucket_name"] object_key = args["object_key"] try: # 使用 get_object 方法通过 style 参数获取视频信息 # 设置处理参数为 video/info resp = tos_client.get_object(bucket_name, object_key, process="video/info") video_info_data = resp.read().decode('utf-8') # 尝试解析JSON响应 try: video_info_json = json.loads(video_info_data) result = { "bucket": bucket_name, "key": object_key, "video_info": video_info_json, "status": "success" } except json.JSONDecodeError: # 如果不是JSON格式,直接返回原始数据 result = { "bucket": bucket_name, "key": object_key, "video_info": video_info_data, "status": "success", "note": "返回原始格式数据" } return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=f"获取视频信息失败: {str(e)}")]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jneless/tos-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server