live_streaming_create_bucket
Create a new bucket for live streaming storage using S3-style API to organize and manage streaming content on Qiniu Cloud.
Instructions
Create a new bucket in LiveStreaming using S3-style API. The bucket will be created at https://.<endpoint_url>
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| bucket | Yes | LiveStreaming bucket name |
Implementation Reference
- The primary handler function for the 'live_streaming_create_bucket' MCP tool. It defines the tool metadata (including input schema) and executes the logic by delegating to LiveStreamingService.create_bucket.@tools.tool_meta( types.Tool( name="live_streaming_create_bucket", description="Create a new bucket in LiveStreaming using S3-style API. The bucket will be created at https://<bucket>.<endpoint_url>", inputSchema={ "type": "object", "properties": { "bucket": { "type": "string", "description": _BUCKET_DESC, }, }, "required": ["bucket"], }, ) ) async def create_bucket(self, **kwargs) -> list[types.TextContent]: result = await self.live_streaming.create_bucket(**kwargs) return [types.TextContent(type="text", text=str(result))]
- src/mcp_server/core/live_streaming/tools.py:242-256 (registration)The register_tools function where the live_streaming_create_bucket handler is registered into the MCP tools registry via auto_register_tools.def register_tools(live_streaming: LiveStreamingService): tool_impl = _ToolImpl(live_streaming) tools.auto_register_tools( [ tool_impl.create_bucket, tool_impl.create_stream, tool_impl.bind_push_domain, tool_impl.bind_play_domain, tool_impl.get_push_urls, tool_impl.get_play_urls, tool_impl.query_live_traffic_stats, tool_impl.list_buckets, tool_impl.list_streams, ] )
- Core helper implementation in LiveStreamingService that performs the HTTP PUT request to create the bucket, handles authentication, and processes the response.async def create_bucket(self, bucket: str) -> Dict[str, Any]: """ Create a bucket using S3-style API Args: bucket: The bucket name to create Returns: Dict containing the response status and message """ url = self._build_bucket_url(bucket) data = {} bodyJson = json.dumps(data) auth_headers = self._get_auth_header(method="PUT", url=url, content_type="application/json", body=bodyJson) headers = {"Content-Type": "application/json"} # 如果有认证头,添加到headers中 if auth_headers: headers.update(auth_headers) # 打印 HTTP 请求信息 print("=== HTTP 请求信息 ===") print("方法: PUT") print(f"URL: {url}") print("请求头:") for key, value in headers.items(): print(f" {key}: {value}") print("请求体: {}") print("===================") print(f"Creating bucket: {bucket} at {url}") async with aiohttp.ClientSession() as session: async with session.put(url, headers=headers, data=bodyJson) as response: status = response.status text = await response.text() print(f"状态码: {status}") print(f"响应内容: {text}") print("==================") if status == 200 or status == 201: logger.info(f"Successfully created bucket: {bucket}") return { "status": "success", "bucket": bucket, "url": url, "message": f"Bucket '{bucket}' created successfully", "status_code": status } else: logger.error(f"Failed to create bucket: {bucket}, status: {status}, response: {text}") return { "status": "error", "bucket": bucket, "url": url, "message": f"Failed to create bucket: {text}", "status_code": status }
- Helper method to construct the S3-style bucket URL used in the create_bucket implementation.def _build_bucket_url(self, bucket: str) -> str: """Build S3-style bucket URL""" if not self.live_endpoint: self.live_endpoint = "mls.cn-east-1.qiniumiku.com" # Remove protocol if present in live_endpoint endpoint = self.live_endpoint if endpoint.startswith("http://"): endpoint = endpoint[7:] elif endpoint.startswith("https://"): endpoint = endpoint[8:] # Build URL in format: https://<bucket>.<endpoint> return f"https://{bucket}.{endpoint}"
- src/mcp_server/core/live_streaming/__init__.py:6-8 (registration)The load function that instantiates LiveStreamingService and calls register_tools to register all live streaming tools including live_streaming_create_bucket.def load(cfg: config.Config): live = LiveStreamingService(cfg) register_tools(live)