We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/detailobsessed/unblu-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Tests for the Kubernetes connection provider."""
import socket
import subprocess
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pytest_subprocess import FakeProcess
from unblu_mcp._internal.exceptions import ConfigurationError
from unblu_mcp._internal.providers_k8s import (
K8sConnectionProvider,
K8sEnvironmentConfig,
_get_default_environments,
detect_environment_from_context,
)
# Test environments for use in tests
TEST_ENVIRONMENTS = {
"dev": K8sEnvironmentConfig(name="dev", local_port=8084, namespace="unblu-dev"),
"test1": K8sEnvironmentConfig(name="test1", local_port=8085, namespace="unblu-test1"),
"test2": K8sEnvironmentConfig(name="test2", local_port=8087, namespace="unblu-test2"),
"prod": K8sEnvironmentConfig(name="prod", local_port=8086, namespace="unblu-prod"),
}
class TestK8sEnvironmentConfig:
"""Tests for K8sEnvironmentConfig dataclass."""
def test_default_values(self) -> None:
"""Config has sensible defaults."""
config = K8sEnvironmentConfig(name="test", local_port=8080, namespace="test-ns")
assert config.service == "haproxy"
assert config.service_port == 8080
assert config.api_path == "/app/rest/v4"
def test_custom_values(self) -> None:
"""Config accepts custom values."""
config = K8sEnvironmentConfig(
name="custom",
local_port=9000,
namespace="custom-ns",
service="nginx",
service_port=443,
api_path="/api/v1",
)
assert config.name == "custom"
assert config.local_port == 9000
assert config.namespace == "custom-ns"
assert config.service == "nginx"
assert config.service_port == 443
assert config.api_path == "/api/v1"
class TestGetDefaultEnvironments:
"""Tests for environment loading from config files."""
def test_loads_environments_from_example_config(self) -> None:
"""Loads environments from example config when user config doesn't exist."""
environments = _get_default_environments()
# Should load from k8s_environments.example.yaml
assert len(environments) > 0
def test_environments_have_unique_ports(self) -> None:
"""Each environment has a unique local port."""
environments = _get_default_environments()
if environments: # Only test if environments are loaded
ports = [env.local_port for env in environments.values()]
assert len(ports) == len(set(ports))
def test_environment_configs_are_valid(self) -> None:
"""All loaded environments have required fields."""
environments = _get_default_environments()
for name, config in environments.items():
assert config.name == name
assert config.local_port > 0
assert config.namespace
class TestK8sConnectionProvider:
"""Tests for K8sConnectionProvider."""
def test_init_with_string_environment(self) -> None:
"""Provider initializes with string environment name."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
assert provider.environment == "dev"
assert provider.local_port == 8084
def test_init_with_config_object(self) -> None:
"""Provider initializes with K8sEnvironmentConfig object."""
config = K8sEnvironmentConfig(name="custom", local_port=9999, namespace="custom-ns")
provider = K8sConnectionProvider(environment=config)
assert provider.environment == "custom"
assert provider.local_port == 9999
def test_init_unknown_environment_raises(self) -> None:
"""Provider raises ValueError for unknown environment."""
with pytest.raises(ValueError, match="Unknown environment 'invalid'"):
K8sConnectionProvider(environment="invalid", environments=TEST_ENVIRONMENTS)
def test_init_custom_environments(self) -> None:
"""Provider accepts custom environment configurations."""
custom_envs = {
"dev": K8sEnvironmentConfig(name="dev", local_port=8000, namespace="dev-ns"),
}
provider = K8sConnectionProvider(environment="dev", environments=custom_envs)
assert provider.environment == "dev"
assert provider.local_port == 8000
def test_get_config_returns_connection_config(self) -> None:
"""get_config returns properly configured ConnectionConfig."""
provider = K8sConnectionProvider(
environment="dev",
trusted_user_id="testuser",
trusted_user_role="ADMIN",
environments=TEST_ENVIRONMENTS,
)
config = provider.get_config()
assert config.base_url == "http://localhost:8084/app/rest/v4"
assert config.headers["x-unblu-trusted-user-id"] == "testuser"
assert config.headers["x-unblu-trusted-user-role"] == "ADMIN"
def test_get_config_default_trusted_headers(self) -> None:
"""get_config uses default trusted headers."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
config = provider.get_config()
assert config.headers["x-unblu-trusted-user-id"] == "superadmin"
assert config.headers["x-unblu-trusted-user-role"] == "SUPER_ADMIN"
@pytest.mark.asyncio
async def test_setup_port_already_in_use(self) -> None:
"""setup() reuses existing port if port is already in use."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
with patch.object(provider, "_is_port_in_use", return_value=True):
await provider.setup()
# Should not start a new process and should not own the port-forward
assert provider._port_forward_process is None
assert provider._owns_port_forward is False
@pytest.mark.asyncio
async def test_setup_kubectl_not_found(self) -> None:
"""setup() raises ConfigurationError if kubectl not found."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
with (
patch.object(provider, "_is_port_in_use", return_value=False),
patch("shutil.which", return_value=None),
pytest.raises(ConfigurationError, match="kubectl not found"),
):
await provider.setup()
@pytest.mark.asyncio
async def test_setup_kubectl_auth_failure(self, fp: FakeProcess) -> None:
"""setup() raises ConfigurationError if kubectl is not authenticated."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
# Register auth check failure
fp.register(
["kubectl", "auth", "can-i", "get", "pods", "-n", "unblu-dev"],
returncode=1,
stderr="error: You must be logged in to the server",
)
with (
patch.object(provider, "_is_port_in_use", return_value=False),
patch("shutil.which", return_value="/usr/bin/kubectl"),
pytest.raises(ConfigurationError, match="kubectl is not authenticated"),
):
await provider.setup()
@pytest.mark.asyncio
async def test_setup_port_forward_fails_early(self, fp: FakeProcess) -> None:
"""setup() raises ConfigurationError with stderr if port-forward process dies early."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
# Register successful auth check
fp.register(
["kubectl", "auth", "can-i", "get", "pods", "-n", "unblu-dev"],
returncode=0,
)
# Register failing port-forward
fp.register(
["kubectl", "port-forward", "-n", "unblu-dev", "svc/haproxy", "8084:8080"],
returncode=1,
stderr="error: services 'haproxy' not found",
)
with (
patch.object(provider, "_is_port_in_use", return_value=False),
patch("shutil.which", return_value="/usr/bin/kubectl"),
patch("asyncio.sleep", new_callable=AsyncMock),
pytest.raises(ConfigurationError, match="kubectl port-forward failed"),
):
await provider.setup()
@pytest.mark.asyncio
async def test_setup_starts_port_forward(self, fp: FakeProcess) -> None:
"""setup() starts kubectl port-forward process when port is not in use."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
# First call returns False (port not in use), subsequent calls return True (port ready)
port_check_results = [False, True]
# Register successful auth check
fp.register(
["kubectl", "auth", "can-i", "get", "pods", "-n", "unblu-dev"],
returncode=0,
)
# Register port-forward (keeps running)
fp.register(
["kubectl", "port-forward", "-n", "unblu-dev", "svc/haproxy", "8084:8080"],
returncode=0,
)
with (
patch.object(provider, "_is_port_in_use", side_effect=port_check_results),
patch("shutil.which", return_value="/usr/bin/kubectl"),
):
await provider.setup()
assert provider._owns_port_forward is True
# Verify the port-forward command was called
assert fp.call_count(["kubectl", "port-forward", fp.any()]) == 1
@pytest.mark.asyncio
async def test_setup_timeout_kills_process(self, fp: FakeProcess) -> None:
"""setup() kills process and raises if port never becomes available."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
# Register successful auth check
fp.register(
["kubectl", "auth", "can-i", "get", "pods", "-n", "unblu-dev"],
returncode=0,
)
# Register port-forward that keeps running (simulated by callback)
fp.register(
["kubectl", "port-forward", "-n", "unblu-dev", "svc/haproxy", "8084:8080"],
returncode=0,
wait=0.1, # Small delay to simulate running process
)
with (
patch.object(provider, "_is_port_in_use", return_value=False),
patch("shutil.which", return_value="/usr/bin/kubectl"),
patch("asyncio.sleep", new_callable=AsyncMock),
pytest.raises(ConfigurationError, match="Port-forward timed out"),
):
await provider.setup()
@pytest.mark.asyncio
async def test_teardown_terminates_process_when_owner(self) -> None:
"""teardown() terminates the port-forward process only when we own it."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
mock_process = MagicMock()
provider._port_forward_process = mock_process
provider._owns_port_forward = True
await provider.teardown()
mock_process.terminate.assert_called_once()
mock_process.wait.assert_called_once_with(timeout=5)
assert provider._port_forward_process is None
@pytest.mark.asyncio
async def test_teardown_kills_on_timeout(self) -> None:
"""teardown() kills process if terminate times out."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
mock_process = MagicMock()
mock_process.wait.side_effect = subprocess.TimeoutExpired(cmd="kubectl", timeout=5)
provider._port_forward_process = mock_process
provider._owns_port_forward = True
await provider.teardown()
mock_process.terminate.assert_called_once()
mock_process.kill.assert_called_once()
assert provider._port_forward_process is None
@pytest.mark.asyncio
async def test_teardown_no_process_when_owner(self) -> None:
"""teardown() handles case where no process was started."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
provider._port_forward_process = None
provider._owns_port_forward = True
# Should not raise
await provider.teardown()
@pytest.mark.asyncio
async def test_teardown_skips_cleanup_when_not_owner(self) -> None:
"""teardown() skips cleanup when we don't own the port-forward."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
mock_process = MagicMock()
provider._port_forward_process = mock_process
provider._owns_port_forward = False
await provider.teardown()
# Should not terminate the process since we don't own it
mock_process.terminate.assert_not_called()
# Process reference should remain (it's owned by another instance)
assert provider._port_forward_process == mock_process
@pytest.mark.asyncio
async def test_setup_reuses_existing_port(self) -> None:
"""setup() reuses existing port-forward when port is already in use."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
with patch.object(provider, "_is_port_in_use", return_value=True):
await provider.setup()
assert provider._owns_port_forward is False
assert provider._port_forward_process is None
@pytest.mark.asyncio
async def test_health_check_delegates_to_port_check(self) -> None:
"""health_check() returns port availability status."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
with patch.object(provider, "_is_port_in_use", return_value=True):
assert await provider.health_check() is True
with patch.object(provider, "_is_port_in_use", return_value=False):
assert await provider.health_check() is False
def test_is_port_in_use_available_port(self) -> None:
"""_is_port_in_use returns False for available port."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
# Use a high port that's unlikely to be in use
provider._env_config = K8sEnvironmentConfig(name="test", local_port=59999, namespace="test")
assert provider._is_port_in_use() is False
def test_is_port_in_use_bound_port(self) -> None:
"""_is_port_in_use returns True for bound port."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
# Bind a port temporarily
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("localhost", 0))
port = s.getsockname()[1]
s.listen(1)
provider._env_config = K8sEnvironmentConfig(name="test", local_port=port, namespace="test")
assert provider._is_port_in_use() is True
@pytest.mark.asyncio
async def test_ensure_connection_does_nothing_when_port_available(self) -> None:
"""ensure_connection() does nothing when port is already available."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
with patch.object(provider, "_is_port_in_use", return_value=True):
await provider.ensure_connection()
# Should not have started any process
assert provider._port_forward_process is None
@pytest.mark.asyncio
async def test_ensure_connection_restarts_dead_port_forward(self, fp: FakeProcess) -> None:
"""ensure_connection() restarts port-forward when our process died."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
provider._owns_port_forward = True
# Simulate a dead process
mock_process = MagicMock()
mock_process.poll.return_value = 1 # Process exited with code 1
provider._port_forward_process = mock_process
# Port check returns False (not available), then True (after restart)
port_check_results = [False, True]
# Register successful auth check
fp.register(
["kubectl", "auth", "can-i", "get", "pods", "-n", "unblu-dev"],
returncode=0,
)
# Register port-forward
fp.register(
["kubectl", "port-forward", "-n", "unblu-dev", "svc/haproxy", "8084:8080"],
returncode=0,
)
with (
patch.object(provider, "_is_port_in_use", side_effect=port_check_results),
patch("shutil.which", return_value="/usr/bin/kubectl"),
):
await provider.ensure_connection()
# Should have started a new port-forward
assert provider._owns_port_forward is True
assert fp.call_count(["kubectl", "port-forward", fp.any()]) == 1
@pytest.mark.asyncio
async def test_ensure_connection_kills_malfunctioning_process(self, fp: FakeProcess) -> None:
"""ensure_connection() kills alive but malfunctioning port-forward."""
provider = K8sConnectionProvider(environment="dev", environments=TEST_ENVIRONMENTS)
provider._owns_port_forward = True
# Simulate alive but malfunctioning process (port not available)
mock_process = MagicMock()
mock_process.poll.return_value = None # Process is alive
provider._port_forward_process = mock_process
# Port check returns False (not available), then True (after restart)
port_check_results = [False, True]
# Register successful auth check
fp.register(
["kubectl", "auth", "can-i", "get", "pods", "-n", "unblu-dev"],
returncode=0,
)
# Register port-forward
fp.register(
["kubectl", "port-forward", "-n", "unblu-dev", "svc/haproxy", "8084:8080"],
returncode=0,
)
with (
patch.object(provider, "_is_port_in_use", side_effect=port_check_results),
patch("shutil.which", return_value="/usr/bin/kubectl"),
):
await provider.ensure_connection()
# Should have killed the malfunctioning process
mock_process.kill.assert_called_once()
mock_process.wait.assert_called_once_with(timeout=5)
# Should have started a new port-forward
assert provider._owns_port_forward is True
assert fp.call_count(["kubectl", "port-forward", fp.any()]) == 1
class TestDetectEnvironmentFromContext:
"""Tests for detect_environment_from_context function."""
def test_detects_dev_environment(self) -> None:
"""Detects dev from context name."""
with (
patch("subprocess.run") as mock_run,
patch(
"unblu_mcp._internal.providers_k8s._get_default_environments",
return_value=TEST_ENVIRONMENTS,
),
):
mock_run.return_value = MagicMock(stdout="cluster-dev-context\n")
assert detect_environment_from_context() == "dev"
def test_detects_environment_with_suffix(self) -> None:
"""Detects environment from context ending with env name."""
with (
patch("subprocess.run") as mock_run,
patch(
"unblu_mcp._internal.providers_k8s._get_default_environments",
return_value=TEST_ENVIRONMENTS,
),
):
mock_run.return_value = MagicMock(stdout="my-cluster-prod\n")
assert detect_environment_from_context() == "prod"
def test_detects_environment_with_dash_pattern(self) -> None:
"""Detects environment from -env- pattern in context."""
with (
patch("subprocess.run") as mock_run,
patch(
"unblu_mcp._internal.providers_k8s._get_default_environments",
return_value=TEST_ENVIRONMENTS,
),
):
mock_run.return_value = MagicMock(stdout="prefix-test1-suffix\n")
assert detect_environment_from_context() == "test1"
def test_returns_none_for_unknown_context(self) -> None:
"""Returns None if context doesn't match any environment."""
with (
patch("subprocess.run") as mock_run,
patch(
"unblu_mcp._internal.providers_k8s._get_default_environments",
return_value=TEST_ENVIRONMENTS,
),
):
mock_run.return_value = MagicMock(stdout="unknown-cluster\n")
assert detect_environment_from_context() is None
def test_returns_none_on_subprocess_error(self) -> None:
"""Returns None if kubectl command fails."""
with patch("subprocess.run") as mock_run:
mock_run.side_effect = subprocess.CalledProcessError(1, "kubectl")
assert detect_environment_from_context() is None
def test_returns_none_if_kubectl_not_found(self) -> None:
"""Returns None if kubectl is not installed."""
with patch("subprocess.run") as mock_run:
mock_run.side_effect = FileNotFoundError()
assert detect_environment_from_context() is None