Skip to main content
Glama
update_endpoints.go10.9 kB
package config import ( "fmt" stream_types "lunar/engine/streams/types" "lunar/engine/utils/environment" shared_config "lunar/shared-model/config" "lunar/toolkit-core/client" context_manager "lunar/toolkit-core/context-manager" "net/http" "regexp" "strings" "github.com/rs/zerolog/log" ) const ( // Regex for replacing path parameters in URL, e.g.: // ^twitter\.com\/user/[^/]+$ // See unit tests for matching/non-matching URL examples: // https://regex101.com/r/DyYNZD/1 RegexToReplacePathParameters string = "/[^/]+" // Regex for replacing wildcard in URL, e.g.: // ^twitter\.com\/user(/.*)?$ // See unit tests for matching/non-matching URL examples: // https://regex101.com/r/XVN7Kw/1 RegexToReplaceWildcard string = "(/.*)?" // Example of regex for a URL with both path parameters and wildcard: // ^twitter\.com\/user/[^/]+/post/[^/]+/by(/.*)?$ // See unit tests for matching/non-matching URL examples: // https://regex101.com/r/3bKePx/1 ) const ( delimiter string = ":::" initialTimeToWaitInSec int = 0 timesToRetry int = 40 timeToWaitBetweenRetriesInMillis int = 250 ) var ( haproxyManagedEndpointURL string haproxyManageAllURL string haproxyUnManageAllURL string haproxyUnmanageGlobalURL string haproxyBodyNeededFrom string haproxyBodyFormAll string haproxyReqCaptureNeededFrom string haproxyReqCaptureFormAll string ) func init() { haproxyManagePort := environment.GetManageEndpointsPort() if haproxyManagePort == "" { log.Warn().Msg("LUNAR_MANAGE_ENDPOINTS_PORT not set, using default port 10252") haproxyManagePort = "10252" } haproxyManagedEndpointURL = "http://localhost:" + haproxyManagePort + "/managed_endpoint" haproxyManageAllURL = "http://localhost:" + haproxyManagePort + "/manage_all" haproxyUnManageAllURL = "http://localhost:" + haproxyManagePort + "/unmanage_all" haproxyUnmanageGlobalURL = "http://localhost:" + haproxyManagePort + "/unmanage_global" haproxyBodyNeededFrom = "http://localhost:" + haproxyManagePort + "/include_body_from" haproxyBodyFormAll = "http://localhost:" + haproxyManagePort + "/include_body_from_all" haproxyReqCaptureNeededFrom = "http://localhost:" + haproxyManagePort + "/capture_req_from" haproxyReqCaptureFormAll = "http://localhost:" + haproxyManagePort + "/capture_req_all" } var regexToFindPathParameters = regexp.MustCompile(`/\{[a-zA-Z0-9-_]+\}`) type HAProxyEndpointData struct { Endpoint string Requirements *stream_types.ProcessorRequirement } type HAProxyEndpointsRequest struct { ManageAll bool BodyNeededForAll bool ReqCaptureForAll bool ManagedEndpoints []*HAProxyEndpointData } func BuildHAProxyEndpointsRequest( policies *shared_config.PoliciesConfig, ) *HAProxyEndpointsRequest { defaultPoliciesRequirements := &stream_types.ProcessorRequirement{ IsBodyRequired: true, } manageAll := false for _, global := range policies.Global.Diagnosis { if global.Enabled { manageAll = true break } } if !manageAll { for _, global := range policies.Global.Remedies { if global.Enabled { manageAll = true break } } } managedEndpoints := []*HAProxyEndpointData{} for _, endpoint := range policies.Endpoints { for _, remedy := range endpoint.Remedies { if remedy.Enabled { managedEndpoints = append(managedEndpoints, HaproxyEndpointFormat(endpoint.Method, endpoint.URL, defaultPoliciesRequirements)) } } for _, diagnosis := range endpoint.Diagnosis { if diagnosis.Enabled { managedEndpoints = append(managedEndpoints, HaproxyEndpointFormat(endpoint.Method, endpoint.URL, defaultPoliciesRequirements)) } } } return &HAProxyEndpointsRequest{ ManageAll: manageAll, BodyNeededForAll: manageAll, ManagedEndpoints: managedEndpoints, } } func WaitForProxyHealthcheck() error { retryConfig := client.RetryConfig{ Attempts: timesToRetry, SleepMillis: timeToWaitBetweenRetriesInMillis, WithInitialSleep: false, InitialSleepMillis: initialTimeToWaitInSec, FailedAttemptLog: "Failed attempt to update HAProxy endpoints", FailureLog: "Failed to update HAProxy endpoints", } healthcheckPort := environment.GetHAProxyHealthcheckPort() healthcheckURL := "http://localhost:" + healthcheckPort + "/healthcheck?proxy_only=true" healthcheckConfig := client.HealthcheckConfig{ URL: healthcheckURL, BodyPredicate: func(_ []byte) bool { return true }, StatusPredicate: func(code int) bool { return code == 200 }, HTTPClient: http.DefaultClient, } clock := context_manager.Get().GetClock() return client.WaitForHealthcheck(clock, &retryConfig, &healthcheckConfig) } func HaproxyEndpointFormat( method, url string, requirements *stream_types.ProcessorRequirement, ) *HAProxyEndpointData { log.Trace().Msgf("Original URL: %v", url) url = strings.ReplaceAll(url, ".", `\.`) formattedURL := url wildcardLiteral := "/*" var hasWildcard bool if strings.HasSuffix(formattedURL, wildcardLiteral) { hasWildcard = true formattedURL = strings.TrimSuffix(formattedURL, wildcardLiteral) formattedURL += RegexToReplaceWildcard } formattedURL = regexToFindPathParameters.ReplaceAllString( formattedURL, RegexToReplacePathParameters, ) log.Trace().Msgf("Formatted URL: %v", formattedURL) result := strings.Join([]string{method, formattedURL}, delimiter) if !hasWildcard { result += "$" } return &HAProxyEndpointData{ Endpoint: result, Requirements: requirements, } } func ManageHAProxyEndpoints(haproxyEndpoints *HAProxyEndpointsRequest) error { err := updateHAProxyEndpoints(haproxyEndpoints) if err != nil { return err } log.Debug().Msg("✍️ Successfully updated endpoints") return nil } func unmanageHAProxyEndpoints(unmanagedEndpoints []string) error { for _, unmanagedEndpoint := range unmanagedEndpoints { err := operateEndpoint(unmanagedEndpoint, http.MethodDelete, haproxyManagedEndpointURL) if err != nil { return fmt.Errorf("failed to unmanage endpoint '%v', error: %v", unmanagedEndpoint, err) } err = operateEndpoint(unmanagedEndpoint, http.MethodDelete, haproxyBodyNeededFrom) if err != nil { return fmt.Errorf("failed to remove body from message for endpoint '%v', error: %v", unmanagedEndpoint, err) } err = operateEndpoint(unmanagedEndpoint, http.MethodDelete, haproxyReqCaptureNeededFrom) if err != nil { return fmt.Errorf("failed to stop capturing request for endpoint '%v', error: %v", unmanagedEndpoint, err) } } log.Debug().Msg("✍️ Successfully unmanaged endpoints") return nil } func updateHAProxyEndpoints(haproxyEndpoints *HAProxyEndpointsRequest) error { if haproxyEndpoints.BodyNeededForAll { if err := bodyFromAll(); err != nil { log.Warn().Err(err).Msg("Failed to include body in message for all endpoints") } } if haproxyEndpoints.ReqCaptureForAll { if err := reqCaptureFromAll(); err != nil { log.Warn().Err(err).Msg("Failed to include body in message for all endpoints") } } if haproxyEndpoints.ManageAll { if err := manageAll(); err != nil { log.Warn().Err(err).Msg("Failed to manage all endpoints") } } for _, managedEndpoint := range haproxyEndpoints.ManagedEndpoints { err := operateEndpoint(managedEndpoint.Endpoint, http.MethodPut, haproxyManagedEndpointURL) if err != nil { return fmt.Errorf("failed to manage endpoint '%v', error: %v", managedEndpoint, err) } if managedEndpoint.Requirements.IsBodyRequired { err = operateEndpoint(managedEndpoint.Endpoint, http.MethodPut, haproxyBodyNeededFrom) if err != nil { return fmt.Errorf("failed to include body in message for endpoint '%v', error: %v", managedEndpoint, err) } } if managedEndpoint.Requirements.IsReqCaptureRequired { err = operateEndpoint(managedEndpoint.Endpoint, http.MethodPut, haproxyReqCaptureNeededFrom) if err != nil { return fmt.Errorf("failed to include body in message for endpoint '%v', error: %v", managedEndpoint, err) } } } return nil } func operateEndpoint(endpoint, method, path string) error { retryConfig := client.RetryConfig{ Attempts: 5, SleepMillis: 250, WithInitialSleep: false, SleepIncreaseFactor: 2, InitialSleepMillis: initialTimeToWaitInSec, FailedAttemptLog: "Failed attempt to operate HAProxy endpoints", FailureLog: "Failed to operate HAProxy endpoints", } _, err := client.WithRetry( context_manager.Get().GetClock(), &retryConfig, func() (interface{}, error) { err := innerOperateEndpoint(endpoint, method, path) if err != nil { log.Trace().Err(err).Msgf("Failed to %s endpoint %v, retrying...", method, endpoint) return nil, err } return struct{}{}, nil }, ) return err } func innerOperateEndpoint(endpoint, method, path string) error { body := strings.NewReader(endpoint) request, err := http.NewRequest(method, path, body) if err != nil { return err // unrecoverable error in request creation } log.Trace().Msgf("Sending request to %s endpoint %s at URL %v", method, endpoint, request.URL.String()) response, err := http.DefaultClient.Do(request) if err != nil { log.Warn().Err(err).Msgf("Failed to %s endpoint %v", method, endpoint) } else { defer func() { if response != nil && response.Body != nil { _ = response.Body.Close() } }() if response.StatusCode == http.StatusOK { return nil // success } log.Warn().Msgf("%s endpoint %v returned status %d", method, endpoint, response.StatusCode) } return fmt.Errorf("failed to %s endpoint %v", method, endpoint) } func bodyFromAll() error { return applyAllRequest(http.MethodPut, haproxyBodyFormAll) } func reqCaptureFromAll() error { return applyAllRequest(http.MethodPut, haproxyReqCaptureFormAll) } func manageAll() error { return applyAllRequest(http.MethodPut, haproxyManageAllURL) } func UnmanageAll() error { return applyAllRequest(http.MethodPut, haproxyUnManageAllURL) } func unmanageGlobal() error { return applyAllRequest(http.MethodDelete, haproxyUnmanageGlobalURL) } func applyAllRequest(method, path string) error { log.Trace().Msgf("Sending request to %s all endpoints at URL %s", method, path) request, err := http.NewRequest(method, path, nil) if err != nil { return err } response, err := http.DefaultClient.Do(request) if err != nil { return err } if response.StatusCode != http.StatusOK { buffer := make([]byte, 1024) _, err := response.Body.Read(buffer) defer func() { if response != nil && response.Body != nil { _ = response.Body.Close() } }() if err != nil { return fmt.Errorf( "failed to manage all, status: %v", response.StatusCode, ) } return fmt.Errorf("failed to manage all, status: %v, body: %v", response.StatusCode, string(buffer)) } return nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server