Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_helm.py106 kB
# ruff: noqa: E501 """Comprehensive Helm chart testing with async concurrency and composable validators""" import asyncio import asyncio.subprocess import atexit import shlex import shutil import sys import tempfile from dataclasses import dataclass from pathlib import Path from typing import Any, Iterator, Optional, Protocol import yaml # ANSI colors RED = "\033[31m" GREEN = "\033[32m" BLUE = "\033[34m" YELLOW = "\033[33m" NC = "\033[0m" # Concurrency limit for parallel test execution MAX_CONCURRENT_TESTS = 8 # ============================================================================ # Type Definitions & Protocols # ============================================================================ @dataclass class TestResult: """Result of a single Helm test execution.""" name: str passed: bool error: Optional[str] = None @dataclass class TestCase: """Definition of a Helm test case.""" name: str flags: str validator: Optional["Validator"] class Validator(Protocol): """Protocol for validator functions.""" def __call__(self, resources: list[dict[str, Any]]) -> bool: """Validate rendered Helm output. Args: resources: Pre-parsed Kubernetes resources Returns: True if validation passes, False otherwise """ ... # ============================================================================ # Validator Combinators # ============================================================================ def all_of(*validators: Validator) -> Validator: """Combine validators with AND logic. Example: all_of(has_deployment, has_service, has_ingress) """ def combined(resources: list[dict[str, Any]]) -> bool: return all(v(resources) for v in validators) return combined def any_of(*validators: Validator) -> Validator: """Combine validators with OR logic.""" def combined(resources: list[dict[str, Any]]) -> bool: return any(v(resources) for v in validators) return combined def not_(validator: Validator) -> Validator: """Negate a validator.""" def negated(resources: list[dict[str, Any]]) -> bool: return not validator(resources) return negated # ============================================================================ # YAML Parsing Utilities # ============================================================================ def load_k8s_resources(content: str) -> list[dict[str, Any]]: """Load all Kubernetes resources from YAML content.""" return [doc for doc in yaml.safe_load_all(content) if doc] def iter_resources_by_kind(resources: list[dict[str, Any]], kind: str) -> Iterator[dict[str, Any]]: """Iterate over resources of a specific kind.""" return (r for r in resources if r.get("kind") == kind) def find_resource( resources: list[dict[str, Any]], kind: str, name_contains: str = "" ) -> Optional[dict[str, Any]]: """Find a single resource by kind and optional name filter.""" for resource in iter_resources_by_kind(resources, kind): if not name_contains or name_contains in resource.get("metadata", {}).get("name", ""): return resource return None # ============================================================================ # Base Validators # ============================================================================ class ResourceValidators: """Factory for resource existence validators.""" @staticmethod def has_kind(kind: str, name_contains: str = "") -> Validator: """Create a validator that checks if a resource kind exists.""" def validator(resources: list[dict[str, Any]]) -> bool: return find_resource(resources, kind, name_contains) is not None validator.__name__ = f"has_{kind.lower()}_resource" return validator @staticmethod def count_kind(kind: str, expected_count: int) -> Validator: """Create a validator that checks resource count.""" def validator(resources: list[dict[str, Any]]) -> bool: actual = len(list(iter_resources_by_kind(resources, kind))) return actual == expected_count validator.__name__ = f"has_{expected_count}_{kind.lower()}_resources" return validator # Convenience validators has_deployment = ResourceValidators.has_kind("Deployment") has_service = ResourceValidators.has_kind("Service") has_ingress = ResourceValidators.has_kind("Ingress") has_pvc = ResourceValidators.has_kind("PersistentVolumeClaim") has_hpa = ResourceValidators.has_kind("HorizontalPodAutoscaler") has_service_account = ResourceValidators.has_kind("ServiceAccount") has_statefulset = ResourceValidators.has_kind("StatefulSet") # Negative validators no_ingress = not_(has_ingress) no_postgresql = not_(ResourceValidators.has_kind("StatefulSet", "postgresql")) # Composite validators minimum_resources = all_of(has_deployment, has_service) # ============================================================================ # Helper Functions for Validators # ============================================================================ def get_deployment_containers(resources: list[dict[str, Any]]) -> list[dict[str, Any]]: """Extract containers from Deployment spec, or empty list if not found.""" deployment = find_resource(resources, "Deployment") if not deployment: return [] return deployment.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []) def find_env_in_containers( containers: list[dict[str, Any]], env_name: str ) -> Optional[dict[str, Any]]: """Find an environment variable by name across all containers.""" for container in containers: for env in container.get("env", []): if env.get("name") == env_name: return env return None def check_resource_spec( containers: list[dict[str, Any]], spec_type: str, cpu: str, memory: str ) -> bool: """Check if any container has the specified resource limits or requests.""" for container in containers: spec = container.get("resources", {}).get(spec_type, {}) cpu_match = str(spec.get("cpu", "")) == cpu memory_match = spec.get("memory") == memory if cpu_match and memory_match: return True return False # ============================================================================ # Specific Field Validators # ============================================================================ class DeploymentValidators: """Validators specific to Deployment resources.""" @staticmethod def replicas(count: int) -> Validator: """Validate replica count in Deployment.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False return deployment.get("spec", {}).get("replicas") == count return validator @staticmethod def resource_limits(cpu: str, memory: str) -> Validator: """Validate container resource limits.""" def validator(resources: list[dict[str, Any]]) -> bool: containers = get_deployment_containers(resources) return check_resource_spec(containers, "limits", cpu, memory) return validator @staticmethod def resource_requests(cpu: str, memory: str) -> Validator: """Validate container resource requests.""" def validator(resources: list[dict[str, Any]]) -> bool: containers = get_deployment_containers(resources) return check_resource_spec(containers, "requests", cpu, memory) return validator @staticmethod def env_var(name: str, value: str) -> Validator: """Validate environment variable value.""" def validator(resources: list[dict[str, Any]]) -> bool: containers = get_deployment_containers(resources) env = find_env_in_containers(containers, name) return env is not None and env.get("value") == value return validator @staticmethod def env_from_secret(env_name: str, secret_name: str) -> Validator: """Validate environment variable from secret.""" def validator(resources: list[dict[str, Any]]) -> bool: containers = get_deployment_containers(resources) env = find_env_in_containers(containers, env_name) if env is None or "valueFrom" not in env: return False return env["valueFrom"].get("secretKeyRef", {}).get("name") == secret_name return validator @staticmethod def env_from_configmap(env_name: str, configmap_name: str) -> Validator: """Validate environment variable from configmap.""" def validator(resources: list[dict[str, Any]]) -> bool: containers = get_deployment_containers(resources) env = find_env_in_containers(containers, env_name) if env is None or "valueFrom" not in env: return False return env["valueFrom"].get("configMapKeyRef", {}).get("name") == configmap_name return validator @staticmethod def security_context_user(user_id: int) -> Validator: """Validate security context runAsUser.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False security_context = ( deployment.get("spec", {}) .get("template", {}) .get("spec", {}) .get("securityContext", {}) ) return security_context.get("runAsUser") == user_id return validator @staticmethod def prometheus_annotations() -> Validator: """Validate Prometheus scraping annotations.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False annotations = ( deployment.get("spec", {}) .get("template", {}) .get("metadata", {}) .get("annotations", {}) ) return ( annotations.get("prometheus.io/scrape") == "true" and "prometheus.io/port" in annotations ) return validator @staticmethod def working_dir(expected_dir: str) -> Validator: """Validate PHOENIX_WORKING_DIR environment variable.""" def validator(resources: list[dict[str, Any]]) -> bool: # First check direct env vars in containers containers = get_deployment_containers(resources) env = find_env_in_containers(containers, "PHOENIX_WORKING_DIR") if env is not None and env.get("value") == expected_dir: return True # Check ConfigMaps referenced in envFrom for container in containers: for env_from in container.get("envFrom", []): config_map_ref = env_from.get("configMapRef", {}) config_map_name = config_map_ref.get("name") if config_map_name: # Find the ConfigMap resource config_map = find_resource(resources, "ConfigMap", config_map_name) if config_map: data = config_map.get("data", {}) if data.get("PHOENIX_WORKING_DIR") == expected_dir: return True return False return validator @staticmethod def has_additional_env(env_name: str, expected_value: Optional[str] = None) -> Validator: """Validate that an additional environment variable is present in deployment.""" def validator(resources: list[dict[str, Any]]) -> bool: containers = get_deployment_containers(resources) env = find_env_in_containers(containers, env_name) if env is None: return False if expected_value is not None: return env.get("value") == expected_value return True return validator @staticmethod def has_additional_env_from_secret( env_name: str, secret_name: Optional[str] = None, secret_key: Optional[str] = None ) -> Validator: """Validate that an additional environment variable from secret is present.""" def validator(resources: list[dict[str, Any]]) -> bool: containers = get_deployment_containers(resources) env = find_env_in_containers(containers, env_name) if env is None or "valueFrom" not in env: return False value_from = env.get("valueFrom", {}) secret_ref = value_from.get("secretKeyRef", {}) if secret_name and secret_ref.get("name") != secret_name: return False if secret_key and secret_ref.get("key") != secret_key: return False return True return validator class IngressValidators: """Validators specific to Ingress resources.""" @staticmethod def host(expected_host: str) -> Validator: """Validate Ingress host.""" def validator(resources: list[dict[str, Any]]) -> bool: ingress = find_resource(resources, "Ingress") if not ingress: return False rules = ingress.get("spec", {}).get("rules", []) return any(rule.get("host") == expected_host for rule in rules) return validator @staticmethod def has_tls() -> Validator: """Validate Ingress has TLS configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: ingress = find_resource(resources, "Ingress") if not ingress: return False tls = ingress.get("spec", {}).get("tls") return tls is not None and len(tls) > 0 return validator @staticmethod def ingress_class_name(expected_class: str) -> Validator: """Validate Ingress className.""" def validator(resources: list[dict[str, Any]]) -> bool: ingress = find_resource(resources, "Ingress") if not ingress: return False return ingress.get("spec", {}).get("ingressClassName") == expected_class return validator class ServiceValidators: """Validators specific to Service resources.""" @staticmethod def service_type(expected_type: str) -> Validator: """Validate Service type.""" def validator(resources: list[dict[str, Any]]) -> bool: # Find main service (not postgresql) for service in iter_resources_by_kind(resources, "Service"): if "postgresql" not in service.get("metadata", {}).get("name", ""): return service.get("spec", {}).get("type") == expected_type return False return validator class StorageValidators: """Validators specific to storage resources.""" @staticmethod def pvc_size(expected_size: str) -> Validator: """Validate PVC storage size.""" def validator(resources: list[dict[str, Any]]) -> bool: pvc = find_resource(resources, "PersistentVolumeClaim") if not pvc: return False storage = pvc.get("spec", {}).get("resources", {}).get("requests", {}).get("storage") return storage == expected_size return validator @staticmethod def storage_class_name(expected_class: str) -> Validator: """Validate PVC storageClassName (use empty string for dynamic provisioning).""" def validator(resources: list[dict[str, Any]]) -> bool: pvc = find_resource(resources, "PersistentVolumeClaim") if not pvc: return False storage_class = pvc.get("spec", {}).get("storageClassName") return storage_class == expected_class return validator class HPAValidators: """Validators specific to HorizontalPodAutoscaler resources.""" @staticmethod def replica_range(min_replicas: int, max_replicas: int) -> Validator: """Validate HPA min/max replicas.""" def validator(resources: list[dict[str, Any]]) -> bool: hpa = find_resource(resources, "HorizontalPodAutoscaler") if not hpa: return False spec = hpa.get("spec", {}) return ( spec.get("minReplicas") == min_replicas and spec.get("maxReplicas") == max_replicas ) return validator class PostgreSQLValidators: """Validators specific to PostgreSQL StatefulSet.""" @staticmethod def is_enabled() -> Validator: """Validate PostgreSQL StatefulSet is present.""" def validator(resources: list[dict[str, Any]]) -> bool: statefulset = find_resource(resources, "StatefulSet", "postgresql") return statefulset is not None return validator @staticmethod def is_disabled() -> Validator: """Validate SQLite is configured (Deployment without PostgreSQL).""" def validator(resources: list[dict[str, Any]]) -> bool: has_deployment = find_resource(resources, "Deployment") is not None has_postgresql = find_resource(resources, "StatefulSet", "postgresql") is not None return has_deployment and not has_postgresql return validator @staticmethod def external_config(host: str, port: str, user: str, db: str) -> Validator: """Validate external PostgreSQL configuration via explicit settings.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return ( data.get("PHOENIX_POSTGRES_HOST") == host and data.get("PHOENIX_POSTGRES_PORT") == port and data.get("PHOENIX_POSTGRES_USER") == user and data.get("PHOENIX_POSTGRES_DB") == db ) return validator class ServiceAccountValidators: """Validators specific to ServiceAccount resources.""" @staticmethod def name(expected_name: str) -> Validator: """Validate ServiceAccount name.""" def validator(resources: list[dict[str, Any]]) -> bool: sa = find_resource(resources, "ServiceAccount") if not sa: return False return sa.get("metadata", {}).get("name") == expected_name return validator @staticmethod def has_image_pull_secret(secret_name: str) -> Validator: """Validate ServiceAccount has imagePullSecret.""" def validator(resources: list[dict[str, Any]]) -> bool: sa = find_resource(resources, "ServiceAccount") if not sa: return False pull_secrets = sa.get("imagePullSecrets", []) return any(s.get("name") == secret_name for s in pull_secrets) return validator class OAuth2Validators: """Validators specific to OAuth2/OIDC configuration.""" @staticmethod def provider_optional_fields( provider: str, display_name: Optional[str] = None, allow_sign_up: Optional[bool] = None, auto_login: Optional[bool] = None, ) -> Validator: """Validate OAuth2 provider optional configuration fields.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) provider_upper = provider.upper() if display_name is not None: if data.get(f"PHOENIX_OAUTH2_{provider_upper}_DISPLAY_NAME") != display_name: return False if allow_sign_up is not None: if ( data.get(f"PHOENIX_OAUTH2_{provider_upper}_ALLOW_SIGN_UP") != str(allow_sign_up).lower() ): return False if auto_login is not None: if ( data.get(f"PHOENIX_OAUTH2_{provider_upper}_AUTO_LOGIN") != str(auto_login).lower() ): return False return True return validator @staticmethod def provider_role_mapping( provider: str, role_attribute_path: str, role_mapping: str, role_attribute_strict: Optional[bool] = None, ) -> Validator: """Validate OAuth2 role mapping configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) provider_upper = provider.upper() path_key = f"PHOENIX_OAUTH2_{provider_upper}_ROLE_ATTRIBUTE_PATH" mapping_key = f"PHOENIX_OAUTH2_{provider_upper}_ROLE_MAPPING" if data.get(path_key) != role_attribute_path: return False if data.get(mapping_key) != role_mapping: return False if role_attribute_strict is not None: strict_key = f"PHOENIX_OAUTH2_{provider_upper}_ROLE_ATTRIBUTE_STRICT" if data.get(strict_key) != str(role_attribute_strict).lower(): return False return True return validator @staticmethod def provider_pkce(provider: str, use_pkce: bool) -> Validator: """Validate OAuth2 PKCE configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) provider_upper = provider.upper() env_key = f"PHOENIX_OAUTH2_{provider_upper}_USE_PKCE" return data.get(env_key) == str(use_pkce).lower() return validator @staticmethod def provider_token_endpoint_auth_method(provider: str, method: str) -> Validator: """Validate OAuth2 token endpoint authentication method.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) provider_upper = provider.upper() env_key = f"PHOENIX_OAUTH2_{provider_upper}_TOKEN_ENDPOINT_AUTH_METHOD" return data.get(env_key) == method return validator @staticmethod def provider_scopes(provider: str, scopes: str) -> Validator: """Validate OAuth2 additional scopes configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) provider_upper = provider.upper() env_key = f"PHOENIX_OAUTH2_{provider_upper}_SCOPES" return data.get(env_key) == scopes return validator @staticmethod def provider_groups_config( provider: str, groups_attribute_path: str, allowed_groups: list[str] ) -> Validator: """Validate OAuth2 group-based access control configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) provider_upper = provider.upper() path_key = f"PHOENIX_OAUTH2_{provider_upper}_GROUPS_ATTRIBUTE_PATH" groups_key = f"PHOENIX_OAUTH2_{provider_upper}_ALLOWED_GROUPS" path_match = data.get(path_key) == groups_attribute_path groups_match = data.get(groups_key) == ",".join(allowed_groups) return path_match and groups_match return validator @staticmethod def provider_comprehensive( provider: str, client_id: str, oidc_config_url: str, has_client_secret: bool = True, display_name: Optional[str] = None, allow_sign_up: Optional[bool] = None, auto_login: Optional[bool] = None, use_pkce: Optional[bool] = None, token_endpoint_auth_method: Optional[str] = None, scopes: Optional[str] = None, groups_attribute_path: Optional[str] = None, allowed_groups: Optional[list[str]] = None, role_attribute_path: Optional[str] = None, role_mapping: Optional[str] = None, role_attribute_strict: Optional[bool] = None, ) -> Validator: """Comprehensive validator for all OAuth2 provider configuration fields.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) provider_upper = provider.upper() # Required fields in ConfigMap if data.get(f"PHOENIX_OAUTH2_{provider_upper}_CLIENT_ID") != client_id: return False if data.get(f"PHOENIX_OAUTH2_{provider_upper}_OIDC_CONFIG_URL") != oidc_config_url: return False # Check client secret in Secret (must be phoenix-secret, not postgresql secret) if has_client_secret: secret = find_resource(resources, "Secret", "phoenix-secret") if not secret: return False secret_key = f"PHOENIX_OAUTH2_{provider_upper}_CLIENT_SECRET" if secret_key not in secret.get("data", {}): return False # Optional fields - only check if explicitly set if display_name is not None: actual_display_name = data.get(f"PHOENIX_OAUTH2_{provider_upper}_DISPLAY_NAME") if actual_display_name != display_name: return False if allow_sign_up is not None: actual_allow = data.get(f"PHOENIX_OAUTH2_{provider_upper}_ALLOW_SIGN_UP") if actual_allow != str(allow_sign_up).lower(): return False if auto_login is not None: actual_auto = data.get(f"PHOENIX_OAUTH2_{provider_upper}_AUTO_LOGIN") if actual_auto != str(auto_login).lower(): return False if use_pkce is not None: actual_pkce = data.get(f"PHOENIX_OAUTH2_{provider_upper}_USE_PKCE") if actual_pkce != str(use_pkce).lower(): return False if token_endpoint_auth_method is not None: actual_method = data.get( f"PHOENIX_OAUTH2_{provider_upper}_TOKEN_ENDPOINT_AUTH_METHOD" ) if actual_method != token_endpoint_auth_method: return False if scopes is not None: actual_scopes = data.get(f"PHOENIX_OAUTH2_{provider_upper}_SCOPES") if actual_scopes != scopes: return False if groups_attribute_path is not None: actual_path = data.get(f"PHOENIX_OAUTH2_{provider_upper}_GROUPS_ATTRIBUTE_PATH") if actual_path != groups_attribute_path: return False if allowed_groups is not None: actual_groups = data.get(f"PHOENIX_OAUTH2_{provider_upper}_ALLOWED_GROUPS") if actual_groups != ",".join(allowed_groups): return False if role_attribute_path is not None: actual_role_path = data.get(f"PHOENIX_OAUTH2_{provider_upper}_ROLE_ATTRIBUTE_PATH") if actual_role_path != role_attribute_path: return False if role_mapping is not None: actual_role_mapping = data.get(f"PHOENIX_OAUTH2_{provider_upper}_ROLE_MAPPING") if actual_role_mapping != role_mapping: return False if role_attribute_strict is not None: actual_strict = data.get(f"PHOENIX_OAUTH2_{provider_upper}_ROLE_ATTRIBUTE_STRICT") if actual_strict != str(role_attribute_strict).lower(): return False return True return validator class SMTPValidators: """Validators specific to SMTP configuration.""" @staticmethod def smtp_config(hostname: str, port: str, username: str) -> Validator: """Validate SMTP configuration in ConfigMap.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return ( data.get("PHOENIX_SMTP_HOSTNAME") == hostname and data.get("PHOENIX_SMTP_PORT") == port and data.get("PHOENIX_SMTP_USERNAME") == username ) return validator @staticmethod def smtp_mail_from(mail_from: str) -> Validator: """Validate SMTP mail from address.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return data.get("PHOENIX_SMTP_MAIL_FROM") == mail_from return validator class TLSValidators: """Validators specific to TLS/SSL configuration.""" @staticmethod def tls_enabled(http: bool = True, grpc: bool = False) -> Validator: """Validate TLS enabled settings.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) http_match = data.get("PHOENIX_TLS_ENABLED_FOR_HTTP") == str(http).lower() grpc_match = data.get("PHOENIX_TLS_ENABLED_FOR_GRPC") == str(grpc).lower() return http_match and grpc_match return validator @staticmethod def tls_files_configured() -> Validator: """Validate TLS certificate and key files are set.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return ( "PHOENIX_TLS_CERT_FILE" in data and "PHOENIX_TLS_KEY_FILE" in data and data["PHOENIX_TLS_CERT_FILE"] != "" and data["PHOENIX_TLS_KEY_FILE"] != "" ) return validator class LoggingValidators: """Validators specific to logging configuration.""" @staticmethod def logging_config(mode: str, level: str, db_level: str) -> Validator: """Validate logging configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return ( data.get("PHOENIX_LOGGING_MODE") == mode and data.get("PHOENIX_LOGGING_LEVEL") == level and data.get("PHOENIX_DB_LOGGING_LEVEL") == db_level ) return validator @staticmethod def log_migrations(enabled: bool) -> Validator: """Validate migration logging setting.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return data.get("PHOENIX_LOG_MIGRATIONS") == str(enabled).lower() return validator class InstrumentationValidators: """Validators specific to instrumentation/OTLP configuration.""" @staticmethod def otlp_endpoint(grpc_endpoint: str = "", http_endpoint: str = "") -> Validator: """Validate OTLP collector endpoints.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) grpc_match = True http_match = True if grpc_endpoint: grpc_match = ( data.get("PHOENIX_SERVER_INSTRUMENTATION_OTLP_TRACE_COLLECTOR_GRPC_ENDPOINT") == grpc_endpoint ) if http_endpoint: http_match = ( data.get("PHOENIX_SERVER_INSTRUMENTATION_OTLP_TRACE_COLLECTOR_HTTP_ENDPOINT") == http_endpoint ) return grpc_match and http_match return validator class ImageValidators: """Validators specific to container image configuration.""" @staticmethod def image_config(registry: str, repository: str, tag: str) -> Validator: """Validate container image configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False containers = ( deployment.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []) ) if not containers: return False image = containers[0].get("image", "") expected = f"{registry}/{repository}:{tag}" return image == expected return validator @staticmethod def pull_policy(policy: str) -> Validator: """Validate image pull policy.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False containers = ( deployment.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []) ) if not containers: return False return containers[0].get("imagePullPolicy") == policy return validator class HealthCheckValidators: """Validators specific to health check probes.""" @staticmethod def startup_probe_enabled(enabled: bool) -> Validator: """Validate startup probe is enabled/disabled.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False containers = ( deployment.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []) ) if not containers: return False has_probe = "startupProbe" in containers[0] return has_probe == enabled return validator @staticmethod def probe_timing(probe_type: str, initial_delay: int, period: int) -> Validator: """Validate probe timing configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False containers = ( deployment.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []) ) if not containers: return False probe = containers[0].get(f"{probe_type}Probe") if not probe: return False return ( probe.get("initialDelaySeconds") == initial_delay and probe.get("periodSeconds") == period ) return validator class DeploymentStrategyValidators: """Validators specific to deployment strategy.""" @staticmethod def rolling_update(max_unavailable: str, max_surge: str) -> Validator: """Validate RollingUpdate strategy parameters.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False strategy = deployment.get("spec", {}).get("strategy", {}) if strategy.get("type") != "RollingUpdate": return False rolling_update = strategy.get("rollingUpdate", {}) actual_unavailable = str(rolling_update.get("maxUnavailable", "")) actual_surge = str(rolling_update.get("maxSurge", "")) return actual_unavailable == max_unavailable and actual_surge == max_surge return validator @staticmethod def strategy_type(strategy_type: str) -> Validator: """Validate deployment strategy type.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False return deployment.get("spec", {}).get("strategy", {}).get("type") == strategy_type return validator class ServerConfigValidators: """Validators specific to server configuration.""" @staticmethod def server_paths(host_root_path: str = "", root_url: str = "") -> Validator: """Validate server path configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return ( data.get("PHOENIX_HOST_ROOT_PATH") == host_root_path and data.get("PHOENIX_ROOT_URL") == root_url ) return validator @staticmethod def allow_external_resources(allowed: bool) -> Validator: """Validate allowExternalResources setting.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return data.get("PHOENIX_ALLOW_EXTERNAL_RESOURCES") == str(allowed).lower() return validator @staticmethod def max_spans_queue_size(size: int) -> Validator: """Validate max spans queue size configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return data.get("PHOENIX_MAX_SPANS_QUEUE_SIZE") == str(size) return validator class VolumeValidators: """Validators for extra volumes and volume mounts.""" @staticmethod def has_extra_volume(volume_name: str) -> Validator: """Validate extra volume exists.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False volumes = ( deployment.get("spec", {}).get("template", {}).get("spec", {}).get("volumes", []) ) return any(v.get("name") == volume_name for v in volumes) return validator @staticmethod def has_volume_mount(mount_name: str, mount_path: str) -> Validator: """Validate volume mount exists.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False containers = ( deployment.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []) ) if not containers: return False mounts = containers[0].get("volumeMounts", []) for mount in mounts: if mount.get("name") == mount_name and mount.get("mountPath") == mount_path: return True return False return validator @staticmethod def readonly_filesystem_volumes() -> Validator: """Validate readOnlyRootFilesystem has required emptyDir volumes.""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = find_resource(resources, "Deployment") if not deployment: return False volumes = ( deployment.get("spec", {}).get("template", {}).get("spec", {}).get("volumes", []) ) volume_names = {v.get("name") for v in volumes} required = {"tmp-volume", "var-tmp-volume", "var-log-volume", "home-volume"} return required.issubset(volume_names) return validator class SecretValidators: """Validators specific to Secret resources.""" @staticmethod def has_key(key: str) -> Validator: """Validate that Secret contains a specific key in data.""" def validator(resources: list[dict[str, Any]]) -> bool: secret = find_resource(resources, "Secret", "phoenix-secret") if not secret: return False data = secret.get("data") or {} return key in data return validator @staticmethod def not_has_key(key: str) -> Validator: """Validate that Secret does NOT contain a specific key in data.""" def validator(resources: list[dict[str, Any]]) -> bool: secret = find_resource(resources, "Secret", "phoenix-secret") if not secret: return False data = secret.get("data") or {} return key not in data return validator class ConfigMapValidators: """Validators specific to ConfigMap and envFrom behavior.""" @staticmethod def has_env_from_configmap() -> Validator: """Validate that deployment uses envFrom to reference a ConfigMap.""" def validator(resources: list[dict[str, Any]]) -> bool: containers = get_deployment_containers(resources) if not containers: return False for container in containers: env_from = container.get("envFrom", []) for ref in env_from: if "configMapRef" in ref: return True return False return validator @staticmethod def configmap_exists(name_contains: str = "") -> Validator: """Validate that a ConfigMap resource exists.""" def validator(resources: list[dict[str, Any]]) -> bool: return find_resource(resources, "ConfigMap", name_contains) is not None return validator @staticmethod def configmap_has_key(key: str, expected_value: Optional[str] = None) -> Validator: """Validate that ConfigMap contains a specific key with optional value check.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) if key not in data: return False if expected_value is not None: return data[key] == expected_value return True return validator @staticmethod def env_from_references_configmap(configmap_name_contains: str) -> Validator: """Validate that envFrom references a ConfigMap with a specific name pattern.""" def validator(resources: list[dict[str, Any]]) -> bool: containers = get_deployment_containers(resources) for container in containers: for env_from in container.get("envFrom", []): config_map_ref = env_from.get("configMapRef", {}) ref_name = config_map_ref.get("name", "") if configmap_name_contains in ref_name: # Verify the ConfigMap actually exists if find_resource(resources, "ConfigMap", configmap_name_contains): return True return False return validator class AuthValidators: """Validators specific to authentication configuration.""" @staticmethod def disable_basic_auth(disabled: bool) -> Validator: """Validate PHOENIX_DISABLE_BASIC_AUTH setting.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return data.get("PHOENIX_DISABLE_BASIC_AUTH") == str(disabled).lower() return validator @staticmethod def cookies_path(path: str) -> Validator: """Validate PHOENIX_COOKIES_PATH setting.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return data.get("PHOENIX_COOKIES_PATH") == path return validator @staticmethod def admins_configured(admins: str) -> Validator: """Validate PHOENIX_ADMINS configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) return data.get("PHOENIX_ADMINS") == admins return validator class DatabaseValidators: """Validators specific to database configuration (beyond PostgreSQL).""" @staticmethod def aws_iam_auth(enabled: bool, token_lifetime: Optional[int] = None) -> Validator: """Validate AWS RDS IAM authentication configuration.""" def validator(resources: list[dict[str, Any]]) -> bool: config_map = find_resource(resources, "ConfigMap", "configmap") if not config_map: return False data = config_map.get("data", {}) if data.get("PHOENIX_POSTGRES_USE_AWS_IAM_AUTH") != str(enabled).lower(): return False if token_lifetime is not None and enabled: if data.get("PHOENIX_POSTGRES_AWS_IAM_TOKEN_LIFETIME_SECONDS") != str( token_lifetime ): return False return True return validator class NamingValidators: """Validators for resource naming with nameOverride and fullnameOverride.""" @staticmethod def _find_phoenix_resource( resources: list[dict[str, Any]], kind: str ) -> Optional[dict[str, Any]]: """Find Phoenix resource (excluding PostgreSQL subchart resources).""" for resource in iter_resources_by_kind(resources, kind): name = resource.get("metadata", {}).get("name", "") # Exclude PostgreSQL subchart resources if "postgresql" not in name: return resource return None @staticmethod def deployment_name(expected_name: str) -> Validator: """Validate Deployment resource name (Phoenix deployment, not PostgreSQL).""" def validator(resources: list[dict[str, Any]]) -> bool: deployment = NamingValidators._find_phoenix_resource(resources, "Deployment") if not deployment: return False actual_name = deployment.get("metadata", {}).get("name", "") return actual_name == expected_name return validator @staticmethod def service_name(expected_name: str) -> Validator: """Validate Service resource name (Phoenix service, not PostgreSQL).""" def validator(resources: list[dict[str, Any]]) -> bool: service = NamingValidators._find_phoenix_resource(resources, "Service") if not service: return False actual_name = service.get("metadata", {}).get("name", "") return actual_name == expected_name return validator @staticmethod def configmap_name(expected_name: str) -> Validator: """Validate ConfigMap resource name (Phoenix configmap, not PostgreSQL).""" def validator(resources: list[dict[str, Any]]) -> bool: # Use the existing find_resource with "configmap" filter which already excludes PostgreSQL configmap = find_resource(resources, "ConfigMap", "configmap") if not configmap: return False actual_name = configmap.get("metadata", {}).get("name", "") return actual_name == expected_name return validator @staticmethod def pvc_name(expected_name: str) -> Validator: """Validate PersistentVolumeClaim resource name.""" def validator(resources: list[dict[str, Any]]) -> bool: pvc = find_resource(resources, "PersistentVolumeClaim") if not pvc: return False actual_name = pvc.get("metadata", {}).get("name", "") return actual_name == expected_name return validator @staticmethod def all_resources_with_fullname(expected_fullname: str) -> Validator: """Validate that all main Phoenix resources use the expected fullname.""" def validator(resources: list[dict[str, Any]]) -> bool: # Check Deployment name (Phoenix deployment only, not PostgreSQL) deployment = NamingValidators._find_phoenix_resource(resources, "Deployment") if deployment and deployment.get("metadata", {}).get("name", "") != expected_fullname: return False # Check Service name (Phoenix service only, not PostgreSQL) service = NamingValidators._find_phoenix_resource(resources, "Service") if ( service and service.get("metadata", {}).get("name", "") != f"{expected_fullname}-svc" ): return False # Check ConfigMap name (should be {fullname}-configmap) configmap = find_resource(resources, "ConfigMap", "configmap") if ( configmap and configmap.get("metadata", {}).get("name", "") != f"{expected_fullname}-configmap" ): return False # Check PVC name if it exists (should be {fullname}-data-pvc) pvc = find_resource(resources, "PersistentVolumeClaim") if pvc and pvc.get("metadata", {}).get("name", "") != f"{expected_fullname}-data-pvc": return False return True return validator # ============================================================================ # Test Runner # ============================================================================ class HelmTester: """Manages Helm chart testing with parallel setup and validation.""" def __init__(self, chart_dir: Path): self.chart_dir = chart_dir self.temp_dir: Optional[Path] = None self.results: list[TestResult] = [] self.semaphore = asyncio.Semaphore(MAX_CONCURRENT_TESTS) def cleanup(self): """Clean up temporary directory""" if self.temp_dir and self.temp_dir.exists(): shutil.rmtree(self.temp_dir) async def check_prereqs(self) -> bool: """Check if required tools (helm) are installed.""" if not shutil.which("helm"): print(f"{RED}✗{NC} helm not installed") return False return True async def _setup_repo(self) -> bool: """Add and update helm repositories (parallel operation).""" # Add helm repository (ignore errors if already exists) add_result = await asyncio.create_subprocess_exec( "helm", "repo", "add", "groundhog2k", "https://groundhog2k.github.io/helm-charts/", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) await add_result.wait() # Update helm repositories update_result = await asyncio.create_subprocess_exec( "helm", "repo", "update", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) await update_result.wait() return update_result.returncode == 0 async def _build_dependencies(self) -> bool: """Build chart dependencies (parallel operation).""" result = await asyncio.create_subprocess_exec( "helm", "dependency", "build", str(self.chart_dir), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) await result.wait() return result.returncode == 0 async def setup(self) -> bool: """Set up testing environment (repo setup, then dependency build, then lint).""" self.temp_dir = Path(tempfile.mkdtemp()) atexit.register(self.cleanup) # Run repo setup first, then dependency build (dependency build requires repo to be set up) repo_ok = await self._setup_repo() if not repo_ok: print(f"{RED}✗{NC} Helm repo update failed") return False dep_ok = await self._build_dependencies() if not dep_ok: print(f"{RED}✗{NC} Helm dependency build failed") return False # Lint the chart (must be after dependencies) result = await asyncio.create_subprocess_exec( "helm", "lint", str(self.chart_dir), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) stdout, _ = await result.communicate() if result.returncode != 0: print(f"{RED}✗{NC} Helm lint failed") print(stdout.decode()) return False return True async def _render_helm_template(self, flags: str) -> tuple[list[dict[str, Any]], Optional[str]]: """Render helm template. Returns: Tuple of (resources, error_message) """ async with self.semaphore: cmd = ["helm", "template", "test-release", str(self.chart_dir)] if flags: cmd.extend(shlex.split(flags)) result = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await result.communicate() if result.returncode != 0: return ([], stderr.decode()) # Parse the resources resources = load_k8s_resources(stdout.decode()) return (resources, None) async def run_test(self, test_case: TestCase) -> TestResult: """Run a single Helm template test.""" if self.temp_dir is None: raise RuntimeError("setup() must be called before run_test()") # Render template resources, error = await self._render_helm_template(test_case.flags) if error: print(f" {test_case.name}... {RED}✗{NC}") first_line = error.split("\n")[0][:100] print(f" {first_line}") return TestResult(test_case.name, False, error) # Run validator if provided if test_case.validator: try: if not test_case.validator(resources): print(f" {test_case.name}... {RED}✗{NC}") print(" Validation failed") return TestResult(test_case.name, False, "Validation failed") except Exception as e: print(f" {test_case.name}... {RED}✗{NC}") print(f" {e}") return TestResult(test_case.name, False, str(e)) return TestResult(test_case.name, True) async def run_all_tests(self, test_cases: list[TestCase]) -> list[TestResult]: """Run all Helm chart test scenarios in parallel.""" tasks = [self.run_test(tc) for tc in test_cases] results = await asyncio.gather(*tasks) for result in results: if result.passed: print(f" {result.name}... {GREEN}✓{NC}") self.results = results return results async def run(self, test_cases: list[TestCase]) -> int: """Main entry point for running all tests.""" exit_code = 0 try: if not await self.check_prereqs(): return 1 if not await self.setup(): return 1 await self.run_all_tests(test_cases) passed = [r for r in self.results if r.passed] failed = [r for r in self.results if not r.passed] # Results summary print(f"\n{GREEN}✓ {len(passed)}{NC} {RED}✗ {len(failed)}{NC}") if failed: for r in failed: print(f" {RED}✗{NC} {r.name}") exit_code = 1 self.cleanup() return exit_code except Exception as e: print(f"{RED}✗{NC} Unexpected error: {e}") self.cleanup() raise # ============================================================================ # Test Suite Definition # ============================================================================ def get_test_suite() -> list[TestCase]: """Define all test cases for the Helm chart.""" return [ # Database TestCase( "PostgreSQL enabled (default)", "--set postgresql.enabled=true", PostgreSQLValidators.is_enabled(), ), TestCase( "SQLite with persistence", "--set postgresql.enabled=false --set persistence.enabled=true", PostgreSQLValidators.is_disabled(), ), TestCase( "SQLite in-memory", "--set postgresql.enabled=false --set persistence.inMemory=true", all_of(no_postgresql, minimum_resources), ), TestCase( "External database (connection URL)", "--set postgresql.enabled=false --set database.url=postgresql://user:pass@external:5432/phoenix --set database.postgres.host=phoenix-postgresql", all_of(no_postgresql, minimum_resources), ), TestCase( "PostgreSQL with custom resources", "--set postgresql.enabled=true --set postgresql.primary.resources.limits.cpu=500m --set postgresql.primary.resources.limits.memory=1Gi", PostgreSQLValidators.is_enabled(), ), TestCase( "PostgreSQL with custom storage size", "--set postgresql.enabled=true --set postgresql.primary.persistence.size=20Gi", PostgreSQLValidators.is_enabled(), ), TestCase( "Database with retention policy", "--set postgresql.enabled=true --set database.dataRetentionDays=90", PostgreSQLValidators.is_enabled(), ), TestCase( "PostgreSQL with AWS RDS IAM authentication", "--set postgresql.enabled=false --set database.postgres.host=mydb.us-east-1.rds.amazonaws.com --set database.postgres.port=5432 --set database.postgres.user=phoenix --set database.postgres.db=phoenix --set database.postgres.useAwsIamAuth=true --set database.postgres.awsIamTokenLifetimeSeconds=840", all_of( no_postgresql, ConfigMapValidators.configmap_has_key("PHOENIX_POSTGRES_USE_AWS_IAM_AUTH", "true"), ConfigMapValidators.configmap_has_key( "PHOENIX_POSTGRES_AWS_IAM_TOKEN_LIFETIME_SECONDS", "840" ), DatabaseValidators.aws_iam_auth(True, 840), ), ), TestCase( "PostgreSQL with AWS RDS IAM authentication (custom token lifetime)", "--set postgresql.enabled=false --set database.postgres.host=mydb.region.rds.amazonaws.com --set database.postgres.port=5432 --set database.postgres.user=phoenix_user --set database.postgres.db=phoenix_db --set database.postgres.useAwsIamAuth=true --set database.postgres.awsIamTokenLifetimeSeconds=600", all_of( no_postgresql, DatabaseValidators.aws_iam_auth(True, 600), ), ), TestCase( "PostgreSQL with custom schema", "--set postgresql.enabled=true --set database.postgres.dbName=custom_phoenix --set database.postgres.user=custom_user", PostgreSQLValidators.is_enabled(), ), TestCase( "External PostgreSQL with explicit settings", "--set postgresql.enabled=false --set persistence.enabled=false --set database.postgres.host=external-pg.example.com --set database.postgres.port=5432 --set database.postgres.user=phoenix_user --set database.postgres.db=phoenix_db", all_of( no_postgresql, PostgreSQLValidators.external_config( "external-pg.example.com", "5432", "phoenix_user", "phoenix_db" ), ), ), # Auth TestCase("Auth enabled (default)", "--set auth.enabled=true", minimum_resources), TestCase("Auth disabled", "--set auth.enabled=false", minimum_resources), TestCase( "OAuth2 with Google provider", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.google.client_id=test-id --set auth.oauth2.providers.google.client_secret=test-secret --set auth.oauth2.providers.google.oidc_config_url=https://accounts.google.com/.well-known/openid-configuration", minimum_resources, ), TestCase( "Multiple OAuth2 providers", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.google.client_id=google-id --set auth.oauth2.providers.google.client_secret=google-secret --set auth.oauth2.providers.google.oidc_config_url=https://accounts.google.com/.well-known/openid-configuration --set auth.oauth2.providers.github.client_id=github-id --set auth.oauth2.providers.github.client_secret=github-secret --set auth.oauth2.providers.github.oidc_config_url=https://token.actions.githubusercontent.com/.well-known/openid-configuration", minimum_resources, ), TestCase( "Custom token expiry settings", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.google.client_id=test-id --set auth.oauth2.providers.google.client_secret=test-secret --set auth.oauth2.providers.google.oidc_config_url=https://accounts.google.com/.well-known/openid-configuration --set auth.accessTokenExpireSeconds=7200 --set auth.refreshTokenExpireSeconds=604800", minimum_resources, ), TestCase( "OAuth2 with optional provider fields", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.google.client_id=test-id --set auth.oauth2.providers.google.client_secret=test-secret --set auth.oauth2.providers.google.oidc_config_url=https://accounts.google.com/.well-known/openid-configuration --set auth.oauth2.providers.google.display_name='Sign in with Google' --set auth.oauth2.providers.google.allow_sign_up=false --set auth.oauth2.providers.google.auto_login=true", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_GOOGLE_DISPLAY_NAME", "Sign in with Google" ), OAuth2Validators.provider_optional_fields( "google", display_name="Sign in with Google", allow_sign_up=False, auto_login=True, ), ), ), TestCase( "OAuth2 with PKCE enabled", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.google.client_id=test-id --set auth.oauth2.providers.google.client_secret=test-secret --set auth.oauth2.providers.google.oidc_config_url=https://accounts.google.com/.well-known/openid-configuration --set auth.oauth2.providers.google.use_pkce=true", all_of( ConfigMapValidators.configmap_has_key("PHOENIX_OAUTH2_GOOGLE_USE_PKCE", "true"), OAuth2Validators.provider_pkce("google", use_pkce=True), ), ), TestCase( "OAuth2 public client with PKCE (no client secret)", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.mobile.client_id=mobile-client-id --set auth.oauth2.providers.mobile.oidc_config_url=https://auth.example.com/.well-known/openid-configuration --set auth.oauth2.providers.mobile.token_endpoint_auth_method=none --set auth.oauth2.providers.mobile.use_pkce=true", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_MOBILE_CLIENT_ID", "mobile-client-id" ), OAuth2Validators.provider_token_endpoint_auth_method("mobile", "none"), OAuth2Validators.provider_pkce("mobile", use_pkce=True), ), ), TestCase( "OAuth2 with client_secret_post authentication", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.okta.client_id=okta-id --set auth.oauth2.providers.okta.client_secret=okta-secret --set auth.oauth2.providers.okta.oidc_config_url=https://okta.example.com/.well-known/openid-configuration --set auth.oauth2.providers.okta.token_endpoint_auth_method=client_secret_post", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_OKTA_TOKEN_ENDPOINT_AUTH_METHOD", "client_secret_post" ), OAuth2Validators.provider_token_endpoint_auth_method("okta", "client_secret_post"), ), ), TestCase( "OAuth2 with additional scopes", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.microsoft.client_id=ms-id --set auth.oauth2.providers.microsoft.client_secret=ms-secret --set auth.oauth2.providers.microsoft.oidc_config_url=https://login.microsoftonline.com/tenant/.well-known/openid-configuration --set auth.oauth2.providers.microsoft.scopes='offline_access User.Read'", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_MICROSOFT_SCOPES", "offline_access User.Read" ), OAuth2Validators.provider_scopes("microsoft", "offline_access User.Read"), ), ), TestCase( "OAuth2 with group-based access control", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.cognito.client_id=cognito-id --set auth.oauth2.providers.cognito.client_secret=cognito-secret --set auth.oauth2.providers.cognito.oidc_config_url=https://cognito-idp.us-east-1.amazonaws.com/.well-known/openid-configuration --set auth.oauth2.providers.cognito.groups_attribute_path='cognito:groups' --set auth.oauth2.providers.cognito.allowed_groups={Admins,PowerUsers}", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_COGNITO_GROUPS_ATTRIBUTE_PATH", "cognito:groups" ), ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_COGNITO_ALLOWED_GROUPS", "Admins,PowerUsers" ), OAuth2Validators.provider_groups_config( "cognito", "cognito:groups", ["Admins", "PowerUsers"] ), ), ), TestCase( "OAuth2 with nested group path (Keycloak)", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.keycloak.client_id=keycloak-id --set auth.oauth2.providers.keycloak.client_secret=keycloak-secret --set auth.oauth2.providers.keycloak.oidc_config_url=https://keycloak.example.com/realms/phoenix/.well-known/openid-configuration --set auth.oauth2.providers.keycloak.groups_attribute_path='resource_access.phoenix.roles' --set auth.oauth2.providers.keycloak.allowed_groups={admin,developer,viewer}", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_KEYCLOAK_GROUPS_ATTRIBUTE_PATH", "resource_access.phoenix.roles" ), ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_KEYCLOAK_ALLOWED_GROUPS", "admin,developer,viewer" ), OAuth2Validators.provider_groups_config( "keycloak", "resource_access.phoenix.roles", ["admin", "developer", "viewer"] ), ), ), TestCase( "OAuth2 comprehensive configuration", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.enterprise.client_id=enterprise-id --set auth.oauth2.providers.enterprise.client_secret=enterprise-secret --set auth.oauth2.providers.enterprise.oidc_config_url=https://sso.example.com/.well-known/openid-configuration --set auth.oauth2.providers.enterprise.display_name='Enterprise SSO' --set auth.oauth2.providers.enterprise.allow_sign_up=false --set auth.oauth2.providers.enterprise.auto_login=true --set auth.oauth2.providers.enterprise.use_pkce=true --set auth.oauth2.providers.enterprise.token_endpoint_auth_method=client_secret_post --set auth.oauth2.providers.enterprise.scopes='custom_scope' --set auth.oauth2.providers.enterprise.groups_attribute_path='groups' --set auth.oauth2.providers.enterprise.allowed_groups={engineering,ops}", OAuth2Validators.provider_comprehensive( provider="enterprise", client_id="enterprise-id", oidc_config_url="https://sso.example.com/.well-known/openid-configuration", has_client_secret=True, display_name="Enterprise SSO", allow_sign_up=False, auto_login=True, use_pkce=True, token_endpoint_auth_method="client_secret_post", scopes="custom_scope", groups_attribute_path="groups", allowed_groups=["engineering", "ops"], ), ), TestCase( "Auth secrets with valueFrom (secretKeyRef)", "--set auth.secret[0].key=PHOENIX_SECRET --set auth.secret[0].valueFrom.secretKeyRef.name=ext-secret --set auth.secret[0].valueFrom.secretKeyRef.key=secret-key", DeploymentValidators.env_from_secret("PHOENIX_SECRET", "ext-secret"), ), TestCase( "Auth secrets with valueFrom (configMapKeyRef)", "--set auth.secret[0].key=PHOENIX_CONFIG --set auth.secret[0].valueFrom.configMapKeyRef.name=ext-config --set auth.secret[0].valueFrom.configMapKeyRef.key=config-key", DeploymentValidators.env_from_configmap("PHOENIX_CONFIG", "ext-config"), ), TestCase( "Multiple secrets with valueFrom", "--set auth.secret[0].key=PHOENIX_SECRET --set auth.secret[0].valueFrom.secretKeyRef.name=ext-secret --set auth.secret[0].valueFrom.secretKeyRef.key=secret-key --set auth.secret[1].key=PHOENIX_ADMIN_SECRET --set auth.secret[1].valueFrom.secretKeyRef.name=ext-secret --set auth.secret[1].valueFrom.secretKeyRef.key=admin-key", minimum_resources, ), TestCase( "Mix of direct value and valueFrom secrets", "--set auth.secret[0].key=PHOENIX_SECRET --set auth.secret[0].value=direct-secret-value --set auth.secret[1].key=PHOENIX_ADMIN_SECRET --set auth.secret[1].valueFrom.secretKeyRef.name=ext-secret --set auth.secret[1].valueFrom.secretKeyRef.key=admin-key", minimum_resources, ), TestCase( "Secret excludes valueFrom entries (only direct values in Secret.data)", "--set auth.secret[0].key=DIRECT_SECRET --set auth.secret[0].value=direct123 --set auth.secret[1].key=REF_SECRET --set auth.secret[1].valueFrom.secretKeyRef.name=ext-secret --set auth.secret[1].valueFrom.secretKeyRef.key=ext-key", all_of( # Secret.data should only have DIRECT_SECRET (not REF_SECRET) SecretValidators.has_key("DIRECT_SECRET"), SecretValidators.not_has_key("REF_SECRET"), # Deployment should have REF_SECRET env var with valueFrom DeploymentValidators.env_from_secret("REF_SECRET", "ext-secret"), ), ), TestCase( "Admin password with valueFrom (external reference)", "--set auth.secret[0].key=PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD --set auth.secret[0].valueFrom.secretKeyRef.name=admin-secret --set auth.secret[0].valueFrom.secretKeyRef.key=admin-pass", all_of( # Secret.data should NOT have admin password (uses external reference) SecretValidators.not_has_key("PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD"), # Deployment should reference external admin-secret DeploymentValidators.env_from_secret( "PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD", "admin-secret" ), ), ), TestCase( "CORS and CSRF configuration", "--set 'auth.allowedOrigins={https://example.com,https://app.example.com}' --set 'auth.csrfTrustedOrigins={https://example.com}'", minimum_resources, ), TestCase( "Auth with basic auth disabled (SSO-only)", "--set auth.enableAuth=true --set auth.disableBasicAuth=true --set auth.oauth2.enabled=true --set auth.oauth2.providers.google.client_id=google-id --set auth.oauth2.providers.google.client_secret=google-secret --set auth.oauth2.providers.google.oidc_config_url=https://accounts.google.com/.well-known/openid-configuration", all_of( ConfigMapValidators.configmap_has_key("PHOENIX_DISABLE_BASIC_AUTH", "true"), AuthValidators.disable_basic_auth(True), ), ), TestCase( "Auth with custom cookies path", "--set auth.enableAuth=true --set auth.cookiesPath=/custom-path", all_of( ConfigMapValidators.configmap_has_key("PHOENIX_COOKIES_PATH", "/custom-path"), AuthValidators.cookies_path("/custom-path"), ), ), TestCase( "Auth with admin users configured", "--set auth.enableAuth=true --set auth.admins='John Doe=john@example.com;Jane Smith=jane@example.com'", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_ADMINS", "John Doe=john@example.com;Jane Smith=jane@example.com" ), AuthValidators.admins_configured( "John Doe=john@example.com;Jane Smith=jane@example.com" ), ), ), TestCase( "OAuth2 with role mapping configuration", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.keycloak.client_id=keycloak-id --set auth.oauth2.providers.keycloak.client_secret=keycloak-secret --set auth.oauth2.providers.keycloak.oidc_config_url=https://keycloak.example.com/.well-known/openid-configuration --set auth.oauth2.providers.keycloak.role_attribute_path='resource_access.phoenix.role' --set-string auth.oauth2.providers.keycloak.role_mapping=admin:ADMIN", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_KEYCLOAK_ROLE_ATTRIBUTE_PATH", "resource_access.phoenix.role" ), ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_KEYCLOAK_ROLE_MAPPING", "admin:ADMIN" ), OAuth2Validators.provider_role_mapping( "keycloak", "resource_access.phoenix.role", "admin:ADMIN" ), ), ), TestCase( "OAuth2 with strict role mapping", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.okta.client_id=okta-id --set auth.oauth2.providers.okta.client_secret=okta-secret --set auth.oauth2.providers.okta.oidc_config_url=https://okta.example.com/.well-known/openid-configuration --set auth.oauth2.providers.okta.role_attribute_path='role' --set-string auth.oauth2.providers.okta.role_mapping=Owner:ADMIN --set auth.oauth2.providers.okta.role_attribute_strict=true", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_OKTA_ROLE_ATTRIBUTE_STRICT", "true" ), OAuth2Validators.provider_role_mapping( "okta", "role", "Owner:ADMIN", role_attribute_strict=True ), ), ), # Ingress TestCase( "Ingress enabled (default)", "--set ingress.enabled=true --set ingress.host=phoenix.example.com", all_of(has_ingress, IngressValidators.host("phoenix.example.com")), ), TestCase("Ingress disabled", "--set ingress.enabled=false", no_ingress), TestCase( "Ingress with TLS enabled", "--set ingress.enabled=true --set ingress.host=phoenix.example.com --set ingress.tls.enabled=true --set ingress.tls.secretName=phoenix-tls", all_of(has_ingress, IngressValidators.has_tls()), ), TestCase( "Ingress with custom API path", "--set ingress.enabled=true --set ingress.host=phoenix.example.com --set ingress.apiPath=/custom-api", has_ingress, ), TestCase( "Ingress with custom annotations", "--set ingress.enabled=true --set ingress.host=phoenix.example.com --set 'ingress.annotations.nginx\\.ingress\\.kubernetes\\.io/ssl-redirect=\"true\"'", has_ingress, ), TestCase( "Ingress with exact path type", "--set ingress.enabled=true --set ingress.host=phoenix.example.com --set ingress.pathType=Exact", has_ingress, ), TestCase( "Ingress with custom className", "--set ingress.enabled=true --set ingress.host=phoenix.example.com --set ingress.className=nginx", all_of(has_ingress, IngressValidators.ingress_class_name("nginx")), ), # Resources TestCase( "Custom replica count (3 replicas)", "--set replicaCount=3", DeploymentValidators.replicas(3), ), TestCase( "Custom CPU and memory limits", "--set resources.limits.cpu=1 --set resources.limits.memory=2Gi --set resources.requests.cpu=500m --set resources.requests.memory=1Gi", all_of( DeploymentValidators.resource_limits("1", "2Gi"), DeploymentValidators.resource_requests("500m", "1Gi"), ), ), # TODO: HPA is not currently implemented in the helm chart TestCase( "HPA configuration", "--set autoscaling.enabled=true --set autoscaling.minReplicas=2 --set autoscaling.maxReplicas=10 --set autoscaling.targetCPUUtilizationPercentage=80", minimum_resources, # Chart doesn't create HPA yet ), TestCase( "Node selector configuration", "--set nodeSelector.disktype=ssd", minimum_resources, ), TestCase( "Tolerations configuration", "--set tolerations[0].key=dedicated --set tolerations[0].operator=Equal --set tolerations[0].value=phoenix --set tolerations[0].effect=NoSchedule", minimum_resources, ), TestCase( "Affinity rules", "--set affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[0].matchExpressions[0].key=kubernetes.io/hostname --set affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[0].matchExpressions[0].operator=In --set affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[0].matchExpressions[0].values[0]=node-1", minimum_resources, ), # Service TestCase( "Service type NodePort (default)", "", ServiceValidators.service_type("NodePort"), ), TestCase( "Service type ClusterIP", "--set service.type=ClusterIP", ServiceValidators.service_type("ClusterIP"), ), TestCase( "Service type LoadBalancer", "--set service.type=LoadBalancer", ServiceValidators.service_type("LoadBalancer"), ), TestCase( "Custom server ports", "--set server.port=8080 --set service.port=80", has_service ), TestCase( "Service with custom annotations", "--set 'service.annotations.service\\.beta\\.kubernetes\\.io/aws-load-balancer-type=nlb'", has_service, ), # TODO: Prometheus annotations are not currently implemented in the helm chart TestCase( "Prometheus metrics enabled", "--set server.enablePrometheus=true", minimum_resources, # Chart doesn't add prometheus annotations yet ), # Security # TODO: Pod security context settings are not currently respected by the helm chart TestCase( "Pod security context", "--set podSecurityContext.runAsUser=1000 --set podSecurityContext.runAsGroup=1000 --set podSecurityContext.fsGroup=1000", minimum_resources, # Chart doesn't support custom security context yet ), TestCase( "Container security context", "--set securityContext.runAsNonRoot=true --set securityContext.runAsUser=1000", minimum_resources, ), TestCase( "Read-only root filesystem with emptyDir volumes", "--set securityContext.container.enabled=true --set securityContext.container.readOnlyRootFilesystem=true", all_of(minimum_resources, VolumeValidators.readonly_filesystem_volumes()), ), TestCase( "Drop all capabilities", "--set securityContext.capabilities.drop[0]=ALL", minimum_resources, ), TestCase( "Custom service account", "--set serviceAccount.create=true --set serviceAccount.name=phoenix-sa", ServiceAccountValidators.name("phoenix-sa"), ), TestCase( "ServiceAccount with imagePullSecrets", "--set serviceAccount.create=true --set serviceAccount.name=phoenix-sa --set 'serviceAccount.imagePullSecrets[0]=regcred'", all_of( ServiceAccountValidators.name("phoenix-sa"), ServiceAccountValidators.has_image_pull_secret("regcred"), ), ), # ConfigMap and envFrom TestCase( "ConfigMap exists and is referenced via envFrom", "", all_of( ConfigMapValidators.configmap_exists("configmap"), ConfigMapValidators.has_env_from_configmap(), ConfigMapValidators.env_from_references_configmap("configmap"), ), ), TestCase( "ConfigMap contains required auth keys", "--set auth.enabled=true", all_of( ConfigMapValidators.configmap_has_key("PHOENIX_ENABLE_AUTH", "true"), ConfigMapValidators.configmap_has_key("PHOENIX_ALLOWED_ORIGINS"), ), ), TestCase( "ConfigMap contains required server keys", "--set server.port=8080 --set server.grpcPort=4318", all_of( ConfigMapValidators.configmap_has_key("PHOENIX_PORT", "8080"), ConfigMapValidators.configmap_has_key("PHOENIX_GRPC_PORT", "4318"), ConfigMapValidators.configmap_has_key("PHOENIX_WORKING_DIR"), ), ), TestCase( "ConfigMap with OAuth2 provider configuration", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.google.client_id=test-client --set auth.oauth2.providers.google.client_secret=test-secret --set auth.oauth2.providers.google.oidc_config_url=https://accounts.google.com/.well-known/openid-configuration", ConfigMapValidators.configmap_has_key("PHOENIX_OAUTH2_GOOGLE_CLIENT_ID", "test-client"), ), TestCase( "ConfigMap with database retention policy", "--set database.defaultRetentionPolicyDays=90", ConfigMapValidators.configmap_has_key("PHOENIX_DEFAULT_RETENTION_POLICY_DAYS", "90"), ), # SMTP Configuration TestCase( "SMTP with full configuration", "--set smtp.hostname=smtp.example.com --set smtp.port=587 --set smtp.username=phoenix --set smtp.mailFrom=noreply@example.com", all_of( SMTPValidators.smtp_config("smtp.example.com", "587", "phoenix"), SMTPValidators.smtp_mail_from("noreply@example.com"), ), ), TestCase( "SMTP with TLS validation disabled", "--set smtp.hostname=smtp.internal.local --set smtp.validateCerts=false", ConfigMapValidators.configmap_has_key("PHOENIX_SMTP_VALIDATE_CERTS", "false"), ), # TLS Configuration TestCase( "TLS enabled for HTTP only", "--set tls.enabled=true --set tls.enabledForHttp=true --set tls.certFile=/certs/tls.crt --set tls.keyFile=/certs/tls.key", all_of( TLSValidators.tls_enabled(http=True, grpc=False), TLSValidators.tls_files_configured(), ), ), TestCase( "TLS enabled for both HTTP and gRPC", "--set tls.enabled=true --set tls.enabledForHttp=true --set tls.enabledForGrpc=true --set tls.certFile=/certs/tls.crt --set tls.keyFile=/certs/tls.key", TLSValidators.tls_enabled(http=True, grpc=True), ), TestCase( "TLS with mutual TLS (mTLS)", "--set tls.enabled=true --set tls.enabledForHttp=true --set tls.certFile=/certs/tls.crt --set tls.keyFile=/certs/tls.key --set tls.caFile=/certs/ca.crt --set tls.verifyClient=true", all_of( TLSValidators.tls_enabled(http=True, grpc=False), ConfigMapValidators.configmap_has_key("PHOENIX_TLS_VERIFY_CLIENT", "true"), ), ), # Logging Configuration TestCase( "Structured logging with debug level", "--set logging.mode=structured --set logging.level=debug --set logging.dbLevel=info", LoggingValidators.logging_config("structured", "debug", "info"), ), TestCase( "Production logging configuration", "--set logging.mode=default --set logging.level=warning --set logging.dbLevel=error --set logging.logMigrations=false", all_of( LoggingValidators.logging_config("default", "warning", "error"), LoggingValidators.log_migrations(False), ), ), # Instrumentation/OTLP TestCase( "OTLP gRPC collector endpoint", "--set instrumentation.otlpTraceCollectorGrpcEndpoint=http://otel-collector:4317", InstrumentationValidators.otlp_endpoint(grpc_endpoint="http://otel-collector:4317"), ), TestCase( "OTLP HTTP collector endpoint", "--set instrumentation.otlpTraceCollectorHttpEndpoint=http://otel-collector:4318/v1/traces", InstrumentationValidators.otlp_endpoint( http_endpoint="http://otel-collector:4318/v1/traces" ), ), TestCase( "OTLP dual collectors (gRPC + HTTP)", "--set instrumentation.otlpTraceCollectorGrpcEndpoint=http://otel-collector:4317 --set instrumentation.otlpTraceCollectorHttpEndpoint=http://otel-collector:4318/v1/traces", InstrumentationValidators.otlp_endpoint( grpc_endpoint="http://otel-collector:4317", http_endpoint="http://otel-collector:4318/v1/traces", ), ), # Image Configuration TestCase( "Custom image registry and tag", "--set image.registry=gcr.io --set image.repository=myproject/phoenix --set image.tag=v1.2.3", ImageValidators.image_config("gcr.io", "myproject/phoenix", "v1.2.3"), ), TestCase( "Image pull policy Always", "--set image.pullPolicy=Always", ImageValidators.pull_policy("Always"), ), TestCase( "Private registry configuration", "--set image.registry=registry.example.com --set image.repository=phoenix/phoenix --set image.tag=latest --set image.pullPolicy=IfNotPresent", all_of( ImageValidators.image_config("registry.example.com", "phoenix/phoenix", "latest"), ImageValidators.pull_policy("IfNotPresent"), ), ), # Health Checks TestCase( "Startup probe disabled", "--set healthChecks.startupProbe.enabled=false", HealthCheckValidators.startup_probe_enabled(False), ), TestCase( "Custom liveness probe timing", "--set healthChecks.livenessProbe.initialDelaySeconds=30 --set healthChecks.livenessProbe.periodSeconds=20", HealthCheckValidators.probe_timing("liveness", 30, 20), ), TestCase( "Custom readiness probe timing", "--set healthChecks.readinessProbe.initialDelaySeconds=15 --set healthChecks.readinessProbe.periodSeconds=10", HealthCheckValidators.probe_timing("readiness", 15, 10), ), TestCase( "Aggressive startup probe", "--set healthChecks.startupProbe.initialDelaySeconds=1 --set healthChecks.startupProbe.periodSeconds=2 --set healthChecks.startupProbe.failureThreshold=60", HealthCheckValidators.probe_timing("startup", 1, 2), ), # Deployment Strategy TestCase( "Conservative rolling update", "--set deployment.strategy.rollingUpdate.maxUnavailable=1 --set deployment.strategy.rollingUpdate.maxSurge=1", DeploymentStrategyValidators.rolling_update("1", "1"), ), TestCase( "Aggressive rolling update", "--set deployment.strategy.rollingUpdate.maxUnavailable=50% --set deployment.strategy.rollingUpdate.maxSurge=100%", DeploymentStrategyValidators.rolling_update("50%", "100%"), ), TestCase( "Default rolling update strategy", "", DeploymentStrategyValidators.strategy_type("RollingUpdate"), ), # Server Configuration TestCase( "Server with reverse proxy paths", "--set server.hostRootPath=/phoenix --set server.rootUrl=https://example.com/phoenix", ServerConfigValidators.server_paths("/phoenix", "https://example.com/phoenix"), ), TestCase( "Air-gapped deployment (no external resources)", "--set server.allowExternalResources=false", ServerConfigValidators.allow_external_resources(False), ), TestCase( "Server with custom host binding", "--set server.host=0.0.0.0", ConfigMapValidators.configmap_has_key("PHOENIX_HOST", "0.0.0.0"), ), TestCase( "Server with custom max spans queue size", "--set server.maxSpansQueueSize=50000", all_of( ConfigMapValidators.configmap_has_key("PHOENIX_MAX_SPANS_QUEUE_SIZE", "50000"), ServerConfigValidators.max_spans_queue_size(50000), ), ), TestCase( "Server with default max spans queue size", "", all_of( ConfigMapValidators.configmap_has_key("PHOENIX_MAX_SPANS_QUEUE_SIZE", "20000"), ServerConfigValidators.max_spans_queue_size(20000), ), ), # Extra Volumes and Mounts TestCase( "Extra ConfigMap volume", "--set 'extraVolumes[0].name=custom-config' --set 'extraVolumes[0].configMap.name=my-config' --set 'extraVolumeMounts[0].name=custom-config' --set 'extraVolumeMounts[0].mountPath=/etc/custom'", all_of( VolumeValidators.has_extra_volume("custom-config"), VolumeValidators.has_volume_mount("custom-config", "/etc/custom"), ), ), TestCase( "Extra emptyDir volume for cache", "--set 'extraVolumes[0].name=cache-volume' --set 'extraVolumes[0].emptyDir={}' --set 'extraVolumeMounts[0].name=cache-volume' --set 'extraVolumeMounts[0].mountPath=/app/cache'", all_of( VolumeValidators.has_extra_volume("cache-volume"), VolumeValidators.has_volume_mount("cache-volume", "/app/cache"), ), ), # Storage TestCase( "Persistence enabled with PVC", "--set postgresql.enabled=false --set persistence.enabled=true --set persistence.size=10Gi", all_of(has_pvc, StorageValidators.pvc_size("10Gi")), ), TestCase( "Custom storage class", "--set postgresql.enabled=false --set persistence.enabled=true --set persistence.storageClass=fast-ssd", all_of(has_pvc, StorageValidators.storage_class_name("fast-ssd")), ), TestCase( "PVC with dynamic provisioning (storageClass: -)", "--set postgresql.enabled=false --set persistence.enabled=true --set 'persistence.storageClass=-'", all_of(has_pvc, StorageValidators.storage_class_name("")), ), TestCase( "Custom working directory", "--set persistence.mountPath=/data/phoenix --set server.workingDir=/data/phoenix", DeploymentValidators.working_dir("/data/phoenix"), ), TestCase( "ReadWriteMany access mode", "--set postgresql.enabled=false --set persistence.enabled=true --set persistence.accessMode=ReadWriteMany", has_pvc, ), # Scenarios TestCase( "Dev environment: minimal setup", "--set postgresql.enabled=false --set persistence.inMemory=true --set ingress.enabled=false --set replicaCount=1", all_of(minimum_resources, no_ingress, DeploymentValidators.replicas(1)), ), TestCase( "Staging: PostgreSQL + Ingress", "--set postgresql.enabled=true --set replicaCount=1 --set resources.limits.cpu=500m --set resources.limits.memory=1Gi --set ingress.enabled=true --set ingress.host=staging.phoenix.example.com", all_of( PostgreSQLValidators.is_enabled(), has_ingress, DeploymentValidators.resource_limits("500m", "1Gi"), ), ), TestCase( "Production (AWS): External RDS + LoadBalancer", "--set postgresql.enabled=false --set database.url=postgresql://user:pass@rds.aws.com:5432/phoenix --set service.type=LoadBalancer --set replicaCount=3 --set autoscaling.enabled=true --set ingress.enabled=true --set ingress.tls.enabled=true --set server.enablePrometheus=true --set database.postgres.host=phoenix-postgresql", all_of( no_postgresql, ServiceValidators.service_type("LoadBalancer"), DeploymentValidators.replicas(3), has_ingress, IngressValidators.has_tls(), ), ), TestCase( "Production (GCP): Comprehensive secure setup", "--set postgresql.enabled=false --set database.url=postgresql://user:pass@cloudsql:5432/phoenix --set ingress.enabled=true --set ingress.host=phoenix.prod.example.com --set ingress.tls.enabled=true --set tls.enabled=true --set tls.enabledForHttp=true --set tls.enabledForGrpc=true --set tls.certFile=/certs/tls.crt --set tls.keyFile=/certs/tls.key --set logging.mode=structured --set logging.level=info --set instrumentation.otlpTraceCollectorGrpcEndpoint=http://otel:4317 --set smtp.hostname=smtp.sendgrid.net --set smtp.port=587 --set replicaCount=5 --set database.postgres.host=phoenix-postgresql", all_of( no_postgresql, has_ingress, IngressValidators.has_tls(), TLSValidators.tls_enabled(http=True, grpc=True), LoggingValidators.logging_config("structured", "info", "warning"), DeploymentValidators.replicas(5), ), ), TestCase( "Enterprise: OAuth2 + SMTP + OTLP + Custom Resources", "--set auth.oauth2.enabled=true --set auth.oauth2.providers.google.client_id=google-id --set auth.oauth2.providers.google.client_secret=google-secret --set auth.oauth2.providers.google.oidc_config_url=https://accounts.google.com/.well-known/openid-configuration --set smtp.hostname=smtp.office365.com --set smtp.port=587 --set smtp.username=phoenix@company.com --set instrumentation.otlpTraceCollectorGrpcEndpoint=http://jaeger:4317 --set resources.limits.cpu=2000m --set resources.limits.memory=4Gi --set server.hostRootPath=/observability/phoenix --set server.rootUrl=https://tools.company.com/observability/phoenix", all_of( ConfigMapValidators.configmap_has_key( "PHOENIX_OAUTH2_GOOGLE_CLIENT_ID", "google-id" ), SMTPValidators.smtp_config("smtp.office365.com", "587", "phoenix@company.com"), InstrumentationValidators.otlp_endpoint(grpc_endpoint="http://jaeger:4317"), DeploymentValidators.resource_limits("2000m", "4Gi"), ServerConfigValidators.server_paths( "/observability/phoenix", "https://tools.company.com/observability/phoenix" ), ), ), TestCase( "Air-gapped secure deployment", "--set server.allowExternalResources=false --set ingress.enabled=false --set service.type=ClusterIP --set image.registry=internal-registry.company.local --set image.repository=phoenix/phoenix --set image.tag=v12.6.0 --set logging.mode=structured --set logging.level=warning", all_of( ServerConfigValidators.allow_external_resources(False), no_ingress, ServiceValidators.service_type("ClusterIP"), ImageValidators.image_config( "internal-registry.company.local", "phoenix/phoenix", "v12.6.0" ), LoggingValidators.logging_config("structured", "warning", "warning"), ), ), # Naming Overrides (nameOverride and fullnameOverride) TestCase( "nameOverride: custom name for resources", "--set nameOverride=custom-phoenix", all_of( # When nameOverride is set, fullname becomes {release-name}-{nameOverride} # For release name "test-release", fullname should be "test-release-custom-phoenix" NamingValidators.deployment_name("test-release-custom-phoenix"), NamingValidators.service_name("test-release-custom-phoenix-svc"), NamingValidators.configmap_name("test-release-custom-phoenix-configmap"), ), ), TestCase( "fullnameOverride: complete custom name override", "--set fullnameOverride=my-app", all_of( # When fullnameOverride is set, all resources use exactly that name NamingValidators.deployment_name("my-app"), NamingValidators.service_name("my-app-svc"), NamingValidators.configmap_name("my-app-configmap"), ), ), TestCase( "fullnameOverride with persistence", "--set postgresql.enabled=false --set persistence.enabled=true --set fullnameOverride=custom-phoenix-app", all_of( NamingValidators.all_resources_with_fullname("custom-phoenix-app"), NamingValidators.pvc_name("custom-phoenix-app-data-pvc"), has_pvc, ), ), TestCase( "nameOverride as subchart simulation", "--set nameOverride=observability", all_of( # Simulates being used as a subchart - resources should be prefixed properly NamingValidators.deployment_name("test-release-observability"), NamingValidators.service_name("test-release-observability-svc"), ), ), # Additional Environment Variables TestCase( "additionalEnv: single environment variable", "--set 'additionalEnv[0].name=CUSTOM_VAR' --set 'additionalEnv[0].value=custom-value'", DeploymentValidators.has_additional_env("CUSTOM_VAR", "custom-value"), ), TestCase( "additionalEnv: multiple environment variables", "--set 'additionalEnv[0].name=VAR_ONE' --set 'additionalEnv[0].value=value1' --set 'additionalEnv[1].name=VAR_TWO' --set 'additionalEnv[1].value=value2'", all_of( DeploymentValidators.has_additional_env("VAR_ONE", "value1"), DeploymentValidators.has_additional_env("VAR_TWO", "value2"), ), ), TestCase( "additionalEnv: environment variable from secret", "--set 'additionalEnv[0].name=SECRET_KEY' --set 'additionalEnv[0].valueFrom.secretKeyRef.name=my-secret' --set 'additionalEnv[0].valueFrom.secretKeyRef.key=secret-key'", DeploymentValidators.has_additional_env_from_secret( "SECRET_KEY", "my-secret", "secret-key" ), ), TestCase( "additionalEnv: environment variable from configMap", "--set 'additionalEnv[0].name=CONFIG_KEY' --set 'additionalEnv[0].valueFrom.configMapKeyRef.name=my-config' --set 'additionalEnv[0].valueFrom.configMapKeyRef.key=config-key'", DeploymentValidators.has_additional_env("CONFIG_KEY"), ), TestCase( "additionalEnv: mixed direct values and references", "--set 'additionalEnv[0].name=DIRECT_VAR' --set 'additionalEnv[0].value=direct123' --set 'additionalEnv[1].name=SECRET_VAR' --set 'additionalEnv[1].valueFrom.secretKeyRef.name=ext-secret' --set 'additionalEnv[1].valueFrom.secretKeyRef.key=ext-key'", all_of( DeploymentValidators.has_additional_env("DIRECT_VAR", "direct123"), DeploymentValidators.has_additional_env_from_secret( "SECRET_VAR", "ext-secret", "ext-key" ), ), ), ] # ============================================================================ # Main Entry Point # ============================================================================ async def main(): """Create and run the Helm tester.""" script_dir = Path(__file__).parent project_root = script_dir.parent.parent chart_dir = project_root / "helm" test_cases = get_test_suite() tester = HelmTester(chart_dir) return await tester.run(test_cases) if __name__ == "__main__": sys.exit(asyncio.run(main()))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server