Skip to main content
Glama

Last9 Observability MCP

Official
Apache 2.0
122
45
  • Apple
utils.go19.8 kB
package utils import ( "bytes" "context" "encoding/base64" "encoding/json" "errors" "flag" "fmt" "net/http" "net/url" "os" "strconv" "strings" "time" "last9-mcp/internal/models" "github.com/peterbourgon/ff/v3" last9mcp "github.com/last9/mcp-go-sdk/mcp" ) // ExtractOrgSlugFromToken extracts organization slug from JWT token func ExtractOrgSlugFromToken(accessToken string) (string, error) { claims, err := ExtractClaimsFromToken(accessToken) if err != nil { return "", fmt.Errorf("failed to extract claims from token: %w", err) } orgSlug, ok := claims["organization_slug"].(string) if !ok { return "", errors.New("organization slug not found in token") } return orgSlug, nil } func ExtractClaimsFromToken(accessToken string) (map[string]interface{}, error) { // Split the token into parts parts := strings.Split(accessToken, ".") if len(parts) != 3 { return nil, errors.New("invalid JWT token format") } // Decode the payload (second part) payload, err := base64.RawURLEncoding.DecodeString(parts[1]) if err != nil { return nil, fmt.Errorf("failed to decode token payload: %w", err) } // Parse the JSON payload var claims map[string]interface{} if err := json.Unmarshal(payload, &claims); err != nil { return nil, fmt.Errorf("failed to parse token claims: %w", err) } return claims, nil } func ExtractActionURLFromToken(accessToken string) (string, error) { // Extract ActionURL from token claims claims, err := ExtractClaimsFromToken(accessToken) if err != nil { return "", fmt.Errorf("failed to extract claims from token: %w", err) } // Get ActionURL from aud field aud, ok := claims["aud"].([]interface{}) if !ok || len(aud) == 0 { return "", errors.New("no audience found in token claims") } // Handle case where audience already includes https:// protocol audStr := aud[0].(string) if strings.HasPrefix(audStr, "https://") || strings.HasPrefix(audStr, "http://") { return audStr, nil } return fmt.Sprintf("https://%s", audStr), nil } // RefreshAccessToken gets a new access token using the refresh token func RefreshAccessToken(ctx context.Context, client *http.Client, cfg models.Config) (string, error) { data := map[string]string{ "refresh_token": cfg.RefreshToken, } jsonData, err := json.Marshal(data) if err != nil { return "", fmt.Errorf("failed to marshal request: %w", err) } // Extract ActionURL from token claims actionURL, err := ExtractActionURLFromToken(cfg.RefreshToken) if err != nil { return "", fmt.Errorf("failed to extract action URL from refresh token: %w", err) } // Handle case where actionURL already includes /api path oauthURL := actionURL if strings.HasSuffix(actionURL, "/api") { oauthURL = strings.TrimSuffix(actionURL, "/api") } u, err := url.Parse(oauthURL + "/api/v4/oauth/access_token") if err != nil { return "", fmt.Errorf("failed to parse URL: %w", err) } req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(jsonData)) if err != nil { return "", fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/json") resp, err := client.Do(req) if err != nil { return "", fmt.Errorf("request failed: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) } var result struct { AccessToken string `json:"access_token"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return "", fmt.Errorf("failed to decode response: %w", err) } return result.AccessToken, nil } // GetDefaultRegion determines the region based on the Last9 BASE URL. // This function extracts the hostname from URLs like "https://otlp-aps1.last9.io:443" // and maps them to the correct AWS regions for API routing. func GetDefaultRegion(baseURL string) string { // Extract hostname from URL (remove protocol and port) // Transform: "https://otlp-aps1.last9.io:443" → "otlp-aps1.last9.io" hostname := baseURL if strings.HasPrefix(hostname, "https://") { hostname = strings.TrimPrefix(hostname, "https://") } if strings.HasPrefix(hostname, "http://") { hostname = strings.TrimPrefix(hostname, "http://") } // Remove port if present (:443, :80, etc.) if colonIndex := strings.Index(hostname, ":"); colonIndex != -1 { hostname = hostname[:colonIndex] } switch hostname { case "otlp.last9.io": return "us-east-1" case "otlp-aps1.last9.io": return "ap-south-1" case "otlp-apse1.last9.io": return "ap-southeast-1" default: return "us-east-1" // default to us-east-1 if URL pattern doesn't match } } // GetTimeRange returns start and end times based on lookback minutes // If start_time_iso and end_time_iso are provided, they take precedence // Otherwise, returns now and now - lookbackMinutes // // IMPORTANT: All ISO timestamps are parsed as UTC to ensure consistent behavior // across different server timezones. This prevents timezone-related bugs where // queries return data from unexpected time periods. func GetTimeRange(params map[string]interface{}, defaultLookbackMinutes int) (startTime, endTime time.Time, err error) { // Always use UTC to ensure consistent behavior across timezones endTime = time.Now().UTC() // First check if lookback_minutes is provided lookbackMinutes := defaultLookbackMinutes if l, ok := params["lookback_minutes"].(float64); ok { lookbackMinutes = int(l) if lookbackMinutes < 1 { return time.Time{}, time.Time{}, fmt.Errorf("lookback_minutes must be at least 1") } if lookbackMinutes > 1440 { // 24 hours return time.Time{}, time.Time{}, fmt.Errorf("lookback_minutes cannot exceed 1440 (24 hours)") } } // Default start time based on lookback startTime = endTime.Add(time.Duration(-lookbackMinutes) * time.Minute) // Override with explicit timestamps if provided if startTimeStr, ok := params["start_time_iso"].(string); ok && startTimeStr != "" { t, err := time.Parse("2006-01-02 15:04:05", startTimeStr) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("invalid start_time_iso format: %w", err) } // Force UTC timezone to prevent server timezone from affecting timestamp interpretation // This ensures "2025-06-23 16:00:00" is always treated as UTC, not local time startTime = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) // If start_time is provided but no end_time, use start_time + lookback_minutes if endTimeStr, ok := params["end_time_iso"].(string); !ok || endTimeStr == "" { endTime = startTime.Add(time.Duration(lookbackMinutes) * time.Minute) } } if endTimeStr, ok := params["end_time_iso"].(string); ok && endTimeStr != "" { t, err := time.Parse("2006-01-02 15:04:05", endTimeStr) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("invalid end_time_iso format: %w", err) } // Force UTC timezone to prevent server timezone from affecting timestamp interpretation // This ensures "2025-06-23 16:30:00" is always treated as UTC, not local time endTime = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) } // Validate time range if startTime.After(endTime) { return time.Time{}, time.Time{}, fmt.Errorf("start_time cannot be after end_time") } // Ensure time range doesn't exceed 24 hours if endTime.Sub(startTime) > 24*time.Hour { return time.Time{}, time.Time{}, fmt.Errorf("time range cannot exceed 24 hours") } return startTime, endTime, nil } // Version information var ( Version = "dev" // Set by goreleaser CommitSHA = "unknown" // Set by goreleaser BuildTime = "unknown" // Set by goreleaser ) // setupConfig initializes and parses the configuration func SetupConfig(defaults models.Config) (models.Config, error) { fs := flag.NewFlagSet("last9-mcp", flag.ExitOnError) var cfg models.Config fs.StringVar(&cfg.AuthToken, "auth", os.Getenv("LAST9_AUTH_TOKEN"), "Last9 API auth token") fs.StringVar(&cfg.BaseURL, "url", os.Getenv("LAST9_BASE_URL"), "Last9 API URL") fs.StringVar(&cfg.RefreshToken, "refresh_token", os.Getenv("LAST9_REFRESH_TOKEN"), "Last9 refresh token for authentication") fs.Float64Var(&cfg.RequestRateLimit, "rate", 1, "Requests per second limit") fs.IntVar(&cfg.RequestRateBurst, "burst", 1, "Request burst capacity") fs.BoolVar(&cfg.HTTPMode, "http", false, "Run as HTTP server instead of STDIO") fs.StringVar(&cfg.Port, "port", "8080", "HTTP server port") fs.StringVar(&cfg.Host, "host", "localhost", "HTTP server host") versionFlag := fs.Bool("version", false, "Print version information") var configFile string fs.StringVar(&configFile, "config", "", "config file path") err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("LAST9"), ff.WithConfigFileFlag("config"), ff.WithConfigFileParser(ff.JSONParser), ) if err != nil { return cfg, fmt.Errorf("failed to parse configuration: %w", err) } if *versionFlag { fmt.Printf("Version: %s\nCommit: %s\nBuild Time: %s\n", Version, CommitSHA, BuildTime) os.Exit(0) } if cfg.AuthToken == "" { if defaults.AuthToken != "" { cfg.AuthToken = defaults.AuthToken } else { return cfg, errors.New("Last9 auth token must be provided via LAST9_AUTH_TOKEN env var") } } // Set default base URL if not provided if cfg.BaseURL == "" { if defaults.BaseURL != "" { cfg.BaseURL = defaults.BaseURL } else { return cfg, errors.New("Last9 base URL must be provided via LAST9_BASE_URL env var") } } if cfg.RefreshToken == "" { if defaults.RefreshToken != "" { cfg.RefreshToken = defaults.RefreshToken } else { return cfg, errors.New("Last9 refresh token must be provided via LAST9_REFRESH_TOKEN env var") } } return cfg, nil } func PopulateAPICfg(cfg *models.Config) error { client := last9mcp.WithHTTPTracing(&http.Client{Timeout: 30 * time.Second}) accessToken, err := RefreshAccessToken(context.Background(), client, *cfg) if err != nil { return fmt.Errorf("failed to refresh access token: %w", err) } cfg.AccessToken = accessToken orgSlug, err := ExtractOrgSlugFromToken(cfg.AccessToken) if err != nil { return fmt.Errorf("failed to extract organization slug from access token: %w", err) } cfg.OrgSlug = orgSlug cfg.APIBaseURL = fmt.Sprintf("https://%s/api/v4/organizations/%s", "app.last9.io", cfg.OrgSlug) // make a GET call to /datasources and iterate over the response array // find the element with is_default set to true and extract url, properties.username, properties.password // add bearer token auth to the request header req, err := http.NewRequestWithContext(context.Background(), "GET", cfg.APIBaseURL+"/datasources", nil) if err != nil { return fmt.Errorf("failed to create request for datasources: %w", err) } req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken) resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to get metrics datasource: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("failed to get metrics datasource: %s", resp.Status) } var datasources []struct { IsDefault bool `json:"is_default"` URL string `json:"url"` Properties struct { Username string `json:"username"` Password string `json:"password"` } `json:"properties"` } if err := json.NewDecoder(resp.Body).Decode(&datasources); err != nil { return fmt.Errorf("failed to decode metrics datasources response: %w", err) } for _, ds := range datasources { if ds.IsDefault { cfg.PrometheusReadURL = ds.URL cfg.PrometheusUsername = ds.Properties.Username cfg.PrometheusPassword = ds.Properties.Password break } } if cfg.PrometheusReadURL == "" || cfg.PrometheusUsername == "" || cfg.PrometheusPassword == "" { return errors.New("default datasource not found or missing required properties") } return nil } func MakePromInstantAPIQuery(ctx context.Context, client *http.Client, promql string, endTimeParam int64, cfg models.Config) (*http.Response, error) { promInstantParam := struct { Query string `json:"query"` Timestamp int64 `json:"timestamp"` ReadURL string `json:"read_url"` Username string `json:"username"` Password string `json:"password"` }{promql, endTimeParam, cfg.PrometheusReadURL, cfg.PrometheusUsername, cfg.PrometheusPassword} bodyBytes, err := json.Marshal(promInstantParam) if err != nil { return nil, err } reqUrl := fmt.Sprintf("%s/prom_query_instant", cfg.APIBaseURL) req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, strings.NewReader(string(bodyBytes))) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken) return client.Do(req) } func MakePromRangeAPIQuery(ctx context.Context, client *http.Client, promql string, startTimeParam, endTimeParam int64, cfg models.Config) (*http.Response, error) { promRangeParam := struct { Query string `json:"query"` Timestamp int64 `json:"timestamp"` Window int64 `json:"window"` ReadURL string `json:"read_url"` Username string `json:"username"` Password string `json:"password"` }{ Query: promql, Timestamp: startTimeParam, Window: endTimeParam - startTimeParam, ReadURL: cfg.PrometheusReadURL, Username: cfg.PrometheusUsername, Password: cfg.PrometheusPassword, } bodyBytes, err := json.Marshal(promRangeParam) if err != nil { return nil, err } reqUrl := fmt.Sprintf("%s/prom_query", cfg.APIBaseURL) req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, bytes.NewReader(bodyBytes)) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken) return client.Do(req) } // function to get the values of a particular label, for a given query filter // path: /prom_label_values func MakePromLabelValuesAPIQuery(ctx context.Context, client *http.Client, label string, matches string, startTimeParam, endTimeParam int64, cfg models.Config) (*http.Response, error) { promLabelValuesParam := struct { Label string `json:"label"` Timestamp int64 `json:"timestamp"` Window int64 `json:"window"` ReadURL string `json:"read_url"` Username string `json:"username"` Password string `json:"password"` Matches []string `json:"matches"` }{ Label: label, Timestamp: startTimeParam, Window: endTimeParam - startTimeParam, ReadURL: cfg.PrometheusReadURL, Username: cfg.PrometheusUsername, Password: cfg.PrometheusPassword, Matches: []string{matches}, } bodyBytes, err := json.Marshal(promLabelValuesParam) if err != nil { return nil, err } reqUrl := fmt.Sprintf("%s/prom_label_values", cfg.APIBaseURL) req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, bytes.NewReader(bodyBytes)) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken) return client.Do(req) } func MakePromLabelsAPIQuery(ctx context.Context, client *http.Client, metric string, startTimeParam, endTimeParam int64, cfg models.Config) (*http.Response, error) { promLabelsParam := struct { Timestamp int64 `json:"timestamp"` Window int64 `json:"window"` ReadURL string `json:"read_url"` Username string `json:"username"` Password string `json:"password"` Metric string `json:"metric"` }{ Timestamp: startTimeParam, Window: endTimeParam - startTimeParam, ReadURL: cfg.PrometheusReadURL, Username: cfg.PrometheusUsername, Password: cfg.PrometheusPassword, Metric: metric, } bodyBytes, err := json.Marshal(promLabelsParam) if err != nil { return nil, err } reqUrl := fmt.Sprintf("%s/apm/labels", cfg.APIBaseURL) req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, bytes.NewReader(bodyBytes)) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken) return client.Do(req) } // ConvertTimestamp converts a timestamp from the API response to RFC3339 format func ConvertTimestamp(timestamp any) string { switch ts := timestamp.(type) { case string: // Try to parse as Unix nanoseconds timestamp if nsTimestamp, err := strconv.ParseInt(ts, 10, 64); err == nil { // Convert nanoseconds to time.Time and format as RFC3339 return time.Unix(0, nsTimestamp).UTC().Format(time.RFC3339) } // If it's already a formatted timestamp, return as is return ts case float64: // Convert to int64 and treat as nanoseconds return time.Unix(0, int64(ts)).UTC().Format(time.RFC3339) case int64: // Treat as nanoseconds return time.Unix(0, ts).UTC().Format(time.RFC3339) default: // Fallback to current time if we can't parse return time.Now().UTC().Format(time.RFC3339) } } // ParseStringArray safely extracts string array from interface{} func ParseStringArray(value interface{}) []string { var result []string if array, ok := value.([]interface{}); ok { for _, item := range array { if str, ok := item.(string); ok && str != "" { result = append(result, str) } } } return result } // BuildOrFilter creates an optimized filter for single or multiple values of the same field. // For single values, returns a simple $eq filter. For multiple values, returns an $or filter. // This optimization reduces query complexity and improves performance for single-value filters. func BuildOrFilter(fieldName string, values []string) map[string]interface{} { if len(values) == 1 { return map[string]interface{}{ "$eq": []interface{}{fieldName, values[0]}, } } orConditions := make([]map[string]interface{}, 0, len(values)) for _, value := range values { orConditions = append(orConditions, map[string]interface{}{ "$eq": []interface{}{fieldName, value}, }) } return map[string]interface{}{"$or": orConditions} } // FetchPhysicalIndex retrieves the physical index for logs queries using the provided service name and environment // Uses an instant query for data from the last 1 day func FetchPhysicalIndex(ctx context.Context, client *http.Client, cfg models.Config, serviceName, env string) (string, error) { // Build the PromQL query with a 2-hour window query := fmt.Sprintf("sum by (name, destination) (physical_index_service_count{service_name='%s'", serviceName) if env != "" { query += fmt.Sprintf(",env=~'%s'", env) } query += "}[1d])" // Get current time for the instant query currentTime := time.Now().Unix() // Make the Prometheus instant query resp, err := MakePromInstantAPIQuery(ctx, client, query, currentTime, cfg) if err != nil { return "", fmt.Errorf("failed to fetch physical index: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { bodyBytes, _ := json.Marshal(resp.Body) return "", fmt.Errorf("physical index API returned status %d: %s", resp.StatusCode, string(bodyBytes)) } // Parse the response to extract the first index var physicalIndexResponse []struct { Metric map[string]string `json:"metric"` Value []interface{} `json:"value"` } if err := json.NewDecoder(resp.Body).Decode(&physicalIndexResponse); err != nil { return "", fmt.Errorf("failed to decode physical index response: %w", err) } if len(physicalIndexResponse) == 0 { // Continue without index if it is not available return "", nil } // Extract the index name from the first result firstResult := physicalIndexResponse[0] if indexName, exists := firstResult.Metric["name"]; exists { return fmt.Sprintf("physical_index:%s", indexName), nil } return "", fmt.Errorf("no index name found in physical index response") }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/last9/last9-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server