Skip to main content
Glama
main.go4.54 kB
//go:build cgo package main import ( "C" "lunar/aggregation-plugin/common" "lunar/aggregation-plugin/discovery" "lunar/aggregation-plugin/remedy" "lunar/toolkit-core/logging" "unsafe" "github.com/fluent/fluent-bit-go/output" "github.com/rs/zerolog/log" ) import ( "lunar/toolkit-core/clock" "os" ) const ( PluginName = "aggregation" PluginDesc = "Aggregation" appName = "aggregation-output-plugin" urlTreeMaxSplitThreshold = 50 ) var ( discoveryStateLocation = os.Getenv("DISCOVERY_STATE_LOCATION") remedyStatsStateLocation = os.Getenv("REMEDY_STATE_LOCATION") ) type PluginContext struct { endpointTree *common.SimpleURLTree discoveryState *discovery.State remedyStatsState *remedy.State clock clock.Clock } //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { logging.ConfigureLogger(appName, false, clock.NewRealClock()) log.Info().Msg("Registering plugin") return output.FLBPluginRegister(def, PluginName, PluginDesc) } //export FLBPluginInit func FLBPluginInit(plugin unsafe.Pointer) int { log.Info().Msgf("Initializing %s plugin", appName) discoveryState := discovery.State{ DiscoverFilepath: discoveryStateLocation, } err := discoveryState.InitializeState() if err != nil { log.Error().Stack(). Err(err). Msg("🛑 Failed to initialize: could not create " + "discovery aggregation state file") return output.FLB_ERROR } remedyStatsState := remedy.State{Filepath: remedyStatsStateLocation} if !common.IsFlowsEnabled() { err = remedyStatsState.Initialize() if err != nil { log.Error().Stack(). Err(err). Msg("🛑 Failed to initialize: could not initialize " + "remedy stats aggregation state file") return output.FLB_ERROR } } lastModified, err := common.GetPoliciesLastModifiedTime() if err != nil { log.Error().Stack(). Err(err). Msg("🛑 Failed to initialize: could not get last modified of known endpoints") return output.FLB_ERROR } currentKnownEndpoints, err := common.ReadKnownEndpoints() if err != nil { log.Error().Stack(). Err(err). Msg("🛑 Failed to initialize: could not load known endpoints") return output.FLB_ERROR } currentTree, err := common.BuildTree( *currentKnownEndpoints, urlTreeMaxSplitThreshold, ) if err != nil { log.Error().Stack(). Err(err). Msg("🛑 Failed to initialize: could not build tree from known endpoints") return output.FLB_ERROR } log.Debug(). Msgf("🌲 Tree initialized, read %v initial endpoints", len(currentKnownEndpoints.Endpoints)) pluginContext := PluginContext{ endpointTree: currentTree, remedyStatsState: &remedyStatsState, discoveryState: &discoveryState, clock: clock.NewRealClock(), } treeRefreshInterval := getTreeRefreshRate() output.FLBPluginSetContext(plugin, pluginContext) updateTreeF := func(tree *common.SimpleURLTree) { updatedContext := pluginContext updatedContext.endpointTree = tree output.FLBPluginSetContext(plugin, updatedContext) } go periodicallyUpdateTree( updateTreeF, treeRefreshInterval, currentKnownEndpoints, lastModified, urlTreeMaxSplitThreshold, ) return output.FLB_OK } //export FLBPluginFlushCtx func FLBPluginFlushCtx( ctx, data unsafe.Pointer, length C.int, _ *C.char, ) int { context, valid := output.FLBPluginGetContext(ctx).(PluginContext) var tree *common.SimpleURLTree if !valid { log.Error().Stack().Msg("Could not get endpoints tree from context" + "will not apply path normalization to access logs") tree = fallbackTree } else { tree = context.endpointTree log.Trace(). Msgf( "Got tree from context with %d top level constant children (%v)", len(tree.Root.ConstantChildren), tree.Root.ConstantChildren, ) } records := discovery.DecodeRecords(data, int(length)) err := discovery.Run(context.discoveryState, records, tree) if err != nil { log.Error().Stack().Err(err).Msg("Discovery processing failed") return output.FLB_ERROR } if !common.IsFlowsEnabled() { err = remedy.Run(context.remedyStatsState, records, tree, context.clock) if err != nil { log.Error().Stack().Err(err).Msg("Remedy Stats processing failed") return output.FLB_ERROR } } log.Trace().Msg("✍️ successfully updated aggregations") return output.FLB_OK } //export FLBPluginExit func FLBPluginExit() int { log.Info().Msg("Starting shutdown...") return output.FLB_OK } // If we drop this we get build warning. func main() {}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server