Skip to main content
Glama
panther-labs

Panther MCP Server

Official

get_bytes_processed_metrics

Retrieve data ingestion metrics to analyze volume patterns by log type and source, showing total bytes processed within specified time periods.

Instructions

Retrieves data ingestion metrics showing total bytes processed per log type and source, helping analyze data volume patterns.

Returns: Dict: - success: Boolean indicating if the query was successful - bytes_processed: List of series with breakdown by log type and source - total_bytes: Total bytes processed in the period - start_date: Start date of the period - end_date: End date of the period - interval_in_minutes: Grouping interval for the metrics

Permissions:{'all_of': ['Read Panther Metrics']}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
start_dateNoOptional start date in ISO-8601 format. If provided, defaults to the start of the current day UTC.
end_dateNoOptional end date in ISO-8601 format. If provided, defaults to the end of the current day UTC.
interval_in_minutesNoHow data points are aggregated over time, with smaller intervals providing more granular detail of when events occurred, while larger intervals show broader trends but obscure the precise timing of incidents.

Implementation Reference

  • Primary handler function for the 'get_bytes_processed_metrics' tool. Includes @mcp_tool decorator for automatic registration, input schema via Annotated Fields, logic to fetch and process bytes processed metrics from Panther GraphQL API.
    @mcp_tool( annotations={ "permissions": all_perms(Permission.SUMMARY_READ), "readOnlyHint": True, } ) async def get_bytes_processed_metrics( start_date: Annotated[ str | None, Field( description="Optional start date in ISO-8601 format. If provided, defaults to the start of the current day UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, end_date: Annotated[ str | None, Field( description="Optional end date in ISO-8601 format. If provided, defaults to the end of the current day UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, interval_in_minutes: Annotated[ int, BeforeValidator(_validate_interval), Field( description="How data points are aggregated over time, with smaller intervals providing more granular detail of when events occurred, while larger intervals show broader trends but obscure the precise timing of incidents.", examples=[60, 720, 1440], ), ] = 1440, ) -> dict[str, Any]: """Retrieves data ingestion metrics showing total bytes processed per log type and source, helping analyze data volume patterns. Returns: Dict: - success: Boolean indicating if the query was successful - bytes_processed: List of series with breakdown by log type and source - total_bytes: Total bytes processed in the period - start_date: Start date of the period - end_date: End date of the period - interval_in_minutes: Grouping interval for the metrics """ try: # If start or end date is missing, use week's date range if not start_date or not end_date: default_start_date, default_end_date = _get_week_date_range() if not start_date: start_date = default_start_date if not end_date: end_date = default_end_date logger.info( f"Fetching bytes processed metrics from {start_date} to {end_date} with {interval_in_minutes} minute interval" ) # Prepare variables variables = { "input": { "fromDate": start_date, "toDate": end_date, "intervalInMinutes": interval_in_minutes, } } # Execute query result = await _execute_query(METRICS_BYTES_PROCESSED_QUERY, variables) if not result or "metrics" not in result: logger.error(f"Could not find key 'metrics' in result: {result}") raise Exception("Failed to fetch metrics data") metrics_data = result["metrics"] bytes_processed = metrics_data["bytesProcessedPerSource"] # Calculate total bytes across all series total_bytes = sum(series["value"] for series in bytes_processed) return { "success": True, "bytes_processed": bytes_processed, "total_bytes": total_bytes, "start_date": start_date, "end_date": end_date, "interval_in_minutes": interval_in_minutes, } except Exception as e: logger.error(f"Failed to fetch bytes processed metrics: {str(e)}") return { "success": False, "message": f"Failed to fetch bytes processed metrics: {str(e)}", }
  • GraphQL query definition used by the tool handler. Defines the input (MetricsInput!) and output structure for bytes processed metrics, serving as the backend schema.
    METRICS_BYTES_PROCESSED_QUERY = gql(""" query GetBytesProcessedMetrics($input: MetricsInput!) { metrics(input: $input) { bytesProcessedPerSource { label value breakdown } } } """)
  • Explicit call to register_all_tools which collects and registers all @mcp_tool decorated functions, including get_bytes_processed_metrics, with the MCP server instance.
    from .panther_mcp_core.tools.registry import register_all_tools # Create the MCP server with lifespan context for shared HTTP client management # Note: Dependencies are declared in fastmcp.json for FastMCP v2.14.0+ mcp = FastMCP(MCP_SERVER_NAME, lifespan=lifespan) # Register all tools with MCP using the registry register_all_tools(mcp)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/panther-labs/mcp-panther'

If you have feedback or need assistance with the MCP directory API, please join our Discord server