live_streaming_query_live_traffic_stats
Retrieve live streaming traffic statistics including total traffic, average bandwidth, and peak bandwidth for specified time periods to monitor and analyze streaming performance.
Instructions
Query live streaming traffic statistics for a time range. Returns total traffic (bytes), average bandwidth (bps), peak bandwidth (bps), and optionally raw data for download.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| begin | Yes | Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000) | |
| end | Yes | End time in format YYYYMMDDHHMMSS (e.g., 20240129105148) | |
| include_raw_data | No | If true, includes raw JSON data and detailed data points for download. Default is false. |
Implementation Reference
- Core handler implementation in LiveStreamingService that queries the API for traffic stats, processes the data to compute total traffic, average/peak bandwidth, formats outputs, and handles errors.async def query_live_traffic_stats(self, begin: str, end: str, include_raw_data: bool = False) -> Dict[str, Any]: """ Query live streaming traffic statistics Args: begin: Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000) end: End time in format YYYYMMDDHHMMSS (e.g., 20240129105148) include_raw_data: If True, includes raw JSON data for download (default: False) Returns: Dict containing traffic statistics with total traffic (bytes), average bandwidth (bps), peak bandwidth (bps), and optionally raw data """ if not self.live_endpoint: self.live_endpoint = "mls.cn-east-1.qiniumiku.com" # Remove protocol and bucket prefix to get base endpoint endpoint = self.live_endpoint if endpoint.startswith("http://"): endpoint = endpoint[7:] elif endpoint.startswith("https://"): endpoint = endpoint[8:] # Remove bucket prefix if present (format: bucket.endpoint) # if '.' in endpoint: # parts = endpoint.split('.', 1) # if len(parts) > 1: # endpoint = parts[1] url = f"http://{endpoint}/?trafficStats&begin={begin}&end={end}&g=5min&select=flow&flow=downflow" headers = self._get_auth_header(method="GET", url=url) logger.info(f"Querying live traffic stats from {begin} to {end}") async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: status = response.status text = await response.text() if status == 200: logger.info("Successfully queried live traffic stats") try: # Parse JSON response data = json.loads(text) # Calculate total traffic and bandwidth metrics total_traffic_bytes = 0 bandwidth_values = [] data_points = [] # Data format: [{"time":"2025-11-26T00:00:00+08:00","values":{"flow":0}}, ...] for item in data: if isinstance(item, dict) and "values" in item and "flow" in item["values"]: flow_bytes = item["values"]["flow"] total_traffic_bytes += flow_bytes # Convert to bandwidth: flow is accumulated over 5 minutes (300 seconds) # Bandwidth (bps) = bytes / 300 seconds * 8 bits/byte bandwidth_bps = (flow_bytes / 300) * 8 bandwidth_values.append(bandwidth_bps) # Store data point with timestamp data_points.append({ "time": item.get("time", ""), "traffic_bytes": flow_bytes, "bandwidth_bps": bandwidth_bps }) # Calculate average and peak bandwidth avg_bandwidth_bps = sum(bandwidth_values) / len(bandwidth_values) if bandwidth_values else 0 peak_bandwidth_bps = max(bandwidth_values) if bandwidth_values else 0 # Convert to human-readable units def format_bytes(bytes_val): """Convert bytes to human-readable format""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes_val < 1024.0: return f"{bytes_val:.2f} {unit}" bytes_val /= 1024.0 return f"{bytes_val:.2f} PB" def format_bandwidth(bps): """Convert bits per second to human-readable format""" for unit in ['bps', 'Kbps', 'Mbps', 'Gbps', 'Tbps']: if bps < 1000.0: return f"{bps:.2f} {unit}" bps /= 1000.0 return f"{bps:.2f} Pbps" result = { "status": "success", "begin": begin, "end": end, "summary": { "total_traffic_bytes": total_traffic_bytes, "total_traffic_formatted": format_bytes(total_traffic_bytes), "data_points_count": len(data_points), "average_bandwidth_bps": avg_bandwidth_bps, "average_bandwidth_formatted": format_bandwidth(avg_bandwidth_bps), "peak_bandwidth_bps": peak_bandwidth_bps, "peak_bandwidth_formatted": format_bandwidth(peak_bandwidth_bps), "granularity": "5 minutes" }, "message": "Traffic statistics calculated successfully", "status_code": status } # Include raw data only if requested if include_raw_data: result["raw_data"] = data result["data_points"] = data_points return result except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON response: {e}") return { "status": "error", "begin": begin, "end": end, "message": f"Failed to parse traffic stats response: {str(e)}", "raw_response": text, "status_code": status } except Exception as e: logger.error(f"Error processing traffic stats: {e}") return { "status": "error", "begin": begin, "end": end, "message": f"Error processing traffic stats: {str(e)}", "status_code": status } else: logger.error(f"Failed to query traffic stats, status: {status}, response: {text}") return { "status": "error", "begin": begin, "end": end, "message": f"Failed to query traffic stats: {text}", "status_code": status }
- MCP tool handler wrapper that defines the tool metadata (including schema) and delegates to LiveStreamingService.query_live_traffic_stats.@tools.tool_meta( types.Tool( name="live_streaming_query_live_traffic_stats", description="Query live streaming traffic statistics for a time range. Returns total traffic (bytes), average bandwidth (bps), peak bandwidth (bps), and optionally raw data for download.", inputSchema={ "type": "object", "properties": { "begin": { "type": "string", "description": "Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000)", }, "end": { "type": "string", "description": "End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)", }, "include_raw_data": { "type": "boolean", "description": "If true, includes raw JSON data and detailed data points for download. Default is false.", "default": False, }, }, "required": ["begin", "end"], }, ) ) async def query_live_traffic_stats(self, **kwargs) -> list[types.TextContent]: result = await self.live_streaming.query_live_traffic_stats(**kwargs) return [types.TextContent(type="text", text=str(result))]
- Input schema definition for the live_streaming_query_live_traffic_stats tool.inputSchema={ "type": "object", "properties": { "begin": { "type": "string", "description": "Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000)", }, "end": { "type": "string", "description": "End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)", }, "include_raw_data": { "type": "boolean", "description": "If true, includes raw JSON data and detailed data points for download. Default is false.", "default": False, }, }, "required": ["begin", "end"], }, )
- src/mcp_server/core/live_streaming/tools.py:242-257 (registration)Registration function that creates _ToolImpl instance and registers all live streaming tools, including query_live_traffic_stats, using tools.auto_register_tools.def register_tools(live_streaming: LiveStreamingService): tool_impl = _ToolImpl(live_streaming) tools.auto_register_tools( [ tool_impl.create_bucket, tool_impl.create_stream, tool_impl.bind_push_domain, tool_impl.bind_play_domain, tool_impl.get_push_urls, tool_impl.get_play_urls, tool_impl.query_live_traffic_stats, tool_impl.list_buckets, tool_impl.list_streams, ] )