Skip to main content
Glama

Elasticsearch MCP Server

by rishab2404
index.ts22.1 kB
#!/usr/bin/env node /* * Copyright Elasticsearch B.V. and contributors * SPDX-License-Identifier: Apache-2.0 */ import { z } from "zod"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { Client, estypes, ClientOptions } from "@elastic/elasticsearch"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import fs from "fs"; import redis from "./redisClient.js" ; // [DEBUG] Script startup console.error("[DEBUG] MCP Server script started"); // Configuration schema with auth options const ConfigSchema = z .object({ url: z .string() .trim() .min(1, "Elasticsearch URL cannot be empty") .url("Invalid Elasticsearch URL format") .describe("Elasticsearch server URL"), apiKey: z .string() .optional() .describe("API key for Elasticsearch authentication"), username: z .string() .optional() .describe("Username for Elasticsearch authentication"), password: z .string() .optional() .describe("Password for Elasticsearch authentication"), caCert: z .string() .optional() .describe("Path to custom CA certificate for Elasticsearch"), }) .refine( (data) => { // If username is provided, password must be provided if (data.username) { return !!data.password; } // If password is provided, username must be provided if (data.password) { return !!data.username; } // If apiKey is provided, it's valid if (data.apiKey) { return true; } // No auth is also valid (for local development) return true; }, { message: "Either ES_API_KEY or both ES_USERNAME and ES_PASSWORD must be provided, or no auth for local development", path: ["username", "password"], } ); type ElasticsearchConfig = z.infer<typeof ConfigSchema>; export async function createElasticsearchMcpServer( config: ElasticsearchConfig ) { // [DEBUG] Entered createElasticsearchMcpServer console.error("[DEBUG] createElasticsearchMcpServer() called"); const validatedConfig = ConfigSchema.parse(config); const { url, apiKey, username, password, caCert } = validatedConfig; const clientOptions: ClientOptions = { node: url, }; // Set up authentication if (apiKey) { clientOptions.auth = { apiKey }; } else if (username && password) { clientOptions.auth = { username, password }; } // Set up SSL/TLS certificate if provided if (caCert) { try { const ca = fs.readFileSync(caCert); clientOptions.tls = { ca }; console.error("[DEBUG] Loaded CA certificate for TLS"); } catch (error) { console.error( `Failed to read certificate file: ${ error instanceof Error ? error.message : String(error) }` ); } } const esClient = new Client(clientOptions); const server = new McpServer({ name: "elasticsearch-mcp-server", version: "0.1.1", }); // [DEBUG] Registering tools console.error("[DEBUG] Registering tools: list_indices, get_mappings, search, get_shards"); // Tool 1: List indices server.tool( "list_indices", "List all available Elasticsearch indices", { indexPattern: z .string() .trim() .min(1, "Index pattern is required") .describe("Index pattern of Elasticsearch indices to list"), }, async ({ indexPattern }) => { console.error("[DEBUG] list_indices tool called", indexPattern); try { const response = await esClient.cat.indices({ index: indexPattern, format: "json" }); const indicesInfo = response.map((index) => ({ index: index.index, health: index.health, status: index.status, docsCount: index.docsCount, })); return { content: [ { type: "text" as const, text: `Found ${indicesInfo.length} indices`, }, { type: "text" as const, text: JSON.stringify(indicesInfo, null, 2), }, ], }; } catch (error) { console.error( `Failed to list indices: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text" as const, text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 2: Get mappings for an index server.tool( "get_mappings", "Get field mappings for a specific Elasticsearch index", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index to get mappings for"), }, async ({ index }) => { console.error("[DEBUG] get_mappings tool called", index); try { const mappingResponse = await esClient.indices.getMapping({ index, }); return { content: [ { type: "text" as const, text: `Mappings for index: ${index}`, }, { type: "text" as const, text: `Mappings for index ${index}: ${JSON.stringify( mappingResponse[index]?.mappings || {}, null, 2 )}`, }, ], }; } catch (error) { console.error( `Failed to get mappings: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text" as const, text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 3: Search an index with simplified parameters server.tool( "search", "Perform an Elasticsearch search with the provided query DSL. Highlights are always enabled.", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index to search"), queryBody: z .record(z.any()) .refine( (val) => { try { JSON.parse(JSON.stringify(val)); return true; } catch (e) { return false; } }, { message: "queryBody must be a valid Elasticsearch query DSL object", } ) .describe( "Complete Elasticsearch query DSL object that can include query, size, from, sort, etc." ), userId: z.string().min(1).describe("User ID for permission filtering"), }, async ({ index, queryBody, userId }) => { console.error("[DEBUG] search tool called", index, userId, queryBody); try { const redisKey = `GLOBAL_SEARCH_INDEX_ID_MAPPING:${userId}`; const raw= await redis.get(redisKey); console.error("[DEBUG] Redis key fetched:", redisKey, "->", raw); const allowedIdsObj = JSON.parse(raw || "{}"); const allowedIds = [ ...allowedIdsObj.header_section_doc_ids, ...allowedIdsObj.line_item_section_doc_ids, ...allowedIdsObj.header_clause_doc_ids, ...allowedIdsObj.line_item_clause_doc_ids, ...allowedIdsObj.attachment_doc_ids, ...allowedIdsObj.meta_doc_ids, ]; let permissionFilter; if (index === "cdc_agreement_list") { // For this index, permission index is "permitted_agreement_for_meta" and keys are meta_doc_ids const allowedMetaDocIds = allowedIdsObj.meta_doc_ids; permissionFilter = { bool: { should: allowedMetaDocIds.map((id: string) => ({ terms: { "AGREEMENT_ID.keyword": { index: "permitted_agreement_for_meta", id: id, path: "agreement_ids" } } })) } }; } else if (index === "cms_documents") { // For this index, permission index is "permitted_agreement_for_attachment" and keys are attachment_doc_ids const allowedAttachmentDocIds = allowedIdsObj.attachment_doc_ids; permissionFilter = { bool: { should: allowedAttachmentDocIds.map((id: string) => ({ terms: { "AGREEMENT_ID.keyword": { index: "permitted_agreement_for_attachment", id: id, path: "agreement_ids" } } })) } }; } else if (index === "cdc_line_items") { const sectionDocIds = allowedIdsObj.line_item_section_doc_ids; // Array of doc IDs permissionFilter = { bool: { should: sectionDocIds.map((docId: string)=> ({ bool: { must: [ { terms: { "AGREEMENT_ID.keyword": { index: "permitted_line_item_section", id: docId, path: "sections.agreement_id" } } }, { terms: { "AGREEMENT_SECTION_ID.keyword": { index: "permitted_line_item_section", id: docId, path: "sections.section_id" } } } ] } })) } }; } else if (index === "cdc_field_data_agreements") { // For each doc_id in header_section_doc_ids, create a must of terms const sectionDocIds = allowedIdsObj.header_section_doc_ids; // array of doc IDs permissionFilter = { bool: { should: sectionDocIds.map((docId: string) => ({ bool: { must: [ { terms: { "AGREEMENT_ID.keyword": { index: "permitted_header_section", id: docId, path: "sections.agreement_id" } } }, { terms: { "SECTION_ID.keyword": { index: "permitted_header_section", id: docId, path: "sections.section_id" } } } ] } })) } }; } else if (index === "cdc_clauses_data") { const clauseDocIds = allowedIdsObj.header_clause_doc_ids; permissionFilter = { bool: { should: clauseDocIds.map((docId: string) => ({ terms: { "AGREEMENT_ID.keyword": { index: "permitted_agreement_for_clause", id: docId, path: "agreement_ids" } } })) } }; }else { // Default (old) logic for agreement permissions only permissionFilter = { terms: { "AGREEMENT_ID.keyword": allowedIds.length > 0 ? allowedIds : ["__none__"], }, }; } // If no query, make a bool with only your filter if (!queryBody.query) { queryBody.query = { bool: { must: [permissionFilter], } }; } // If the query is already a bool, inject your filter else if (queryBody.query.bool) { if (!queryBody.query.bool.must) { queryBody.query.bool.must = []; } queryBody.query.bool.must.push(permissionFilter); } // If the query is any other type (e.g. term, match), wrap in bool else { // Store the original query const originalQuery = queryBody.query; queryBody.query = { bool: { must: [originalQuery, permissionFilter] } }; } console.error("[DEBUG] Final queryBody to send to ES:", JSON.stringify(queryBody, null, 2)); // Get mappings to identify text fields for highlighting const mappingResponse = await esClient.indices.getMapping({ index, }); const indexMappings = mappingResponse[index]?.mappings || {}; const searchRequest = { index:index, body:queryBody, }; //queryBody.index = index; // Always do highlighting if (indexMappings.properties) { const textFields: Record<string, estypes.SearchHighlightField> = {}; for (const [fieldName, fieldData] of Object.entries( indexMappings.properties )) { if (fieldData.type === "text" || "dense_vector" in fieldData) { textFields[fieldName] = {}; } } searchRequest.body.highlight = { fields: textFields, pre_tags: ["<em>"], post_tags: ["</em>"], }; } // DEBUG: print the final searchRequest object console.error("[DEBUG] ES SearchRequest:", JSON.stringify(searchRequest, null, 2)); const result = await esClient.search(searchRequest); // DEBUG: print raw ES response // console.error("[DEBUG] ES Search Response:", JSON.stringify(result, null, 2)); // return { // content: [ // { // type: "text", // text: "[DEBUG] This is the final query that would be sent to ES:\n" + JSON.stringify(result, null, 2) // } // ] // }; // Extract the 'from' parameter from queryBody, defaulting to 0 if not provided const from = queryBody.from || 0; // ----- AGGREGATION UNIVERSAL HANDLER ----- function formatAggs( aggsObj: Record<string, unknown>, prefix = "" ): string[] { const lines: string[] = []; for (const [aggName, aggData] of Object.entries(aggsObj)) { if (aggData && typeof aggData === "object" && aggData !== null) { // Buckets: must cast and check if 'buckets' is an array const maybeBuckets = (aggData as Record<string, unknown>)["buckets"]; if (Array.isArray(maybeBuckets)) { lines.push(`${prefix}Aggregation "${aggName}" (buckets):`); if (maybeBuckets.length === 0) { lines.push(`${prefix} (no buckets)`); } for (const bucket of maybeBuckets) { if (bucket && typeof bucket === "object" && bucket !== null) { const key = (bucket as Record<string, unknown>)["key"]; const docCount = (bucket as Record<string, unknown>)["doc_count"]; lines.push( `${prefix} ${String(key)}: ${String(docCount)}` ); // Recursively print nested aggs in buckets for (const [k, v] of Object.entries(bucket)) { if ( v && typeof v === "object" && v !== null && (Array.isArray((v as Record<string, unknown>)["buckets"]) || (v as Record<string, unknown>)["value"] !== undefined) ) { lines.push( ...formatAggs({ [k]: v }, prefix + " ") ); } } } } } else if ( // Single metric: e.g., { value: 123 } Object.prototype.hasOwnProperty.call(aggData, "value") ) { lines.push( `${prefix}Aggregation "${aggName}": ${ (aggData as Record<string, unknown>)["value"] }` ); } else if ( // Multi-metric: e.g., { count, min, max, avg, sum } Object.keys(aggData).some((k) => ["values", "avg", "sum", "min", "max", "count"].includes(k) ) ) { lines.push( `${prefix}Aggregation "${aggName}": ${JSON.stringify( aggData )}` ); } else { // Recursively process other nested aggs (if any) for (const [k, v] of Object.entries(aggData)) { if ( v && typeof v === "object" && v !== null && (Array.isArray((v as Record<string, unknown>)["buckets"]) || (v as Record<string, unknown>)["value"] !== undefined) ) { lines.push(...formatAggs({ [k]: v }, prefix + " ")); } } } } } return lines; } let aggregationFragments: { type: "text"; text: string }[] = []; if (result.aggregations) { const aggLines = formatAggs(result.aggregations as Record<string, unknown>); if (aggLines.length > 0) { aggregationFragments.push({ type: "text" as const, text: aggLines.join("\n"), }); } } const contentFragments = result.hits.hits.map((hit) => { const highlightedFields = hit.highlight || {}; const sourceData = hit._source || {}; let content = ""; for (const [field, highlights] of Object.entries(highlightedFields)) { if (highlights && highlights.length > 0) { content += `${field} (highlighted): ${highlights.join( " ... " )}\n`; } } for (const [field, value] of Object.entries(sourceData)) { if (!(field in highlightedFields)) { content += `${field}: ${JSON.stringify(value)}\n`; } } return { type: "text" as const, text: content.trim(), }; }); const metadataFragment = { type: "text" as const, text: `Total results: ${ typeof result.hits.total === "number" ? result.hits.total : result.hits.total?.value || 0 }, showing ${result.hits.hits.length} from position ${from}`, }; return { content: [...aggregationFragments, metadataFragment, ...contentFragments], }; } catch (error) { console.error( `Search failed: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text" as const, text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 4: Get shard information server.tool( "get_shards", "Get shard information for all or specific indices", { index: z .string() .optional() .describe("Optional index name to get shard information for"), }, async ({ index }) => { console.error("[DEBUG] get_shards tool called", index); try { const response = await esClient.cat.shards({ index, format: "json", }); const shardsInfo = response.map((shard) => ({ index: shard.index, shard: shard.shard, prirep: shard.prirep, state: shard.state, docs: shard.docs, store: shard.store, ip: shard.ip, node: shard.node, })); const metadataFragment = { type: "text" as const, text: `Found ${shardsInfo.length} shards${ index ? ` for index ${index}` : "" }`, }; return { content: [ metadataFragment, { type: "text" as const, text: JSON.stringify(shardsInfo, null, 2), }, ], }; } catch (error) { console.error( `Failed to get shard information: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text" as const, text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // [DEBUG] Returning MCP server instance console.error("[DEBUG] Returning server from createElasticsearchMcpServer()"); return server; } const config: ElasticsearchConfig = { url: process.env.ES_URL || "", apiKey: process.env.ES_API_KEY || "", username: process.env.ES_USERNAME || "", password: process.env.ES_PASSWORD || "", caCert: process.env.ES_CA_CERT || "", }; async function main() { console.error("[DEBUG] main() called"); const transport = new StdioServerTransport(); const server = await createElasticsearchMcpServer(config); console.error("[DEBUG] Connecting server..."); await server.connect(transport); console.error("[DEBUG] Server connected and waiting for input..."); process.on("SIGINT", async () => { await server.close(); process.exit(0); }); } main().catch((error) => { console.error( "Server error:", error instanceof Error ? error.message : String(error) ); console.error("[DEBUG] Exiting due to error"); process.exit(1); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rishab2404/mcp_es'

If you have feedback or need assistance with the MCP directory API, please join our Discord server