Skip to main content
Glama
liqcui

OpenShift OVN-Kubernetes Benchmark MCP Server

by liqcui
ovnk_benchmark_prometheus_nodes_usage.py17 kB
#!/usr/bin/env python3 """ OVNK Benchmark Prometheus Node Usage Collector Collects CPU, RAM, and network usage metrics for nodes grouped by role """ import asyncio import json import yaml import os from datetime import datetime, timezone from typing import Dict, List, Any, Optional import sys # Import dependencies from tools.ovnk_benchmark_prometheus_basequery import PrometheusBaseQuery, PrometheusQueryError from tools.ovnk_benchmark_prometheus_utility import mcpToolsUtility from ocauth.ovnk_benchmark_auth import OpenShiftAuth class nodeUsageCollector: """Collect node usage metrics from Prometheus with role-based grouping""" def __init__(self, prometheus_client: PrometheusBaseQuery, auth_client: Optional[OpenShiftAuth] = None): self.prometheus_client = prometheus_client self.auth_client = auth_client self.utility = mcpToolsUtility(auth_client) # Default PromQL queries (hardcoded fallback) self.default_queries = { 'cpu_usage': '100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)', 'memory_usage': '(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / 1024 / 1024', 'network_rx': 'sum by (instance) (rate(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|virbr.*|cni.*|flannel.*|br.*|vxlan.*|genev.*|tun.*|tap.*|ovs.*"}[5m]))', 'network_tx': 'sum by (instance) (rate(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|virbr.*|cni.*|flannel.*|br.*|vxlan.*|genev.*|tun.*|tap.*|ovs.*"}[5m]))' } # Load queries from metrics.yaml or use defaults self.queries = self.load_metrics_config() # Metric units self.units = { 'cpu_usage': '%', 'memory_usage': 'MB', 'network_rx': 'bytes/s', 'network_tx': 'bytes/s' } def load_metrics_config(self) -> Dict[str, str]: """Load metrics from metrics.yaml or use hardcoded defaults""" metrics_file = 'metrics.yaml' if os.path.exists(metrics_file): try: with open(metrics_file, 'r') as f: config = yaml.safe_load(f) if isinstance(config, dict) and 'metrics' in config: metrics = config['metrics'] queries = {} # Extract relevant queries for metric_name in ['cpu_usage', 'memory_usage', 'network_rx', 'network_tx']: if metric_name in metrics: queries[metric_name] = metrics[metric_name].get('query', self.default_queries[metric_name]) else: queries[metric_name] = self.default_queries[metric_name] return queries except Exception: pass # Return defaults if config loading fails return self.default_queries async def get_instance_node_mapping(self) -> Dict[str, str]: """Get mapping from Prometheus instance to node name""" try: result = await self.prometheus_client.query_instant('up{job="node-exporter"}') instance_to_node = {} if 'result' in result: for item in result['result']: if 'metric' in item: instance = item['metric'].get('instance', '') node_name = (item['metric'].get('node') or item['metric'].get('nodename') or item['metric'].get('kubernetes_node') or instance.split(':')[0]) if instance and node_name: instance_to_node[instance] = node_name return instance_to_node except Exception: return {} async def query_metric_stats(self, metric_query: str, start_time: str, end_time: str) -> Dict[str, Dict[str, float]]: """Query metric and calculate min/avg/max for each instance""" try: result = await self.prometheus_client.query_range(metric_query, start_time, end_time, '15s') stats = {} if 'result' in result: for item in result['result']: instance = item.get('metric', {}).get('instance', '') if not instance: continue # Extract values and calculate statistics if 'values' in item and item['values']: values = [] for timestamp, value in item['values']: try: val = float(value) if not (val != val): # Check for NaN values.append(val) except (ValueError, TypeError): continue if values: stats[instance] = { 'min': round(min(values), 2), 'avg': round(sum(values) / len(values), 2), 'max': round(max(values), 2) } return stats except Exception: return {} async def debug_role_detection(self) -> Dict[str, Any]: """Debug method to check role detection""" debug_info = { 'node_labels_sources': {}, 'role_assignments': {}, 'label_samples': {} } # Try each source if self.auth_client: k8s_labels = await self.utility.get_node_labels_from_kubernetes() debug_info['node_labels_sources']['kubernetes_api'] = len(k8s_labels) debug_info['label_samples']['kubernetes_api'] = { node: labels for node, labels in list(k8s_labels.items())[:2] } oc_labels = await self.utility.get_node_labels_via_oc() debug_info['node_labels_sources']['oc_cli'] = len(oc_labels) debug_info['label_samples']['oc_cli'] = { node: labels for node, labels in list(oc_labels.items())[:2] } prom_labels = await self.utility.get_node_labels_from_prometheus(self.prometheus_client) debug_info['node_labels_sources']['prometheus'] = len(prom_labels) debug_info['label_samples']['prometheus'] = { node: labels for node, labels in list(prom_labels.items())[:2] } # Get final labels and role assignments final_labels = await self.utility.get_all_node_labels(self.prometheus_client) debug_info['final_node_count'] = len(final_labels) for node_name, labels in final_labels.items(): role = self.utility.determine_node_role(node_name, labels) debug_info['role_assignments'][node_name] = { 'role': role, 'role_labels': {k: v for k, v in labels.items() if 'role' in k.lower()}, 'has_controlplane_label': any('control-plane' in k for k in labels.keys()), 'has_master_label': any('master' in k for k in labels.keys()), 'has_worker_label': any('worker' in k for k in labels.keys()) } return debug_info async def collect_usage_data(self, duration: str = '1h', end_time: Optional[str] = None, debug: bool = False) -> Dict[str, Any]: """Main method to collect node usage data""" # Get time range start_time, end_time = self.prometheus_client.get_time_range_from_duration(duration, end_time) # Get node information and mappings - pass prometheus_client for fallback node_groups = await self.utility.get_node_groups(self.prometheus_client) instance_mapping = await self.get_instance_node_mapping() # Query all metrics all_metrics = {} for metric_name, query in self.queries.items(): all_metrics[metric_name] = await self.query_metric_stats(query, start_time, end_time) # Initialize result structure result = { 'metadata': { 'query_time': datetime.now(timezone.utc).isoformat(), 'duration': duration, 'start_time': start_time, 'end_time': end_time, 'timezone': 'UTC' }, 'groups': { 'controlplane': {'nodes': [], 'summary': {}}, 'worker': {'nodes': [], 'summary': {}}, 'infra': {'nodes': [], 'summary': {}}, 'workload': {'nodes': [], 'summary': {}} }, 'top_usage': { 'cpu': [], 'memory': [] } } # Collect all instances with metrics all_instances = set() for metric_data in all_metrics.values(): all_instances.update(metric_data.keys()) # Process each instance node_usage_data = [] for instance in all_instances: node_name = instance_mapping.get(instance, instance.split(':')[0]) # Find node role from groups using improved matching node_role = 'worker' # default for role, nodes in node_groups.items(): for node in nodes: # Check exact match or short name match if (node['name'] == node_name or node['name'].startswith(node_name + '.') or node_name.startswith(node['name'] + '.')): node_role = role break if node_role != 'worker': break # If still not found, use utility's role detection directly if node_role == 'worker': # Get labels for this specific node all_node_labels = await self.utility.get_all_node_labels(self.prometheus_client) node_labels = all_node_labels.get(node_name, {}) if not node_labels: # Try short name for full_name, labels in all_node_labels.items(): if full_name.startswith(node_name + '.') or node_name.startswith(full_name): node_labels = labels break node_role = self.utility.determine_node_role(node_name, node_labels, instance) # Build node data node_data = { 'instance': instance, 'name': node_name, 'role': node_role, 'metrics': {} } # Add metrics for metric_name in self.queries.keys(): if instance in all_metrics[metric_name]: stats = all_metrics[metric_name][instance].copy() stats['unit'] = self.units[metric_name] node_data['metrics'][metric_name] = stats else: node_data['metrics'][metric_name] = { 'min': None, 'avg': None, 'max': None, 'unit': self.units[metric_name] } node_usage_data.append(node_data) result['groups'][node_role]['nodes'].append(node_data) # Calculate group summaries for role, group in result['groups'].items(): if group['nodes']: group['summary'] = self.calculate_group_summary(group['nodes']) group['count'] = len(group['nodes']) # Get top 5 worker nodes for CPU and memory worker_nodes = [node for node in node_usage_data if node['role'] == 'worker'] # Top 5 CPU usage (by max value) cpu_sorted = sorted( [node for node in worker_nodes if node['metrics']['cpu_usage']['max'] is not None], key=lambda x: x['metrics']['cpu_usage']['max'], reverse=True )[:5] result['top_usage']['cpu'] = [ { 'name': node['name'], 'instance': node['instance'], 'cpu_max': node['metrics']['cpu_usage']['max'], 'cpu_avg': node['metrics']['cpu_usage']['avg'] } for node in cpu_sorted ] # Top 5 memory usage (by max value) memory_sorted = sorted( [node for node in worker_nodes if node['metrics']['memory_usage']['max'] is not None], key=lambda x: x['metrics']['memory_usage']['max'], reverse=True )[:5] result['top_usage']['memory'] = [ { 'name': node['name'], 'instance': node['instance'], 'memory_max': node['metrics']['memory_usage']['max'], 'memory_avg': node['metrics']['memory_usage']['avg'] } for node in memory_sorted ] # Add debug information if requested if debug: debug_data = await self.debug_role_detection() result['debug'] = debug_data return result def calculate_group_summary(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Calculate summary statistics for a group of nodes""" summary = {} for metric_name in self.queries.keys(): summary[metric_name] = {} # Collect values for each statistic for stat in ['min', 'avg', 'max']: values = [] for node in nodes: if metric_name in node['metrics']: value = node['metrics'][metric_name].get(stat) if value is not None: values.append(value) # Calculate group statistic if values: if stat == 'min': summary[metric_name][stat] = round(min(values), 2) elif stat == 'max': summary[metric_name][stat] = round(max(values), 2) else: # avg summary[metric_name][stat] = round(sum(values) / len(values), 2) else: summary[metric_name][stat] = None summary[metric_name]['unit'] = self.units[metric_name] return summary async def main(): """Main function for testing""" import argparse parser = argparse.ArgumentParser(description='Collect node usage metrics') parser.add_argument('--prometheus-url', help='Prometheus server URL') parser.add_argument('--token', help='Bearer token for authentication') parser.add_argument('--kubeconfig', help='Path to kubeconfig file') parser.add_argument('--duration', default='1h', help='Duration to query') parser.add_argument('--end-time', help='End time (ISO format)') parser.add_argument('--debug', action='store_true', help='Enable debug output for role detection') parser.add_argument('--pretty', action='store_true', help='Pretty print JSON') args = parser.parse_args() # Initialize authentication auth_client = None try: auth_client = OpenShiftAuth(kubeconfig_path=args.kubeconfig) await auth_client.initialize() except Exception: pass # Determine Prometheus configuration prometheus_url = args.prometheus_url or (auth_client.prometheus_url if auth_client else None) token = args.token or (auth_client.prometheus_token if auth_client else None) if not prometheus_url: print("Error: Prometheus URL required", file=sys.stderr) return 1 # Create collector and run prometheus_client = PrometheusBaseQuery(prometheus_url, token) try: async with prometheus_client: if not await prometheus_client.test_connection(): print("Error: Cannot connect to Prometheus", file=sys.stderr) return 1 collector = nodeUsageCollector(prometheus_client, auth_client) result = await collector.collect_usage_data(args.duration, args.end_time, args.debug) print(json.dumps(result, indent=2 if args.pretty else None)) return 0 except Exception as e: print(f"Error: {str(e)}", file=sys.stderr) return 1 if __name__ == '__main__': exit_code = asyncio.run(main()) sys.exit(exit_code)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liqcui/ovnk-benchmark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server