Skip to main content
Glama
qiniu

Qiniu MCP Server

Official
by qiniu

live_streaming_query_live_traffic_stats

Retrieve live streaming traffic statistics including total traffic, average bandwidth, and peak bandwidth for specified time periods to monitor and analyze streaming performance.

Instructions

Query live streaming traffic statistics for a time range. Returns total traffic (bytes), average bandwidth (bps), peak bandwidth (bps), and optionally raw data for download.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
beginYesStart time in format YYYYMMDDHHMMSS (e.g., 20240101000000)
endYesEnd time in format YYYYMMDDHHMMSS (e.g., 20240129105148)
include_raw_dataNoIf true, includes raw JSON data and detailed data points for download. Default is false.

Implementation Reference

  • Core handler implementation in LiveStreamingService that queries the API for traffic stats, processes the data to compute total traffic, average/peak bandwidth, formats outputs, and handles errors.
    async def query_live_traffic_stats(self, begin: str, end: str, include_raw_data: bool = False) -> Dict[str, Any]:
        """
        Query live streaming traffic statistics
    
        Args:
            begin: Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000)
            end: End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)
            include_raw_data: If True, includes raw JSON data for download (default: False)
    
        Returns:
            Dict containing traffic statistics with total traffic (bytes), average bandwidth (bps),
            peak bandwidth (bps), and optionally raw data
        """
        if not self.live_endpoint:
            self.live_endpoint = "mls.cn-east-1.qiniumiku.com"
    
        # Remove protocol and bucket prefix to get base endpoint
        endpoint = self.live_endpoint
        if endpoint.startswith("http://"):
            endpoint = endpoint[7:]
        elif endpoint.startswith("https://"):
            endpoint = endpoint[8:]
    
        # Remove bucket prefix if present (format: bucket.endpoint)
       # if '.' in endpoint:
       #     parts = endpoint.split('.', 1)
       #     if len(parts) > 1:
       #         endpoint = parts[1]
    
        url = f"http://{endpoint}/?trafficStats&begin={begin}&end={end}&g=5min&select=flow&flow=downflow"
        headers = self._get_auth_header(method="GET", url=url)
    
        logger.info(f"Querying live traffic stats from {begin} to {end}")
    
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=headers) as response:
                status = response.status
                text = await response.text()
    
                if status == 200:
                    logger.info("Successfully queried live traffic stats")
    
                    try:
                        # Parse JSON response
                        data = json.loads(text)
    
                        # Calculate total traffic and bandwidth metrics
                        total_traffic_bytes = 0
                        bandwidth_values = []
                        data_points = []
    
                        # Data format: [{"time":"2025-11-26T00:00:00+08:00","values":{"flow":0}}, ...]
                        for item in data:
                            if isinstance(item, dict) and "values" in item and "flow" in item["values"]:
                                flow_bytes = item["values"]["flow"]
                                total_traffic_bytes += flow_bytes
    
                                # Convert to bandwidth: flow is accumulated over 5 minutes (300 seconds)
                                # Bandwidth (bps) = bytes / 300 seconds * 8 bits/byte
                                bandwidth_bps = (flow_bytes / 300) * 8
                                bandwidth_values.append(bandwidth_bps)
    
                                # Store data point with timestamp
                                data_points.append({
                                    "time": item.get("time", ""),
                                    "traffic_bytes": flow_bytes,
                                    "bandwidth_bps": bandwidth_bps
                                })
    
                        # Calculate average and peak bandwidth
                        avg_bandwidth_bps = sum(bandwidth_values) / len(bandwidth_values) if bandwidth_values else 0
                        peak_bandwidth_bps = max(bandwidth_values) if bandwidth_values else 0
    
                        # Convert to human-readable units
                        def format_bytes(bytes_val):
                            """Convert bytes to human-readable format"""
                            for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
                                if bytes_val < 1024.0:
                                    return f"{bytes_val:.2f} {unit}"
                                bytes_val /= 1024.0
                            return f"{bytes_val:.2f} PB"
    
                        def format_bandwidth(bps):
                            """Convert bits per second to human-readable format"""
                            for unit in ['bps', 'Kbps', 'Mbps', 'Gbps', 'Tbps']:
                                if bps < 1000.0:
                                    return f"{bps:.2f} {unit}"
                                bps /= 1000.0
                            return f"{bps:.2f} Pbps"
    
                        result = {
                            "status": "success",
                            "begin": begin,
                            "end": end,
                            "summary": {
                                "total_traffic_bytes": total_traffic_bytes,
                                "total_traffic_formatted": format_bytes(total_traffic_bytes),
                                "data_points_count": len(data_points),
                                "average_bandwidth_bps": avg_bandwidth_bps,
                                "average_bandwidth_formatted": format_bandwidth(avg_bandwidth_bps),
                                "peak_bandwidth_bps": peak_bandwidth_bps,
                                "peak_bandwidth_formatted": format_bandwidth(peak_bandwidth_bps),
                                "granularity": "5 minutes"
                            },
                            "message": "Traffic statistics calculated successfully",
                            "status_code": status
                        }
    
                        # Include raw data only if requested
                        if include_raw_data:
                            result["raw_data"] = data
                            result["data_points"] = data_points
    
                        return result
    
                    except json.JSONDecodeError as e:
                        logger.error(f"Failed to parse JSON response: {e}")
                        return {
                            "status": "error",
                            "begin": begin,
                            "end": end,
                            "message": f"Failed to parse traffic stats response: {str(e)}",
                            "raw_response": text,
                            "status_code": status
                        }
                    except Exception as e:
                        logger.error(f"Error processing traffic stats: {e}")
                        return {
                            "status": "error",
                            "begin": begin,
                            "end": end,
                            "message": f"Error processing traffic stats: {str(e)}",
                            "status_code": status
                        }
                else:
                    logger.error(f"Failed to query traffic stats, status: {status}, response: {text}")
                    return {
                        "status": "error",
                        "begin": begin,
                        "end": end,
                        "message": f"Failed to query traffic stats: {text}",
                        "status_code": status
                    }
  • MCP tool handler wrapper that defines the tool metadata (including schema) and delegates to LiveStreamingService.query_live_traffic_stats.
    @tools.tool_meta(
        types.Tool(
            name="live_streaming_query_live_traffic_stats",
            description="Query live streaming traffic statistics for a time range. Returns total traffic (bytes), average bandwidth (bps), peak bandwidth (bps), and optionally raw data for download.",
            inputSchema={
                "type": "object",
                "properties": {
                    "begin": {
                        "type": "string",
                        "description": "Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000)",
                    },
                    "end": {
                        "type": "string",
                        "description": "End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)",
                    },
                    "include_raw_data": {
                        "type": "boolean",
                        "description": "If true, includes raw JSON data and detailed data points for download. Default is false.",
                        "default": False,
                    },
                },
                "required": ["begin", "end"],
            },
        )
    )
    async def query_live_traffic_stats(self, **kwargs) -> list[types.TextContent]:
        result = await self.live_streaming.query_live_traffic_stats(**kwargs)
        return [types.TextContent(type="text", text=str(result))]
  • Input schema definition for the live_streaming_query_live_traffic_stats tool.
        inputSchema={
            "type": "object",
            "properties": {
                "begin": {
                    "type": "string",
                    "description": "Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000)",
                },
                "end": {
                    "type": "string",
                    "description": "End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)",
                },
                "include_raw_data": {
                    "type": "boolean",
                    "description": "If true, includes raw JSON data and detailed data points for download. Default is false.",
                    "default": False,
                },
            },
            "required": ["begin", "end"],
        },
    )
  • Registration function that creates _ToolImpl instance and registers all live streaming tools, including query_live_traffic_stats, using tools.auto_register_tools.
    def register_tools(live_streaming: LiveStreamingService):
        tool_impl = _ToolImpl(live_streaming)
        tools.auto_register_tools(
            [
                tool_impl.create_bucket,
                tool_impl.create_stream,
                tool_impl.bind_push_domain,
                tool_impl.bind_play_domain,
                tool_impl.get_push_urls,
                tool_impl.get_play_urls,
                tool_impl.query_live_traffic_stats,
                tool_impl.list_buckets,
                tool_impl.list_streams,
            ]
        )
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It mentions the return values (total traffic, average bandwidth, peak bandwidth, optional raw data) but lacks critical details: it doesn't specify if this is a read-only operation, what permissions are required, whether there are rate limits, or how the data is formatted (e.g., units, timezone). For a query tool with no annotation coverage, this leaves significant gaps in understanding its behavior.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is appropriately sized and front-loaded, with the core purpose stated first in a single sentence. The second sentence efficiently lists return values and the optional raw data feature. There's no wasted text, making it easy to scan, though it could be slightly more structured (e.g., bullet points for returns).

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's moderate complexity (querying statistics with 3 parameters) and no annotations or output schema, the description is partially complete. It covers the purpose and return values but lacks behavioral details (e.g., read-only status, error handling) and doesn't fully compensate for the missing output schema. It's adequate as a minimum viable description but has clear gaps in context.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 100%, so the input schema fully documents the parameters (begin, end, include_raw_data). The description adds minimal value beyond the schema: it implies time-range filtering and mentions the optional raw data parameter, but doesn't provide additional context like valid time ranges, timezone handling, or what 'raw data' entails. This meets the baseline of 3 when the schema does the heavy lifting.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Query live streaming traffic statistics for a time range.' It specifies the verb ('query'), resource ('live streaming traffic statistics'), and scope ('for a time range'). However, it doesn't explicitly differentiate from sibling tools like 'live_streaming_list_streams' or 'live_streaming_get_play_urls', which might also involve live streaming data but serve different purposes.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. It mentions an optional parameter for raw data but doesn't explain when to include it or how it differs from the default output. With many sibling tools (e.g., 'live_streaming_list_streams', 'live_streaming_get_play_urls'), there's no indication of when this query tool is preferred over others for accessing live streaming data.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qiniu/qiniu-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server