Skip to main content
Glama
rustomax

Observe Community MCP Server

by rustomax
config.py7.29 kB
""" OpenTelemetry configuration and initialization for Observe MCP Server Handles the setup of tracing, metrics, and instrumentation with automatic detection of the OpenTelemetry collector endpoint and service metadata. """ import os import sys from typing import Optional from src.logging import get_logger logger = get_logger('TELEMETRY') # Global telemetry state _telemetry_initialized = False _tracer = None _meter = None def is_telemetry_enabled() -> bool: """Check if telemetry is enabled via environment variables.""" return os.getenv('OTEL_TELEMETRY_ENABLED', 'true').lower() in ('true', '1', 'yes', 'on') def get_service_name() -> str: """Get the service name for telemetry.""" return os.getenv('OTEL_SERVICE_NAME', 'observe-community-mcp') def get_otel_endpoint() -> str: """Get the OTLP endpoint for telemetry export.""" # Default to the collector we set up in docker-compose return os.getenv('OTEL_EXPORTER_OTLP_ENDPOINT', 'http://localhost:4317') def get_deployment_environment() -> str: """Get the deployment environment.""" return os.getenv('DEPLOYMENT_ENVIRONMENT', 'development') def initialize_telemetry() -> bool: """ Initialize OpenTelemetry tracing and metrics. Returns: True if initialization was successful, False otherwise """ global _telemetry_initialized, _tracer, _meter if _telemetry_initialized: logger.debug("telemetry already initialized") return True if not is_telemetry_enabled(): logger.info("telemetry disabled via configuration") return False try: from opentelemetry import trace, metrics from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader # Auto-instrumentation imports from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor # Create resource with service information resource = Resource.create({ "service.name": get_service_name(), "service.version": "1.0.0", "deployment.environment": get_deployment_environment(), "service.namespace": "observe-mcp", "telemetry.sdk.name": "opentelemetry", "telemetry.sdk.language": "python" }) # Configure tracing otlp_endpoint = get_otel_endpoint() logger.info(f"initializing telemetry | endpoint:{otlp_endpoint} | service:{get_service_name()}") # Set up trace provider trace_provider = TracerProvider(resource=resource) trace.set_tracer_provider(trace_provider) # Configure OTLP span exporter span_exporter = OTLPSpanExporter( endpoint=otlp_endpoint, insecure=True # For local development ) span_processor = BatchSpanProcessor(span_exporter) trace_provider.add_span_processor(span_processor) # Set up metrics provider metric_reader = PeriodicExportingMetricReader( OTLPMetricExporter( endpoint=otlp_endpoint, insecure=True ), export_interval_millis=10000 # Export every 10 seconds ) metrics_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) metrics.set_meter_provider(metrics_provider) # Initialize auto-instrumentation logger.debug("enabling automatic instrumentation") # Instrument HTTP client (httpx) HTTPXClientInstrumentor().instrument() # Instrument PostgreSQL (asyncpg) AsyncPGInstrumentor().instrument() # Note: FastAPI instrumentation will be done in observe_server.py # after the FastMCP app is created # Create tracer and meter instances _tracer = trace.get_tracer(__name__) _meter = metrics.get_meter(__name__) _telemetry_initialized = True logger.info("telemetry initialization complete") return True except ImportError as e: logger.warning(f"telemetry disabled | missing dependencies: {e}") return False except Exception as e: logger.error(f"telemetry initialization failed | error: {e}") import traceback traceback.print_exc(file=sys.stderr) return False def get_tracer(): """Get the OpenTelemetry tracer instance.""" global _tracer if not _telemetry_initialized: logger.warning("telemetry not initialized | tracer unavailable") return None return _tracer def get_meter(): """Get the OpenTelemetry meter instance.""" global _meter if not _telemetry_initialized: logger.warning("telemetry not initialized | meter unavailable") return None return _meter def shutdown_telemetry(): """Shutdown telemetry providers and flush any pending data.""" global _telemetry_initialized if not _telemetry_initialized: return try: from opentelemetry import trace, metrics # Get providers and shut them down trace_provider = trace.get_tracer_provider() if hasattr(trace_provider, 'shutdown'): trace_provider.shutdown() meter_provider = metrics.get_meter_provider() if hasattr(meter_provider, 'shutdown'): meter_provider.shutdown() logger.info("telemetry shutdown complete") except Exception as e: logger.error(f"telemetry shutdown error | error: {e}") finally: _telemetry_initialized = False def instrument_fastapi_app(app): """ Instrument a FastAPI application with OpenTelemetry. Args: app: FastAPI application instance """ if not _telemetry_initialized: logger.debug("telemetry not initialized | skipping FastAPI instrumentation") return try: from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor # Instrument the FastAPI app FastAPIInstrumentor.instrument_app(app) logger.debug("FastAPI instrumentation enabled") except ImportError: logger.warning("FastAPI instrumentation not available") except Exception as e: logger.error(f"FastAPI instrumentation failed | error: {e}") def get_telemetry_status() -> dict: """ Get the current telemetry configuration status. Returns: Dictionary with telemetry status information """ return { "enabled": is_telemetry_enabled(), "initialized": _telemetry_initialized, "service_name": get_service_name(), "endpoint": get_otel_endpoint(), "environment": get_deployment_environment(), "tracer_available": _tracer is not None, "meter_available": _meter is not None }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-community-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server