Skip to main content
Glama
liqcui

OpenShift OVN-Kubernetes Benchmark MCP Server

by liqcui
ovnk_benchmark_analysis_cluster_stat.py21.7 kB
#!/usr/bin/env python3 """ OVNK Benchmark Cluster Status Analysis Module Analyzes cluster information and provides performance insights """ import json import logging from typing import Dict, List, Any, Optional from datetime import datetime, timezone from dataclasses import dataclass, asdict from analysis.ovnk_benchmark_performance_utility import ( BasePerformanceAnalyzer, PerformanceAlert, AnalysisMetadata, ClusterHealth, AlertLevel, ResourceType, PerformanceLevel, RecommendationEngine, HealthScoreCalculator, create_performance_alert ) logger = logging.getLogger(__name__) @dataclass class NodeGroupSummary: """Summary for a group of nodes (master/infra/worker)""" node_type: str total_nodes: int ready_nodes: int not_ready_nodes: int scheduling_disabled: int health_score: float resource_summary: Dict[str, Any] problematic_nodes: List[str] @dataclass class ClusterStatusAnalysis: """Complete cluster status analysis result""" metadata: AnalysisMetadata cluster_health: ClusterHealth node_groups: Dict[str, NodeGroupSummary] resource_utilization: Dict[str, Any] network_policy_analysis: Dict[str, Any] cluster_operators_summary: Dict[str, Any] mcp_summary: Dict[str, Any] alerts: List[PerformanceAlert] recommendations: List[str] class ClusterStatAnalyzer(BasePerformanceAnalyzer): """Analyzer for OpenShift cluster status and health""" def __init__(self): super().__init__("cluster_status") def analyze_metrics_data(self, cluster_data: Dict[str, Any]) -> Dict[str, Any]: """Main analysis method for cluster information""" try: # Start timing start_time = datetime.now(timezone.utc) # Analyze different aspects of the cluster node_analysis = self._analyze_node_groups(cluster_data) resource_analysis = self._analyze_resource_utilization(cluster_data) network_analysis = self._analyze_network_policies(cluster_data) operator_analysis = self._analyze_cluster_operators(cluster_data) mcp_analysis = self._analyze_mcp_status(cluster_data) # Generate alerts alerts = self._generate_alerts(node_analysis, operator_analysis, mcp_analysis) # Calculate cluster health total_components = cluster_data.get('total_nodes', 0) + len(cluster_data.get('unavailable_cluster_operators', [])) cluster_health = self.calculate_cluster_health_score(alerts, total_components) # Generate recommendations recommendations = self._generate_recommendations( node_analysis, operator_analysis, mcp_analysis, alerts ) # Calculate duration duration = str(datetime.now(timezone.utc) - start_time) # Create metadata metadata = self.generate_metadata( collection_type="cluster_status", total_items=cluster_data.get('total_nodes', 0), duration=duration ) # Create complete analysis analysis = ClusterStatusAnalysis( metadata=metadata, cluster_health=cluster_health, node_groups=node_analysis, resource_utilization=resource_analysis, network_policy_analysis=network_analysis, cluster_operators_summary=operator_analysis, mcp_summary=mcp_analysis, alerts=alerts, recommendations=recommendations ) return asdict(analysis) except Exception as e: logger.error(f"Analysis failed: {e}") raise def _analyze_node_groups(self, cluster_data: Dict[str, Any]) -> Dict[str, NodeGroupSummary]: """Analyze node groups (master, infra, worker)""" node_groups = {} for node_type in ['master_nodes', 'infra_nodes', 'worker_nodes']: nodes = cluster_data.get(node_type, []) if not nodes: continue group_type = node_type.replace('_nodes', '') # Count node statuses ready_count = 0 not_ready_count = 0 scheduling_disabled_count = 0 problematic_nodes = [] total_cpu_capacity = 0 total_memory_mb = 0 for node in nodes: status = node.get('ready_status', '') if 'Ready' in status and 'SchedulingDisabled' not in status: ready_count += 1 elif 'NotReady' in status: not_ready_count += 1 problematic_nodes.append(node.get('name', 'unknown')) if 'SchedulingDisabled' in status: scheduling_disabled_count += 1 # Parse resource capacity cpu_str = node.get('cpu_capacity', '0') try: cpu_cores = float(cpu_str.replace('m', '')) / 1000 if 'm' in cpu_str else float(cpu_str) total_cpu_capacity += cpu_cores except: pass memory_str = node.get('memory_capacity', '0') try: if 'Ki' in memory_str: memory_kb = float(memory_str.replace('Ki', '')) total_memory_mb += memory_kb / 1024 elif 'Mi' in memory_str: total_memory_mb += float(memory_str.replace('Mi', '')) elif 'Gi' in memory_str: total_memory_mb += float(memory_str.replace('Gi', '')) * 1024 except: pass # Calculate health score for this group health_score = self._calculate_node_group_health( len(nodes), ready_count, not_ready_count, scheduling_disabled_count ) node_groups[group_type] = NodeGroupSummary( node_type=group_type, total_nodes=len(nodes), ready_nodes=ready_count, not_ready_nodes=not_ready_count, scheduling_disabled=scheduling_disabled_count, health_score=health_score, resource_summary={ 'total_cpu_cores': round(total_cpu_capacity, 2), 'total_memory_gb': round(total_memory_mb / 1024, 2), 'avg_cpu_per_node': round(total_cpu_capacity / len(nodes), 2) if nodes else 0, 'avg_memory_gb_per_node': round(total_memory_mb / 1024 / len(nodes), 2) if nodes else 0 }, problematic_nodes=problematic_nodes ) return node_groups def _calculate_node_group_health(self, total: int, ready: int, not_ready: int, disabled: int) -> float: """Calculate health score for a node group""" if total == 0: return 0.0 # Base score from ready nodes base_score = (ready / total) * 100 # Penalties for problematic nodes not_ready_penalty = (not_ready / total) * 50 # Not ready is severe disabled_penalty = (disabled / total) * 20 # Scheduling disabled is moderate score = base_score - not_ready_penalty - disabled_penalty return max(0.0, min(100.0, score)) def _analyze_resource_utilization(self, cluster_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze cluster resource utilization patterns""" return { 'pod_density': round(cluster_data.get('pods_count', 0) / max(cluster_data.get('total_nodes', 1), 1), 2), 'namespace_distribution': cluster_data.get('namespaces_count', 0), 'service_to_pod_ratio': round( cluster_data.get('services_count', 0) / max(cluster_data.get('pods_count', 1), 1), 3 ), 'secret_density': round(cluster_data.get('secrets_count', 0) / max(cluster_data.get('namespaces_count', 1), 1), 2), 'configmap_density': round(cluster_data.get('configmaps_count', 0) / max(cluster_data.get('namespaces_count', 1), 1), 2) } def _analyze_network_policies(self, cluster_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze network policy configuration and impact""" total_pods = cluster_data.get('pods_count', 0) networkpolicies = cluster_data.get('networkpolicies_count', 0) adminnetworkpolicies = cluster_data.get('adminnetworkpolicies_count', 0) egressfirewalls = cluster_data.get('egressfirewalls_count', 0) egressips = cluster_data.get('egressips_count', 0) udn = cluster_data.get('udn_count', 0) total_network_resources = networkpolicies + adminnetworkpolicies + egressfirewalls + egressips + udn return { 'total_network_resources': total_network_resources, 'policy_density': round(networkpolicies / max(total_pods, 1), 4), 'network_complexity_score': self._calculate_network_complexity( networkpolicies, adminnetworkpolicies, egressfirewalls, egressips, udn ), 'resource_breakdown': { 'networkpolicies': networkpolicies, 'adminnetworkpolicies': adminnetworkpolicies, 'egressfirewalls': egressfirewalls, 'egressips': egressips, 'user_defined_networks': udn } } def _calculate_network_complexity(self, np: int, anp: int, ef: int, eip: int, udn: int) -> float: """Calculate network complexity score (0-100)""" # Weight different types of network resources by complexity weights = {'np': 1.0, 'anp': 3.0, 'ef': 2.0, 'eip': 1.5, 'udn': 2.5} weighted_score = (np * weights['np'] + anp * weights['anp'] + ef * weights['ef'] + eip * weights['eip'] + udn * weights['udn']) # Normalize to 0-100 scale (arbitrary scaling) normalized_score = min(weighted_score / 10, 100) return round(normalized_score, 2) def _analyze_cluster_operators(self, cluster_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze cluster operators status""" unavailable_operators = cluster_data.get('unavailable_cluster_operators', []) # Estimate total operators (common OpenShift deployment has ~30-35 operators) estimated_total = max(30, len(unavailable_operators) + 25) available_count = estimated_total - len(unavailable_operators) return { 'total_operators_estimated': estimated_total, 'available_operators': available_count, 'unavailable_operators': len(unavailable_operators), 'availability_percentage': round((available_count / estimated_total) * 100, 1), 'unavailable_operator_names': unavailable_operators, 'health_status': 'healthy' if len(unavailable_operators) == 0 else 'degraded' if len(unavailable_operators) <= 2 else 'critical' } def _analyze_mcp_status(self, cluster_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze machine config pool status""" mcp_status = cluster_data.get('mcp_status', {}) status_counts = {} for pool_name, status in mcp_status.items(): status_counts[status] = status_counts.get(status, 0) + 1 total_pools = len(mcp_status) degraded_pools = status_counts.get('Degraded', 0) updating_pools = status_counts.get('Updating', 0) updated_pools = status_counts.get('Updated', 0) health_score = 100.0 if total_pools > 0: health_score = ((updated_pools / total_pools) * 100) - (degraded_pools * 30) - (updating_pools * 10) health_score = max(0.0, min(100.0, health_score)) return { 'total_pools': total_pools, 'status_distribution': status_counts, 'health_score': round(health_score, 2), 'health_status': 'healthy' if degraded_pools == 0 else 'degraded' if degraded_pools <= 1 else 'critical' } def _generate_alerts(self, node_analysis: Dict[str, NodeGroupSummary], operator_analysis: Dict[str, Any], mcp_analysis: Dict[str, Any]) -> List[PerformanceAlert]: """Generate performance alerts based on analysis""" alerts = [] # Node-related alerts for group_type, group_summary in node_analysis.items(): if group_summary.not_ready_nodes > 0: severity = AlertLevel.CRITICAL if group_type == 'master' else AlertLevel.HIGH alerts.append(create_performance_alert( severity=severity.value, resource_type='cpu', # Generic resource type component_name=f"{group_type}_nodes", message=f"{group_summary.not_ready_nodes} {group_type} node(s) not ready", current_value=group_summary.not_ready_nodes, threshold_value=0, unit="nodes" )) if group_summary.health_score < 80: alerts.append(create_performance_alert( severity=AlertLevel.MEDIUM.value, resource_type='cpu', component_name=f"{group_type}_nodes", message=f"{group_type} node group health score below threshold", current_value=group_summary.health_score, threshold_value=80, unit="score" )) # Cluster operator alerts if operator_analysis['unavailable_operators'] > 0: severity = AlertLevel.CRITICAL if operator_analysis['unavailable_operators'] > 2 else AlertLevel.HIGH alerts.append(create_performance_alert( severity=severity.value, resource_type='cpu', component_name="cluster_operators", message=f"{operator_analysis['unavailable_operators']} cluster operator(s) unavailable", current_value=operator_analysis['unavailable_operators'], threshold_value=0, unit="operators" )) # MCP alerts if mcp_analysis['health_status'] in ['degraded', 'critical']: alerts.append(create_performance_alert( severity=AlertLevel.HIGH.value if mcp_analysis['health_status'] == 'degraded' else AlertLevel.CRITICAL.value, resource_type='cpu', component_name="machine_config_pools", message=f"Machine config pools in {mcp_analysis['health_status']} state", current_value=mcp_analysis['health_score'], threshold_value=80, unit="score" )) return alerts def _generate_recommendations(self, node_analysis: Dict[str, NodeGroupSummary], operator_analysis: Dict[str, Any], mcp_analysis: Dict[str, Any], alerts: List[PerformanceAlert]) -> List[str]: """Generate actionable recommendations""" recommendations = [] # Critical issues first critical_alerts = [a for a in alerts if a.severity == AlertLevel.CRITICAL] if critical_alerts: recommendations.append("URGENT: Critical cluster issues detected requiring immediate attention") # Node-specific recommendations for group_type, group_summary in node_analysis.items(): if group_summary.not_ready_nodes > 0: recommendations.append(f"Investigate and resolve {group_summary.not_ready_nodes} not ready {group_type} node(s)") if group_summary.health_score < 60: recommendations.append(f"Review {group_type} node group health - score: {group_summary.health_score}%") # Cluster operator recommendations if operator_analysis['unavailable_operators'] > 0: recommendations.append(f"Check and restore {operator_analysis['unavailable_operators']} unavailable cluster operator(s)") if operator_analysis['unavailable_operator_names']: recommendations.append(f"Focus on operators: {', '.join(operator_analysis['unavailable_operator_names'][:3])}") # MCP recommendations if mcp_analysis['health_status'] != 'healthy': recommendations.append("Review machine config pool status and resolve any configuration issues") # Resource utilization recommendations if not recommendations: recommendations.append("Cluster appears healthy - continue monitoring key metrics") return recommendations def generate_report(self, analysis_result: Dict[str, Any]) -> str: """Generate human-readable analysis report""" lines = [] # Header metadata = analysis_result.get('metadata', {}) lines.extend([ "=" * 80, "OPENSHIFT CLUSTER STATUS ANALYSIS REPORT", "=" * 80, f"Analysis Timestamp: {metadata.get('analysis_timestamp', 'unknown')}", f"Total Items Analyzed: {metadata.get('total_items_analyzed', 0)}", f"Analysis Duration: {metadata.get('duration', 'unknown')}", "" ]) # Cluster health summary health = analysis_result.get('cluster_health', {}) lines.extend([ "CLUSTER HEALTH SUMMARY", "-" * 40, f"Overall Score: {health.get('overall_score', 0)}/100", f"Health Level: {health.get('health_level', 'unknown').title()}", f"Critical Issues: {health.get('critical_issues_count', 0)}", f"Warning Issues: {health.get('warning_issues_count', 0)}", f"Healthy Components: {health.get('healthy_items_count', 0)}", "" ]) # Node groups analysis node_groups = analysis_result.get('node_groups', {}) lines.extend(["NODE GROUPS ANALYSIS", "-" * 40]) for group_type, group_data in node_groups.items(): lines.extend([ f"{group_type.upper()} NODES:", f" Total: {group_data.get('total_nodes', 0)}", f" Ready: {group_data.get('ready_nodes', 0)}", f" Not Ready: {group_data.get('not_ready_nodes', 0)}", f" Health Score: {group_data.get('health_score', 0):.1f}%", f" CPU Cores: {group_data.get('resource_summary', {}).get('total_cpu_cores', 0)}", f" Memory (GB): {group_data.get('resource_summary', {}).get('total_memory_gb', 0)}", "" ]) # Alerts section alerts = analysis_result.get('alerts', []) if alerts: lines.extend(["PERFORMANCE ALERTS", "-" * 40]) for alert in alerts[:10]: # Show top 10 alerts lines.append(f" [{alert.get('severity', 'unknown').upper()}] {alert.get('message', '')}") lines.append("") # Recommendations recommendations = analysis_result.get('recommendations', []) if recommendations: lines.extend(["RECOMMENDATIONS", "-" * 40]) for i, rec in enumerate(recommendations, 1): lines.append(f"{i}. {rec}") lines.append("") return "\n".join(lines) # Convenience functions async def analyze_cluster_status(cluster_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze cluster status data and return results""" analyzer = ClusterStatAnalyzer() return analyzer.analyze_metrics_data(cluster_data) async def generate_cluster_status_report(cluster_data: Dict[str, Any]) -> str: """Generate and return cluster status report as text""" analyzer = ClusterStatAnalyzer() analysis_result = analyzer.analyze_metrics_data(cluster_data) return analyzer.generate_report(analysis_result) if __name__ == "__main__": # Example usage import asyncio from tools.ovnk_benchmark_openshift_cluster_info import collect_cluster_information async def main(): try: # Collect cluster information print("Collecting cluster information...") cluster_data = await collect_cluster_information() # Analyze the data print("Analyzing cluster status...") analyzer = ClusterStatAnalyzer() analysis_result = analyzer.analyze_metrics_data(cluster_data) # Generate and display report report = analyzer.generate_report(analysis_result) print(report) # Return the analysis result as JSON/dict print("\nAnalysis completed successfully!") return analysis_result except Exception as e: print(f"Error: {e}") return None asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liqcui/ovnk-benchmark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server