Skip to main content
Glama
liqcui

OpenShift OVN-Kubernetes Benchmark MCP Server

by liqcui
ovnk_benchmark_elt_pods_usage.py11.3 kB
""" Extract, Load, Transform module for OpenShift Pods Usage Handles pod usage data from ovnk_benchmark_prometheus_pods_usage.py """ import logging from typing import Dict, Any, List import pandas as pd from .ovnk_benchmark_elt_utility import EltUtility logger = logging.getLogger(__name__) class PodsUsageELT(EltUtility): """Extract, Load, Transform class for pod usage data""" def __init__(self): super().__init__() def extract_pod_usage(self, data: Dict[str, Any]) -> Dict[str, Any]: """Extract pod usage metrics from ovnk_benchmark_prometheus_pods_usage.py output""" structured = { 'usage_summary': [], 'top_cpu_pods': [], 'top_memory_pods': [], 'cpu_detailed': [], 'memory_detailed': [] } # Usage collection summary query_info = data.get('query_info', {}) structured['usage_summary'] = [ {'Property': 'Collection Type', 'Value': data.get('collection_type', 'instant')}, {'Property': 'Collection Time', 'Value': self.format_timestamp(data.get('collection_timestamp', 'Unknown'))}, {'Property': 'Total Analyzed', 'Value': data.get('total_analyzed', 0)}, {'Property': 'Include Containers', 'Value': 'Yes' if data.get('include_containers', False) else 'No'}, {'Property': 'Duration', 'Value': query_info.get('duration', 'N/A')}, {'Property': 'Query Step', 'Value': query_info.get('step', 'N/A')} ] # Top 5 CPU usage - simplified view cpu_usage = data.get('top_5_cpu_usage', []) for item in cpu_usage: rank = item.get('rank', 0) pod_name = item.get('pod_name', 'unknown') node_name = self.truncate_node_name(item.get('node_name', 'unknown')) container_name = item.get('container_name', '') namespace = item.get('namespace', 'unknown') # Format pod display name if container_name and container_name != 'unknown': pod_display = f"{pod_name}:{container_name}" else: pod_display = pod_name # Truncate long names if len(pod_display) > 35: pod_display = pod_display[:32] + '...' # Get CPU metric value - prioritize the main cpu_usage metric cpu_value = 'N/A' metrics = item.get('metrics', {}) # Look for cpu_usage metric first cpu_metric = metrics.get('cpu_usage', {}) if cpu_metric: if data.get('collection_type') == 'instant': cpu_value = f"{cpu_metric.get('value', 0):.2f}%" else: # For duration queries, show avg/max avg_val = cpu_metric.get('avg', 0) max_val = cpu_metric.get('max', 0) cpu_value = f"{avg_val:.1f}% (max: {max_val:.1f}%)" structured['top_cpu_pods'].append({ 'Rank': rank, 'Pod': self.truncate_text(pod_display, 30), 'Node': node_name, 'CPU Usage': cpu_value }) # Detailed CPU metrics for separate table (omit Min % for readability) if cpu_metric and data.get('collection_type') == 'duration': structured['cpu_detailed'].append({ 'Pod': self.truncate_text(pod_display, 25), 'Avg %': f"{cpu_metric.get('avg', 0):.2f}", 'Max %': f"{cpu_metric.get('max', 0):.2f}", 'Node': self.truncate_node_name(node_name, 20), 'Namespace': namespace }) # Top 5 Memory usage - include both working set (memory_usage) and RSS memory_usage = data.get('top_5_memory_usage', []) for item in memory_usage: rank = item.get('rank', 0) pod_name = item.get('pod_name', 'unknown') node_name = self.truncate_node_name(item.get('node_name', 'unknown')) container_name = item.get('container_name', '') namespace = item.get('namespace', 'unknown') # Format pod display name if container_name and container_name != 'unknown': pod_display = f"{pod_name}:{container_name}" else: pod_display = pod_name # Truncate long names if len(pod_display) > 35: pod_display = pod_display[:32] + '...' # Get memory metrics: working set (memory_usage or memory_working_set) and RSS metrics = item.get('metrics', {}) ws_metric = metrics.get('memory_usage') or metrics.get('memory_working_set', {}) rss_metric = metrics.get('memory_rss', {}) # Prepare display strings ws_value_str = 'N/A' rss_value_str = 'N/A' if ws_metric: ws_unit = ws_metric.get('unit', 'B') if data.get('collection_type') == 'instant': ws_value_str = f"{ws_metric.get('value', 0)} {ws_unit}" else: ws_avg = ws_metric.get('avg', 0) ws_max = ws_metric.get('max', 0) ws_value_str = f"{ws_avg:.0f} {ws_unit} (max: {ws_max:.0f})" if rss_metric: rss_unit = rss_metric.get('unit', 'B') if data.get('collection_type') == 'instant': rss_value_str = f"{rss_metric.get('value', 0)} {rss_unit}" else: rss_avg = rss_metric.get('avg', 0) rss_max = rss_metric.get('max', 0) rss_value_str = f"{rss_avg:.0f} {rss_unit} (max: {rss_max:.0f})" structured['top_memory_pods'].append({ 'Rank': rank, 'Pod': self.truncate_text(pod_display, 30), 'Node': node_name, 'Memory Usage': ws_value_str, 'Memory RSS': rss_value_str }) # Detailed Memory metrics for separate table (duration queries) if data.get('collection_type') == 'duration' and (ws_metric or rss_metric): ws_unit = (ws_metric or {}).get('unit', 'MB') rss_unit = (rss_metric or {}).get('unit', 'MB') row = { 'Pod': self.truncate_text(pod_display, 25), 'Node': self.truncate_node_name(node_name, 20), 'Namespace': namespace } if ws_metric: row[f'Avg WS ({ws_unit})'] = f"{ws_metric.get('avg', 0):.0f}" row[f'Max WS ({ws_unit})'] = f"{ws_metric.get('max', 0):.0f}" if rss_metric: row[f'Avg RSS ({rss_unit})'] = f"{rss_metric.get('avg', 0):.0f}" row[f'Max RSS ({rss_unit})'] = f"{rss_metric.get('max', 0):.0f}" structured['memory_detailed'].append(row) return structured def summarize_pod_usage(self, data: Dict[str, Any]) -> str: """Generate pod usage summary""" summary = ["Pod Usage Analysis:"] if 'usage_summary' in data: collection_type = next((item['Value'] for item in data['usage_summary'] if item['Property'] == 'Collection Type'), 'unknown') total_analyzed = next((item['Value'] for item in data['usage_summary'] if item['Property'] == 'Total Analyzed'), 0) duration = next((item['Value'] for item in data['usage_summary'] if item['Property'] == 'Duration'), 'N/A') if duration != 'N/A': summary.append(f"• {collection_type} collection over {duration} ({total_analyzed} pods analyzed)") else: summary.append(f"• {collection_type} collection ({total_analyzed} pods analyzed)") # Top CPU consumer if 'top_cpu_pods' in data and data['top_cpu_pods']: top_cpu = data['top_cpu_pods'][0] pod_name = top_cpu.get('Pod', 'unknown') cpu_usage = top_cpu.get('CPU Usage', 'N/A') summary.append(f"• Top CPU: {pod_name} ({cpu_usage})") # Top Memory consumer if 'top_memory_pods' in data and data['top_memory_pods']: top_memory = data['top_memory_pods'][0] pod_name = top_memory.get('Pod', 'unknown') memory_usage = top_memory.get('Memory Usage', 'N/A') summary.append(f"• Top Memory: {pod_name} ({memory_usage})") # Add collection info if 'cpu_detailed' in data and data['cpu_detailed']: summary.append(f"• Detailed metrics available for {len(data['cpu_detailed'])} pods") return " ".join(summary) def transform_to_dataframes(self, structured_data: Dict[str, Any]) -> Dict[str, pd.DataFrame]: """Transform structured data into pandas DataFrames with proper column limiting""" dataframes = {} try: for key, value in structured_data.items(): if isinstance(value, list) and value: df = pd.DataFrame(value) if not df.empty: # Apply specific column limiting based on table type if key in ['usage_summary']: # Summary tables - limit to 2 columns for better readability df = self.limit_dataframe_columns(df, max_cols=2, table_name=key) elif key in ['top_cpu_pods', 'top_memory_pods']: # Top usage tables - limit to 4 columns for readability df = self.limit_dataframe_columns(df, max_cols=4, table_name=key) elif key == 'cpu_detailed': # CPU detailed - keep to a reasonable number of columns df = self.limit_dataframe_columns(df, max_cols=6, table_name=key) elif key == 'memory_detailed': # Memory detailed - show all available columns for readability; no limiting df = df else: # Default limiting df = self.limit_dataframe_columns(df, table_name=key) dataframes[key] = df except Exception as e: logger.error(f"Failed to transform pod usage to DataFrames: {e}") return dataframes def generate_html_tables(self, dataframes: Dict[str, pd.DataFrame]) -> Dict[str, str]: """Generate HTML tables from DataFrames""" html_tables = {} try: for name, df in dataframes.items(): if not df.empty: html_tables[name] = self.create_html_table(df, name) except Exception as e: logger.error(f"Failed to generate pod usage HTML tables: {e}") return html_tables

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liqcui/ovnk-benchmark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server