Skip to main content
Glama
transaction_metrics.go3.89 kB
package metrics import ( "context" "fmt" "lunar/engine/utils/environment" "sync" "time" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) type transactionMetricsManager struct { transactionMetricObjects map[string]metric.Float64Histogram mu sync.Mutex metricsTimer *time.Ticker labelManager *LabelManager discoveryParser *discoveryStateParser } func newTransactionMetricsManager( meter metric.Meter, metricConfig *Config, labelManager *LabelManager, ) (*transactionMetricsManager, error) { mng := &transactionMetricsManager{ mu: sync.Mutex{}, labelManager: labelManager, transactionMetricObjects: make(map[string]metric.Float64Histogram), } err := mng.initMetrics(meter, metricConfig) if err != nil { return nil, fmt.Errorf("failed to initialize transaction metrics: %w", err) } if len(mng.transactionMetricObjects) == 0 { log.Info().Msg("No transaction metrics to initialize") return mng, nil } mng.discoveryParser, err = newDiscoveryStateParser() if err != nil { return nil, fmt.Errorf("failed to initialize discovery state parser: %w", err) } // Start the metrics timer - will parse the JSON file and collect metrics mng.metricsTimer = time.NewTicker(environment.GetAccessLogMetricsCollectTimeInterval()) go func() { for range mng.metricsTimer.C { mng.collectMetrics() } }() log.Info().Msg("TransactionMetricsManager initialized") return mng, nil } func (m *transactionMetricsManager) initMetrics(meter metric.Meter, metricConfig *Config) error { for _, metricDef := range metricConfig.GeneralMetrics.MetricValue { var buckets []float64 if metricDef.Name == TransactionDuration || metricDef.Name == ProviderTransactionDuration { buckets = metricDef.Buckets if len(buckets) == 0 { log.Debug().Msgf("No explicit bucket boundaries for %v", metricDef.Name) buckets = defaultBucketBoundaries } log.Debug().Msgf("Creating %v meter with bucket boundaries: %v", metricDef.Name, buckets) transactionBucketMetricObj, err := meter.Float64Histogram( MetricPrefix+string(metricDef.Name), metric.WithDescription(metricDef.Description), metric.WithExplicitBucketBoundaries(buckets...), ) if err != nil { log.Error().Err(err).Msgf("Failed to create %v histogram meter", metricDef.Name) return err } m.transactionMetricObjects[string(metricDef.Name)] = transactionBucketMetricObj } } return nil } func (m *transactionMetricsManager) publishMetricValue( metricName Metric, value float32, attributes []attribute.KeyValue, ) { if metricObj, exists := m.transactionMetricObjects[string(metricName)]; exists { ctx := context.Background() metricObj.Record(ctx, float64(value), metric.WithAttributes(attributes...)) } } func (m *transactionMetricsManager) collectMetrics() { m.mu.Lock() defer m.mu.Unlock() data, err := m.discoveryParser.ReadAndParseDiscovery() if err != nil { log.Error().Err(err).Msg("Failed to read and parse JSON file") return } for consumer, endpointMap := range data.NewConsumerData { for endpoint, endpointAgg := range endpointMap { for statusCode := range endpointAgg.StatusCodes { // Check if the data has not changed if origConsumer, ok := data.OriginalConsumerData[consumer]; ok { if origAgg, ok := origConsumer[endpoint]; ok { if origAgg.AverageDuration == endpointAgg.AverageDuration && origAgg.AverageTotalDuration == endpointAgg.AverageTotalDuration { continue } } } attributes := m.labelManager.GetAttributesFromDiscoveryEndpoint(endpoint, consumer, statusCode) m.publishMetricValue(ProviderTransactionDuration, endpointAgg.AverageDuration, attributes) m.publishMetricValue(TransactionDuration, endpointAgg.AverageTotalDuration, attributes) } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server