Skip to main content
Glama
liqcui

OpenShift OVN-Kubernetes Benchmark MCP Server

by liqcui
ovnk_benchmark_auth.py29 kB
""" OpenShift Authentication Module Handles authentication and service discovery for OpenShift/Kubernetes clusters """ import os import base64 import json import subprocess from typing import Optional, Dict, Any, List from kubernetes import client, config from kubernetes.client.rest import ApiException import aiohttp import asyncio import logging class OpenShiftAuth: """Authentication and service discovery for OpenShift clusters""" def __init__(self, kubeconfig_path: Optional[str] = None): self.kubeconfig_path = kubeconfig_path or os.getenv('KUBECONFIG') self.kube_client: Optional[client.ApiClient] = None self.prometheus_url: Optional[str] = None self.prometheus_token: Optional[str] = None self.cluster_info: Dict[str, Any] = {} self.prometheus_alt_urls: List[str] = [] # Environment configuration for Prometheus access self.prefer_route = os.getenv('OVNK_PROMETHEUS_USE_ROUTE', 'true').lower() in ['1', 'true', 'yes'] self.prometheus_namespace = os.getenv('OVNK_PROMETHEUS_NAMESPACE', 'openshift-monitoring') self.prometheus_sa = os.getenv('OVNK_PROMETHEUS_SA', 'prometheus-k8s') async def initialize(self) -> None: """Initialize authentication and discover services""" try: # Load kubeconfig await self._load_kube_config() # Create API client self.kube_client = client.ApiClient() # Get cluster information await self._get_cluster_info() # Discover Prometheus await self._discover_prometheus() # Get service account token (multiple strategies) await self._get_prometheus_token() print(f"✅ Successfully authenticated to OpenShift cluster") print(f"✅ Prometheus URL: {self.prometheus_url}") print(f"✅ Token authentication: {'enabled' if self.prometheus_token else 'disabled'}") except Exception as e: print(f"❌ Failed to initialize authentication: {e}") raise async def _load_kube_config(self) -> None: """Load kubeconfig with priority order""" try: if self.kubeconfig_path and os.path.exists(self.kubeconfig_path): print(f"📁 Loading kubeconfig from: {self.kubeconfig_path}") config.load_kube_config(config_file=self.kubeconfig_path) elif os.path.exists('/var/run/secrets/kubernetes.io/serviceaccount/token'): # Running in-cluster print("🏗️ Loading in-cluster configuration") config.load_incluster_config() else: # Try default kubeconfig location default_kubeconfig = os.path.expanduser('~/.kube/config') if os.path.exists(default_kubeconfig): print(f"📁 Loading default kubeconfig from: {default_kubeconfig}") config.load_kube_config() else: raise Exception("No valid kubeconfig found") except Exception as e: print(f"❌ Failed to load kubeconfig: {e}") raise async def _get_cluster_info(self) -> None: """Get basic cluster information""" try: v1 = client.CoreV1Api(self.kube_client) # Get cluster version (OpenShift specific) try: custom_api = client.CustomObjectsApi(self.kube_client) cluster_version = custom_api.get_cluster_custom_object( group="config.openshift.io", version="v1", plural="clusterversions", name="version" ) self.cluster_info['openshift_version'] = cluster_version.get('status', {}).get('desired', {}).get('version', 'unknown') self.cluster_info['is_openshift'] = True print(f"🚀 OpenShift version: {self.cluster_info['openshift_version']}") except ApiException: self.cluster_info['is_openshift'] = False print("🔧 Kubernetes cluster (not OpenShift)") # Get nodes info nodes = v1.list_node() self.cluster_info['node_count'] = len(nodes.items) self.cluster_info['nodes'] = [] for node in nodes.items: labels = node.metadata.labels or {} roles_set = set() for label_key, label_value in labels.items(): if label_key.startswith('node-role.kubernetes.io/'): role_suffix = label_key.split('/', 1)[1] roles_set.add(role_suffix or (label_value or 'unknown')) elif label_key == 'kubernetes.io/role' and label_value: roles_set.add(label_value) self.cluster_info['nodes'].append({ 'name': node.metadata.name, 'roles': sorted(list(roles_set)), 'version': node.status.node_info.kubelet_version }) # Get namespaces namespaces = v1.list_namespace() self.cluster_info['namespace_count'] = len(namespaces.items) print(f"🏷️ Found {self.cluster_info['node_count']} nodes and {self.cluster_info['namespace_count']} namespaces") except Exception as e: print(f"⚠️ Warning: Could not get cluster info: {e}") async def _discover_prometheus(self) -> None: """Discover Prometheus service and construct URLs""" try: v1 = client.CoreV1Api(self.kube_client) # Try to find Prometheus in common namespaces prometheus_namespaces = [ self.prometheus_namespace, 'openshift-monitoring', 'monitoring', 'prometheus', 'kube-system' ] prometheus_service = None prometheus_namespace = None self.prometheus_alt_urls = [] for namespace in prometheus_namespaces: try: services = v1.list_namespaced_service(namespace=namespace) for service in services.items: service_name = service.metadata.name.lower() if ('prometheus' in service_name) or ('thanos-querier' in service_name): prometheus_service = service prometheus_namespace = namespace # Build internal service URLs svc_name = service.metadata.name ports = [p.port for p in (service.spec.ports or [])] # Prefer port 9090, then others preferred_ports = [9090] + [p for p in ports if p != 9090] for port in preferred_ports: url = f"http://{svc_name}.{namespace}.svc.cluster.local:{port}" if url not in self.prometheus_alt_urls: self.prometheus_alt_urls.append(url) break if prometheus_service: break except ApiException: continue if prometheus_service: prometheus_namespace = prometheus_service.metadata.namespace # Determine primary URL based on preference if self.prefer_route and self.cluster_info.get('is_openshift', False): route_url = await self._get_openshift_route_url( prometheus_service.metadata.name, prometheus_namespace ) self.prometheus_url = route_url or (self.prometheus_alt_urls[0] if self.prometheus_alt_urls else None) else: self.prometheus_url = self.prometheus_alt_urls[0] if self.prometheus_alt_urls else None print(f"🎯 Found Prometheus service: {prometheus_service.metadata.name} in {prometheus_namespace}") print(f"📍 Primary URL: {self.prometheus_url}") print(f"🔗 Alternative URLs: {len(self.prometheus_alt_urls)} found") else: print("⚠️ No Prometheus service found, using default URL") self.prometheus_url = "http://prometheus-k8s.openshift-monitoring.svc.cluster.local:9090" except Exception as e: print(f"⚠️ Warning: Could not discover Prometheus: {e}") self.prometheus_url = "http://prometheus-k8s.openshift-monitoring.svc.cluster.local:9090" async def _get_openshift_route_url(self, service_name: str, namespace: str) -> Optional[str]: """Get OpenShift route URL for Prometheus""" try: custom_api = client.CustomObjectsApi(self.kube_client) # Look for existing route routes = custom_api.list_namespaced_custom_object( group="route.openshift.io", version="v1", namespace=namespace, plural="routes" ) for route in routes.get('items', []): route_name = route['metadata']['name'].lower() if service_name.lower() in route_name or 'prometheus' in route_name: host = route['spec'].get('host') tls = route['spec'].get('tls') protocol = 'https' if tls else 'http' url = f"{protocol}://{host}" print(f"🌐 Found OpenShift route: {url}") return url return None except Exception as e: print(f"Could not get OpenShift route: {e}") return None async def _get_prometheus_token_via_oc(self) -> Optional[str]: """Get token using oc command (Strategy 1)""" try: # Set KUBECONFIG if available env = os.environ.copy() if self.kubeconfig_path: env['KUBECONFIG'] = self.kubeconfig_path # Try 'oc create token' first (newer OpenShift) try: result = subprocess.run( ['oc', 'create', 'token', self.prometheus_sa, '-n', self.prometheus_namespace], capture_output=True, text=True, timeout=30, env=env ) if result.returncode == 0: token = result.stdout.strip() if token: print(f"✅ Retrieved token via 'oc create token'") return token except Exception as e: print(f"'oc create token' failed: {e}") # Fallback to 'oc sa new-token' (older OpenShift) try: result = subprocess.run( ['oc', 'sa', 'new-token', self.prometheus_sa, '-n', self.prometheus_namespace], capture_output=True, text=True, timeout=30, env=env ) if result.returncode == 0: token = result.stdout.strip() if token: print(f"✅ Retrieved token via 'oc sa new-token'") return token except Exception as e: print(f"'oc sa new-token' failed: {e}") except Exception as e: print(f"oc command execution failed: {e}") return None async def _get_prometheus_token_via_kube_client(self) -> Optional[str]: """Get token using Kubernetes client (Strategy 2)""" try: # Try TokenRequest API first (K8s 1.20+) token = await self._create_token_request() if token: return token # Fallback to service account secrets token = await self._get_token_from_secrets() if token: return token return None except Exception as e: print(f"Kubernetes client token retrieval failed: {e}") return None async def _create_token_request(self) -> Optional[str]: """Create token using TokenRequest API""" try: auth_v1 = client.AuthenticationV1Api(self.kube_client) # Create TokenRequest object token_request = self._create_token_request_object(3600) if token_request is None: return None if isinstance(token_request, dict): # Handle dynamic object creation response = self.kube_client.call_api( resource_path=f'/api/v1/namespaces/{self.prometheus_namespace}/serviceaccounts/{self.prometheus_sa}/token', method='POST', body=token_request, header_params={'Content-Type': 'application/json'}, response_type='object' ) if response and len(response) > 0: token_data = response[0] if hasattr(token_data, 'status') and hasattr(token_data.status, 'token'): print("✅ Retrieved token via TokenRequest API (dynamic)") return token_data.status.token elif isinstance(token_data, dict) and 'status' in token_data: token = token_data['status'].get('token') if token: print("✅ Retrieved token via TokenRequest API (dict)") return token else: # Use standard API token_response = auth_v1.create_service_account_token( name=self.prometheus_sa, namespace=self.prometheus_namespace, body=token_request ) print("✅ Retrieved token via TokenRequest API (standard)") return token_response.status.token except Exception as e: print(f"TokenRequest API failed: {e}") return None def _create_token_request_object(self, expiration_seconds: int = 3600): """Create a TokenRequest object using dynamic approach""" # Try multiple ways to create the TokenRequest object # Method 1: Direct class access try: return client.V1TokenRequest( spec=client.V1TokenRequestSpec(expiration_seconds=expiration_seconds) ) except (AttributeError, NameError): pass # Method 2: Via client.models try: return client.models.V1TokenRequest( spec=client.models.V1TokenRequestSpec(expiration_seconds=expiration_seconds) ) except (AttributeError, NameError): pass # Method 3: Via authentication_v1 models try: from kubernetes.client.models import V1TokenRequest, V1TokenRequestSpec return V1TokenRequest( spec=V1TokenRequestSpec(expiration_seconds=expiration_seconds) ) except ImportError: pass # Method 4: Dynamic object creation (fallback) try: token_request = { 'apiVersion': 'authentication.k8s.io/v1', 'kind': 'TokenRequest', 'spec': { 'expirationSeconds': expiration_seconds } } return token_request except Exception: pass return None async def _get_token_from_secrets(self) -> Optional[str]: """Get token from service account secrets""" try: v1 = client.CoreV1Api(self.kube_client) # Get service account try: service_account = v1.read_namespaced_service_account( name=self.prometheus_sa, namespace=self.prometheus_namespace ) except ApiException as e: if e.status == 404: print(f"Service account {self.prometheus_sa} not found in {self.prometheus_namespace}") return None raise # Look for token secrets if hasattr(service_account, 'secrets') and service_account.secrets: for secret_ref in service_account.secrets: try: secret = v1.read_namespaced_secret( name=secret_ref.name, namespace=self.prometheus_namespace ) if secret.type == 'kubernetes.io/service-account-token': token_data = secret.data.get('token') if token_data: token = base64.b64decode(token_data).decode('utf-8') print("✅ Retrieved token from service account secret") return token except Exception as e: print(f"Could not read secret {secret_ref.name}: {e}") continue # Try to find token secrets by label secrets = v1.list_namespaced_secret( namespace=self.prometheus_namespace, label_selector=f"kubernetes.io/service-account.name={self.prometheus_sa}" ) for secret in secrets.items: if secret.type == 'kubernetes.io/service-account-token': token_data = secret.data.get('token') if token_data: token = base64.b64decode(token_data).decode('utf-8') print("✅ Retrieved token from labeled secret") return token print(f"No token secrets found for service account {self.prometheus_sa}") return None except Exception as e: print(f"Could not get token from secrets: {e}") return None async def _get_prometheus_token(self) -> None: """Get service account token for Prometheus access using multiple strategies""" try: # Strategy 1: Try oc command line tools first print("🔐 Attempting to get token via oc command...") self.prometheus_token = await self._get_prometheus_token_via_oc() if self.prometheus_token: return # Strategy 2: Use Kubernetes client API print("🔐 Attempting to get token via Kubernetes client...") self.prometheus_token = await self._get_prometheus_token_via_kube_client() if self.prometheus_token: return # Strategy 3: Try to get current context token print("🔐 Attempting to get token from current context...") if hasattr(self.kube_client.configuration, 'api_key') and self.kube_client.configuration.api_key: token = list(self.kube_client.configuration.api_key.values())[0] if token.startswith('Bearer '): token = token[7:] self.prometheus_token = token print("✅ Retrieved token from current context") return # Strategy 4: Try to read token from service account (in-cluster) print("🔐 Attempting to read in-cluster service account token...") token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token' if os.path.exists(token_path): with open(token_path, 'r') as f: self.prometheus_token = f.read().strip() print("✅ Retrieved in-cluster service account token") return print("⚠️ No token retrieval strategy succeeded") except Exception as e: print(f"⚠️ Warning: Could not get Prometheus token: {e}") self.prometheus_token = None async def test_prometheus_connection(self) -> bool: """Test connection to Prometheus with fallback URLs""" urls_to_test = [self.prometheus_url] if self.prometheus_url else [] urls_to_test.extend(self.prometheus_alt_urls) if not urls_to_test: print("❌ No Prometheus URLs to test") return False headers = {} if self.prometheus_token: headers['Authorization'] = f'Bearer {self.prometheus_token}' # Disable SSL verification for self-signed certificates connector = aiohttp.TCPConnector(ssl=False) async with aiohttp.ClientSession(connector=connector) as session: for url in urls_to_test: try: print(f"🔍 Testing Prometheus connection: {url}") async with session.get( f"{url}/api/v1/query", params={'query': 'up'}, headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as response: print(f"📡 Response status: {response.status}") if response.status == 200: try: data = await response.json() if data.get('status') == 'success': if url != self.prometheus_url: print(f"🔄 Switching to working URL: {url}") self.prometheus_url = url print("✅ Prometheus connection successful") return True except Exception as e: print(f"Failed to parse response: {e}") continue elif response.status == 403: print("🚫 Access forbidden - token may be invalid or insufficient permissions") elif response.status == 401: print("🔐 Unauthorized - authentication required") except Exception as e: print(f"Connection failed for {url}: {e}") continue print("❌ All Prometheus connection attempts failed") return False async def test_kubeapi_connection(self) -> bool: """Test connectivity to the Kubernetes API server""" try: if not self.kube_client: await self._load_kube_config() self.kube_client = client.ApiClient() v1 = client.CoreV1Api(self.kube_client) namespaces = v1.list_namespace(limit=1) print("✅ Kubernetes API connection successful") return True except Exception as e: print(f"❌ Kubernetes API connection test failed: {e}") return False async def validate_prometheus_access(self) -> Dict[str, Any]: """Validate Prometheus access and return detailed status""" validation_result = { 'prometheus_url': self.prometheus_url, 'has_token': bool(self.prometheus_token), 'connection_successful': False, 'alternative_urls': self.prometheus_alt_urls, 'error_details': None } try: validation_result['connection_successful'] = await self.test_prometheus_connection() except Exception as e: validation_result['error_details'] = str(e) return validation_result def get_prometheus_url(self) -> Optional[str]: """Return the Prometheus URL""" return self.prometheus_url def get_prometheus_token(self) -> Optional[str]: """Return the Prometheus authentication token""" return self.prometheus_token def get_prometheus_config(self) -> Dict[str, Any]: """Return Prometheus URL and token configuration""" return { 'url': self.prometheus_url, 'token': self.prometheus_token, 'authenticated': bool(self.prometheus_token), 'alternative_urls': self.prometheus_alt_urls } def get_cluster_summary(self) -> Dict[str, Any]: """Get a summary of cluster information""" return { 'cluster_info': self.cluster_info, 'prometheus_config': { 'url': self.prometheus_url, 'authenticated': bool(self.prometheus_token), 'alternative_urls': self.prometheus_alt_urls, 'prefer_route': self.prefer_route, 'namespace': self.prometheus_namespace, 'service_account': self.prometheus_sa }, 'auth_status': 'authenticated' if self.kube_client else 'not_authenticated' } async def refresh_token(self) -> bool: """Refresh the Prometheus access token""" try: print("🔄 Refreshing Prometheus token...") old_token = self.prometheus_token self.prometheus_token = None await self._get_prometheus_token() if self.prometheus_token and self.prometheus_token != old_token: print("✅ Token refreshed successfully") return True else: print("⚠️ Token refresh did not produce a new token") return False except Exception as e: print(f"❌ Token refresh failed: {e}") return False # Convenience function for easy initialization async def initialize_auth(kubeconfig_path: Optional[str] = None) -> OpenShiftAuth: """Initialize and return configured OpenShift authentication""" auth = OpenShiftAuth(kubeconfig_path) await auth.initialize() return auth # Global authenticator instance auth = OpenShiftAuth() # Example usage and testing async def main(): """Example usage of the OpenShift authentication module""" try: # Initialize authentication await auth.initialize() # Test connections kube_ok = await auth.test_kubeapi_connection() prom_ok = await auth.test_prometheus_connection() print("\n" + "="*50) print("CONNECTION STATUS") print("="*50) print(f"Kubernetes API: {'✅ Connected' if kube_ok else '❌ Failed'}") print(f"Prometheus: {'✅ Connected' if prom_ok else '❌ Failed'}") # Display cluster summary summary = auth.get_cluster_summary() print(f"\nCluster Type: {'OpenShift' if summary['cluster_info'].get('is_openshift') else 'Kubernetes'}") if summary['cluster_info'].get('openshift_version'): print(f"Version: {summary['cluster_info']['openshift_version']}") print(f"Nodes: {summary['cluster_info'].get('node_count', 'unknown')}") print(f"Namespaces: {summary['cluster_info'].get('namespace_count', 'unknown')}") # Display configuration print(f"\nPrometheus Configuration:") print(f" URL: {auth.get_prometheus_url()}") print(f" Token: {'Available' if auth.get_prometheus_token() else 'Not available'}") # Get Prometheus config prom_config = auth.get_prometheus_config() print(f" Authenticated: {prom_config['authenticated']}") if prom_config['alternative_urls']: print(f" Alternatives: {len(prom_config['alternative_urls'])} URLs") except Exception as e: print(f"❌ Initialization failed: {e}") if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liqcui/ovnk-benchmark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server