Skip to main content
Glama

CentralMind/Gateway

connector.go11.9 kB
package elasticsearch import ( "bytes" "context" "encoding/json" "fmt" "io" "strings" "github.com/centralmind/gateway/castx" "github.com/centralmind/gateway/connectors" "github.com/centralmind/gateway/model" "github.com/elastic/go-elasticsearch/v8" "golang.org/x/xerrors" ) const limitOfDocuments = 5 func init() { connectors.Register(func(cfg Config) (connectors.Connector, error) { config, err := cfg.MakeConfig() if err != nil { return nil, xerrors.Errorf("unable to prepare Elasticsearch config: %w", err) } client, err := elasticsearch.NewClient(*config) if err != nil { return nil, xerrors.Errorf("unable to create Elasticsearch client: %w", err) } return &Connector{ config: cfg, client: client, }, nil }) } var _ connectors.Connector = (*Connector)(nil) // Connector implements the connectors.Connector interface for Elasticsearch type Connector struct { config Config client *elasticsearch.Client } func (c *Connector) Config() connectors.Config { return &c.config } // Ping checks if Elasticsearch is reachable func (c *Connector) Ping(ctx context.Context) error { res, err := c.client.Ping(c.client.Ping.WithContext(ctx)) if err != nil { return fmt.Errorf("failed to ping Elasticsearch: %w", err) } defer res.Body.Close() return nil } // Query executes a search query in Elasticsearch func (c *Connector) Query(ctx context.Context, endpoint model.Endpoint, params map[string]any) ([]map[string]any, error) { processed, err := castx.ParamsE(endpoint, params) if err != nil { return nil, xerrors.Errorf("unable to process params: %w", err) } finalQuery := map[string]interface{}{ "source": endpoint.Query, "params": processed, } var buf bytes.Buffer err = json.NewEncoder(&buf).Encode(finalQuery) if err != nil { return nil, xerrors.Errorf("unable to encode query: %w", err) } res, err := c.client.API.SearchTemplate( &buf, c.client.SearchTemplate.WithContext(ctx), ) if err != nil { return nil, xerrors.Errorf("failed to execute search query: %w", err) } defer res.Body.Close() // Read response body body, err := io.ReadAll(res.Body) if err != nil { return nil, xerrors.Errorf("failed to read Elasticsearch response: %w", err) } // Check for errors if res.IsError() { return nil, xerrors.Errorf("Elasticsearch returned an error: %s", body) } // Parse JSON response var result map[string]interface{} err = json.Unmarshal(body, &result) if err != nil { return nil, xerrors.Errorf("failed to parse Elasticsearch response: %w", err) } var hits []interface{} if hitsMap, ok := result["hits"].(map[string]interface{}); ok { if hitsList, ok := hitsMap["hits"].([]interface{}); ok { hits = hitsList } else { return nil, xerrors.Errorf("'hits' key is missing or not a list") } } else { return nil, xerrors.Errorf("'hits' key is missing or not a map") } // Process the results results := make([]map[string]interface{}, 0) for _, hit := range hits { hitMap, ok := hit.(map[string]interface{}) if !ok { continue } source, ok := hitMap["_source"].(map[string]interface{}) if !ok { continue } results = append(results, source) } return results, nil } // Discovery retrieves available indices in Elasticsearch func (c *Connector) Discovery(ctx context.Context, tablesList []string) ([]model.Table, error) { // Create a map for quick lookups if tablesList is provided tableSet := make(map[string]bool) if len(tablesList) > 0 { for _, table := range tablesList { tableSet[table] = true } } // Get a list of indices, but limit processing for large indices res, err := c.client.Cat.Indices( c.client.Cat.Indices.WithContext(ctx), c.client.Cat.Indices.WithFormat("json"), ) if err != nil { return nil, xerrors.Errorf("failed to get indices: %w", err) } defer res.Body.Close() // Read response body body, err := io.ReadAll(res.Body) if err != nil { return nil, xerrors.Errorf("failed to read indices response: %w", err) } // Parse JSON response var indices []map[string]interface{} if err := json.Unmarshal(body, &indices); err != nil { return nil, xerrors.Errorf("failed to parse indices response: %w", err) } // Process only a subset of indices (if necessary) var tables []model.Table for _, index := range indices { indexName, ok := index["index"].(string) if !ok { continue } // Skip indices not in the list if a list was provided if len(tablesList) > 0 && !tableSet[indexName] { continue } // Instead of fetching all mappings, **sample documents** to infer fields columns, err := c.sampleIndexFields(ctx, indexName) if err != nil { return nil, xerrors.Errorf("failed to infer schema for index %s: %w", indexName, err) } // Get document count efficiently rowCount, err := c.getDocumentCount(ctx, indexName) if err != nil { return nil, xerrors.Errorf("failed to get row count for index %s: %w", indexName, err) } tables = append(tables, model.Table{ Name: indexName, Columns: columns, RowCount: rowCount, }) } return tables, nil } func (c *Connector) getDocumentCount(ctx context.Context, indexName string) (int, error) { // Execute _count API request res, err := c.client.Count( c.client.Count.WithContext(ctx), c.client.Count.WithIndex(indexName), ) if err != nil { return 0, xerrors.Errorf("failed to execute _count API: %w", err) } defer res.Body.Close() // Read response body body, err := io.ReadAll(res.Body) if err != nil { return 0, xerrors.Errorf("failed to read _count response: %w", err) } // Parse JSON response var countResponse map[string]interface{} if err := json.Unmarshal(body, &countResponse); err != nil { return 0, xerrors.Errorf("failed to parse _count response: %w", err) } // Extract document count count, ok := countResponse["count"].(float64) if !ok { return 0, xerrors.Errorf("unexpected _count response format") } return int(count), nil } // Sample retrieves a few sample documents from an index func (c *Connector) Sample(ctx context.Context, table model.Table) ([]map[string]any, error) { query := map[string]interface{}{ "size": limitOfDocuments, "query": map[string]interface{}{ "match_all": map[string]interface{}{}, }, } // Convert query to JSON queryBytes, err := json.Marshal(query) if err != nil { return nil, xerrors.Errorf("failed to marshal sample query: %w", err) } // Execute the search request res, err := c.client.Search( c.client.Search.WithContext(ctx), c.client.Search.WithBody(bytes.NewReader(queryBytes)), ) if err != nil { return nil, xerrors.Errorf("failed to fetch sample documents: %w", err) } defer res.Body.Close() // Read response body body, err := io.ReadAll(res.Body) if err != nil { return nil, xerrors.Errorf("failed to read sample response: %w", err) } // Parse JSON response var result map[string]interface{} err = json.Unmarshal(body, &result) if err != nil { return nil, xerrors.Errorf("failed to parse sample response: %w", err) } // Extract search hits var hits []interface{} if hitsMap, ok := result["hits"].(map[string]interface{}); ok { if hitsList, ok := hitsMap["hits"].([]interface{}); ok { hits = hitsList } else { return nil, xerrors.Errorf("'hits' key is missing or not a list") } } else { return nil, xerrors.Errorf("'hits' key is missing or not a map") } // Process the results results := make([]map[string]interface{}, len(hits)) for i, hit := range hits { hitMap := hit.(map[string]interface{}) results[i] = hitMap["_source"].(map[string]interface{}) } return results, nil } func (c *Connector) InferQuery(ctx context.Context, query string) ([]model.ColumnSchema, error) { // Query multiple documents for better inference // Ensure the query is properly formatted JSON var esQuery map[string]interface{} err := json.Unmarshal([]byte(query), &esQuery) if err != nil { return nil, xerrors.Errorf("invalid query format: %w", err) } // Execute the query in Elasticsearch res, err := c.client.Search( c.client.Search.WithContext(ctx), c.client.Search.WithBody(strings.NewReader(query)), ) if err != nil { return nil, xerrors.Errorf("failed to execute query: %w", err) } defer res.Body.Close() // Read response body body, err := io.ReadAll(res.Body) if err != nil { return nil, xerrors.Errorf("failed to read Elasticsearch response: %w", err) } // Parse JSON response var result map[string]interface{} err = json.Unmarshal(body, &result) if err != nil { return nil, xerrors.Errorf("failed to parse Elasticsearch response: %w", err) } // Extract "hits" (documents) var hits []interface{} if hitsMap, ok := result["hits"].(map[string]interface{}); ok { if hitsList, ok := hitsMap["hits"].([]interface{}); ok { hits = hitsList } else { return nil, xerrors.Errorf("'hits' key is missing or not a list") } } else { return nil, xerrors.Errorf("'hits' key is missing or not a map") } // Use the helper function return c.extractColumnsFromHits(hits) } func (c *Connector) GuessColumnType(sqlType string) model.ColumnType { switch strings.ToLower(sqlType) { case "text", "keyword": return model.TypeString case "long", "integer", "short", "byte": return model.TypeInteger case "float", "double", "half_float", "scaled_float": return model.TypeNumber case "boolean": return model.TypeBoolean case "date": return model.TypeDatetime case "object", "nested": return model.TypeObject case "array": return model.TypeArray default: return model.TypeString // Default fallback } } func (c *Connector) sampleIndexFields(ctx context.Context, indexName string) ([]model.ColumnSchema, error) { N := 100 // Sample up to 100 documents query := map[string]interface{}{ "size": N, "query": map[string]interface{}{ "match_all": map[string]interface{}{}, }, } // Convert query to JSON queryBytes, err := json.Marshal(query) if err != nil { return nil, xerrors.Errorf("failed to marshal sample query: %w", err) } // Execute search request res, err := c.client.Search( c.client.Search.WithContext(ctx), c.client.Search.WithIndex(indexName), c.client.Search.WithBody(bytes.NewReader(queryBytes)), ) if err != nil { return nil, xerrors.Errorf("failed to execute sample query: %w", err) } defer res.Body.Close() // Read response body body, err := io.ReadAll(res.Body) if err != nil { return nil, xerrors.Errorf("failed to read search response: %w", err) } // Parse JSON response var result map[string]interface{} if err := json.Unmarshal(body, &result); err != nil { return nil, xerrors.Errorf("failed to parse search response: %w", err) } // Extract search hits (sample documents) hits, ok := result["hits"].(map[string]interface{})["hits"].([]interface{}) if !ok { return nil, xerrors.Errorf("unexpected response structure from Elasticsearch") } // Use the helper function return c.extractColumnsFromHits(hits) } func (c *Connector) extractColumnsFromHits(hits []interface{}) ([]model.ColumnSchema, error) { // Track field types across multiple documents fieldTypeMap := make(map[string]string) // Iterate through sample documents for _, hit := range hits { doc, ok := hit.(map[string]interface{})["_source"].(map[string]interface{}) if !ok { continue } for field, value := range doc { fieldType := fmt.Sprintf("%T", value) // Get Go type as a string if _, exists := fieldTypeMap[field]; !exists { fieldTypeMap[field] = fieldType } } } // Convert detected types to ColumnSchema var columns []model.ColumnSchema for field, detectedType := range fieldTypeMap { columnType := c.GuessColumnType(detectedType) // Convert to SQL-like type columns = append(columns, model.ColumnSchema{ Name: field, Type: columnType, }) } return columns, nil }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/centralmind/gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server