utils.go•19.8 kB
package utils
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"flag"
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"last9-mcp/internal/models"
"github.com/peterbourgon/ff/v3"
last9mcp "github.com/last9/mcp-go-sdk/mcp"
)
// ExtractOrgSlugFromToken extracts organization slug from JWT token
func ExtractOrgSlugFromToken(accessToken string) (string, error) {
claims, err := ExtractClaimsFromToken(accessToken)
if err != nil {
return "", fmt.Errorf("failed to extract claims from token: %w", err)
}
orgSlug, ok := claims["organization_slug"].(string)
if !ok {
return "", errors.New("organization slug not found in token")
}
return orgSlug, nil
}
func ExtractClaimsFromToken(accessToken string) (map[string]interface{}, error) {
// Split the token into parts
parts := strings.Split(accessToken, ".")
if len(parts) != 3 {
return nil, errors.New("invalid JWT token format")
}
// Decode the payload (second part)
payload, err := base64.RawURLEncoding.DecodeString(parts[1])
if err != nil {
return nil, fmt.Errorf("failed to decode token payload: %w", err)
}
// Parse the JSON payload
var claims map[string]interface{}
if err := json.Unmarshal(payload, &claims); err != nil {
return nil, fmt.Errorf("failed to parse token claims: %w", err)
}
return claims, nil
}
func ExtractActionURLFromToken(accessToken string) (string, error) {
// Extract ActionURL from token claims
claims, err := ExtractClaimsFromToken(accessToken)
if err != nil {
return "", fmt.Errorf("failed to extract claims from token: %w", err)
}
// Get ActionURL from aud field
aud, ok := claims["aud"].([]interface{})
if !ok || len(aud) == 0 {
return "", errors.New("no audience found in token claims")
}
// Handle case where audience already includes https:// protocol
audStr := aud[0].(string)
if strings.HasPrefix(audStr, "https://") || strings.HasPrefix(audStr, "http://") {
return audStr, nil
}
return fmt.Sprintf("https://%s", audStr), nil
}
// RefreshAccessToken gets a new access token using the refresh token
func RefreshAccessToken(ctx context.Context, client *http.Client, cfg models.Config) (string, error) {
data := map[string]string{
"refresh_token": cfg.RefreshToken,
}
jsonData, err := json.Marshal(data)
if err != nil {
return "", fmt.Errorf("failed to marshal request: %w", err)
}
// Extract ActionURL from token claims
actionURL, err := ExtractActionURLFromToken(cfg.RefreshToken)
if err != nil {
return "", fmt.Errorf("failed to extract action URL from refresh token: %w", err)
}
// Handle case where actionURL already includes /api path
oauthURL := actionURL
if strings.HasSuffix(actionURL, "/api") {
oauthURL = strings.TrimSuffix(actionURL, "/api")
}
u, err := url.Parse(oauthURL + "/api/v4/oauth/access_token")
if err != nil {
return "", fmt.Errorf("failed to parse URL: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(jsonData))
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
var result struct {
AccessToken string `json:"access_token"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", fmt.Errorf("failed to decode response: %w", err)
}
return result.AccessToken, nil
}
// GetDefaultRegion determines the region based on the Last9 BASE URL.
// This function extracts the hostname from URLs like "https://otlp-aps1.last9.io:443"
// and maps them to the correct AWS regions for API routing.
func GetDefaultRegion(baseURL string) string {
// Extract hostname from URL (remove protocol and port)
// Transform: "https://otlp-aps1.last9.io:443" → "otlp-aps1.last9.io"
hostname := baseURL
if strings.HasPrefix(hostname, "https://") {
hostname = strings.TrimPrefix(hostname, "https://")
}
if strings.HasPrefix(hostname, "http://") {
hostname = strings.TrimPrefix(hostname, "http://")
}
// Remove port if present (:443, :80, etc.)
if colonIndex := strings.Index(hostname, ":"); colonIndex != -1 {
hostname = hostname[:colonIndex]
}
switch hostname {
case "otlp.last9.io":
return "us-east-1"
case "otlp-aps1.last9.io":
return "ap-south-1"
case "otlp-apse1.last9.io":
return "ap-southeast-1"
default:
return "us-east-1" // default to us-east-1 if URL pattern doesn't match
}
}
// GetTimeRange returns start and end times based on lookback minutes
// If start_time_iso and end_time_iso are provided, they take precedence
// Otherwise, returns now and now - lookbackMinutes
//
// IMPORTANT: All ISO timestamps are parsed as UTC to ensure consistent behavior
// across different server timezones. This prevents timezone-related bugs where
// queries return data from unexpected time periods.
func GetTimeRange(params map[string]interface{}, defaultLookbackMinutes int) (startTime, endTime time.Time, err error) {
// Always use UTC to ensure consistent behavior across timezones
endTime = time.Now().UTC()
// First check if lookback_minutes is provided
lookbackMinutes := defaultLookbackMinutes
if l, ok := params["lookback_minutes"].(float64); ok {
lookbackMinutes = int(l)
if lookbackMinutes < 1 {
return time.Time{}, time.Time{}, fmt.Errorf("lookback_minutes must be at least 1")
}
if lookbackMinutes > 1440 { // 24 hours
return time.Time{}, time.Time{}, fmt.Errorf("lookback_minutes cannot exceed 1440 (24 hours)")
}
}
// Default start time based on lookback
startTime = endTime.Add(time.Duration(-lookbackMinutes) * time.Minute)
// Override with explicit timestamps if provided
if startTimeStr, ok := params["start_time_iso"].(string); ok && startTimeStr != "" {
t, err := time.Parse("2006-01-02 15:04:05", startTimeStr)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("invalid start_time_iso format: %w", err)
}
// Force UTC timezone to prevent server timezone from affecting timestamp interpretation
// This ensures "2025-06-23 16:00:00" is always treated as UTC, not local time
startTime = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
// If start_time is provided but no end_time, use start_time + lookback_minutes
if endTimeStr, ok := params["end_time_iso"].(string); !ok || endTimeStr == "" {
endTime = startTime.Add(time.Duration(lookbackMinutes) * time.Minute)
}
}
if endTimeStr, ok := params["end_time_iso"].(string); ok && endTimeStr != "" {
t, err := time.Parse("2006-01-02 15:04:05", endTimeStr)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("invalid end_time_iso format: %w", err)
}
// Force UTC timezone to prevent server timezone from affecting timestamp interpretation
// This ensures "2025-06-23 16:30:00" is always treated as UTC, not local time
endTime = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
// Validate time range
if startTime.After(endTime) {
return time.Time{}, time.Time{}, fmt.Errorf("start_time cannot be after end_time")
}
// Ensure time range doesn't exceed 24 hours
if endTime.Sub(startTime) > 24*time.Hour {
return time.Time{}, time.Time{}, fmt.Errorf("time range cannot exceed 24 hours")
}
return startTime, endTime, nil
}
// Version information
var (
Version = "dev" // Set by goreleaser
CommitSHA = "unknown" // Set by goreleaser
BuildTime = "unknown" // Set by goreleaser
)
// setupConfig initializes and parses the configuration
func SetupConfig(defaults models.Config) (models.Config, error) {
fs := flag.NewFlagSet("last9-mcp", flag.ExitOnError)
var cfg models.Config
fs.StringVar(&cfg.AuthToken, "auth", os.Getenv("LAST9_AUTH_TOKEN"), "Last9 API auth token")
fs.StringVar(&cfg.BaseURL, "url", os.Getenv("LAST9_BASE_URL"), "Last9 API URL")
fs.StringVar(&cfg.RefreshToken, "refresh_token", os.Getenv("LAST9_REFRESH_TOKEN"), "Last9 refresh token for authentication")
fs.Float64Var(&cfg.RequestRateLimit, "rate", 1, "Requests per second limit")
fs.IntVar(&cfg.RequestRateBurst, "burst", 1, "Request burst capacity")
fs.BoolVar(&cfg.HTTPMode, "http", false, "Run as HTTP server instead of STDIO")
fs.StringVar(&cfg.Port, "port", "8080", "HTTP server port")
fs.StringVar(&cfg.Host, "host", "localhost", "HTTP server host")
versionFlag := fs.Bool("version", false, "Print version information")
var configFile string
fs.StringVar(&configFile, "config", "", "config file path")
err := ff.Parse(fs, os.Args[1:],
ff.WithEnvVarPrefix("LAST9"),
ff.WithConfigFileFlag("config"),
ff.WithConfigFileParser(ff.JSONParser),
)
if err != nil {
return cfg, fmt.Errorf("failed to parse configuration: %w", err)
}
if *versionFlag {
fmt.Printf("Version: %s\nCommit: %s\nBuild Time: %s\n", Version, CommitSHA, BuildTime)
os.Exit(0)
}
if cfg.AuthToken == "" {
if defaults.AuthToken != "" {
cfg.AuthToken = defaults.AuthToken
} else {
return cfg, errors.New("Last9 auth token must be provided via LAST9_AUTH_TOKEN env var")
}
}
// Set default base URL if not provided
if cfg.BaseURL == "" {
if defaults.BaseURL != "" {
cfg.BaseURL = defaults.BaseURL
} else {
return cfg, errors.New("Last9 base URL must be provided via LAST9_BASE_URL env var")
}
}
if cfg.RefreshToken == "" {
if defaults.RefreshToken != "" {
cfg.RefreshToken = defaults.RefreshToken
} else {
return cfg, errors.New("Last9 refresh token must be provided via LAST9_REFRESH_TOKEN env var")
}
}
return cfg, nil
}
func PopulateAPICfg(cfg *models.Config) error {
client := last9mcp.WithHTTPTracing(&http.Client{Timeout: 30 * time.Second})
accessToken, err := RefreshAccessToken(context.Background(), client, *cfg)
if err != nil {
return fmt.Errorf("failed to refresh access token: %w", err)
}
cfg.AccessToken = accessToken
orgSlug, err := ExtractOrgSlugFromToken(cfg.AccessToken)
if err != nil {
return fmt.Errorf("failed to extract organization slug from access token: %w", err)
}
cfg.OrgSlug = orgSlug
cfg.APIBaseURL = fmt.Sprintf("https://%s/api/v4/organizations/%s", "app.last9.io", cfg.OrgSlug)
// make a GET call to /datasources and iterate over the response array
// find the element with is_default set to true and extract url, properties.username, properties.password
// add bearer token auth to the request header
req, err := http.NewRequestWithContext(context.Background(), "GET", cfg.APIBaseURL+"/datasources", nil)
if err != nil {
return fmt.Errorf("failed to create request for datasources: %w", err)
}
req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to get metrics datasource: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to get metrics datasource: %s", resp.Status)
}
var datasources []struct {
IsDefault bool `json:"is_default"`
URL string `json:"url"`
Properties struct {
Username string `json:"username"`
Password string `json:"password"`
} `json:"properties"`
}
if err := json.NewDecoder(resp.Body).Decode(&datasources); err != nil {
return fmt.Errorf("failed to decode metrics datasources response: %w", err)
}
for _, ds := range datasources {
if ds.IsDefault {
cfg.PrometheusReadURL = ds.URL
cfg.PrometheusUsername = ds.Properties.Username
cfg.PrometheusPassword = ds.Properties.Password
break
}
}
if cfg.PrometheusReadURL == "" || cfg.PrometheusUsername == "" || cfg.PrometheusPassword == "" {
return errors.New("default datasource not found or missing required properties")
}
return nil
}
func MakePromInstantAPIQuery(ctx context.Context, client *http.Client, promql string, endTimeParam int64, cfg models.Config) (*http.Response, error) {
promInstantParam := struct {
Query string `json:"query"`
Timestamp int64 `json:"timestamp"`
ReadURL string `json:"read_url"`
Username string `json:"username"`
Password string `json:"password"`
}{promql, endTimeParam, cfg.PrometheusReadURL, cfg.PrometheusUsername, cfg.PrometheusPassword}
bodyBytes, err := json.Marshal(promInstantParam)
if err != nil {
return nil, err
}
reqUrl := fmt.Sprintf("%s/prom_query_instant", cfg.APIBaseURL)
req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, strings.NewReader(string(bodyBytes)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
return client.Do(req)
}
func MakePromRangeAPIQuery(ctx context.Context, client *http.Client, promql string, startTimeParam, endTimeParam int64, cfg models.Config) (*http.Response, error) {
promRangeParam := struct {
Query string `json:"query"`
Timestamp int64 `json:"timestamp"`
Window int64 `json:"window"`
ReadURL string `json:"read_url"`
Username string `json:"username"`
Password string `json:"password"`
}{
Query: promql,
Timestamp: startTimeParam,
Window: endTimeParam - startTimeParam,
ReadURL: cfg.PrometheusReadURL,
Username: cfg.PrometheusUsername,
Password: cfg.PrometheusPassword,
}
bodyBytes, err := json.Marshal(promRangeParam)
if err != nil {
return nil, err
}
reqUrl := fmt.Sprintf("%s/prom_query", cfg.APIBaseURL)
req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, bytes.NewReader(bodyBytes))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
return client.Do(req)
}
// function to get the values of a particular label, for a given query filter
// path: /prom_label_values
func MakePromLabelValuesAPIQuery(ctx context.Context, client *http.Client, label string, matches string, startTimeParam, endTimeParam int64, cfg models.Config) (*http.Response, error) {
promLabelValuesParam := struct {
Label string `json:"label"`
Timestamp int64 `json:"timestamp"`
Window int64 `json:"window"`
ReadURL string `json:"read_url"`
Username string `json:"username"`
Password string `json:"password"`
Matches []string `json:"matches"`
}{
Label: label,
Timestamp: startTimeParam,
Window: endTimeParam - startTimeParam,
ReadURL: cfg.PrometheusReadURL,
Username: cfg.PrometheusUsername,
Password: cfg.PrometheusPassword,
Matches: []string{matches},
}
bodyBytes, err := json.Marshal(promLabelValuesParam)
if err != nil {
return nil, err
}
reqUrl := fmt.Sprintf("%s/prom_label_values", cfg.APIBaseURL)
req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, bytes.NewReader(bodyBytes))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
return client.Do(req)
}
func MakePromLabelsAPIQuery(ctx context.Context, client *http.Client, metric string, startTimeParam, endTimeParam int64, cfg models.Config) (*http.Response, error) {
promLabelsParam := struct {
Timestamp int64 `json:"timestamp"`
Window int64 `json:"window"`
ReadURL string `json:"read_url"`
Username string `json:"username"`
Password string `json:"password"`
Metric string `json:"metric"`
}{
Timestamp: startTimeParam,
Window: endTimeParam - startTimeParam,
ReadURL: cfg.PrometheusReadURL,
Username: cfg.PrometheusUsername,
Password: cfg.PrometheusPassword,
Metric: metric,
}
bodyBytes, err := json.Marshal(promLabelsParam)
if err != nil {
return nil, err
}
reqUrl := fmt.Sprintf("%s/apm/labels", cfg.APIBaseURL)
req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, bytes.NewReader(bodyBytes))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
return client.Do(req)
}
// ConvertTimestamp converts a timestamp from the API response to RFC3339 format
func ConvertTimestamp(timestamp any) string {
switch ts := timestamp.(type) {
case string:
// Try to parse as Unix nanoseconds timestamp
if nsTimestamp, err := strconv.ParseInt(ts, 10, 64); err == nil {
// Convert nanoseconds to time.Time and format as RFC3339
return time.Unix(0, nsTimestamp).UTC().Format(time.RFC3339)
}
// If it's already a formatted timestamp, return as is
return ts
case float64:
// Convert to int64 and treat as nanoseconds
return time.Unix(0, int64(ts)).UTC().Format(time.RFC3339)
case int64:
// Treat as nanoseconds
return time.Unix(0, ts).UTC().Format(time.RFC3339)
default:
// Fallback to current time if we can't parse
return time.Now().UTC().Format(time.RFC3339)
}
}
// ParseStringArray safely extracts string array from interface{}
func ParseStringArray(value interface{}) []string {
var result []string
if array, ok := value.([]interface{}); ok {
for _, item := range array {
if str, ok := item.(string); ok && str != "" {
result = append(result, str)
}
}
}
return result
}
// BuildOrFilter creates an optimized filter for single or multiple values of the same field.
// For single values, returns a simple $eq filter. For multiple values, returns an $or filter.
// This optimization reduces query complexity and improves performance for single-value filters.
func BuildOrFilter(fieldName string, values []string) map[string]interface{} {
if len(values) == 1 {
return map[string]interface{}{
"$eq": []interface{}{fieldName, values[0]},
}
}
orConditions := make([]map[string]interface{}, 0, len(values))
for _, value := range values {
orConditions = append(orConditions, map[string]interface{}{
"$eq": []interface{}{fieldName, value},
})
}
return map[string]interface{}{"$or": orConditions}
}
// FetchPhysicalIndex retrieves the physical index for logs queries using the provided service name and environment
// Uses an instant query for data from the last 1 day
func FetchPhysicalIndex(ctx context.Context, client *http.Client, cfg models.Config, serviceName, env string) (string, error) {
// Build the PromQL query with a 2-hour window
query := fmt.Sprintf("sum by (name, destination) (physical_index_service_count{service_name='%s'", serviceName)
if env != "" {
query += fmt.Sprintf(",env=~'%s'", env)
}
query += "}[1d])"
// Get current time for the instant query
currentTime := time.Now().Unix()
// Make the Prometheus instant query
resp, err := MakePromInstantAPIQuery(ctx, client, query, currentTime, cfg)
if err != nil {
return "", fmt.Errorf("failed to fetch physical index: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := json.Marshal(resp.Body)
return "", fmt.Errorf("physical index API returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
// Parse the response to extract the first index
var physicalIndexResponse []struct {
Metric map[string]string `json:"metric"`
Value []interface{} `json:"value"`
}
if err := json.NewDecoder(resp.Body).Decode(&physicalIndexResponse); err != nil {
return "", fmt.Errorf("failed to decode physical index response: %w", err)
}
if len(physicalIndexResponse) == 0 {
// Continue without index if it is not available
return "", nil
}
// Extract the index name from the first result
firstResult := physicalIndexResponse[0]
if indexName, exists := firstResult.Metric["name"]; exists {
return fmt.Sprintf("physical_index:%s", indexName), nil
}
return "", fmt.Errorf("no index name found in physical index response")
}