Skip to main content
Glama

kubernetes-mcp-server

sts_test.go6.89 kB
package http import ( "encoding/base64" "fmt" "net/http" "strings" "testing" "github.com/containers/kubernetes-mcp-server/internal/test" "github.com/coreos/go-oidc/v3/oidc" "golang.org/x/oauth2" ) func TestIsEnabled(t *testing.T) { disabledCases := []SecurityTokenService{ {}, {Provider: nil}, {Provider: &oidc.Provider{}}, {Provider: &oidc.Provider{}, ClientId: "test-client-id", ClientSecret: "test-client-secret"}, {ClientId: "test-client-id", ClientSecret: "test-client-secret", ExternalAccountAudience: "test-audience"}, {Provider: &oidc.Provider{}, ClientSecret: "test-client-secret", ExternalAccountAudience: "test-audience"}, } for _, sts := range disabledCases { t.Run(fmt.Sprintf("SecurityTokenService{%+v}.IsEnabled() = false", sts), func(t *testing.T) { if sts.IsEnabled() { t.Errorf("SecurityTokenService{%+v}.IsEnabled() = true; want false", sts) } }) } enabledCases := []SecurityTokenService{ {Provider: &oidc.Provider{}, ClientId: "test-client-id", ExternalAccountAudience: "test-audience"}, {Provider: &oidc.Provider{}, ClientId: "test-client-id", ExternalAccountAudience: "test-audience", ClientSecret: "test-client-secret"}, {Provider: &oidc.Provider{}, ClientId: "test-client-id", ExternalAccountAudience: "test-audience", ClientSecret: "test-client-secret", ExternalAccountScopes: []string{"test-scope"}}, } for _, sts := range enabledCases { t.Run(fmt.Sprintf("SecurityTokenService{%+v}.IsEnabled() = true", sts), func(t *testing.T) { if !sts.IsEnabled() { t.Errorf("SecurityTokenService{%+v}.IsEnabled() = false; want true", sts) } }) } } func TestExternalAccountTokenExchange(t *testing.T) { mockServer := test.NewMockServer() authServer := mockServer.Config().Host var tokenExchangeRequest *http.Request mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if req.URL.Path == "/.well-known/openid-configuration" { w.Header().Set("Content-Type", "application/json") _, _ = fmt.Fprintf(w, `{ "issuer": "%s", "authorization_endpoint": "https://mock-oidc-provider/authorize", "token_endpoint": "%s/token" }`, authServer, authServer) return } if req.URL.Path == "/token" { tokenExchangeRequest = req _ = tokenExchangeRequest.ParseForm() if tokenExchangeRequest.PostForm.Get("subject_token") != "the-original-access-token" { http.Error(w, "Invalid subject_token", http.StatusUnauthorized) return } w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{"access_token":"exchanged-access-token","token_type":"Bearer","expires_in":253402297199}`)) return } })) t.Cleanup(mockServer.Close) provider, err := oidc.NewProvider(t.Context(), authServer) if err != nil { t.Fatalf("oidc.NewProvider() error = %v; want nil", err) } // With missing Token Source information _, err = (&SecurityTokenService{Provider: provider}).ExternalAccountTokenExchange(t.Context(), &oauth2.Token{}) t.Run("ExternalAccountTokenExchange with missing token source returns error", func(t *testing.T) { if err == nil { t.Fatalf("ExternalAccountTokenExchange() error = nil; want error") } if !strings.Contains(err.Error(), "must be set") { t.Errorf("ExternalAccountTokenExchange() error = %v; want missing required field", err) } }) // With valid Token Source information sts := SecurityTokenService{ Provider: provider, ClientId: "test-client-id", ClientSecret: "test-client-secret", ExternalAccountAudience: "test-audience", ExternalAccountScopes: []string{"test-scope"}, } // With Invalid token _, err = sts.ExternalAccountTokenExchange(t.Context(), &oauth2.Token{ AccessToken: "invalid-access-token", TokenType: "Bearer", }) t.Run("ExternalAccountTokenExchange with invalid token returns error", func(t *testing.T) { if err == nil { t.Fatalf("ExternalAccountTokenExchange() error = nil; want error") } if !strings.Contains(err.Error(), "status code 401: Invalid subject_token") { t.Errorf("ExternalAccountTokenExchange() error = %v; want invalid_grant: Invalid subject_token", err) } }) // With Valid token exchangeToken, err := sts.ExternalAccountTokenExchange(t.Context(), &oauth2.Token{ AccessToken: "the-original-access-token", TokenType: "Bearer", }) t.Run("ExternalAccountTokenExchange with valid token returns new token", func(t *testing.T) { if err != nil { t.Errorf("ExternalAccountTokenExchange() error = %v; want nil", err) } if exchangeToken == nil { t.Fatal("ExternalAccountTokenExchange() = nil; want token") } if exchangeToken.AccessToken != "exchanged-access-token" { t.Errorf("exchangeToken.AccessToken = %s; want exchanged-access-token", exchangeToken.AccessToken) } }) t.Run("ExternalAccountTokenExchange with valid token sends POST request", func(t *testing.T) { if tokenExchangeRequest == nil { t.Fatal("tokenExchangeRequest is nil; want request") } if tokenExchangeRequest.Method != "POST" { t.Errorf("tokenExchangeRequest.Method = %s; want POST", tokenExchangeRequest.Method) } }) t.Run("ExternalAccountTokenExchange with valid token has correct form data", func(t *testing.T) { if tokenExchangeRequest.Header.Get("Content-Type") != "application/x-www-form-urlencoded" { t.Errorf("tokenExchangeRequest.Content-Type = %s; want application/x-www-form-urlencoded", tokenExchangeRequest.Header.Get("Content-Type")) } if tokenExchangeRequest.PostForm.Get("audience") != "test-audience" { t.Errorf("tokenExchangeRequest.PostForm[audience] = %s; want test-audience", tokenExchangeRequest.PostForm.Get("audience")) } if tokenExchangeRequest.PostForm.Get("subject_token_type") != "urn:ietf:params:oauth:token-type:access_token" { t.Errorf("tokenExchangeRequest.PostForm[subject_token_type] = %s; want urn:ietf:params:oauth:token-type:access_token", tokenExchangeRequest.PostForm.Get("subject_token_type")) } if tokenExchangeRequest.PostForm.Get("subject_token") != "the-original-access-token" { t.Errorf("tokenExchangeRequest.PostForm[subject_token] = %s; want the-original-access-token", tokenExchangeRequest.PostForm.Get("subject_token")) } if len(tokenExchangeRequest.PostForm["scope"]) == 0 || tokenExchangeRequest.PostForm["scope"][0] != "test-scope" { t.Errorf("tokenExchangeRequest.PostForm[scope] = %v; want [test-scope]", tokenExchangeRequest.PostForm["scope"]) } }) t.Run("ExternalAccountTokenExchange with valid token sends correct client credentials header", func(t *testing.T) { if tokenExchangeRequest.Header.Get("Authorization") != "Basic "+base64.StdEncoding.EncodeToString([]byte("test-client-id:test-client-secret")) { t.Errorf("tokenExchangeRequest.Header[Authorization] = %s; want Basic base64(test-client-id:test-client-secret)", tokenExchangeRequest.Header.Get("Authorization")) } }) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/containers/kubernetes-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server