Skip to main content
Glama
liqcui

OpenShift OVN-Kubernetes Benchmark MCP Server

by liqcui
ovnk_benchmark_prometheus_kubeapi.py29.1 kB
""" Kubernetes API Server Metrics Module Queries and processes Kubernetes API server performance metrics """ import asyncio from datetime import datetime, timezone import math from typing import Dict, List, Any, Optional from .ovnk_benchmark_prometheus_basequery import PrometheusBaseQuery class kubeAPICollector: """Handles Kubernetes API server metrics queries""" def __init__(self, prometheus_client: PrometheusBaseQuery): self.prometheus_client = prometheus_client async def get_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """ Get Kubernetes API server metrics Args: duration: Query duration (e.g., '5m', '1h') start_time: Optional start time in ISO format end_time: Optional end time in ISO format Returns: Dictionary containing API server metrics """ result = { 'timestamp': datetime.now(timezone.utc).isoformat(), 'duration': duration, 'metrics': {}, 'summary': {}, 'errors': [] } try: # Get all metrics concurrently tasks = [ self.get_readonly_latency_metrics(duration, start_time, end_time), self.get_mutating_latency_metrics(duration, start_time, end_time), self.get_basic_api_metrics(duration, start_time, end_time), self.get_watch_events_metrics(duration, start_time, end_time), self.get_cache_list_metrics(duration, start_time, end_time), self.get_watch_cache_received_metrics(duration, start_time, end_time), self.get_watch_cache_dispatched_metrics(duration, start_time, end_time), self.get_rest_client_metrics(duration, start_time, end_time), self.get_ovnkube_controller_metrics(duration, start_time, end_time), self.get_etcd_requests_metrics(duration, start_time, end_time) ] metrics_results = await asyncio.gather(*tasks, return_exceptions=True) metric_names = [ 'readonly_latency', 'mutating_latency', 'basic_api', 'watch_events', 'cache_list', 'watch_cache_received', 'watch_cache_dispatched', 'rest_client', 'ovnkube_controller', 'etcd_requests' ] for i, metric_result in enumerate(metrics_results): metric_name = metric_names[i] if isinstance(metric_result, Exception): result['errors'].append(f"{metric_name}: {str(metric_result)}") else: result['metrics'][metric_name] = metric_result # Generate comprehensive summary result['summary'] = self._generate_comprehensive_summary(result['metrics']) except Exception as e: result['errors'].append(f"Failed to get API server metrics: {str(e)}") return result async def get_readonly_latency_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get read-only API calls latency metrics with top 5 avg and max values""" elapsed_placeholder = f"[{duration}:]" queries = { 'avg_ro_apicalls_latency': f'''avg_over_time(histogram_quantile(0.99, sum(irate(apiserver_request_duration_seconds_bucket{{apiserver="kube-apiserver", verb=~"LIST|GET", subresource!~"log|exec|portforward|attach|proxy"}}[2m])) by (le, resource, verb, scope)){elapsed_placeholder}) > 0''', 'max_ro_apicalls_latency': f'''max_over_time(histogram_quantile(0.99, sum(irate(apiserver_request_duration_seconds_bucket{{apiserver="kube-apiserver", verb=~"LIST|GET", subresource!~"log|exec|portforward|attach|proxy"}}[2m])) by (le, resource, verb, scope)){elapsed_placeholder}) > 0''' } return await self._execute_and_process_queries(queries, 'readonly_latency', start_time, end_time) async def get_mutating_latency_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get mutating API calls latency metrics with top 5 avg and max values""" elapsed_placeholder = f"[{duration}:]" queries = { 'avg_mutating_apicalls_latency': f'''avg_over_time(histogram_quantile(0.99, sum(irate(apiserver_request_duration_seconds_bucket{{apiserver="kube-apiserver", verb=~"POST|PUT|DELETE|PATCH", subresource!~"log|exec|portforward|attach|proxy"}}[2m])) by (le, resource, verb, scope)){elapsed_placeholder}) > 0''', 'max_mutating_apicalls_latency': f'''max_over_time(histogram_quantile(0.99, sum(irate(apiserver_request_duration_seconds_bucket{{apiserver="kube-apiserver", verb=~"POST|PUT|DELETE|PATCH", subresource!~"log|exec|portforward|attach|proxy"}}[2m])) by (le, resource, verb, scope)){elapsed_placeholder}) > 0''' } return await self._execute_and_process_queries(queries, 'mutating_latency', start_time, end_time) async def get_basic_api_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get basic API server metrics""" queries = { 'api_request_rate': 'sum(rate(apiserver_request_total{apiserver="kube-apiserver"}[5m])) by (verb, resource)', 'api_request_errors': 'sum(rate(apiserver_request_total{apiserver="kube-apiserver", code!~"2.."}[5m])) by (verb, resource, code)', 'api_server_current_inflight_requests': 'sum(apiserver_current_inflight_requests{apiserver="kube-apiserver"}) by (request_kind)', 'etcd_request_duration': 'histogram_quantile(0.99, sum(rate(etcd_request_duration_seconds_bucket{job=~".*etcd.*"}[5m])) by (le, operation, type))' } return await self._execute_and_process_queries(queries, 'basic_api', start_time, end_time) async def get_watch_events_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get top 5 watch events metrics with instance:group:kind format""" query = 'rate(apiserver_watch_events_total[5m])' result = await self._execute_single_query(query, start_time, end_time) return { 'metric_type': 'watch_events', 'query': query, 'top5_avg': self._get_top5_values(result, 'avg', lambda m: f"{m.get('instance', 'unknown')}:{m.get('group', 'unknown')}:{m.get('kind', 'unknown')}"), 'top5_max': self._get_top5_values(result, 'max', lambda m: f"{m.get('instance', 'unknown')}:{m.get('group', 'unknown')}:{m.get('kind', 'unknown')}"), 'unit': 'events/second' } async def get_cache_list_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get top 5 cache list metrics with instance:resource_prefix format""" query = 'topk(5, rate(apiserver_cache_list_total{job="apiserver"}[5m]))' result = await self._execute_single_query(query, start_time, end_time) return { 'metric_type': 'cache_list', 'query': query, 'top5_avg': self._get_top5_values(result, 'avg', lambda m: f"{m.get('instance', 'unknown')}:{m.get('resource_prefix', 'unknown')}"), 'top5_max': self._get_top5_values(result, 'max', lambda m: f"{m.get('instance', 'unknown')}:{m.get('resource_prefix', 'unknown')}"), 'unit': 'operations/second' } async def get_watch_cache_received_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get top 5 watch cache received metrics with instance:resource format""" query = 'topk(5, rate(apiserver_watch_cache_events_received_total[5m]))' result = await self._execute_single_query(query, start_time, end_time) return { 'metric_type': 'watch_cache_received', 'query': query, 'top5_avg': self._get_top5_values(result, 'avg', lambda m: f"{m.get('instance', 'unknown')}:{m.get('resource', 'unknown')}"), 'top5_max': self._get_top5_values(result, 'max', lambda m: f"{m.get('instance', 'unknown')}:{m.get('resource', 'unknown')}"), 'unit': 'events/second' } async def get_watch_cache_dispatched_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get top 5 watch cache dispatched metrics with instance:resource format""" query = 'rate(apiserver_watch_cache_events_dispatched_total[5m])' result = await self._execute_single_query(query, start_time, end_time) return { 'metric_type': 'watch_cache_dispatched', 'query': query, 'top5_avg': self._get_top5_values(result, 'avg', lambda m: f"{m.get('instance', 'unknown')}:{m.get('resource', 'unknown')}"), 'top5_max': self._get_top5_values(result, 'max', lambda m: f"{m.get('instance', 'unknown')}:{m.get('resource', 'unknown')}"), 'unit': 'events/second' } async def get_rest_client_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get top 5 REST client request duration metrics with service:verb format""" query = 'histogram_quantile(0.99, sum by(le, service, verb) (rate(rest_client_request_duration_seconds_bucket{job=~"kube-controller-manager|scheduler|check-endpoints|kubelet"}[5m])))' result = await self._execute_single_query(query, start_time, end_time) return { 'metric_type': 'rest_client_duration', 'query': query, 'top5_avg': self._get_top5_values(result, 'avg', lambda m: f"{m.get('service', 'unknown')}:{m.get('verb', 'unknown')}"), 'top5_max': self._get_top5_values(result, 'max', lambda m: f"{m.get('service', 'unknown')}:{m.get('verb', 'unknown')}"), 'unit': 'seconds' } async def get_ovnkube_controller_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get top 5 OVNKube controller resource update metrics with instance:event:name format""" query = 'topk(10, rate(ovnkube_controller_resource_update_total[5m]))' result = await self._execute_single_query(query, start_time, end_time) return { 'metric_type': 'ovnkube_controller_updates', 'query': query, 'top5_avg': self._get_top5_values(result, 'avg', lambda m: f"{m.get('instance', 'unknown')}:{m.get('event', 'unknown')}:{m.get('name', 'unknown')}"), 'top5_max': self._get_top5_values(result, 'max', lambda m: f"{m.get('instance', 'unknown')}:{m.get('event', 'unknown')}:{m.get('name', 'unknown')}"), 'unit': 'updates/second' } async def get_etcd_requests_metrics(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Get top 5 etcd requests metrics with operation:type format""" query = 'rate(etcd_requests_total[5m])' result = await self._execute_single_query(query, start_time, end_time) return { 'metric_type': 'etcd_requests', 'query': query, 'top5_avg': self._get_top5_values(result, 'avg', lambda m: f"{m.get('operation', 'unknown')}:{m.get('type', 'unknown')}"), 'top5_max': self._get_top5_values(result, 'max', lambda m: f"{m.get('operation', 'unknown')}:{m.get('type', 'unknown')}"), 'unit': 'requests/second' } async def _execute_single_query(self, query: str, start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Execute a single query with proper time handling""" try: if start_time and end_time: return await self.prometheus_client.query_range(query, start_time, end_time, step='15s') else: return await self.prometheus_client.query_instant(query) except Exception as e: return {'error': str(e)} async def _execute_and_process_queries(self, queries: Dict[str, str], metric_type: str, start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """Execute multiple queries and process results with top 5 values""" try: if start_time and end_time: metrics_data = await self.prometheus_client.query_multiple_range( queries, start_time, end_time, step='15s' ) else: metrics_data = await self.prometheus_client.query_multiple_instant(queries) result = { 'metric_type': metric_type, 'queries': queries, 'top5_avg': {}, 'top5_max': {}, 'unit': self._get_metric_unit_by_type(metric_type) } for query_name, data in metrics_data.items(): if 'error' in data: continue # Get top 5 values for this query if 'avg' in query_name: result['top5_avg'][query_name] = self._get_top5_values(data, 'avg') elif 'max' in query_name: result['top5_max'][query_name] = self._get_top5_values(data, 'max') return result except Exception as e: return {'error': str(e), 'metric_type': metric_type} def _get_top5_values(self, data: Dict[str, Any], stat_type: str = 'avg', label_formatter: Optional[callable] = None) -> List[Dict[str, Any]]: """Get top 5 values from query result""" if 'result' not in data: return [] values_list = [] for result_item in data['result']: labels = result_item.get('metric', {}) # Calculate value based on data type if 'value' in result_item: # Instant query _, value = result_item['value'] numeric_value = self._to_number(value) if numeric_value is None: continue elif 'values' in result_item: # Range query - calculate avg or max values = [] for _, val in result_item['values']: num = self._to_number(val) if num is not None: values.append(num) if not values: continue numeric_value = sum(values) / len(values) if stat_type == 'avg' else max(values) else: continue # Format label if formatter provided if label_formatter: label_key = label_formatter(labels) else: # Default label formatting label_parts = [] for key in ['resource', 'verb', 'scope', 'instance', 'group', 'kind', 'operation', 'type', 'service', 'event', 'name']: if key in labels: label_parts.append(labels[key]) label_key = ':'.join(label_parts) if label_parts else 'unknown' values_list.append({ 'label': label_key, 'value': round(numeric_value, 6), 'raw_labels': labels }) # Sort by value descending and take top 5 values_list.sort(key=lambda x: x['value'], reverse=True) return values_list[:5] def _get_metric_unit_by_type(self, metric_type: str) -> str: """Get unit for metric type""" unit_mapping = { 'readonly_latency': 'seconds', 'mutating_latency': 'seconds', 'basic_api': 'mixed', 'watch_events': 'events/second', 'cache_list': 'operations/second', 'watch_cache_received': 'events/second', 'watch_cache_dispatched': 'events/second', 'rest_client': 'seconds', 'ovnkube_controller': 'updates/second', 'etcd_requests': 'requests/second' } return unit_mapping.get(metric_type, 'count') def _process_metric_data(self, metric_name: str, data: Dict[str, Any]) -> Dict[str, Any]: """Process individual metric data""" processed = { 'metric_name': metric_name, 'values': [], 'statistics': {}, 'unit': self._get_metric_unit(metric_name) } if 'result' not in data: processed['error'] = 'No result data' return processed all_values = [] for result_item in data['result']: item_data = { 'labels': {k: v for k, v in result_item.get('metric', {}).items() if k != '__name__'}, 'values': [] } # Handle instant query results if 'value' in result_item: timestamp, value = result_item['value'] numeric_value = self._to_number(value) if numeric_value is None: continue item_data['values'].append({ 'timestamp': float(timestamp), 'value': numeric_value }) all_values.append(numeric_value) # Handle range query results elif 'values' in result_item: for timestamp, value in result_item['values']: numeric_value = self._to_number(value) if numeric_value is None: continue item_data['values'].append({ 'timestamp': float(timestamp), 'value': numeric_value }) all_values.append(numeric_value) if item_data['values']: processed['values'].append(item_data) # Calculate statistics if all_values: finite_values = [v for v in all_values if self._is_finite(v)] if finite_values: processed['statistics'] = { 'count': len(finite_values), 'min': min(finite_values), 'max': max(finite_values), 'avg': sum(finite_values) / len(finite_values), 'p50': self._percentile(finite_values, 50), 'p90': self._percentile(finite_values, 90), 'p99': self._percentile(finite_values, 99) } return processed def _get_metric_unit(self, metric_name: str) -> str: """Get the unit for a metric""" if 'latency' in metric_name or 'duration' in metric_name: return 'seconds' elif 'rate' in metric_name or 'request' in metric_name: return 'requests/second' elif 'inflight' in metric_name: return 'requests' else: return 'count' def _percentile(self, values: List[float], percentile: int) -> float: """Calculate percentile of values""" if not values: return 0.0 # Filter out non-finite values to avoid NaN in results sorted_values = sorted([v for v in values if self._is_finite(v)]) if not sorted_values: return 0.0 k = (len(sorted_values) - 1) * percentile / 100 f = int(k) c = k - f if f == len(sorted_values) - 1: return sorted_values[f] else: return sorted_values[f] * (1 - c) + sorted_values[f + 1] * c def _to_number(self, value: Any) -> Optional[float]: """Convert input to a finite float; return None for NaN/Inf or invalid.""" try: num = float(value) if self._is_finite(num): return num return None except (ValueError, TypeError): return None def _is_finite(self, num: float) -> bool: return not math.isnan(num) and not math.isinf(num) def _generate_comprehensive_summary(self, metrics: Dict[str, Any]) -> Dict[str, Any]: """Generate comprehensive summary of all API server metrics""" summary = { 'total_metric_types': len(metrics), 'performance_overview': {}, 'top_issues': [], 'health_scores': {}, 'overall_health': 100 } # Analyze readonly latency if 'readonly_latency' in metrics: readonly_data = metrics['readonly_latency'] avg_top = readonly_data.get('top5_avg', {}).get('avg_ro_apicalls_latency', []) max_top = readonly_data.get('top5_max', {}).get('max_ro_apicalls_latency', []) if avg_top or max_top: highest_avg = avg_top[0]['value'] if avg_top else 0 highest_max = max_top[0]['value'] if max_top else 0 summary['performance_overview']['readonly_latency'] = { 'highest_avg_p99': round(highest_avg, 4), 'highest_max_p99': round(highest_max, 4), 'status': self._evaluate_latency_status(max(highest_avg, highest_max)) } if highest_max > 1.0: summary['overall_health'] -= 20 summary['top_issues'].append(f"High readonly latency: {highest_max:.3f}s") # Analyze mutating latency if 'mutating_latency' in metrics: mutating_data = metrics['mutating_latency'] avg_top = mutating_data.get('top5_avg', {}).get('avg_mutating_apicalls_latency', []) max_top = mutating_data.get('top5_max', {}).get('max_mutating_apicalls_latency', []) if avg_top or max_top: highest_avg = avg_top[0]['value'] if avg_top else 0 highest_max = max_top[0]['value'] if max_top else 0 summary['performance_overview']['mutating_latency'] = { 'highest_avg_p99': round(highest_avg, 4), 'highest_max_p99': round(highest_max, 4), 'status': self._evaluate_latency_status(max(highest_avg, highest_max)) } if highest_max > 2.0: summary['overall_health'] -= 25 summary['top_issues'].append(f"High mutating latency: {highest_max:.3f}s") # Analyze watch events if 'watch_events' in metrics: watch_data = metrics['watch_events'] top_avg = watch_data.get('top5_avg', []) top_max = watch_data.get('top5_max', []) if top_avg or top_max: highest_rate = max( top_avg[0]['value'] if top_avg else 0, top_max[0]['value'] if top_max else 0 ) summary['performance_overview']['watch_events'] = { 'highest_rate': round(highest_rate, 2), 'top_consumer': top_max[0]['label'] if top_max else 'unknown' } # Analyze etcd requests if 'etcd_requests' in metrics: etcd_data = metrics['etcd_requests'] top_avg = etcd_data.get('top5_avg', []) top_max = etcd_data.get('top5_max', []) if top_avg or top_max: highest_rate = max( top_avg[0]['value'] if top_avg else 0, top_max[0]['value'] if top_max else 0 ) summary['performance_overview']['etcd_requests'] = { 'highest_rate': round(highest_rate, 2), 'top_operation': top_max[0]['label'] if top_max else 'unknown' } # Calculate individual health scores for metric_type in ['readonly_latency', 'mutating_latency', 'watch_events', 'etcd_requests']: if metric_type in summary['performance_overview']: summary['health_scores'][metric_type] = self._calculate_metric_health_score( metric_type, summary['performance_overview'][metric_type] ) # Overall health assessment if summary['overall_health'] >= 90: summary['overall_status'] = 'excellent' elif summary['overall_health'] >= 80: summary['overall_status'] = 'good' elif summary['overall_health'] >= 70: summary['overall_status'] = 'warning' else: summary['overall_status'] = 'critical' return summary def _calculate_metric_health_score(self, metric_type: str, metric_data: Dict[str, Any]) -> int: """Calculate health score for individual metric type""" if metric_type in ['readonly_latency', 'mutating_latency']: max_latency = metric_data.get('highest_max_p99', 0) if max_latency <= 0.1: return 100 elif max_latency <= 0.5: return 85 elif max_latency <= 1.0: return 70 elif max_latency <= 2.0: return 50 else: return 25 else: # For rate-based metrics, assume healthy if data exists return 85 if metric_data.get('highest_rate', 0) > 0 else 50 def _evaluate_latency_status(self, latency: float) -> str: """Evaluate latency status""" if latency <= 0.1: # <= 100ms return 'excellent' elif latency <= 0.5: # <= 500ms return 'good' elif latency <= 1.0: # <= 1s return 'warning' else: return 'critical' async def get_comprehensive_metrics_summary(self, duration: str = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None) -> Dict[str, Any]: """ Get comprehensive metrics summary with all metric types Returns: Complete JSON summary with all metrics organized by type """ try: # Get all metrics all_metrics = await self.get_metrics(duration, start_time, end_time) # Assemble comprehensive summary comprehensive_summary = { 'timestamp': datetime.now(timezone.utc).isoformat(), 'duration': duration, 'query_period': { 'start_time': start_time, 'end_time': end_time }, 'api_latency': { 'readonly': all_metrics['metrics'].get('readonly_latency', {}), 'mutating': all_metrics['metrics'].get('mutating_latency', {}) }, 'api_activity': { 'watch_events': all_metrics['metrics'].get('watch_events', {}), 'cache_operations': all_metrics['metrics'].get('cache_list', {}), 'watch_cache': { 'received': all_metrics['metrics'].get('watch_cache_received', {}), 'dispatched': all_metrics['metrics'].get('watch_cache_dispatched', {}) } }, 'component_metrics': { 'rest_client': all_metrics['metrics'].get('rest_client', {}), 'ovnkube_controller': all_metrics['metrics'].get('ovnkube_controller', {}), 'etcd': all_metrics['metrics'].get('etcd_requests', {}) }, 'health_summary': all_metrics.get('summary', {}), 'errors': all_metrics.get('errors', []) } return comprehensive_summary except Exception as e: return { 'timestamp': datetime.now(timezone.utc).isoformat(), 'error': f"Failed to get comprehensive metrics: {str(e)}", 'duration': duration, 'query_period': { 'start_time': start_time, 'end_time': end_time } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liqcui/ovnk-benchmark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server