Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_otel.py23.7 kB
from typing import Any, Generator, Optional from unittest.mock import MagicMock, Mock, patch from urllib.parse import urlparse import pytest from opentelemetry import trace as trace_api from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace.export import BatchSpanProcessor as _BatchSpanProcessor from opentelemetry.sdk.trace.export import SimpleSpanProcessor as _SimpleSpanProcessor from opentelemetry.sdk.trace.export import SpanExporter from phoenix.otel.otel import ( PROJECT_NAME, BatchSpanProcessor, GRPCSpanExporter, HTTPSpanExporter, OTLPTransportProtocol, SimpleSpanProcessor, TracerProvider, _construct_phoenix_cloud_endpoint, register, ) def _get_exporter_from_processor(span_processor: Any) -> Optional[SpanExporter]: """ Helper function to get the exporter from a span processor. Handles both old and new OpenTelemetry versions. OpenTelemetry v1.34.0+ moved exporter from span_exporter to _batch_processor._exporter """ return getattr(getattr(span_processor, "_batch_processor", None), "_exporter", None) or getattr( span_processor, "span_exporter", None ) @pytest.fixture(autouse=True) def reset_tracer_provider() -> Generator[None, None, None]: """Reset OpenTelemetry tracer provider for test isolation.""" with patch.dict("os.environ", {}, clear=True): with patch("phoenix.otel.otel.get_env_grpc_port", return_value=4317): yield class TestRegister: def test_register_basic(self) -> None: tracer_provider = register(verbose=False, set_global_tracer_provider=False) assert isinstance(tracer_provider, TracerProvider) assert tracer_provider._default_processor processors = tracer_provider._active_span_processor._span_processors assert len(processors) == 1 assert isinstance(processors[0], _SimpleSpanProcessor) exporter = _get_exporter_from_processor(processors[0]) assert isinstance(exporter, GRPCSpanExporter) def test_register_with_project_name(self) -> None: project_name = "test-project" tracer_provider = register( project_name=project_name, verbose=False, set_global_tracer_provider=False ) assert tracer_provider.resource.attributes.get(PROJECT_NAME) == project_name def test_register_with_batch_processor(self) -> None: tracer_provider = register(batch=True, verbose=False, set_global_tracer_provider=False) processors = tracer_provider._active_span_processor._span_processors assert len(processors) == 1 assert isinstance(processors[0], _BatchSpanProcessor) exporter = _get_exporter_from_processor(processors[0]) assert isinstance(exporter, GRPCSpanExporter) def test_register_with_simple_processor(self) -> None: tracer_provider = register(batch=False, verbose=False, set_global_tracer_provider=False) processors = tracer_provider._active_span_processor._span_processors assert len(processors) == 1 assert isinstance(processors[0], _SimpleSpanProcessor) def test_register_without_global_tracer(self) -> None: tracer_provider = register(set_global_tracer_provider=False, verbose=False) assert trace_api.get_tracer_provider() != tracer_provider assert isinstance(tracer_provider, TracerProvider) def test_register_with_http_endpoint(self) -> None: endpoint = "http://custom-endpoint:4318/v1/traces" tracer_provider = register( endpoint=endpoint, verbose=False, set_global_tracer_provider=False ) processors = tracer_provider._active_span_processor._span_processors assert len(processors) == 1 exporter = _get_exporter_from_processor(processors[0]) assert isinstance(exporter, HTTPSpanExporter) assert exporter._endpoint == endpoint def test_register_with_grpc_endpoint(self) -> None: endpoint = "grpc://custom-endpoint:4317" tracer_provider = register( endpoint=endpoint, verbose=False, set_global_tracer_provider=False ) processors = tracer_provider._active_span_processor._span_processors assert len(processors) == 1 exporter = _get_exporter_from_processor(processors[0]) assert isinstance(exporter, GRPCSpanExporter) @patch("phoenix.otel.otel.get_env_client_headers") def test_register_with_headers(self, mock_env_headers: Any) -> None: mock_env_headers.return_value = None headers = {"Authorization": "Bearer token123"} tracer_provider = register(headers=headers, verbose=False, set_global_tracer_provider=False) processors = tracer_provider._active_span_processor._span_processors exporter = _get_exporter_from_processor(processors[0]) assert "authorization" in [h[0].lower() for h in exporter._headers] def test_register_with_http_protocol(self) -> None: tracer_provider = register( protocol="http/protobuf", verbose=False, set_global_tracer_provider=False ) processors = tracer_provider._active_span_processor._span_processors exporter = _get_exporter_from_processor(processors[0]) assert isinstance(exporter, HTTPSpanExporter) def test_register_with_grpc_protocol(self) -> None: tracer_provider = register(protocol="grpc", verbose=False, set_global_tracer_provider=False) processors = tracer_provider._active_span_processor._span_processors exporter = _get_exporter_from_processor(processors[0]) assert isinstance(exporter, GRPCSpanExporter) @patch("phoenix.otel.otel._auto_instrument_installed_openinference_libraries") def test_register_with_auto_instrument(self, mock_auto_instrument: Any) -> None: tracer_provider = register( auto_instrument=True, verbose=False, set_global_tracer_provider=False ) mock_auto_instrument.assert_called_once_with(tracer_provider) @patch("builtins.print") def test_register_verbose_output(self, mock_print: Any) -> None: register(verbose=True, set_global_tracer_provider=False) mock_print.assert_called() output = str(mock_print.call_args) assert "OpenTelemetry Tracing Details" in output def test_register_with_custom_resource_no_project_name(self) -> None: custom_resource = Resource.create( {"service.name": "my-service", "service.version": "1.0.0"} ) with patch("phoenix.otel.otel.get_env_project_name", return_value="env-project"): tracer_provider = register( resource=custom_resource, verbose=False, set_global_tracer_provider=False ) assert tracer_provider.resource.attributes.get("service.name") == "my-service" assert tracer_provider.resource.attributes.get("service.version") == "1.0.0" assert tracer_provider.resource.attributes.get(PROJECT_NAME) == "env-project" def test_register_with_custom_resource_and_project_name(self) -> None: custom_resource = Resource.create( {"service.name": "my-service", "service.version": "1.0.0"} ) tracer_provider = register( project_name="explicit-project", resource=custom_resource, verbose=False, set_global_tracer_provider=False, ) assert tracer_provider.resource.attributes.get("service.name") == "my-service" assert tracer_provider.resource.attributes.get("service.version") == "1.0.0" assert tracer_provider.resource.attributes.get(PROJECT_NAME) == "explicit-project" def test_register_with_custom_resource_overrides_project_name(self) -> None: custom_resource = Resource.create( {"service.name": "my-service", PROJECT_NAME: "resource-project"} ) tracer_provider = register( project_name="explicit-project", resource=custom_resource, verbose=False, set_global_tracer_provider=False, ) assert tracer_provider.resource.attributes.get("service.name") == "my-service" assert tracer_provider.resource.attributes.get(PROJECT_NAME) == "explicit-project" def test_register_passes_through_kwargs_to_tracer_provider(self) -> None: from opentelemetry.sdk.trace.id_generator import IdGenerator from opentelemetry.sdk.trace.sampling import ALWAYS_OFF mock_id_generator = Mock(spec=IdGenerator) tracer_provider = register( project_name="test-project", sampler=ALWAYS_OFF, id_generator=mock_id_generator, verbose=False, set_global_tracer_provider=False, ) assert tracer_provider.sampler == ALWAYS_OFF assert tracer_provider.id_generator == mock_id_generator def test_register_with_custom_id_generator(self) -> None: from opentelemetry.sdk.trace.id_generator import IdGenerator class CustomIdGenerator(IdGenerator): def generate_span_id(self) -> int: return 0x1234567890ABCDEF def generate_trace_id(self) -> int: return 0x12345678901234567890123456789012 custom_id_gen = CustomIdGenerator() tracer_provider = register( project_name="test-project", id_generator=custom_id_gen, verbose=False, set_global_tracer_provider=False, ) assert tracer_provider.id_generator is custom_id_gen def test_register_with_span_limits(self) -> None: from opentelemetry.sdk.trace import SpanLimits custom_limits = SpanLimits(max_attributes=50, max_events=20, max_links=10) tracer_provider = register( project_name="test-project", span_limits=custom_limits, verbose=False, set_global_tracer_provider=False, ) assert tracer_provider._span_limits == custom_limits assert tracer_provider._span_limits.max_attributes == 50 assert tracer_provider._span_limits.max_events == 20 assert tracer_provider._span_limits.max_links == 10 def test_register_with_multiple_kwargs(self) -> None: from opentelemetry.sdk.trace import SpanLimits from opentelemetry.sdk.trace.id_generator import IdGenerator from opentelemetry.sdk.trace.sampling import TraceIdRatioBased custom_resource = Resource.create( {"service.name": "test-service", "deployment.environment": "testing"} ) custom_sampler = TraceIdRatioBased(0.5) custom_limits = SpanLimits(max_attributes=100) mock_id_generator = Mock(spec=IdGenerator) tracer_provider = register( project_name="multi-test-project", resource=custom_resource, sampler=custom_sampler, span_limits=custom_limits, id_generator=mock_id_generator, verbose=False, set_global_tracer_provider=False, ) assert tracer_provider.sampler == custom_sampler assert tracer_provider._span_limits == custom_limits assert tracer_provider.id_generator == mock_id_generator assert tracer_provider.resource.attributes.get("service.name") == "test-service" assert tracer_provider.resource.attributes.get("deployment.environment") == "testing" assert tracer_provider.resource.attributes.get(PROJECT_NAME) == "multi-test-project" def test_register_tracer_provider_verbose_is_always_false(self) -> None: with patch("phoenix.otel.otel.TracerProvider") as mock_tracer_provider: mock_instance = Mock() mock_tracer_provider.return_value = mock_instance mock_instance._default_processor = True mock_instance._tracing_details.return_value = "test details" register(verbose=True, set_global_tracer_provider=False) call_args = mock_tracer_provider.call_args assert not call_args.kwargs["verbose"] def test_register_with_global_tracer_provider_enabled(self) -> None: """Test that register can still set global tracer provider when requested.""" trace_api.get_tracer_provider() try: tracer_provider = register(verbose=False, set_global_tracer_provider=True) assert isinstance(tracer_provider, TracerProvider) try: assert trace_api.get_tracer_provider() == tracer_provider except Exception: pytest.skip("OpenTelemetry prevented global tracer provider override") finally: try: if hasattr(tracer_provider, "shutdown"): tracer_provider.shutdown() except Exception: pass class TestTracerProvider: def test_tracer_provider_creation(self) -> None: tracer_provider = TracerProvider(verbose=False) assert isinstance(tracer_provider, TracerProvider) assert tracer_provider._default_processor processors = tracer_provider._active_span_processor._span_processors assert len(processors) == 1 def test_tracer_provider_with_resource(self) -> None: resource = Resource.create({"custom.attribute": "value"}) tracer_provider = TracerProvider(resource=resource, verbose=False) assert tracer_provider.resource == resource def test_tracer_provider_with_http_endpoint(self) -> None: endpoint = "http://localhost:4318/v1/traces" tracer_provider = TracerProvider(endpoint=endpoint, verbose=False) processors = tracer_provider._active_span_processor._span_processors assert len(processors) == 1 exporter = _get_exporter_from_processor(processors[0]) assert isinstance(exporter, HTTPSpanExporter) assert exporter._endpoint == endpoint def test_tracer_provider_with_grpc_endpoint(self) -> None: endpoint = "localhost:4317" tracer_provider = TracerProvider(endpoint=endpoint, verbose=False) processors = tracer_provider._active_span_processor._span_processors assert len(processors) == 1 exporter = _get_exporter_from_processor(processors[0]) assert isinstance(exporter, GRPCSpanExporter) def test_add_span_processor_replaces_default(self) -> None: tracer_provider = TracerProvider(verbose=False) assert tracer_provider._default_processor custom_processor = Mock(spec=_SimpleSpanProcessor) tracer_provider.add_span_processor(custom_processor) assert not tracer_provider._default_processor # The default processor should have been removed assert custom_processor in tracer_provider._active_span_processor._span_processors def test_add_span_processor_without_replace(self) -> None: tracer_provider = TracerProvider(verbose=False) assert tracer_provider._default_processor initial_count = len(tracer_provider._active_span_processor._span_processors) custom_processor = Mock(spec=_SimpleSpanProcessor) tracer_provider.add_span_processor(custom_processor, replace_default_processor=False) assert tracer_provider._default_processor # Both processors should be present assert len(tracer_provider._active_span_processor._span_processors) == initial_count + 1 @patch("builtins.print") def test_tracer_provider_verbose(self, mock_print: Any) -> None: TracerProvider(verbose=True) mock_print.assert_called() output = str(mock_print.call_args) assert "OpenTelemetry Tracing Details" in output class TestSpanProcessors: def test_simple_span_processor_http(self) -> None: endpoint = "http://localhost:4318/v1/traces" processor = SimpleSpanProcessor(endpoint=endpoint) assert isinstance(processor, _SimpleSpanProcessor) assert isinstance(processor.span_exporter, HTTPSpanExporter) assert processor.span_exporter._endpoint == endpoint def test_simple_span_processor_grpc(self) -> None: endpoint = "localhost:4317" processor = SimpleSpanProcessor(endpoint=endpoint, protocol="grpc") assert isinstance(processor, _SimpleSpanProcessor) assert isinstance(processor.span_exporter, GRPCSpanExporter) assert processor.span_exporter._endpoint == endpoint def test_simple_span_processor_with_exporter(self) -> None: mock_exporter = MagicMock() processor = SimpleSpanProcessor(span_exporter=mock_exporter) assert processor.span_exporter == mock_exporter def test_batch_span_processor_http(self) -> None: endpoint = "http://localhost:4318/v1/traces" processor = BatchSpanProcessor(endpoint=endpoint) assert isinstance(processor, _BatchSpanProcessor) exporter = _get_exporter_from_processor(processor) assert isinstance(exporter, HTTPSpanExporter) assert exporter._endpoint == endpoint def test_batch_span_processor_grpc(self) -> None: endpoint = "localhost:4317" processor = BatchSpanProcessor(endpoint=endpoint, protocol="grpc") assert isinstance(processor, _BatchSpanProcessor) exporter = _get_exporter_from_processor(processor) assert isinstance(exporter, GRPCSpanExporter) assert exporter._endpoint == endpoint class TestSpanExporters: @patch("phoenix.otel.otel.get_env_client_headers") @patch("phoenix.otel.otel.get_env_phoenix_auth_header") def test_http_span_exporter_env_headers( self, mock_auth_header: Any, mock_client_headers: Any ) -> None: mock_client_headers.return_value = {"X-Custom": "value"} mock_auth_header.return_value = {"Authorization": "Bearer token"} exporter = HTTPSpanExporter() mock_client_headers.assert_called_once() mock_auth_header.assert_called_once() # Check headers were set headers_dict = {h.lower(): v for h, v in exporter._headers.items()} assert headers_dict.get("x-custom") == "value" assert headers_dict.get("authorization") == "Bearer token" @patch("phoenix.otel.otel.get_env_client_headers") @patch("phoenix.otel.otel.get_env_phoenix_auth_header") def test_grpc_span_exporter_env_headers( self, mock_auth_header: Any, mock_client_headers: Any ) -> None: mock_client_headers.return_value = {"X-Custom": "value"} mock_auth_header.return_value = {"Authorization": "Bearer token"} exporter = GRPCSpanExporter() mock_client_headers.assert_called_once() mock_auth_header.assert_called_once() # Check headers were set (gRPC uses list of tuples) if exporter._headers: headers_dict = {h[0].lower(): h[1] for h in exporter._headers} assert headers_dict.get("x-custom") == "value" assert headers_dict.get("authorization") == "Bearer token" def test_http_span_exporter_with_explicit_headers(self) -> None: headers = {"Custom-Header": "custom-value"} exporter = HTTPSpanExporter(headers=headers) headers_dict = {h.lower(): v for h, v in exporter._headers.items()} assert headers_dict.get("custom-header") == "custom-value" def test_grpc_span_exporter_with_explicit_headers(self) -> None: headers = {"Custom-Header": "custom-value"} exporter = GRPCSpanExporter(headers=headers) if exporter._headers: headers_dict = {h[0].lower(): h[1] for h in exporter._headers} assert headers_dict.get("custom-header") == "custom-value" class TestEndpointNormalization: def test_normalized_endpoint_http_explicit(self) -> None: from phoenix.otel.otel import _normalized_endpoint parsed, endpoint = _normalized_endpoint("http://localhost:6006/v1/traces", use_http=True) assert parsed.scheme == "http" assert parsed.netloc == "localhost:6006" assert parsed.path == "/v1/traces" assert endpoint == "http://localhost:6006/v1/traces" def test_normalized_endpoint_grpc_explicit(self) -> None: from phoenix.otel.otel import _normalized_endpoint parsed, endpoint = _normalized_endpoint("localhost:4317", use_http=False) assert endpoint == "localhost:4317" def test_normalized_endpoint_known_provider(self) -> None: from phoenix.otel.otel import _normalized_endpoint with patch( "phoenix.otel.otel.get_env_collector_endpoint", return_value="https://app.phoenix.arize.com", ): parsed, endpoint = _normalized_endpoint(None) assert parsed.scheme == "https" assert parsed.netloc == "app.phoenix.arize.com" assert parsed.path == "/v1/traces" def test_normalized_endpoint_none_defaults(self) -> None: from phoenix.otel.otel import _normalized_endpoint with patch("phoenix.otel.otel.get_env_collector_endpoint", return_value=None): with patch("phoenix.otel.otel.get_env_grpc_port", return_value=4317): parsed, endpoint = _normalized_endpoint(None, use_http=True) assert parsed.scheme == "http" assert parsed.netloc == "localhost:6006" assert parsed.path == "/v1/traces" parsed, endpoint = _normalized_endpoint(None, use_http=False) assert parsed.scheme == "http" assert parsed.netloc == "localhost:4317" class TestPhoenixCloudEndpoint: def test_space_path_basic(self) -> None: parsed = urlparse("https://app.phoenix.arize.com/s/testspace") result = _construct_phoenix_cloud_endpoint(parsed) assert result.path == "/s/testspace/v1/traces" def test_space_path_with_trailing_slash(self) -> None: parsed = urlparse("https://app.phoenix.arize.com/s/my-space_01/") result = _construct_phoenix_cloud_endpoint(parsed) assert result.path == "/s/my-space_01/v1/traces" def test_space_path_with_additional_components(self) -> None: parsed = urlparse("https://app.phoenix.arize.com/s/space123/extra/path") result = _construct_phoenix_cloud_endpoint(parsed) assert result.path == "/s/space123/v1/traces" def test_non_space_path_defaults(self) -> None: parsed = urlparse("https://app.phoenix.arize.com/some/other/path") result = _construct_phoenix_cloud_endpoint(parsed) assert result.path == "/v1/traces" def test_empty_space_id_defaults(self) -> None: parsed = urlparse("https://app.phoenix.arize.com/s/") result = _construct_phoenix_cloud_endpoint(parsed) assert result.path == "/v1/traces" class TestOTLPTransportProtocol: def test_valid_protocols(self) -> None: assert OTLPTransportProtocol("http/protobuf") == OTLPTransportProtocol.HTTP_PROTOBUF assert OTLPTransportProtocol("grpc") == OTLPTransportProtocol.GRPC assert OTLPTransportProtocol("infer") == OTLPTransportProtocol.INFER assert OTLPTransportProtocol(None) == OTLPTransportProtocol.INFER def test_invalid_protocols(self) -> None: with pytest.raises(ValueError) as exc_info: OTLPTransportProtocol("http") assert "Did you mean 'http/protobuf'?" in str(exc_info.value) with pytest.raises(ValueError) as exc_info: OTLPTransportProtocol("invalid") assert "Must one of" in str(exc_info.value) with pytest.raises(ValueError) as exc_info: OTLPTransportProtocol(123) assert "Must be a string" in str(exc_info.value)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server