Skip to main content
Glama
metric_manager.go12.1 kB
package metrics import ( "context" "fmt" "lunar/toolkit-core/otel" "sync" "go.opentelemetry.io/otel/attribute" "golang.org/x/exp/slices" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/metric" ) type MetricManager struct { config *Config meter metric.Meter // observable metrics metricObjects sync.Map // key - Metric, value - metric.Observable // regular metrics flowsInvocationsCounter metric.Int64Counter requestsThroughFlowsCounter metric.Int64Counter generalMetricReg metric.Registration systemMetricReg metric.Registration labelManager *LabelManager apiCallMetricMng *apiCallCountMetricManager transactionMetricsManager *transactionMetricsManager remainingConnectionsMetricMng *remainingConnectionsMetricManager providerData *metricsProviderData metricManagerActive bool mu sync.Mutex } func NewMetricManager() (*MetricManager, error) { log.Info().Msg("Initializing metrics manager") meter := otel.GetMeter() config, err := loadMetricsConfig() if err != nil { log.Error().Err(err).Msg("failed to get metrics config") return &MetricManager{}, err } labeledEndpointMng := NewLabeledEndpointManager(config.LabeledEndpoints) mng := &MetricManager{ config: config, meter: meter, metricObjects: sync.Map{}, providerData: newMetricsProviderData(), labelManager: NewLabelManager(config.GeneralMetrics.LabelValue). WithLabeledEndpointManager(labeledEndpointMng), } // api call count metric - special treatment mng.apiCallMetricMng, err = newAPICallMetricManager(meter, mng.labelManager) if err != nil { return &MetricManager{}, fmt.Errorf("failed to initialize APICallCountMetric: %w", err) } mng.remainingConnectionsMetricMng, err = newRemainingConnectionsMetricManager(meter) if err != nil { return mng, fmt.Errorf("failed to initialize remaining connections metric: %w", err) } mng.transactionMetricsManager, err = newTransactionMetricsManager(meter, config, mng.labelManager) if err != nil { return mng, fmt.Errorf("failed to initialize transaction metrics: %w", err) } // general metrics if err := mng.initializeGeneralMetrics(); err != nil { return mng, fmt.Errorf("failed to initialize general metrics: %w", err) } // system metrics if err := mng.initializeSystemMetrics(); err != nil { return mng, fmt.Errorf("failed to initialize system metrics: %w", err) } mng.metricManagerActive = true log.Info().Msg("Metrics manager initialized") return mng, nil } func (m *MetricManager) ReloadMetricsConfig() error { log.Info().Msg("Reloading metrics config") m.mu.Lock() defer m.mu.Unlock() config, err := loadMetricsConfig() if err != nil { log.Error().Err(err).Msg("failed to get metrics config") return err } if m.config == nil { m.metricManagerActive = true m.config = config log.Info().Msg("Metrics manager reloaded") return nil } if !slices.Equal(config.GeneralMetrics.LabelValue, m.config.GeneralMetrics.LabelValue) { log.Info().Msg("Reloading labels") m.labelManager.SetLabels(config.GeneralMetrics.LabelValue) } if !config.EqualLabeledEndpoints(m.config) { log.Info().Msg("Reloading labeled endpoints") m.labelManager.labeledEndpointMng.SetLabeledEndpoints(config.LabeledEndpoints) } if !config.GeneralMetrics.Equal(m.config.GeneralMetrics) { log.Info().Msg("Reloading general metrics") if err = m.initializeGeneralMetrics(); err != nil { return fmt.Errorf("failed to initialize general metrics: %w", err) } } if !config.EqualSystemMetrics(*m.config) { log.Info().Msg("Reloading system metrics") if err = m.initializeSystemMetrics(); err != nil { return fmt.Errorf("failed to initialize system metrics: %w", err) } } m.metricManagerActive = true log.Info().Msg("Metrics manager reloaded") return nil } // UpdateMetricsProviderForAPICall updates the metrics provider for API call-related metrics func (m *MetricManager) UpdateMetricsProviderForAPICall(provider APICallMetricsProviderI) { if !m.metricManagerActive { return } if provider.GetType().IsResponseType() { m.providerData.UpdateAPICallData(provider) } } // UpdateMetricsProviderForFlow updates the metrics provider for flow-related metrics func (m *MetricManager) UpdateMetricsProviderForFlow(provider FlowMetricsProviderI) { if !m.metricManagerActive { return } if m.providerData == nil { m.providerData = newMetricsProviderData() } provider.RegisterFlowInvocationsObserver(m.observeFlowInvocations) provider.RegisterRequestsThroughFlowsObserver(m.observeRequestsThroughFlows) m.providerData.UpdateFlowDataProvider(provider) } // initializeObservableMetrics initializes the metrics by parsing func (m *MetricManager) initializeObservableMetrics(metrics []MetricValue) ( []metric.Observable, error, ) { log.Info().Msgf("Initializing metrics: %+v", metrics) var meterObjs []metric.Observable for _, metricValue := range metrics { if _, ok := accessLogBasedMetrics[metricValue.Name]; ok { continue } meterObj, err := registerObservableMetric(m.meter, metricValue) if err != nil { return nil, fmt.Errorf("failed to initialize metric %s: %w", metricValue.Name, err) } meterObjs = append(meterObjs, meterObj) m.metricObjects.Store(metricValue.Name, meterObj) } return meterObjs, nil } // initializeGeneralMetrics initializes the general metrics func (m *MetricManager) initializeGeneralMetrics() error { meterObjs, err := m.initializeObservableMetrics(m.config.GeneralMetrics.MetricValue) if err != nil { return err } if m.generalMetricReg != nil { _ = m.generalMetricReg.Unregister() } m.generalMetricReg, err = m.meter.RegisterCallback(m.observeGeneralMetrics, meterObjs...) if err != nil { return err } return nil } // initializeSystemRegularMetric initializes a non-observable system metric func (m *MetricManager) initializeSystemRegularMetric(metricValue MetricValue) error { var err error switch metricValue.Name { //nolint:exhaustive case FlowsInvocationsMetric: m.flowsInvocationsCounter, err = m.meter.Int64Counter( string(FlowsInvocationsMetric), metric.WithDescription(metricValue.Description), ) case RequestsThroughFlowsMetric: m.requestsThroughFlowsCounter, err = m.meter.Int64Counter( string(RequestsThroughFlowsMetric), metric.WithDescription(metricValue.Description), ) default: log.Warn().Msgf("System metric %s is not supported", metricValue.Name) } return err } // initializeSystemMetrics initializes the system metrics func (m *MetricManager) initializeSystemMetrics() error { var observableMetrics []MetricValue for _, metricValue := range m.config.SystemMetrics { if _, ok := metricsObservableRegistry[metricValue.Name]; ok { observableMetrics = append(observableMetrics, metricValue) } else if err := m.initializeSystemRegularMetric(metricValue); err != nil { return fmt.Errorf("failed to initialize regular system metric %s: %w", metricValue.Name, err) } } meterObjs, err := m.initializeObservableMetrics(observableMetrics) if err != nil { return fmt.Errorf("failed to initialize observable system metrics: %w", err) } if m.systemMetricReg != nil { _ = m.systemMetricReg.Unregister() } m.systemMetricReg, err = m.meter.RegisterCallback(m.observeSystemMetrics, meterObjs...) if err != nil { return err } return nil } // observeFlowInvocations observes the flow invocations metric func (m *MetricManager) observeFlowInvocations(metricData *MetricData) { flowInvData := metricData.FlowInvocations attributes := buildAttributesFromLabelSet(flowInvData.FlowID, flowInvData.Labels) attributes = appendGatewayIDAttribute(attributes...) m.flowsInvocationsCounter.Add(context.Background(), 1, metric.WithAttributes(attributes...)) } func (m *MetricManager) observeRequestsThroughFlows(metricData *MetricData) { attributes := buildAttributesFromLabelSet("", metricData.RequestsThroughFlows.Labels) attributes = appendGatewayIDAttribute(attributes...) m.requestsThroughFlowsCounter.Add(context.Background(), 1, metric.WithAttributes(attributes...)) } func (m *MetricManager) observeGeneralMetrics(_ context.Context, observer metric.Observer) error { err := observeMetric(m, APICallSizeMetric, m.providerData.GetAvgAPICallSize(), observer) if err != nil { return err } return nil } func (m *MetricManager) observeSystemMetrics(_ context.Context, observer metric.Observer) error { data := m.providerData.GetFlowData() activeFlowsData := data.GetActiveFlows() for _, flowName := range activeFlowsData.ActiveFlows { attr := attribute.String(FlowName, flowName) attributes := appendGatewayIDAttribute(attr) if err := observeMetric(m, ActiveFlowsMetric, 1, observer, attributes...); err != nil { log.Trace().Err(err).Msgf("Failed to observe %v, flow %s", ActiveFlowsMetric, flowName) } } avgFlowExecutionData := data.GetAvgFlowExecutionTime() for flowName, execTime := range avgFlowExecutionData.AvgFlowExecutionTime { attr := attribute.String(FlowName, flowName) attributes := appendGatewayIDAttribute(attr) if err := observeMetric(m, AvgFlowExecutionTimeMetric, execTime, observer, attributes...); err != nil { log.Trace().Err(err).Msgf("Failed to observe %v, flow %s", AvgFlowExecutionTimeMetric, flowName) } } procExecutionData := data.GetProcessorExecutionData() for processorKey, procData := range procExecutionData.ProcExecutionData { attr := attribute.String(ProcessorKey, processorKey) attributes := appendGatewayIDAttribute(attr) if err := observeMetric(m, AvgProcessorExecutionTimeMetric, procData.AvgExecutionTime, observer, attributes...); err != nil { log.Trace(). Err(err). Msgf("Failed to observe %v, processor %s", AvgProcessorExecutionTimeMetric, processorKey) } if err := observeMetric(m, ProcessorInvocation, 1, observer, attributes...); err != nil { log.Trace(). Err(err). Msgf("Failed to observe %v, processor %s", ProcessorInvocation, processorKey) } } return nil } func observeMetric[T int | int64 | float64]( mng *MetricManager, metricName Metric, value T, observer metric.Observer, attributes ...attribute.KeyValue, ) error { meterObjRaw, found := mng.metricObjects.Load(metricName) if !found { return fmt.Errorf("metric %s not found", metricName) } attributes = appendGatewayIDAttribute(attributes...) switch meterObj := meterObjRaw.(type) { case metric.Float64ObservableCounter, metric.Float64ObservableUpDownCounter, metric.Float64ObservableGauge: //nolint:lll if observable, ok := meterObj.(metric.Float64Observable); ok { observer.ObserveFloat64(observable, float64(value), metric.WithAttributes(attributes...)) } else { log.Error().Msgf("metric %s is not a Float64Observable", metricName) } case metric.Int64ObservableUpDownCounter, metric.Int64ObservableCounter, metric.Int64ObservableGauge: //nolint:lll if observable, ok := meterObj.(metric.Int64Observable); ok { observer.ObserveInt64(observable, int64(value), metric.WithAttributes(attributes...)) } else { log.Error().Msgf("metric %s is not an Int64Observable", metricName) } default: log.Error().Msgf("unsupported metric type for %s", metricName) } return nil } func buildAttributesFromLabelSet(flowName string, labelSet *LabelSet) []attribute.KeyValue { var attributes []attribute.KeyValue if flowName != "" { attributes = addAttribute(FlowName, flowName, attribute.String, attributes) } if labelSet != nil { attributes = addAttribute(Host, labelSet.Host, attribute.String, attributes) attributes = addAttribute(HTTPMethod, labelSet.Method, attribute.String, attributes) attributes = addAttribute(ConsumerTag, labelSet.Consumer, attribute.String, attributes) } return attributes } func addAttribute[T int | string]( key string, value T, addFunc func(key string, value T) attribute.KeyValue, attributes []attribute.KeyValue, ) []attribute.KeyValue { var isZero T if value != isZero { attributes = append(attributes, addFunc(key, value)) } return attributes }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server