utils.go•19.8 kB
package utils
import (
	"bytes"
	"context"
	"encoding/base64"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"net/http"
	"net/url"
	"os"
	"strconv"
	"strings"
	"time"
	"last9-mcp/internal/models"
	"github.com/peterbourgon/ff/v3"
	last9mcp "github.com/last9/mcp-go-sdk/mcp"
)
// ExtractOrgSlugFromToken extracts organization slug from JWT token
func ExtractOrgSlugFromToken(accessToken string) (string, error) {
	claims, err := ExtractClaimsFromToken(accessToken)
	if err != nil {
		return "", fmt.Errorf("failed to extract claims from token: %w", err)
	}
	orgSlug, ok := claims["organization_slug"].(string)
	if !ok {
		return "", errors.New("organization slug not found in token")
	}
	return orgSlug, nil
}
func ExtractClaimsFromToken(accessToken string) (map[string]interface{}, error) {
	// Split the token into parts
	parts := strings.Split(accessToken, ".")
	if len(parts) != 3 {
		return nil, errors.New("invalid JWT token format")
	}
	// Decode the payload (second part)
	payload, err := base64.RawURLEncoding.DecodeString(parts[1])
	if err != nil {
		return nil, fmt.Errorf("failed to decode token payload: %w", err)
	}
	// Parse the JSON payload
	var claims map[string]interface{}
	if err := json.Unmarshal(payload, &claims); err != nil {
		return nil, fmt.Errorf("failed to parse token claims: %w", err)
	}
	return claims, nil
}
func ExtractActionURLFromToken(accessToken string) (string, error) {
	// Extract ActionURL from token claims
	claims, err := ExtractClaimsFromToken(accessToken)
	if err != nil {
		return "", fmt.Errorf("failed to extract claims from token: %w", err)
	}
	// Get ActionURL from aud field
	aud, ok := claims["aud"].([]interface{})
	if !ok || len(aud) == 0 {
		return "", errors.New("no audience found in token claims")
	}
	// Handle case where audience already includes https:// protocol
	audStr := aud[0].(string)
	if strings.HasPrefix(audStr, "https://") || strings.HasPrefix(audStr, "http://") {
		return audStr, nil
	}
	return fmt.Sprintf("https://%s", audStr), nil
}
// RefreshAccessToken gets a new access token using the refresh token
func RefreshAccessToken(ctx context.Context, client *http.Client, cfg models.Config) (string, error) {
	data := map[string]string{
		"refresh_token": cfg.RefreshToken,
	}
	jsonData, err := json.Marshal(data)
	if err != nil {
		return "", fmt.Errorf("failed to marshal request: %w", err)
	}
	// Extract ActionURL from token claims
	actionURL, err := ExtractActionURLFromToken(cfg.RefreshToken)
	if err != nil {
		return "", fmt.Errorf("failed to extract action URL from refresh token: %w", err)
	}
	// Handle case where actionURL already includes /api path
	oauthURL := actionURL
	if strings.HasSuffix(actionURL, "/api") {
		oauthURL = strings.TrimSuffix(actionURL, "/api")
	}
	u, err := url.Parse(oauthURL + "/api/v4/oauth/access_token")
	if err != nil {
		return "", fmt.Errorf("failed to parse URL: %w", err)
	}
	req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(jsonData))
	if err != nil {
		return "", fmt.Errorf("failed to create request: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")
	resp, err := client.Do(req)
	if err != nil {
		return "", fmt.Errorf("request failed: %w", err)
	}
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusOK {
		return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
	}
	var result struct {
		AccessToken string `json:"access_token"`
	}
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return "", fmt.Errorf("failed to decode response: %w", err)
	}
	return result.AccessToken, nil
}
// GetDefaultRegion determines the region based on the Last9 BASE URL.
// This function extracts the hostname from URLs like "https://otlp-aps1.last9.io:443"
// and maps them to the correct AWS regions for API routing.
func GetDefaultRegion(baseURL string) string {
	// Extract hostname from URL (remove protocol and port)
	// Transform: "https://otlp-aps1.last9.io:443" → "otlp-aps1.last9.io"
	hostname := baseURL
	if strings.HasPrefix(hostname, "https://") {
		hostname = strings.TrimPrefix(hostname, "https://")
	}
	if strings.HasPrefix(hostname, "http://") {
		hostname = strings.TrimPrefix(hostname, "http://")
	}
	// Remove port if present (:443, :80, etc.)
	if colonIndex := strings.Index(hostname, ":"); colonIndex != -1 {
		hostname = hostname[:colonIndex]
	}
	switch hostname {
	case "otlp.last9.io":
		return "us-east-1"
	case "otlp-aps1.last9.io":
		return "ap-south-1"
	case "otlp-apse1.last9.io":
		return "ap-southeast-1"
	default:
		return "us-east-1" // default to us-east-1 if URL pattern doesn't match
	}
}
// GetTimeRange returns start and end times based on lookback minutes
// If start_time_iso and end_time_iso are provided, they take precedence
// Otherwise, returns now and now - lookbackMinutes
//
// IMPORTANT: All ISO timestamps are parsed as UTC to ensure consistent behavior
// across different server timezones. This prevents timezone-related bugs where
// queries return data from unexpected time periods.
func GetTimeRange(params map[string]interface{}, defaultLookbackMinutes int) (startTime, endTime time.Time, err error) {
	// Always use UTC to ensure consistent behavior across timezones
	endTime = time.Now().UTC()
	// First check if lookback_minutes is provided
	lookbackMinutes := defaultLookbackMinutes
	if l, ok := params["lookback_minutes"].(float64); ok {
		lookbackMinutes = int(l)
		if lookbackMinutes < 1 {
			return time.Time{}, time.Time{}, fmt.Errorf("lookback_minutes must be at least 1")
		}
		if lookbackMinutes > 1440 { // 24 hours
			return time.Time{}, time.Time{}, fmt.Errorf("lookback_minutes cannot exceed 1440 (24 hours)")
		}
	}
	// Default start time based on lookback
	startTime = endTime.Add(time.Duration(-lookbackMinutes) * time.Minute)
	// Override with explicit timestamps if provided
	if startTimeStr, ok := params["start_time_iso"].(string); ok && startTimeStr != "" {
		t, err := time.Parse("2006-01-02 15:04:05", startTimeStr)
		if err != nil {
			return time.Time{}, time.Time{}, fmt.Errorf("invalid start_time_iso format: %w", err)
		}
		// Force UTC timezone to prevent server timezone from affecting timestamp interpretation
		// This ensures "2025-06-23 16:00:00" is always treated as UTC, not local time
		startTime = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
		// If start_time is provided but no end_time, use start_time + lookback_minutes
		if endTimeStr, ok := params["end_time_iso"].(string); !ok || endTimeStr == "" {
			endTime = startTime.Add(time.Duration(lookbackMinutes) * time.Minute)
		}
	}
	if endTimeStr, ok := params["end_time_iso"].(string); ok && endTimeStr != "" {
		t, err := time.Parse("2006-01-02 15:04:05", endTimeStr)
		if err != nil {
			return time.Time{}, time.Time{}, fmt.Errorf("invalid end_time_iso format: %w", err)
		}
		// Force UTC timezone to prevent server timezone from affecting timestamp interpretation
		// This ensures "2025-06-23 16:30:00" is always treated as UTC, not local time
		endTime = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
	}
	// Validate time range
	if startTime.After(endTime) {
		return time.Time{}, time.Time{}, fmt.Errorf("start_time cannot be after end_time")
	}
	// Ensure time range doesn't exceed 24 hours
	if endTime.Sub(startTime) > 24*time.Hour {
		return time.Time{}, time.Time{}, fmt.Errorf("time range cannot exceed 24 hours")
	}
	return startTime, endTime, nil
}
// Version information
var (
	Version   = "dev"     // Set by goreleaser
	CommitSHA = "unknown" // Set by goreleaser
	BuildTime = "unknown" // Set by goreleaser
)
// setupConfig initializes and parses the configuration
func SetupConfig(defaults models.Config) (models.Config, error) {
	fs := flag.NewFlagSet("last9-mcp", flag.ExitOnError)
	var cfg models.Config
	fs.StringVar(&cfg.AuthToken, "auth", os.Getenv("LAST9_AUTH_TOKEN"), "Last9 API auth token")
	fs.StringVar(&cfg.BaseURL, "url", os.Getenv("LAST9_BASE_URL"), "Last9 API URL")
	fs.StringVar(&cfg.RefreshToken, "refresh_token", os.Getenv("LAST9_REFRESH_TOKEN"), "Last9 refresh token for authentication")
	fs.Float64Var(&cfg.RequestRateLimit, "rate", 1, "Requests per second limit")
	fs.IntVar(&cfg.RequestRateBurst, "burst", 1, "Request burst capacity")
	fs.BoolVar(&cfg.HTTPMode, "http", false, "Run as HTTP server instead of STDIO")
	fs.StringVar(&cfg.Port, "port", "8080", "HTTP server port")
	fs.StringVar(&cfg.Host, "host", "localhost", "HTTP server host")
	versionFlag := fs.Bool("version", false, "Print version information")
	var configFile string
	fs.StringVar(&configFile, "config", "", "config file path")
	err := ff.Parse(fs, os.Args[1:],
		ff.WithEnvVarPrefix("LAST9"),
		ff.WithConfigFileFlag("config"),
		ff.WithConfigFileParser(ff.JSONParser),
	)
	if err != nil {
		return cfg, fmt.Errorf("failed to parse configuration: %w", err)
	}
	if *versionFlag {
		fmt.Printf("Version: %s\nCommit: %s\nBuild Time: %s\n", Version, CommitSHA, BuildTime)
		os.Exit(0)
	}
	if cfg.AuthToken == "" {
		if defaults.AuthToken != "" {
			cfg.AuthToken = defaults.AuthToken
		} else {
			return cfg, errors.New("Last9 auth token must be provided via LAST9_AUTH_TOKEN env var")
		}
	}
	// Set default base URL if not provided
	if cfg.BaseURL == "" {
		if defaults.BaseURL != "" {
			cfg.BaseURL = defaults.BaseURL
		} else {
			return cfg, errors.New("Last9 base URL must be provided via LAST9_BASE_URL env var")
		}
	}
	if cfg.RefreshToken == "" {
		if defaults.RefreshToken != "" {
			cfg.RefreshToken = defaults.RefreshToken
		} else {
			return cfg, errors.New("Last9 refresh token must be provided via LAST9_REFRESH_TOKEN env var")
		}
	}
	return cfg, nil
}
func PopulateAPICfg(cfg *models.Config) error {
	client := last9mcp.WithHTTPTracing(&http.Client{Timeout: 30 * time.Second})
	accessToken, err := RefreshAccessToken(context.Background(), client, *cfg)
	if err != nil {
		return fmt.Errorf("failed to refresh access token: %w", err)
	}
	cfg.AccessToken = accessToken
	orgSlug, err := ExtractOrgSlugFromToken(cfg.AccessToken)
	if err != nil {
		return fmt.Errorf("failed to extract organization slug from access token: %w", err)
	}
	cfg.OrgSlug = orgSlug
	cfg.APIBaseURL = fmt.Sprintf("https://%s/api/v4/organizations/%s", "app.last9.io", cfg.OrgSlug)
	// make a GET call to /datasources and iterate over the response array
	// find the element with is_default set to true and extract url, properties.username, properties.password
	// add bearer token auth to the request header
	req, err := http.NewRequestWithContext(context.Background(), "GET", cfg.APIBaseURL+"/datasources", nil)
	if err != nil {
		return fmt.Errorf("failed to create request for datasources: %w", err)
	}
	req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("failed to get metrics datasource: %w", err)
	}
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("failed to get metrics datasource: %s", resp.Status)
	}
	var datasources []struct {
		IsDefault  bool   `json:"is_default"`
		URL        string `json:"url"`
		Properties struct {
			Username string `json:"username"`
			Password string `json:"password"`
		} `json:"properties"`
	}
	if err := json.NewDecoder(resp.Body).Decode(&datasources); err != nil {
		return fmt.Errorf("failed to decode metrics datasources response: %w", err)
	}
	for _, ds := range datasources {
		if ds.IsDefault {
			cfg.PrometheusReadURL = ds.URL
			cfg.PrometheusUsername = ds.Properties.Username
			cfg.PrometheusPassword = ds.Properties.Password
			break
		}
	}
	if cfg.PrometheusReadURL == "" || cfg.PrometheusUsername == "" || cfg.PrometheusPassword == "" {
		return errors.New("default datasource not found or missing required properties")
	}
	return nil
}
func MakePromInstantAPIQuery(ctx context.Context, client *http.Client, promql string, endTimeParam int64, cfg models.Config) (*http.Response, error) {
	promInstantParam := struct {
		Query     string `json:"query"`
		Timestamp int64  `json:"timestamp"`
		ReadURL   string `json:"read_url"`
		Username  string `json:"username"`
		Password  string `json:"password"`
	}{promql, endTimeParam, cfg.PrometheusReadURL, cfg.PrometheusUsername, cfg.PrometheusPassword}
	bodyBytes, err := json.Marshal(promInstantParam)
	if err != nil {
		return nil, err
	}
	reqUrl := fmt.Sprintf("%s/prom_query_instant", cfg.APIBaseURL)
	req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, strings.NewReader(string(bodyBytes)))
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
	return client.Do(req)
}
func MakePromRangeAPIQuery(ctx context.Context, client *http.Client, promql string, startTimeParam, endTimeParam int64, cfg models.Config) (*http.Response, error) {
	promRangeParam := struct {
		Query     string `json:"query"`
		Timestamp int64  `json:"timestamp"`
		Window    int64  `json:"window"`
		ReadURL   string `json:"read_url"`
		Username  string `json:"username"`
		Password  string `json:"password"`
	}{
		Query:     promql,
		Timestamp: startTimeParam,
		Window:    endTimeParam - startTimeParam,
		ReadURL:   cfg.PrometheusReadURL,
		Username:  cfg.PrometheusUsername,
		Password:  cfg.PrometheusPassword,
	}
	bodyBytes, err := json.Marshal(promRangeParam)
	if err != nil {
		return nil, err
	}
	reqUrl := fmt.Sprintf("%s/prom_query", cfg.APIBaseURL)
	req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, bytes.NewReader(bodyBytes))
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
	return client.Do(req)
}
// function to get the values of a particular label, for a given query filter
// path: /prom_label_values
func MakePromLabelValuesAPIQuery(ctx context.Context, client *http.Client, label string, matches string, startTimeParam, endTimeParam int64, cfg models.Config) (*http.Response, error) {
	promLabelValuesParam := struct {
		Label     string   `json:"label"`
		Timestamp int64    `json:"timestamp"`
		Window    int64    `json:"window"`
		ReadURL   string   `json:"read_url"`
		Username  string   `json:"username"`
		Password  string   `json:"password"`
		Matches   []string `json:"matches"`
	}{
		Label:     label,
		Timestamp: startTimeParam,
		Window:    endTimeParam - startTimeParam,
		ReadURL:   cfg.PrometheusReadURL,
		Username:  cfg.PrometheusUsername,
		Password:  cfg.PrometheusPassword,
		Matches:   []string{matches},
	}
	bodyBytes, err := json.Marshal(promLabelValuesParam)
	if err != nil {
		return nil, err
	}
	reqUrl := fmt.Sprintf("%s/prom_label_values", cfg.APIBaseURL)
	req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, bytes.NewReader(bodyBytes))
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
	return client.Do(req)
}
func MakePromLabelsAPIQuery(ctx context.Context, client *http.Client, metric string, startTimeParam, endTimeParam int64, cfg models.Config) (*http.Response, error) {
	promLabelsParam := struct {
		Timestamp int64  `json:"timestamp"`
		Window    int64  `json:"window"`
		ReadURL   string `json:"read_url"`
		Username  string `json:"username"`
		Password  string `json:"password"`
		Metric    string `json:"metric"`
	}{
		Timestamp: startTimeParam,
		Window:    endTimeParam - startTimeParam,
		ReadURL:   cfg.PrometheusReadURL,
		Username:  cfg.PrometheusUsername,
		Password:  cfg.PrometheusPassword,
		Metric:    metric,
	}
	bodyBytes, err := json.Marshal(promLabelsParam)
	if err != nil {
		return nil, err
	}
	reqUrl := fmt.Sprintf("%s/apm/labels", cfg.APIBaseURL)
	req, err := http.NewRequestWithContext(ctx, "POST", reqUrl, bytes.NewReader(bodyBytes))
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-LAST9-API-TOKEN", "Bearer "+cfg.AccessToken)
	return client.Do(req)
}
// ConvertTimestamp converts a timestamp from the API response to RFC3339 format
func ConvertTimestamp(timestamp any) string {
	switch ts := timestamp.(type) {
	case string:
		// Try to parse as Unix nanoseconds timestamp
		if nsTimestamp, err := strconv.ParseInt(ts, 10, 64); err == nil {
			// Convert nanoseconds to time.Time and format as RFC3339
			return time.Unix(0, nsTimestamp).UTC().Format(time.RFC3339)
		}
		// If it's already a formatted timestamp, return as is
		return ts
	case float64:
		// Convert to int64 and treat as nanoseconds
		return time.Unix(0, int64(ts)).UTC().Format(time.RFC3339)
	case int64:
		// Treat as nanoseconds
		return time.Unix(0, ts).UTC().Format(time.RFC3339)
	default:
		// Fallback to current time if we can't parse
		return time.Now().UTC().Format(time.RFC3339)
	}
}
// ParseStringArray safely extracts string array from interface{}
func ParseStringArray(value interface{}) []string {
	var result []string
	if array, ok := value.([]interface{}); ok {
		for _, item := range array {
			if str, ok := item.(string); ok && str != "" {
				result = append(result, str)
			}
		}
	}
	return result
}
// BuildOrFilter creates an optimized filter for single or multiple values of the same field.
// For single values, returns a simple $eq filter. For multiple values, returns an $or filter.
// This optimization reduces query complexity and improves performance for single-value filters.
func BuildOrFilter(fieldName string, values []string) map[string]interface{} {
	if len(values) == 1 {
		return map[string]interface{}{
			"$eq": []interface{}{fieldName, values[0]},
		}
	}
	orConditions := make([]map[string]interface{}, 0, len(values))
	for _, value := range values {
		orConditions = append(orConditions, map[string]interface{}{
			"$eq": []interface{}{fieldName, value},
		})
	}
	return map[string]interface{}{"$or": orConditions}
}
// FetchPhysicalIndex retrieves the physical index for logs queries using the provided service name and environment
// Uses an instant query for data from the last 1 day
func FetchPhysicalIndex(ctx context.Context, client *http.Client, cfg models.Config, serviceName, env string) (string, error) {
	// Build the PromQL query with a 2-hour window
	query := fmt.Sprintf("sum by (name, destination) (physical_index_service_count{service_name='%s'", serviceName)
	if env != "" {
		query += fmt.Sprintf(",env=~'%s'", env)
	}
	query += "}[1d])"
	// Get current time for the instant query
	currentTime := time.Now().Unix()
	// Make the Prometheus instant query
	resp, err := MakePromInstantAPIQuery(ctx, client, query, currentTime, cfg)
	if err != nil {
		return "", fmt.Errorf("failed to fetch physical index: %w", err)
	}
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusOK {
		bodyBytes, _ := json.Marshal(resp.Body)
		return "", fmt.Errorf("physical index API returned status %d: %s", resp.StatusCode, string(bodyBytes))
	}
	// Parse the response to extract the first index
	var physicalIndexResponse []struct {
		Metric map[string]string `json:"metric"`
		Value  []interface{}     `json:"value"`
	}
	if err := json.NewDecoder(resp.Body).Decode(&physicalIndexResponse); err != nil {
		return "", fmt.Errorf("failed to decode physical index response: %w", err)
	}
	if len(physicalIndexResponse) == 0 {
		// Continue without index if it is not available
		return "", nil
	}
	// Extract the index name from the first result
	firstResult := physicalIndexResponse[0]
	if indexName, exists := firstResult.Metric["name"]; exists {
		return fmt.Sprintf("physical_index:%s", indexName), nil
	}
	return "", fmt.Errorf("no index name found in physical index response")
}