"""
Utility functions for OpenShift Benchmark ELT modules
Common functions used across multiple ELT modules
Updated to support optimized latency ELT
"""
import logging
import re
from typing import Dict, Any, List, Union, Tuple
import pandas as pd
from datetime import datetime
logger = logging.getLogger(__name__)
class EltUtility:
"""Common utility functions for ELT modules"""
def __init__(self):
self.max_columns = 6
def truncate_text(self, text: str, max_length: int = 30, suffix: str = '...') -> str:
"""Truncate text for display"""
if len(text) <= max_length:
return text
return text[:max_length-len(suffix)] + suffix
def truncate_url(self, url: str, max_length: int = 50) -> str:
"""Truncate URL for display"""
return self.truncate_text(url, max_length)
def truncate_node_name(self, name: str, max_length: int = 25) -> str:
"""Truncate node name for display"""
return self.truncate_text(name, max_length)
def truncate_kernel_version(self, kernel_ver: str, max_length: int = 30) -> str:
"""Truncate kernel version for display"""
return self.truncate_text(kernel_ver, max_length)
def truncate_runtime(self, runtime: str, max_length: int = 25) -> str:
"""Truncate container runtime for display"""
if len(runtime) <= max_length:
return runtime
# Try to keep the version part
if '://' in runtime:
protocol, version = runtime.split('://', 1)
return f"{protocol}://{version[:max_length-len(protocol)-6]}..."
return runtime[:max_length-3] + '...'
def truncate_os_image(self, os_image: str, max_length: int = 35) -> str:
"""Truncate OS image for display"""
return self.truncate_text(os_image, max_length)
def parse_cpu_capacity(self, cpu_str: str) -> int:
"""Parse CPU capacity string to integer"""
try:
# Handle formats like "32", "32000m"
if cpu_str.endswith('m'):
return int(cpu_str[:-1]) // 1000
return int(cpu_str)
except (ValueError, TypeError):
return 0
def parse_memory_capacity(self, memory_str: str) -> float:
"""Parse memory capacity string to GB"""
try:
if memory_str.endswith('Ki'):
# Convert KiB to GB
kib = int(memory_str[:-2])
return kib / (1024 * 1024) # KiB to GB
elif memory_str.endswith('Mi'):
# Convert MiB to GB
mib = int(memory_str[:-2])
return mib / 1024 # MiB to GB
elif memory_str.endswith('Gi'):
# Already in GiB, close enough to GB
return float(memory_str[:-2])
else:
# Assume it's already in bytes, convert to GB
return int(memory_str) / (1024**3)
except (ValueError, TypeError):
return 0.0
def format_memory_display(self, memory_str: str) -> str:
"""Format memory for display"""
try:
gb_value = self.parse_memory_capacity(memory_str)
if gb_value >= 1:
return f"{gb_value:.0f} GB"
else:
# Show in MB for small values
return f"{gb_value * 1024:.0f} MB"
except:
return memory_str
def categorize_resource_type(self, resource_name: str) -> str:
"""Categorize resource type for better organization"""
resource_lower = resource_name.lower()
if any(keyword in resource_lower for keyword in ['network', 'policy', 'egress', 'udn']):
return 'Network & Security'
elif any(keyword in resource_lower for keyword in ['config', 'secret']):
return 'Configuration'
elif any(keyword in resource_lower for keyword in ['pod', 'service']):
return 'Workloads'
elif any(keyword in resource_lower for keyword in ['namespace']):
return 'Organization'
else:
return 'Other'
def format_timestamp(self, timestamp_str: str, length: int = 19) -> str:
"""Format timestamp for display"""
if not timestamp_str:
return 'Unknown'
return timestamp_str[:length]
def clean_html(self, html: str) -> str:
"""Clean up HTML (remove newlines and extra whitespace)"""
html = re.sub(r'\s+', ' ', html.replace('\n', ' ').replace('\r', ''))
return html.strip()
def create_html_table(self, df: pd.DataFrame, table_name: str) -> str:
"""Generate HTML table from DataFrame with improved styling"""
try:
if df.empty:
return ""
# Create styled HTML table
html = df.to_html(
index=False,
classes='table table-striped table-bordered table-sm',
escape=False,
table_id=f"table-{table_name.replace('_', '-')}",
border=1
)
# Clean up HTML
html = self.clean_html(html)
# Add responsive wrapper
html = f'<div class="table-responsive">{html}</div>'
return html
except Exception as e:
logger.error(f"Failed to generate HTML table for {table_name}: {e}")
return f'<div class="alert alert-danger">Error generating table: {str(e)}</div>'
def limit_dataframe_columns(self, df: pd.DataFrame, max_cols: int = None, table_name: str = None) -> pd.DataFrame:
"""Limit DataFrame columns to maximum number - Updated with enhanced latency support"""
if max_cols is None:
max_cols = self.max_columns
# Enhanced handling for latencyELT tables
if table_name and table_name.startswith('latencyelt_'):
if table_name in ['latencyelt_collection_info', 'latencyelt_essential_summary']:
max_cols = 2 # Property-value format
elif table_name == 'latencyelt_top_latencies_ranking':
max_cols = 6 # Show rank, metric, component, latency, severity, data points
elif table_name == 'latencyelt_controller_sync_top20':
max_cols = 5 # Show rank, pod, node, resource, latency
elif table_name == 'latencyelt_all_metrics_top5':
max_cols = 7 # Show metric, component, category, rank, pod, node, latency
elif table_name == 'latencyelt_critical_findings':
max_cols = 3 # Show type, description, details
elif table_name in ['latencyelt_ready_duration', 'latencyelt_sync_duration',
'latencyelt_cni_latency', 'latencyelt_pod_annotation',
'latencyelt_pod_creation', 'latencyelt_service_latency',
'latencyelt_network_config']:
max_cols = 6 # Show more columns for detailed latency metrics
if len(df.columns) <= max_cols:
return df
# Enhanced priority columns for latencyELT tables
if table_name and table_name.startswith('latencyelt_'):
if 'ranking' in table_name or 'top20' in table_name:
priority_cols = ['rank', 'metric name', 'metric_name', 'pod name', 'pod_name', 'node name', 'node_name', 'latency', 'max latency', 'component', 'severity']
elif 'top5' in table_name or 'all_metrics' in table_name:
priority_cols = ['metric name', 'metric_name', 'component', 'category', 'rank', 'pod name', 'pod_name', 'node name', 'node_name', 'latency', 'value']
else:
priority_cols = ['property', 'value', 'metric', 'component', 'max_value', 'avg_value', 'severity', 'rank']
else:
# Default priority columns for other tables
priority_cols = ['name', 'status', 'value', 'count', 'property', 'rank', 'node', 'type', 'ready', 'cpu', 'memory', 'metric']
# Find priority columns that exist
keep_cols = []
for col in df.columns:
col_lower = col.lower()
if any(priority in col_lower for priority in priority_cols):
keep_cols.append(col)
# Add remaining columns up to limit
remaining_cols = [col for col in df.columns if col not in keep_cols]
while len(keep_cols) < max_cols and remaining_cols:
keep_cols.append(remaining_cols.pop(0))
return df[keep_cols[:max_cols]]
def create_property_value_table(self, data: List[Dict[str, Any]]) -> List[Dict[str, str]]:
"""Create a property-value table format"""
return [{'Property': item.get('Property', ''), 'Value': str(item.get('Value', ''))} for item in data]
def calculate_totals_from_nodes(self, nodes: List[Dict[str, Any]]) -> Dict[str, Union[int, float]]:
"""Calculate total CPU and memory from node list"""
total_cpu = sum(self.parse_cpu_capacity(node.get('cpu_capacity', '0')) for node in nodes)
total_memory_gb = sum(self.parse_memory_capacity(node.get('memory_capacity', '0Ki')) for node in nodes)
ready_count = sum(1 for node in nodes if 'Ready' in node.get('ready_status', ''))
schedulable_count = sum(1 for node in nodes if node.get('schedulable', False))
return {
'total_cpu': total_cpu,
'total_memory_gb': total_memory_gb,
'ready_count': ready_count,
'schedulable_count': schedulable_count,
'count': len(nodes)
}
def format_latency_value(self, value: Union[float, int], unit: str = 'seconds') -> str:
"""Format latency value with appropriate unit for display"""
try:
if unit == 'seconds' and isinstance(value, (int, float)):
if value == 0:
return '0 ms'
elif value < 1:
return f"{round(value * 1000, 2)} ms"
elif value < 60:
return f"{round(value, 3)} s"
elif value < 3600:
return f"{round(value / 60, 2)} min"
else:
return f"{round(value / 3600, 2)} h"
else:
return f"{round(float(value), 4)} {unit}"
except (ValueError, TypeError):
return str(value)
def format_latency_for_summary(self, value_seconds: float) -> Dict[str, Union[str, float]]:
"""Format latency for summary blocks as a dict with value and unit."""
try:
if value_seconds < 1:
return {'value': round(value_seconds * 1000, 2), 'unit': 'ms'}
elif value_seconds < 60:
return {'value': round(value_seconds, 3), 'unit': 's'}
elif value_seconds < 3600:
return {'value': round(value_seconds / 60, 2), 'unit': 'min'}
else:
return {'value': round(value_seconds / 3600, 2), 'unit': 'h'}
except Exception:
return {'value': value_seconds, 'unit': 'seconds'}
def categorize_latency_severity(self, value: float, unit: str = 'seconds') -> str:
"""Categorize latency severity for color coding"""
try:
if unit == 'seconds':
if value < 0.1: # Less than 100ms
return 'low'
elif value < 1.0: # Less than 1 second
return 'medium'
elif value < 5.0: # Less than 5 seconds
return 'high'
else: # 5+ seconds
return 'critical'
else:
# For other units, assume already in appropriate scale
return 'medium'
except (ValueError, TypeError):
return 'unknown'
def truncate_metric_name(self, metric_name: str, max_length: int = 30) -> str:
"""Truncate metric name for display while preserving key information"""
if len(metric_name) <= max_length:
return metric_name
# Try to preserve important parts like percentile info
if 'p99' in metric_name:
base_name = metric_name.replace('_p99', '').replace('p99', '')
truncated = self.truncate_text(base_name, max_length - 4)
return f"{truncated} p99"
elif 'p95' in metric_name:
base_name = metric_name.replace('_p95', '').replace('p95', '')
truncated = self.truncate_text(base_name, max_length - 4)
return f"{truncated} p95"
else:
return self.truncate_text(metric_name, max_length)
def extract_component_from_metric(self, metric_name: str) -> str:
"""Extract component type from metric name"""
metric_lower = metric_name.lower()
if any(keyword in metric_lower for keyword in ['controller', 'ovnkube_controller']):
return 'Controller'
elif any(keyword in metric_lower for keyword in ['node', 'ovnkube_node']):
return 'Node'
elif any(keyword in metric_lower for keyword in ['cni']):
return 'CNI'
elif any(keyword in metric_lower for keyword in ['pod']):
return 'Pod'
elif any(keyword in metric_lower for keyword in ['service']):
return 'Service'
elif any(keyword in metric_lower for keyword in ['network']):
return 'Network'
else:
return 'Unknown'
def create_latency_severity_badge(self, value: float, unit: str = 'seconds') -> str:
"""Create HTML badge for latency severity"""
severity = self.categorize_latency_severity(value, unit)
formatted_value = self.format_latency_value(value, unit)
badge_colors = {
'low': 'success',
'medium': 'warning',
'high': 'danger',
'critical': 'dark',
'unknown': 'secondary'
}
color = badge_colors.get(severity, 'secondary')
return f'<span class="badge badge-{color}">{formatted_value}</span>'
def resolve_pod_and_node_names(self, entry: Dict[str, Any]) -> Tuple[str, str]:
"""Resolve pod_name and node_name from multiple possible keys (including nested dicts)."""
def first_non_empty(d: Dict[str, Any], keys: List[str], default: str) -> str:
for k in keys:
v = d.get(k)
if isinstance(v, str) and v.strip():
return v
# Also check nested 'labels' and 'metadata' if present
labels = d.get('labels') if isinstance(d.get('labels'), dict) else {}
for k in keys:
v = labels.get(k)
if isinstance(v, str) and v.strip():
return v
meta = d.get('metadata') if isinstance(d.get('metadata'), dict) else {}
for k in keys:
v = meta.get(k)
if isinstance(v, str) and v.strip():
return v
return default
pod_name = first_non_empty(entry, ['pod_name', 'pod', 'podname', 'podName'], 'N/A')
node_name = first_non_empty(entry, ['node_name', 'node', 'instance', 'nodename', 'nodeName', 'host'], 'unknown')
return pod_name, node_name
def create_status_badge(self, status: str, value: str = None) -> str:
"""Create HTML badge for status with optional value"""
badge_colors = {
'success': 'success',
'warning': 'warning',
'danger': 'danger',
'info': 'info',
'critical': 'danger',
'high': 'warning',
'medium': 'info',
'low': 'success',
'normal': 'success'
}
color = badge_colors.get(status.lower(), 'secondary')
display_text = value if value else status.title()
return f'<span class="badge badge-{color}">{display_text}</span>'
def highlight_critical_values(self, value: float, thresholds: Dict[str, float], unit: str = "") -> str:
"""Highlight critical values with color coding"""
critical = thresholds.get('critical', 90)
warning = thresholds.get('warning', 70)
if value >= critical:
return f'<span class="text-danger font-weight-bold">{value}{unit}</span>'
elif value >= warning:
return f'<span class="text-warning font-weight-bold">{value}{unit}</span>'
else:
return f'{value}{unit}'
# Enhanced latencyELT utility functions
def truncate_latencyelt_pod_name(self, pod_name: str, max_length: int = 25) -> str:
"""Truncate pod name for latencyELT tables with intelligent shortening"""
if len(pod_name) <= max_length:
return pod_name
# For OVN pods, keep the important parts
if 'ovnkube' in pod_name:
if 'controller' in pod_name:
return pod_name.replace('ovnkube-controller-', 'ovn-ctrl-')[:max_length]
elif 'node' in pod_name:
parts = pod_name.split('-')
if len(parts) >= 3:
return f"ovn-node-{parts[-1]}"
elif 'master' in pod_name:
return pod_name.replace('ovnkube-master-', 'ovn-mstr-')[:max_length]
return self.truncate_text(pod_name, max_length)
def create_latencyelt_component_badge(self, component: str) -> str:
"""Create component badge for latencyELT with appropriate styling"""
component_colors = {
'controller': 'primary',
'node': 'info',
'cni': 'success',
'unknown': 'secondary'
}
color = component_colors.get(component.lower(), 'secondary')
return f'<span class="badge badge-{color}">{component.title()}</span>'
def categorize_latencyelt_severity_enhanced(self, max_value: float, avg_value: float, data_points: int, unit: str = 'seconds') -> Dict[str, str]:
"""Enhanced severity categorization for latencyELT metrics"""
try:
if unit == 'seconds':
# Enhanced thresholds based on metric impact
if max_value > 300: # > 5 minutes - system critical
max_severity = 'critical'
impact = 'system_critical'
elif max_value > 60: # > 1 minute - service critical
max_severity = 'critical'
impact = 'service_critical'
elif max_value > 10: # > 10 seconds - performance critical
max_severity = 'high'
impact = 'performance_critical'
elif max_value > 2: # > 2 seconds - noticeable delay
max_severity = 'medium'
impact = 'noticeable'
elif max_value > 0.5: # > 500ms - minor impact
max_severity = 'low'
impact = 'minor'
else:
max_severity = 'excellent'
impact = 'minimal'
# Average value assessment
if avg_value > 30:
avg_severity = 'critical'
elif avg_value > 5:
avg_severity = 'high'
elif avg_value > 1:
avg_severity = 'medium'
elif avg_value > 0.2:
avg_severity = 'low'
else:
avg_severity = 'excellent'
# Data reliability factor
reliability = 'high' if data_points > 10 else 'medium' if data_points > 3 else 'low'
return {
'max_severity': max_severity,
'avg_severity': avg_severity,
'overall': max_severity if max_severity in ['critical'] else avg_severity,
'impact_level': impact,
'data_reliability': reliability,
'is_critical': max_severity == 'critical' or avg_severity == 'critical'
}
else:
return {
'max_severity': 'unknown',
'avg_severity': 'unknown',
'overall': 'unknown',
'impact_level': 'unknown',
'data_reliability': 'unknown',
'is_critical': False
}
except (ValueError, TypeError):
return {
'max_severity': 'unknown',
'avg_severity': 'unknown',
'overall': 'unknown',
'impact_level': 'unknown',
'data_reliability': 'unknown',
'is_critical': False
}