"""OpenTelemetry observability setup for Standards MCP Server."""
import logging
import os
import structlog
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from .config import ServerConfig
def configure_opentelemetry(config: ServerConfig) -> None:
"""Configure OpenTelemetry SDK with exporters.
Sets up tracing and metrics export to OTLP endpoint (e.g., OpenTelemetry Collector).
The collector then routes to Langfuse, Datadog, etc.
"""
if not config.otel_enabled:
return
# Create resource identifying this service
resource = Resource.create({
SERVICE_NAME: config.otel_service_name,
SERVICE_VERSION: config.server_version,
"deployment.environment": os.environ.get("ENVIRONMENT", "development"),
})
# Configure tracing
tracer_provider = TracerProvider(resource=resource)
if config.otel_endpoint:
span_exporter = OTLPSpanExporter(endpoint=config.otel_endpoint)
tracer_provider.add_span_processor(BatchSpanProcessor(span_exporter))
trace.set_tracer_provider(tracer_provider)
# Configure metrics
if config.otel_endpoint:
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=config.otel_endpoint),
export_interval_millis=60000
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
def configure_logging(config: ServerConfig) -> None:
"""Configure structured logging with trace correlation."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
# Add trace context to logs
add_trace_context,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, config.log_level.upper(), logging.INFO)
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True
)
def add_trace_context(
_logger: structlog.types.WrappedLogger,
_method_name: str,
event_dict: structlog.types.EventDict
) -> structlog.types.EventDict:
"""Add OpenTelemetry trace context to log entries."""
span = trace.get_current_span()
if span.is_recording():
ctx = span.get_span_context()
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict
def get_tracer(name: str) -> trace.Tracer:
"""Get a tracer for the given module."""
return trace.get_tracer(name)
def get_meter(name: str) -> metrics.Meter:
"""Get a meter for the given module."""
return metrics.get_meter(name)