We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from typing import Any, Generator, Optional
from unittest.mock import MagicMock, Mock, patch
from urllib.parse import urlparse
import pytest
from opentelemetry import trace as trace_api
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import BatchSpanProcessor as _BatchSpanProcessor
from opentelemetry.sdk.trace.export import SimpleSpanProcessor as _SimpleSpanProcessor
from opentelemetry.sdk.trace.export import SpanExporter
from phoenix.otel.otel import (
PROJECT_NAME,
BatchSpanProcessor,
GRPCSpanExporter,
HTTPSpanExporter,
OTLPTransportProtocol,
SimpleSpanProcessor,
TracerProvider,
_construct_phoenix_cloud_endpoint,
register,
)
def _get_exporter_from_processor(span_processor: Any) -> Optional[SpanExporter]:
"""
Helper function to get the exporter from a span processor.
Handles both old and new OpenTelemetry versions.
OpenTelemetry v1.34.0+ moved exporter from span_exporter to _batch_processor._exporter
"""
return getattr(getattr(span_processor, "_batch_processor", None), "_exporter", None) or getattr(
span_processor, "span_exporter", None
)
@pytest.fixture(autouse=True)
def reset_tracer_provider() -> Generator[None, None, None]:
"""Reset OpenTelemetry tracer provider for test isolation."""
with patch.dict("os.environ", {}, clear=True):
with patch("phoenix.otel.otel.get_env_grpc_port", return_value=4317):
yield
class TestRegister:
def test_register_basic(self) -> None:
tracer_provider = register(verbose=False, set_global_tracer_provider=False)
assert isinstance(tracer_provider, TracerProvider)
assert tracer_provider._default_processor
processors = tracer_provider._active_span_processor._span_processors
assert len(processors) == 1
assert isinstance(processors[0], _SimpleSpanProcessor)
exporter = _get_exporter_from_processor(processors[0])
assert isinstance(exporter, GRPCSpanExporter)
def test_register_with_project_name(self) -> None:
project_name = "test-project"
tracer_provider = register(
project_name=project_name, verbose=False, set_global_tracer_provider=False
)
assert tracer_provider.resource.attributes.get(PROJECT_NAME) == project_name
def test_register_with_batch_processor(self) -> None:
tracer_provider = register(batch=True, verbose=False, set_global_tracer_provider=False)
processors = tracer_provider._active_span_processor._span_processors
assert len(processors) == 1
assert isinstance(processors[0], _BatchSpanProcessor)
exporter = _get_exporter_from_processor(processors[0])
assert isinstance(exporter, GRPCSpanExporter)
def test_register_with_simple_processor(self) -> None:
tracer_provider = register(batch=False, verbose=False, set_global_tracer_provider=False)
processors = tracer_provider._active_span_processor._span_processors
assert len(processors) == 1
assert isinstance(processors[0], _SimpleSpanProcessor)
def test_register_without_global_tracer(self) -> None:
tracer_provider = register(set_global_tracer_provider=False, verbose=False)
assert trace_api.get_tracer_provider() != tracer_provider
assert isinstance(tracer_provider, TracerProvider)
def test_register_with_http_endpoint(self) -> None:
endpoint = "http://custom-endpoint:4318/v1/traces"
tracer_provider = register(
endpoint=endpoint, verbose=False, set_global_tracer_provider=False
)
processors = tracer_provider._active_span_processor._span_processors
assert len(processors) == 1
exporter = _get_exporter_from_processor(processors[0])
assert isinstance(exporter, HTTPSpanExporter)
assert exporter._endpoint == endpoint
def test_register_with_grpc_endpoint(self) -> None:
endpoint = "grpc://custom-endpoint:4317"
tracer_provider = register(
endpoint=endpoint, verbose=False, set_global_tracer_provider=False
)
processors = tracer_provider._active_span_processor._span_processors
assert len(processors) == 1
exporter = _get_exporter_from_processor(processors[0])
assert isinstance(exporter, GRPCSpanExporter)
@patch("phoenix.otel.otel.get_env_client_headers")
def test_register_with_headers(self, mock_env_headers: Any) -> None:
mock_env_headers.return_value = None
headers = {"Authorization": "Bearer token123"}
tracer_provider = register(headers=headers, verbose=False, set_global_tracer_provider=False)
processors = tracer_provider._active_span_processor._span_processors
exporter = _get_exporter_from_processor(processors[0])
assert "authorization" in [h[0].lower() for h in exporter._headers]
def test_register_with_http_protocol(self) -> None:
tracer_provider = register(
protocol="http/protobuf", verbose=False, set_global_tracer_provider=False
)
processors = tracer_provider._active_span_processor._span_processors
exporter = _get_exporter_from_processor(processors[0])
assert isinstance(exporter, HTTPSpanExporter)
def test_register_with_grpc_protocol(self) -> None:
tracer_provider = register(protocol="grpc", verbose=False, set_global_tracer_provider=False)
processors = tracer_provider._active_span_processor._span_processors
exporter = _get_exporter_from_processor(processors[0])
assert isinstance(exporter, GRPCSpanExporter)
@patch("phoenix.otel.otel._auto_instrument_installed_openinference_libraries")
def test_register_with_auto_instrument(self, mock_auto_instrument: Any) -> None:
tracer_provider = register(
auto_instrument=True, verbose=False, set_global_tracer_provider=False
)
mock_auto_instrument.assert_called_once_with(tracer_provider)
@patch("builtins.print")
def test_register_verbose_output(self, mock_print: Any) -> None:
register(verbose=True, set_global_tracer_provider=False)
mock_print.assert_called()
output = str(mock_print.call_args)
assert "OpenTelemetry Tracing Details" in output
def test_register_with_custom_resource_no_project_name(self) -> None:
custom_resource = Resource.create(
{"service.name": "my-service", "service.version": "1.0.0"}
)
with patch("phoenix.otel.otel.get_env_project_name", return_value="env-project"):
tracer_provider = register(
resource=custom_resource, verbose=False, set_global_tracer_provider=False
)
assert tracer_provider.resource.attributes.get("service.name") == "my-service"
assert tracer_provider.resource.attributes.get("service.version") == "1.0.0"
assert tracer_provider.resource.attributes.get(PROJECT_NAME) == "env-project"
def test_register_with_custom_resource_and_project_name(self) -> None:
custom_resource = Resource.create(
{"service.name": "my-service", "service.version": "1.0.0"}
)
tracer_provider = register(
project_name="explicit-project",
resource=custom_resource,
verbose=False,
set_global_tracer_provider=False,
)
assert tracer_provider.resource.attributes.get("service.name") == "my-service"
assert tracer_provider.resource.attributes.get("service.version") == "1.0.0"
assert tracer_provider.resource.attributes.get(PROJECT_NAME) == "explicit-project"
def test_register_with_custom_resource_overrides_project_name(self) -> None:
custom_resource = Resource.create(
{"service.name": "my-service", PROJECT_NAME: "resource-project"}
)
tracer_provider = register(
project_name="explicit-project",
resource=custom_resource,
verbose=False,
set_global_tracer_provider=False,
)
assert tracer_provider.resource.attributes.get("service.name") == "my-service"
assert tracer_provider.resource.attributes.get(PROJECT_NAME) == "explicit-project"
def test_register_passes_through_kwargs_to_tracer_provider(self) -> None:
from opentelemetry.sdk.trace.id_generator import IdGenerator
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF
mock_id_generator = Mock(spec=IdGenerator)
tracer_provider = register(
project_name="test-project",
sampler=ALWAYS_OFF,
id_generator=mock_id_generator,
verbose=False,
set_global_tracer_provider=False,
)
assert tracer_provider.sampler == ALWAYS_OFF
assert tracer_provider.id_generator == mock_id_generator
def test_register_with_custom_id_generator(self) -> None:
from opentelemetry.sdk.trace.id_generator import IdGenerator
class CustomIdGenerator(IdGenerator):
def generate_span_id(self) -> int:
return 0x1234567890ABCDEF
def generate_trace_id(self) -> int:
return 0x12345678901234567890123456789012
custom_id_gen = CustomIdGenerator()
tracer_provider = register(
project_name="test-project",
id_generator=custom_id_gen,
verbose=False,
set_global_tracer_provider=False,
)
assert tracer_provider.id_generator is custom_id_gen
def test_register_with_span_limits(self) -> None:
from opentelemetry.sdk.trace import SpanLimits
custom_limits = SpanLimits(max_attributes=50, max_events=20, max_links=10)
tracer_provider = register(
project_name="test-project",
span_limits=custom_limits,
verbose=False,
set_global_tracer_provider=False,
)
assert tracer_provider._span_limits == custom_limits
assert tracer_provider._span_limits.max_attributes == 50
assert tracer_provider._span_limits.max_events == 20
assert tracer_provider._span_limits.max_links == 10
def test_register_with_multiple_kwargs(self) -> None:
from opentelemetry.sdk.trace import SpanLimits
from opentelemetry.sdk.trace.id_generator import IdGenerator
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
custom_resource = Resource.create(
{"service.name": "test-service", "deployment.environment": "testing"}
)
custom_sampler = TraceIdRatioBased(0.5)
custom_limits = SpanLimits(max_attributes=100)
mock_id_generator = Mock(spec=IdGenerator)
tracer_provider = register(
project_name="multi-test-project",
resource=custom_resource,
sampler=custom_sampler,
span_limits=custom_limits,
id_generator=mock_id_generator,
verbose=False,
set_global_tracer_provider=False,
)
assert tracer_provider.sampler == custom_sampler
assert tracer_provider._span_limits == custom_limits
assert tracer_provider.id_generator == mock_id_generator
assert tracer_provider.resource.attributes.get("service.name") == "test-service"
assert tracer_provider.resource.attributes.get("deployment.environment") == "testing"
assert tracer_provider.resource.attributes.get(PROJECT_NAME) == "multi-test-project"
def test_register_tracer_provider_verbose_is_always_false(self) -> None:
with patch("phoenix.otel.otel.TracerProvider") as mock_tracer_provider:
mock_instance = Mock()
mock_tracer_provider.return_value = mock_instance
mock_instance._default_processor = True
mock_instance._tracing_details.return_value = "test details"
register(verbose=True, set_global_tracer_provider=False)
call_args = mock_tracer_provider.call_args
assert not call_args.kwargs["verbose"]
def test_register_with_global_tracer_provider_enabled(self) -> None:
"""Test that register can still set global tracer provider when requested."""
trace_api.get_tracer_provider()
try:
tracer_provider = register(verbose=False, set_global_tracer_provider=True)
assert isinstance(tracer_provider, TracerProvider)
try:
assert trace_api.get_tracer_provider() == tracer_provider
except Exception:
pytest.skip("OpenTelemetry prevented global tracer provider override")
finally:
try:
if hasattr(tracer_provider, "shutdown"):
tracer_provider.shutdown()
except Exception:
pass
class TestTracerProvider:
def test_tracer_provider_creation(self) -> None:
tracer_provider = TracerProvider(verbose=False)
assert isinstance(tracer_provider, TracerProvider)
assert tracer_provider._default_processor
processors = tracer_provider._active_span_processor._span_processors
assert len(processors) == 1
def test_tracer_provider_with_resource(self) -> None:
resource = Resource.create({"custom.attribute": "value"})
tracer_provider = TracerProvider(resource=resource, verbose=False)
assert tracer_provider.resource == resource
def test_tracer_provider_with_http_endpoint(self) -> None:
endpoint = "http://localhost:4318/v1/traces"
tracer_provider = TracerProvider(endpoint=endpoint, verbose=False)
processors = tracer_provider._active_span_processor._span_processors
assert len(processors) == 1
exporter = _get_exporter_from_processor(processors[0])
assert isinstance(exporter, HTTPSpanExporter)
assert exporter._endpoint == endpoint
def test_tracer_provider_with_grpc_endpoint(self) -> None:
endpoint = "localhost:4317"
tracer_provider = TracerProvider(endpoint=endpoint, verbose=False)
processors = tracer_provider._active_span_processor._span_processors
assert len(processors) == 1
exporter = _get_exporter_from_processor(processors[0])
assert isinstance(exporter, GRPCSpanExporter)
def test_add_span_processor_replaces_default(self) -> None:
tracer_provider = TracerProvider(verbose=False)
assert tracer_provider._default_processor
custom_processor = Mock(spec=_SimpleSpanProcessor)
tracer_provider.add_span_processor(custom_processor)
assert not tracer_provider._default_processor
# The default processor should have been removed
assert custom_processor in tracer_provider._active_span_processor._span_processors
def test_add_span_processor_without_replace(self) -> None:
tracer_provider = TracerProvider(verbose=False)
assert tracer_provider._default_processor
initial_count = len(tracer_provider._active_span_processor._span_processors)
custom_processor = Mock(spec=_SimpleSpanProcessor)
tracer_provider.add_span_processor(custom_processor, replace_default_processor=False)
assert tracer_provider._default_processor
# Both processors should be present
assert len(tracer_provider._active_span_processor._span_processors) == initial_count + 1
@patch("builtins.print")
def test_tracer_provider_verbose(self, mock_print: Any) -> None:
TracerProvider(verbose=True)
mock_print.assert_called()
output = str(mock_print.call_args)
assert "OpenTelemetry Tracing Details" in output
class TestSpanProcessors:
def test_simple_span_processor_http(self) -> None:
endpoint = "http://localhost:4318/v1/traces"
processor = SimpleSpanProcessor(endpoint=endpoint)
assert isinstance(processor, _SimpleSpanProcessor)
assert isinstance(processor.span_exporter, HTTPSpanExporter)
assert processor.span_exporter._endpoint == endpoint
def test_simple_span_processor_grpc(self) -> None:
endpoint = "localhost:4317"
processor = SimpleSpanProcessor(endpoint=endpoint, protocol="grpc")
assert isinstance(processor, _SimpleSpanProcessor)
assert isinstance(processor.span_exporter, GRPCSpanExporter)
assert processor.span_exporter._endpoint == endpoint
def test_simple_span_processor_with_exporter(self) -> None:
mock_exporter = MagicMock()
processor = SimpleSpanProcessor(span_exporter=mock_exporter)
assert processor.span_exporter == mock_exporter
def test_batch_span_processor_http(self) -> None:
endpoint = "http://localhost:4318/v1/traces"
processor = BatchSpanProcessor(endpoint=endpoint)
assert isinstance(processor, _BatchSpanProcessor)
exporter = _get_exporter_from_processor(processor)
assert isinstance(exporter, HTTPSpanExporter)
assert exporter._endpoint == endpoint
def test_batch_span_processor_grpc(self) -> None:
endpoint = "localhost:4317"
processor = BatchSpanProcessor(endpoint=endpoint, protocol="grpc")
assert isinstance(processor, _BatchSpanProcessor)
exporter = _get_exporter_from_processor(processor)
assert isinstance(exporter, GRPCSpanExporter)
assert exporter._endpoint == endpoint
class TestSpanExporters:
@patch("phoenix.otel.otel.get_env_client_headers")
@patch("phoenix.otel.otel.get_env_phoenix_auth_header")
def test_http_span_exporter_env_headers(
self, mock_auth_header: Any, mock_client_headers: Any
) -> None:
mock_client_headers.return_value = {"X-Custom": "value"}
mock_auth_header.return_value = {"Authorization": "Bearer token"}
exporter = HTTPSpanExporter()
mock_client_headers.assert_called_once()
mock_auth_header.assert_called_once()
# Check headers were set
headers_dict = {h.lower(): v for h, v in exporter._headers.items()}
assert headers_dict.get("x-custom") == "value"
assert headers_dict.get("authorization") == "Bearer token"
@patch("phoenix.otel.otel.get_env_client_headers")
@patch("phoenix.otel.otel.get_env_phoenix_auth_header")
def test_grpc_span_exporter_env_headers(
self, mock_auth_header: Any, mock_client_headers: Any
) -> None:
mock_client_headers.return_value = {"X-Custom": "value"}
mock_auth_header.return_value = {"Authorization": "Bearer token"}
exporter = GRPCSpanExporter()
mock_client_headers.assert_called_once()
mock_auth_header.assert_called_once()
# Check headers were set (gRPC uses list of tuples)
if exporter._headers:
headers_dict = {h[0].lower(): h[1] for h in exporter._headers}
assert headers_dict.get("x-custom") == "value"
assert headers_dict.get("authorization") == "Bearer token"
def test_http_span_exporter_with_explicit_headers(self) -> None:
headers = {"Custom-Header": "custom-value"}
exporter = HTTPSpanExporter(headers=headers)
headers_dict = {h.lower(): v for h, v in exporter._headers.items()}
assert headers_dict.get("custom-header") == "custom-value"
def test_grpc_span_exporter_with_explicit_headers(self) -> None:
headers = {"Custom-Header": "custom-value"}
exporter = GRPCSpanExporter(headers=headers)
if exporter._headers:
headers_dict = {h[0].lower(): h[1] for h in exporter._headers}
assert headers_dict.get("custom-header") == "custom-value"
class TestEndpointNormalization:
def test_normalized_endpoint_http_explicit(self) -> None:
from phoenix.otel.otel import _normalized_endpoint
parsed, endpoint = _normalized_endpoint("http://localhost:6006/v1/traces", use_http=True)
assert parsed.scheme == "http"
assert parsed.netloc == "localhost:6006"
assert parsed.path == "/v1/traces"
assert endpoint == "http://localhost:6006/v1/traces"
def test_normalized_endpoint_grpc_explicit(self) -> None:
from phoenix.otel.otel import _normalized_endpoint
parsed, endpoint = _normalized_endpoint("localhost:4317", use_http=False)
assert endpoint == "localhost:4317"
def test_normalized_endpoint_known_provider(self) -> None:
from phoenix.otel.otel import _normalized_endpoint
with patch(
"phoenix.otel.otel.get_env_collector_endpoint",
return_value="https://app.phoenix.arize.com",
):
parsed, endpoint = _normalized_endpoint(None)
assert parsed.scheme == "https"
assert parsed.netloc == "app.phoenix.arize.com"
assert parsed.path == "/v1/traces"
def test_normalized_endpoint_none_defaults(self) -> None:
from phoenix.otel.otel import _normalized_endpoint
with patch("phoenix.otel.otel.get_env_collector_endpoint", return_value=None):
with patch("phoenix.otel.otel.get_env_grpc_port", return_value=4317):
parsed, endpoint = _normalized_endpoint(None, use_http=True)
assert parsed.scheme == "http"
assert parsed.netloc == "localhost:6006"
assert parsed.path == "/v1/traces"
parsed, endpoint = _normalized_endpoint(None, use_http=False)
assert parsed.scheme == "http"
assert parsed.netloc == "localhost:4317"
class TestPhoenixCloudEndpoint:
def test_space_path_basic(self) -> None:
parsed = urlparse("https://app.phoenix.arize.com/s/testspace")
result = _construct_phoenix_cloud_endpoint(parsed)
assert result.path == "/s/testspace/v1/traces"
def test_space_path_with_trailing_slash(self) -> None:
parsed = urlparse("https://app.phoenix.arize.com/s/my-space_01/")
result = _construct_phoenix_cloud_endpoint(parsed)
assert result.path == "/s/my-space_01/v1/traces"
def test_space_path_with_additional_components(self) -> None:
parsed = urlparse("https://app.phoenix.arize.com/s/space123/extra/path")
result = _construct_phoenix_cloud_endpoint(parsed)
assert result.path == "/s/space123/v1/traces"
def test_non_space_path_defaults(self) -> None:
parsed = urlparse("https://app.phoenix.arize.com/some/other/path")
result = _construct_phoenix_cloud_endpoint(parsed)
assert result.path == "/v1/traces"
def test_empty_space_id_defaults(self) -> None:
parsed = urlparse("https://app.phoenix.arize.com/s/")
result = _construct_phoenix_cloud_endpoint(parsed)
assert result.path == "/v1/traces"
class TestOTLPTransportProtocol:
def test_valid_protocols(self) -> None:
assert OTLPTransportProtocol("http/protobuf") == OTLPTransportProtocol.HTTP_PROTOBUF
assert OTLPTransportProtocol("grpc") == OTLPTransportProtocol.GRPC
assert OTLPTransportProtocol("infer") == OTLPTransportProtocol.INFER
assert OTLPTransportProtocol(None) == OTLPTransportProtocol.INFER
def test_invalid_protocols(self) -> None:
with pytest.raises(ValueError) as exc_info:
OTLPTransportProtocol("http")
assert "Did you mean 'http/protobuf'?" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
OTLPTransportProtocol("invalid")
assert "Must one of" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
OTLPTransportProtocol(123)
assert "Must be a string" in str(exc_info.value)