# -*- coding: utf-8 -*-
"""Location: ./tests/unit/mcpgateway/test_observability.py
Copyright 2025
SPDX-License-Identifier: Apache-2.0
Authors: Mihai Criveti
Tests for observability module.
"""
# Standard
import os
from unittest.mock import MagicMock, patch
# Third-Party
import pytest
# First-Party
from mcpgateway.observability import create_span, init_telemetry, trace_operation
class TestObservability:
"""Test cases for observability module."""
def setup_method(self):
"""Reset environment before each test."""
# Clear relevant environment variables
env_vars = [
"OTEL_ENABLE_OBSERVABILITY",
"OTEL_TRACES_EXPORTER",
"OTEL_EXPORTER_OTLP_ENDPOINT",
"OTEL_SERVICE_NAME",
"OTEL_RESOURCE_ATTRIBUTES",
"OTEL_COPY_RESOURCE_ATTRS_TO_SPANS",
]
for var in env_vars:
os.environ.pop(var, None)
@staticmethod
def _enable_observability() -> None:
"""Enable OpenTelemetry for tests that exercise initialization paths."""
os.environ["OTEL_ENABLE_OBSERVABILITY"] = "true"
def teardown_method(self):
"""Clean up after each test."""
# Reset global tracer
# First-Party
import mcpgateway.observability
# pylint: disable=protected-access
mcpgateway.observability._TRACER = None
def test_init_telemetry_disabled_via_env(self):
"""Test that telemetry can be disabled via environment variable."""
os.environ["OTEL_ENABLE_OBSERVABILITY"] = "false"
result = init_telemetry()
assert result is None
@patch("mcpgateway.observability.OTEL_AVAILABLE", False)
def test_init_telemetry_otel_not_installed(self):
"""Test graceful degradation when OpenTelemetry is not installed."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "otlp"
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317"
with patch("mcpgateway.observability.logger") as mock_logger:
result = init_telemetry()
# Should log warning and return None
mock_logger.warning.assert_called()
mock_logger.info.assert_called()
assert result is None
def test_init_telemetry_none_exporter(self):
"""Test that 'none' exporter disables telemetry."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "none"
result = init_telemetry()
assert result is None
def test_init_telemetry_no_endpoint(self):
"""Test that missing OTLP endpoint skips initialization."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "otlp"
# Don't set OTEL_EXPORTER_OTLP_ENDPOINT
result = init_telemetry()
assert result is None
@patch("mcpgateway.observability.OTEL_AVAILABLE", True)
@patch("mcpgateway.observability.OTLP_SPAN_EXPORTER")
@patch("mcpgateway.observability.TracerProvider")
@patch("mcpgateway.observability.BatchSpanProcessor")
def test_init_telemetry_otlp_success(self, mock_processor, mock_provider, mock_exporter):
"""Test successful OTLP initialization."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "otlp"
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317"
os.environ["OTEL_SERVICE_NAME"] = "test-service"
# Mock the provider instance
provider_instance = MagicMock()
mock_provider.return_value = provider_instance
result = init_telemetry()
# Verify provider was created and configured
mock_provider.assert_called_once()
# Only 1 span processor (BatchSpanProcessor) since OTEL_COPY_RESOURCE_ATTRS_TO_SPANS is not set
provider_instance.add_span_processor.assert_called_once()
assert result is not None
@patch("mcpgateway.observability.OTEL_AVAILABLE", True)
@patch("mcpgateway.observability.ConsoleSpanExporter")
@patch("mcpgateway.observability.TracerProvider")
@patch("mcpgateway.observability.SimpleSpanProcessor")
def test_init_telemetry_console_exporter(self, mock_processor, mock_provider, mock_exporter):
"""Test console exporter initialization."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "console"
# Mock the provider instance
provider_instance = MagicMock()
mock_provider.return_value = provider_instance
result = init_telemetry()
# Verify console exporter was created
mock_exporter.assert_called_once()
# Only 1 span processor (SimpleSpanProcessor) since OTEL_COPY_RESOURCE_ATTRS_TO_SPANS is not set
provider_instance.add_span_processor.assert_called_once()
assert result is not None
@patch("mcpgateway.observability.OTEL_AVAILABLE", True)
@patch("mcpgateway.observability.ConsoleSpanExporter")
@patch("mcpgateway.observability.TracerProvider")
@patch("mcpgateway.observability.SimpleSpanProcessor")
def test_init_telemetry_with_resource_attr_copy_enabled(self, mock_processor, mock_provider, mock_exporter):
"""Test that ResourceAttributeSpanProcessor is added when OTEL_COPY_RESOURCE_ATTRS_TO_SPANS=true."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "console"
os.environ["OTEL_COPY_RESOURCE_ATTRS_TO_SPANS"] = "true"
# Mock the provider instance
provider_instance = MagicMock()
mock_provider.return_value = provider_instance
result = init_telemetry()
# Verify 2 span processors: ResourceAttributeSpanProcessor + SimpleSpanProcessor
assert provider_instance.add_span_processor.call_count == 2
assert result is not None
@patch("mcpgateway.observability.OTEL_AVAILABLE", True)
def test_init_telemetry_custom_resource_attributes(self):
"""Test parsing of custom resource attributes."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "console"
os.environ["OTEL_RESOURCE_ATTRIBUTES"] = "env=prod,team=platform,version=1.0"
with patch("mcpgateway.observability.Resource.create") as mock_resource:
with patch("mcpgateway.observability.TracerProvider"):
with patch("mcpgateway.observability.ConsoleSpanExporter"):
init_telemetry()
# Verify resource attributes were parsed correctly
call_args = mock_resource.call_args[0][0]
assert call_args["env"] == "prod"
assert call_args["team"] == "platform"
assert call_args["version"] == "1.0"
@patch("mcpgateway.observability.OTEL_AVAILABLE", True)
def test_init_telemetry_otlp_headers_parsing(self):
"""Test parsing of OTLP headers."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "otlp"
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = "api-key=secret,x-auth=token123"
with patch("mcpgateway.observability.OTLP_SPAN_EXPORTER") as mock_exporter:
with patch("mcpgateway.observability.TracerProvider"):
with patch("mcpgateway.observability.BatchSpanProcessor"):
init_telemetry()
# Verify headers were parsed correctly
call_kwargs = mock_exporter.call_args[1]
assert call_kwargs["headers"]["api-key"] == "secret"
assert call_kwargs["headers"]["x-auth"] == "token123"
def test_create_span_no_tracer(self):
"""Test create_span when tracer is not initialized."""
# First-Party
import mcpgateway.observability
# pylint: disable=protected-access
mcpgateway.observability._TRACER = None
# Should return a no-op context manager
with create_span("test.operation") as span:
assert span is None
@patch("mcpgateway.observability._TRACER")
def test_create_span_with_attributes(self, mock_tracer):
"""Test create_span with attributes."""
# Setup mock
mock_span = MagicMock()
mock_context = MagicMock()
mock_context.__enter__ = MagicMock(return_value=mock_span)
mock_context.__exit__ = MagicMock(return_value=None)
mock_tracer.start_as_current_span.return_value = mock_context
# Test with attributes
attrs = {"key1": "value1", "key2": 42}
with create_span("test.operation", attrs) as span:
assert span is not None
# Verify attributes were set
span.set_attribute.assert_any_call("key1", "value1")
span.set_attribute.assert_any_call("key2", 42)
@pytest.mark.skip(reason="Mock doesn't properly simulate SpanWithAttributes wrapper behavior")
def test_create_span_with_exception(self):
"""Test create_span exception handling."""
# Note: This test is skipped because mocking the complex interaction
# between the SpanWithAttributes wrapper and the underlying span
# doesn't accurately represent the real behavior.
# Manual testing confirms the exception handling works correctly.
pass
@pytest.mark.asyncio
async def test_trace_operation_decorator_no_tracer(self):
"""Test trace_operation decorator when tracer is not initialized."""
# First-Party
import mcpgateway.observability
# pylint: disable=protected-access
mcpgateway.observability._TRACER = None
@trace_operation("test.operation")
async def test_func():
return "result"
result = await test_func()
assert result == "result"
@pytest.mark.asyncio
@patch("mcpgateway.observability._TRACER")
async def test_trace_operation_decorator_with_tracer(self, mock_tracer):
"""Test trace_operation decorator with tracer."""
# Setup mock
mock_span = MagicMock()
mock_context = MagicMock()
mock_context.__enter__ = MagicMock(return_value=mock_span)
mock_context.__exit__ = MagicMock(return_value=None)
mock_tracer.start_as_current_span.return_value = mock_context
@trace_operation("test.operation", {"attr1": "value1"})
async def test_func():
return "result"
result = await test_func()
assert result == "result"
mock_tracer.start_as_current_span.assert_called_once_with("test.operation")
mock_span.set_attribute.assert_any_call("attr1", "value1")
mock_span.set_attribute.assert_any_call("status", "success")
@pytest.mark.asyncio
@patch("mcpgateway.observability._TRACER")
async def test_trace_operation_decorator_with_exception(self, mock_tracer):
"""Test trace_operation decorator exception handling."""
# Setup mock
mock_span = MagicMock()
mock_context = MagicMock()
mock_context.__enter__ = MagicMock(return_value=mock_span)
mock_context.__exit__ = MagicMock(return_value=None)
mock_tracer.start_as_current_span.return_value = mock_context
@trace_operation("test.operation")
async def test_func():
raise ValueError("Test error")
with pytest.raises(ValueError):
await test_func()
mock_span.set_attribute.assert_any_call("status", "error")
mock_span.set_attribute.assert_any_call("error.message", "Test error")
mock_span.record_exception.assert_called_once()
@patch("mcpgateway.observability.OTEL_AVAILABLE", True)
@patch("mcpgateway.observability.JAEGER_EXPORTER", None)
def test_init_telemetry_jaeger_import_error(self):
"""Test Jaeger exporter when not installed."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "jaeger"
# Mock ImportError for Jaeger
with patch("mcpgateway.observability.logger") as mock_logger:
result = init_telemetry()
# Should log error and return None
mock_logger.error.assert_called()
assert result is None
@patch("mcpgateway.observability.OTEL_AVAILABLE", True)
@patch("mcpgateway.observability.ZIPKIN_EXPORTER", None)
def test_init_telemetry_zipkin_import_error(self):
"""Test Zipkin exporter when not installed."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "zipkin"
# Mock ImportError for Zipkin
with patch("mcpgateway.observability.logger") as mock_logger:
result = init_telemetry()
# Should log error and return None
mock_logger.error.assert_called()
assert result is None
@patch("mcpgateway.observability.OTEL_AVAILABLE", True)
def test_init_telemetry_unknown_exporter(self):
"""Test unknown exporter type falls back to console."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "unknown_exporter"
with patch("mcpgateway.observability.ConsoleSpanExporter") as mock_console:
with patch("mcpgateway.observability.TracerProvider"):
with patch("mcpgateway.observability.logger") as mock_logger:
init_telemetry()
# Should warn and use console exporter
mock_logger.warning.assert_called()
mock_console.assert_called()
@patch("mcpgateway.observability.OTEL_AVAILABLE", True)
def test_init_telemetry_exception_handling(self):
"""Test exception handling during initialization."""
self._enable_observability()
os.environ["OTEL_TRACES_EXPORTER"] = "otlp"
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317"
with patch("mcpgateway.observability.TracerProvider", side_effect=Exception("Test error")):
with patch("mcpgateway.observability.logger") as mock_logger:
result = init_telemetry()
# Should log error and return None
mock_logger.error.assert_called()
assert result is None
def test_create_span_none_attributes_filtered(self):
"""Test that None values in attributes are filtered out."""
# First-Party
import mcpgateway.observability
# Setup mock tracer
mock_span = MagicMock()
mock_context = MagicMock()
mock_context.__enter__ = MagicMock(return_value=mock_span)
mock_context.__exit__ = MagicMock(return_value=None)
mock_tracer = MagicMock()
mock_tracer.start_as_current_span.return_value = mock_context
# pylint: disable=protected-access
mcpgateway.observability._TRACER = mock_tracer
# Test with None values
attrs = {"key1": "value1", "key2": None, "key3": 42}
with create_span("test.operation", attrs) as span:
# Verify only non-None attributes were set
span.set_attribute.assert_any_call("key1", "value1")
span.set_attribute.assert_any_call("key3", 42)
# key2 should not be set
for call in span.set_attribute.call_args_list:
assert call[0][0] != "key2" or call[0][0] == "error"