Skip to main content
Glama
lunar_hub.go12.6 kB
package communication import ( "context" "encoding/json" "lunar/engine/metrics" "lunar/engine/utils/environment" sharedActions "lunar/shared-model/actions" sharedDiscovery "lunar/shared-model/discovery" "lunar/toolkit-core/clock" "lunar/toolkit-core/network" "net/http" "net/url" "os" "sync" "time" "github.com/rs/zerolog/log" ) const ( connectionTimeout = 5 * time.Second defaultReportIntervalSec int = 300 defaultMetricsReportIntervalSec int = 3600 authHeader = "authorization" proxyVersionHeader = "x-lunar-proxy-version" proxyIDHeader = "x-lunar-proxy-id" // Connection attempt defaults defaultInitialWaitTimeBetweenConnectionAttempts = 5 * time.Second defaultMaxWaitTimeBetweenConnectionAttempts = 5 * time.Minute defaultConnectionAttemptsPerWaitTime = 5 defaultConnectionAttemptsWaitTimeExponentialGrowth = 2 ) var epochTime = time.Unix(0, 0) type hubConnConfig struct { initialWaitTimeBetweenConnectionAttempts time.Duration maxWaitTimeBetweenConnectionAttempts time.Duration connectionAttemptsPerWaitTime int connectionAttemptsWaitTimeExponentialGrowth int } type hubConnStatus struct { isConnected bool isConnectedMutex sync.RWMutex lastSuccessfulCommunication *time.Time lastSuccessfulCommunicationMutex sync.RWMutex connectionEstablishedChannels []chan struct{} connectionEstablishedChannelsMutex sync.RWMutex } type HubCommunication struct { client *network.WSClient workersStop []context.CancelFunc periodicInterval time.Duration periodicMetricsInterval time.Duration clock clock.Clock nextReportTime time.Time connConfig hubConnConfig connStatus hubConnStatus } func NewHubCommunication(apiKey string, proxyID string, clock clock.Clock) *HubCommunication { reportInterval, err := environment.GetHubReportInterval() if err != nil { log.Debug().Msgf( "Could not find Report Interval Value from ENV, will use default of: %v", defaultReportIntervalSec) reportInterval = defaultReportIntervalSec } metricsInterval, err := environment.GetHubMetricsReportInterval() if err != nil { log.Debug().Msgf( "Could not find Metrics Report Interval Value from ENV, will use default of: %v", defaultReportIntervalSec) metricsInterval = defaultReportIntervalSec } hubURL := url.URL{ //nolint: exhaustruct Scheme: environment.GetHubScheme(), Host: environment.GetHubURL(), Path: "/ui/v1/control", } handshakeHeaders := http.Header{ authHeader: []string{"Bearer " + apiKey}, proxyIDHeader: []string{proxyID}, proxyVersionHeader: []string{environment.GetProxyVersion()}, } hub := HubCommunication{ //nolint: exhaustruct client: network.NewWSClient(hubURL, handshakeHeaders), workersStop: []context.CancelFunc{}, periodicInterval: time.Duration(reportInterval) * time.Second, periodicMetricsInterval: time.Duration(metricsInterval) * time.Second, clock: clock, nextReportTime: time.Time{}, connStatus: newHubConnStatus(), connConfig: newHubConnConfig(), } hub.client.OnMessage(hub.onMessage) if err := hub.client.ConnectAndStart(); err != nil { log.Error(). Err(err). Msg("Failed to make initial connection with Lunar Hub, will retry in the background") go hub.attemptToConnectInLoop() } else { hub.updateCommunicationStatus() hub.setIsConnected(true) log.Debug().Msg("Connected to Lunar Hub") } return &hub } func newHubConnStatus() hubConnStatus { return hubConnStatus{ isConnected: false, connectionEstablishedChannels: []chan struct{}{}, lastSuccessfulCommunication: nil, } } func newHubConnConfig() hubConnConfig { initialWaitTimeBetweenConnectionAttempts := environment.GetHubInitialWaitTimeBetweenConnectionAttempts( //nolint: lll defaultInitialWaitTimeBetweenConnectionAttempts, ) maxWaitTimeBetweenConnectionAttempts := environment.GetHubMaxWaitTimeBetweenConnectionAttempts( defaultMaxWaitTimeBetweenConnectionAttempts, ) connectionAttemptsPerWaitTime := environment.GetHubConnectionAttemptsPerWaitTime( defaultConnectionAttemptsPerWaitTime, ) connectionAttemptsWaitTimeExponentialGrowth := environment.GetHubConnectionAttemptsWaitTimeExponentialGrowth( //nolint: lll defaultConnectionAttemptsWaitTimeExponentialGrowth, ) return hubConnConfig{ initialWaitTimeBetweenConnectionAttempts: initialWaitTimeBetweenConnectionAttempts, maxWaitTimeBetweenConnectionAttempts: maxWaitTimeBetweenConnectionAttempts, connectionAttemptsPerWaitTime: connectionAttemptsPerWaitTime, connectionAttemptsWaitTimeExponentialGrowth: connectionAttemptsWaitTimeExponentialGrowth, } } func (hub *HubCommunication) StartWorkers() { hub.startDiscoveryWorker() hub.startMetricsWorker() } func (hub *HubCommunication) SendDataToHub(message network.MessageI) bool { if !hub.IsConnected() { log.Debug().Msg("HubCommunication::SendDataToHub Not connected to Lunar Hub") return false } log.Trace().Msgf( "HubCommunication::SendDataToHub Sending data to Lunar Hub, event: %+v", message.GetEvent()) if err := hub.client.Send(message); err != nil { log.Debug(). Err(err). Bool("wsClientIsConnectionReady", hub.client.IsConnectionReadyAndAuthorized()). Msg("HubCommunication::SendDataToHub Error sending data to Lunar Hub") return false } hub.updateCommunicationStatus() return true } func (hub *HubCommunication) IsConnected() bool { hub.connStatus.isConnectedMutex.RLock() defer hub.connStatus.isConnectedMutex.RUnlock() return hub.connStatus.isConnected } func (hub *HubCommunication) ConnectionEstablishedChannel() <-chan struct{} { hub.connStatus.connectionEstablishedChannelsMutex.Lock() defer hub.connStatus.connectionEstablishedChannelsMutex.Unlock() ch := make(chan struct{}) hub.connStatus.connectionEstablishedChannels = append( hub.connStatus.connectionEstablishedChannels, ch, ) return ch } func (hub *HubCommunication) LastSuccessfulCommunication() *time.Time { hub.connStatus.lastSuccessfulCommunicationMutex.RLock() defer hub.connStatus.lastSuccessfulCommunicationMutex.RUnlock() return hub.connStatus.lastSuccessfulCommunication } func (hub *HubCommunication) Stop() { log.Trace().Msg("Stopping HubCommunication Worker...") hub.setIsConnected(false) hub.removeConnectionEstablishedListeners() for _, cancel := range hub.workersStop { cancel() } hub.client.Close() } func (hub *HubCommunication) startMetricsWorker() { ctx, cancel := context.WithCancel(context.Background()) hub.workersStop = append(hub.workersStop, cancel) go func() { for { timeToWaitForNextReport := hub.calculateTimeToWaitForNextReport(hub.periodicMetricsInterval) log.Trace().Msgf("HubCommunication::MetricsWorker Next report in %v", timeToWaitForNextReport) select { case <-ctx.Done(): log.Trace().Msg("HubCommunication::MetricsWorker task canceled") return case <-time.After(timeToWaitForNextReport): metricsReport, err := metrics.GatherLunarMetrics() if err != nil { log.Debug().Err(err).Msg("HubCommunication::MetricsWorker Error getting metrics," + " will be executed again in the next interval") continue } log.Trace().Msg("HubCommunication::MetricsWorker Sending lunar metrics to Lunar Hub") message := network.MetricsMessage{ Event: network.WebSocketEventMetrics, Data: metricsReport, } hub.SendDataToHub(&message) } } }() } func (hub *HubCommunication) startDiscoveryWorker() { ctx, cancel := context.WithCancel(context.Background()) hub.workersStop = append(hub.workersStop, cancel) discoveryFileLocation := environment.GetDiscoveryStateLocation() if discoveryFileLocation == "" { log.Warn().Msg( `Could not get the location of the discovery state file, Please validate that the ENV 'DISCOVERY_STATE_LOCATION' is set.`) return } go func() { for { timeToWaitForNextReport := hub.calculateTimeToWaitForNextReport(hub.periodicInterval) select { case <-ctx.Done(): log.Trace().Msg("HubCommunication::DiscoveryWorker task canceled") return case <-time.After(timeToWaitForNextReport): data, err := os.ReadFile(discoveryFileLocation) if err != nil { log.Error().Err(err).Msg( "HubCommunication::DiscoveryWorker Error reading file") continue } // Unmarshal the object data to Aggregation object and send it to the hub output := sharedDiscovery.Output{} err = json.Unmarshal(data, &output) if err != nil { // TODO: Once we understand and fix the error, we can log it as an error log.Debug().Err(err).Msg( "HubCommunication::DiscoveryWorker Error unmarshalling data") continue } output.CreatedAt = sharedActions.TimestampToStringFromTime(hub.nextReportTime) message := network.DiscoveryMessage{ Event: network.WebSocketEventDiscovery, Data: output, } log.Trace(). Msgf("HubCommunication::DiscoveryWorker Sending data to Lunar Hub: %v, %+v", hub.nextReportTime, message) hub.SendDataToHub(&message) } } }() } func (hub *HubCommunication) setIsConnected(value bool) { hub.connStatus.isConnectedMutex.Lock() defer hub.connStatus.isConnectedMutex.Unlock() hub.connStatus.isConnected = value } func (hub *HubCommunication) removeConnectionEstablishedListeners() { hub.connStatus.connectionEstablishedChannelsMutex.Lock() defer hub.connStatus.connectionEstablishedChannelsMutex.Unlock() hub.connStatus.connectionEstablishedChannels = []chan struct{}{} } func (hub *HubCommunication) fanOutConnectionEstablished() { hub.connStatus.connectionEstablishedChannelsMutex.Lock() defer hub.connStatus.connectionEstablishedChannelsMutex.Unlock() log.Debug().Msg("Fanning out connection established signal asynchronously") for _, ch := range hub.connStatus.connectionEstablishedChannels { go func(ch chan struct{}) { ch <- struct{}{} }(ch) } } func (hub *HubCommunication) setLastSuccessfulCommunication(value *time.Time) { hub.connStatus.lastSuccessfulCommunicationMutex.Lock() defer hub.connStatus.lastSuccessfulCommunicationMutex.Unlock() hub.connStatus.lastSuccessfulCommunication = value } // This function will try to connect to the Hub in the background. It will keep trying to connect // until it is successful, with an exponential backoff. // This function is blocking and is meant to be run in a goroutine. func (hub *HubCommunication) attemptToConnectInLoop() { retries := 0 waitTime := hub.connConfig.initialWaitTimeBetweenConnectionAttempts for { if err := hub.client.ConnectAndStart(); err != nil { log.Debug().Err(err).Int("retry", retries).Msgf( "Failed to make connection with Lunar Hub, will retry in %v", waitTime) <-hub.clock.After(waitTime) retries++ if retries%hub.connConfig.connectionAttemptsPerWaitTime == 0 { waitTime = waitTime * time.Duration( hub.connConfig.connectionAttemptsWaitTimeExponentialGrowth, ) if waitTime > hub.connConfig.maxWaitTimeBetweenConnectionAttempts { waitTime = hub.connConfig.maxWaitTimeBetweenConnectionAttempts } } } else { hub.updateCommunicationStatus() hub.setIsConnected(true) hub.fanOutConnectionEstablished() log.Debug().Int("retries", retries).Msg("Connected to Lunar Hub") break } } } func (hub *HubCommunication) updateCommunicationStatus() { t := hub.clock.Now() hub.setLastSuccessfulCommunication(&t) } func (hub *HubCommunication) calculateTimeToWaitForNextReport( periodicInterval time.Duration, ) time.Duration { currentTime := hub.clock.Now() elapsedTime := currentTime.Sub(epochTime) previousReportTime := epochTime.Add( (elapsedTime / periodicInterval) * periodicInterval, ) hub.nextReportTime = previousReportTime.Add(periodicInterval) return hub.nextReportTime.Sub(currentTime) } func (hub *HubCommunication) onMessage(message []byte) { log.Trace().Msg("HubCommunication::OnMessage") var wsMessage WebSocketMessage if err := json.Unmarshal(message, &wsMessage); err != nil { log.Error().Err(err).Msg("HubCommunication::OnMessage Error unmarshalling message") return } switch wsMessage.Event { // Here we can add more cases for different events default: log.Debug().Msgf("HubCommunication::OnMessage Unknown event: %v", wsMessage.Event) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server