Skip to main content
Glama
index.ts33.6 kB
#!/usr/bin/env node import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { z } from "zod"; import { ElasticsearchService } from "./utils/elasticsearchService.js"; import { ElasticsearchConfig } from "./utils/models.js"; import { TextContent, ResponseContent } from "./types.js"; import pkg from '../package.json' with { type: 'json' }; // Import version from package.json export const VERSION = pkg.version; // Configuration schema with auth options const ConfigSchema = z .object({ url: z .string() .trim() .min(1, "Elasticsearch URL cannot be empty") .describe("Elasticsearch server URL"), apiKey: z .string() .optional() .describe("API key for Elasticsearch authentication"), username: z .string() .optional() .describe("Username for Elasticsearch authentication"), password: z .string() .optional() .describe("Password for Elasticsearch authentication"), caCert: z .string() .optional() .describe("Path to custom CA certificate for Elasticsearch"), pathPrefix: z.string().optional().describe("Path prefix for Elasticsearch"), version: z .string() .optional() .transform((val) => (["8", "9"].includes(val || "") ? val : "8")) .describe("Elasticsearch version (8 or 9)"), sslSkipVerify: z .boolean() .optional() .describe("Skip SSL certificate verification"), }) .refine( (data) => { // If apiKey is provided, it's valid if (data.apiKey != null) return true; // If username is provided, password must be provided if (data.username != null) { return data.password != null; } // No auth is also valid (for local development) return true; }, { message: "Either ES_API_KEY or both ES_USERNAME and ES_PASSWORD must be provided, or no auth for local development", path: ["username", "password"], } ); // Function to create and configure the MCP server export async function createOctodetElasticsearchMcpServer( config: ElasticsearchConfig ): Promise<McpServer> { const validatedConfig = ConfigSchema.parse(config); // Create Elasticsearch service instance const esService = new ElasticsearchService(validatedConfig); // Create server instance const server = new McpServer({ name: "octodet-elasticsearch-mcp", version: VERSION, capabilities: { resources: {}, tools: {}, }, }); // Tool 1: List indices server.tool( "list_indices", "List all available Elasticsearch indices with detailed information", { indexPattern: z .string() .trim() .min(1, "Index pattern is required") .describe('Pattern of Elasticsearch indices to list (e.g., "logs-*")'), }, async ({ indexPattern }) => { try { const indicesInfo = await esService.listIndices(indexPattern); return { content: [ { type: "text", text: `Found ${indicesInfo.length} indices matching pattern '${indexPattern}'`, }, { type: "text", text: JSON.stringify(indicesInfo, null, 2), }, ], }; } catch (error) { console.error( `Failed to list indices: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 2: Get mappings for an index server.tool( "get_mappings", "Get field mappings for a specific Elasticsearch index", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index to get mappings for"), }, async ({ index }) => { try { const mappings = await esService.getMappings(index); return { content: [ { type: "text", text: `Mappings for index: ${index}`, }, { type: "text", text: JSON.stringify(mappings, null, 2), }, ], }; } catch (error) { console.error( `Failed to get mappings: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 3: Search an index server.tool( "search", "Perform an Elasticsearch search with the provided query DSL, highlighting, and script fields", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index to search"), queryBody: z .record(z.any()) .describe( "Complete Elasticsearch query DSL object (can include query, size, from, sort, etc.)" ), scriptFields: z .record( z.object({ script: z.object({ source: z .string() .min(1, "Script source is required") .describe("Painless script source code"), params: z .record(z.any()) .optional() .describe("Optional parameters for the script"), lang: z .string() .optional() .default("painless") .describe("Script language (defaults to painless)"), }) }) ) .optional() .describe("Script fields to evaluate and include in the response"), }, async ({ index, queryBody, scriptFields }, extra) => { try { // Add script_fields to the query body if provided const enhancedQueryBody = { ...queryBody }; if (scriptFields && Object.keys(scriptFields).length > 0) { enhancedQueryBody.script_fields = scriptFields; } const result = await esService.search(index, enhancedQueryBody); // Extract the 'from' parameter from queryBody, defaulting to 0 if not provided const from = queryBody.from ?? 0; const contentFragments: TextContent[] = []; // Add metadata about the search results contentFragments.push({ type: "text", text: `Total results: ${ typeof result.hits.total === "number" ? result.hits.total : result.hits.total?.value ?? 0 }, showing ${result.hits.hits.length} from position ${from}`, }); // Add aggregation results if present if (result.aggregations) { contentFragments.push({ type: "text", text: `Aggregations: ${JSON.stringify( result.aggregations, null, 2 )}`, }); } // Process and add individual hit results result.hits.hits.forEach((hit: any) => { const highlightedFields = hit.highlight ?? {}; const sourceData = hit._source ?? {}; const scriptFieldsData = hit.fields ?? {}; let content = `Document ID: ${hit._id}\nScore: ${hit._score}\n\n`; // Add script fields results for (const [field, value] of Object.entries(scriptFieldsData)) { content += `${field} (script): ${JSON.stringify(value)}\n`; } // Add highlighted fields for (const [field, highlights] of Object.entries(highlightedFields)) { if (Array.isArray(highlights) && highlights.length > 0) { content += `${field} (highlighted): ${( highlights as string[] ).join(" ... ")}\n`; } } // Add source fields that weren't highlighted for (const [field, value] of Object.entries(sourceData)) { if (!(field in highlightedFields)) { content += `${field}: ${JSON.stringify(value)}\n`; } } contentFragments.push({ type: "text", text: content.trim(), }); }); const response: ResponseContent = { content: contentFragments, }; return response; } catch (error) { console.error( `Search failed: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 4: Get cluster health server.tool( "get_cluster_health", "Get health information about the Elasticsearch cluster", {}, async () => { try { const health = await esService.getClusterHealth(); return { content: [ { type: "text", text: `Elasticsearch Cluster Health:`, }, { type: "text", text: JSON.stringify(health, null, 2), }, ], }; } catch (error) { console.error( `Failed to get cluster health: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 5: Get shard information server.tool( "get_shards", "Get shard information for all or specific indices", { index: z .string() .optional() .describe("Optional index name to get shard information for"), }, async ({ index }) => { try { const shardsInfo = await esService.getShards(index); return { content: [ { type: "text", text: `Found ${shardsInfo.length} shards${ index ? ` for index ${index}` : "" }`, }, { type: "text", text: JSON.stringify(shardsInfo, null, 2), }, ], }; } catch (error) { console.error( `Failed to get shard information: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 6: Add a document to an index server.tool( "add_document", "Add a new document to a specific Elasticsearch index", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index"), id: z .string() .optional() .describe( "Optional document ID (if not provided, Elasticsearch will generate one)" ), document: z.record(z.any()).describe("Document body to index"), }, async ({ index, id, document }) => { try { const response = await esService.addDocument(index, document, id); return { content: [ { type: "text", text: `Document added to index '${index}' with ID: ${response._id}`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Error adding document: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 7: Update a document in an index server.tool( "update_document", "Update an existing document in a specific Elasticsearch index", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index"), id: z .string() .min(1, "Document ID is required") .describe("Document ID to update"), document: z .record(z.any()) .describe("Partial document body to update (fields to change)"), }, async ({ index, id, document }) => { try { await esService.updateDocument(index, id, document); return { content: [ { type: "text", text: `Document with ID '${id}' updated in index '${index}'.`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Error updating document: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 8: Delete a document from an index server.tool( "delete_document", "Delete a document from a specific Elasticsearch index", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index"), id: z .string() .min(1, "Document ID is required") .describe("Document ID to delete"), }, async ({ index, id }) => { try { await esService.deleteDocument(index, id); return { content: [ { type: "text", text: `Document with ID '${id}' deleted from index '${index}'.`, }, ], }; } catch (error) { return { content: [ { type: "text", text: `Error deleting document: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 9: Update documents by query server.tool( "update_by_query", "Update documents in an Elasticsearch index based on a query", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index to update documents in"), query: z .record(z.any()) .describe("Elasticsearch query to select documents for updating"), script: z .object({ source: z .string() .min(1, "Script source is required") .describe("Painless script source for the update operation"), params: z .record(z.any()) .optional() .describe("Optional parameters for the script"), }) .describe("Script to execute on matching documents"), conflicts: z .enum(["abort", "proceed"]) .optional() .describe("What to do when version conflicts occur during the update"), maxDocs: z .number() .int() .positive() .optional() .describe("Limit the number of documents to update"), refresh: z .boolean() .optional() .default(true) .describe( "Should the index be refreshed after the update (defaults to true)" ), }, async ({ index, query, script, conflicts, maxDocs, refresh }) => { try { // Create the params object with the correct type structure const params: Record<string, any> = { index, body: { query, script: { source: script.source, params: script.params, }, }, refresh: refresh !== false, // true by default unless explicitly set to false }; // Add optional parameters if (conflicts) params.conflicts = conflicts; if (maxDocs) params.max_docs = maxDocs; const response = await esService.updateByQuery(params); // Format the response for better readability let resultText = `Update by query completed successfully in index '${index}':\n`; resultText += `- Total documents processed: ${response.total}\n`; resultText += `- Documents updated: ${response.updated}\n`; resultText += `- Documents that failed: ${ response.failures?.length || 0 }\n`; resultText += `- Time taken: ${response.took}ms`; // Add more detailed information if there were failures if (response.failures && response.failures.length > 0) { resultText += "\n\nFailures:"; response.failures .slice(0, 5) .forEach((failure: any, index: number) => { resultText += `\n${index + 1}. ID: ${failure.id}, Reason: ${ failure.cause?.reason || "Unknown" }`; }); if (response.failures.length > 5) { resultText += `\n...and ${ response.failures.length - 5 } more failures.`; } } return { content: [ { type: "text", text: resultText, }, ], }; } catch (error) { console.error( `Update by query failed: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 10: Delete documents by query server.tool( "delete_by_query", "Delete documents in an Elasticsearch index based on a query", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index to delete documents from"), query: z .record(z.any()) .describe("Elasticsearch query to select documents for deletion"), conflicts: z .enum(["abort", "proceed"]) .optional() .describe( "What to do when version conflicts occur during the deletion" ), maxDocs: z .number() .int() .positive() .optional() .describe("Limit the number of documents to delete"), refresh: z .boolean() .optional() .default(true) .describe( "Should the index be refreshed after the deletion (defaults to true)" ), }, async ({ index, query, conflicts, maxDocs, refresh }) => { try { const params: Record<string, any> = { index, body: { query, }, refresh: refresh !== false, // true by default unless explicitly set to false }; if (conflicts) params.conflicts = conflicts; if (maxDocs) params.max_docs = maxDocs; const response = await esService.deleteByQuery(params); // Format the response for better readability let resultText = `Delete by query completed successfully in index '${index}':\n`; resultText += `- Total documents processed: ${response.total}\n`; resultText += `- Documents deleted: ${response.deleted}\n`; resultText += `- Deletion failures: ${ response.failures?.length || 0 }\n`; resultText += `- Time taken: ${response.took}ms`; // Add version conflicts if any occurred if (response.version_conflicts && response.version_conflicts > 0) { resultText += `\n- Version conflicts: ${response.version_conflicts}`; } // Add detailed failure information if (response.failures && response.failures.length > 0) { resultText += "\n\nFailures:"; response.failures.slice(0, 5).forEach((failure: any, idx: number) => { resultText += `\n${idx + 1}. ID: ${ failure.id || "unknown" }, Reason: ${failure.cause?.reason || "Unknown"}`; }); if (response.failures.length > 5) { resultText += `\n...and ${ response.failures.length - 5 } more failures.`; } } return { content: [ { type: "text", text: resultText, }, ], }; } catch (error) { console.error( `Delete by query failed: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 11: Bulk operations server.tool( "bulk", "Perform multiple document operations (create, update, delete) in a single API call", { operations: z .array( z.object({ action: z .enum(["index", "create", "update", "delete"]) .describe( "The action to perform: index (create/replace), create (fail if exists), update, or delete" ), index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index for this operation"), id: z .string() .optional() .describe( "Document ID (required for update and delete, optional for index/create)" ), document: z .record(z.any()) .optional() .describe( "Document body (required for index/create/update, not used for delete)" ), }) ) .min(1, "At least one operation is required") .describe("Array of operations to perform in bulk"), pipeline: z .string() .optional() .describe("Optional pipeline to use for preprocessing documents"), }, async ({ operations, pipeline }) => { try { // Validate operations operations.forEach((op, idx) => { if ((op.action === "update" || op.action === "delete") && !op.id) { throw new Error( `Operation #${idx + 1} (${op.action}): Document ID is required` ); } if ( (op.action === "index" || op.action === "create" || op.action === "update") && !op.document ) { throw new Error( `Operation #${idx + 1} (${op.action}): Document body is required` ); } }); // Build the bulk operations array const bulkOperations: any[] = []; operations.forEach((op) => { const actionMeta: any = { _index: op.index }; if (op.id) actionMeta._id = op.id; bulkOperations.push({ [op.action]: actionMeta }); if (op.action !== "delete") { if (op.action === "update") { bulkOperations.push({ doc: op.document }); } else { bulkOperations.push(op.document); } } }); // Execute the bulk operation const response = await esService.bulk(bulkOperations, pipeline); // Process the response const summary = { took: response.took, errors: response.errors, successes: 0, failures: 0, actionResults: [] as any[], }; // Count successes and failures response.items.forEach((item: any, idx: number) => { const actionType = Object.keys(item)[0]; const result = item[actionType as keyof typeof item] as any; if (!result) return; if (result.error) { summary.failures++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, error: { type: result.error?.type || "unknown_error", reason: result.error?.reason || "Unknown error", }, }); } else { summary.successes++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, result: result.result || "unknown", }); } }); // Format the response let resultText = `Bulk operation completed in ${summary.took}ms\n`; resultText += `- Total operations: ${operations.length}\n`; resultText += `- Successful: ${summary.successes}\n`; resultText += `- Failed: ${summary.failures}\n`; // Add failure details if (summary.failures > 0) { resultText += "\nFailures:\n"; const failures = summary.actionResults.filter((r) => r.error); failures.slice(0, 5).forEach((failure, idx) => { resultText += `${idx + 1}. Operation #${failure.operation} (${ failure.action }): ${failure.error.reason} [${failure.error.type}]\n`; }); if (failures.length > 5) { resultText += `...and ${failures.length - 5} more failures.\n`; } } return { content: [{ type: "text", text: resultText }], }; } catch (error) { console.error( `Bulk operation failed: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 12: Create an index server.tool( "create_index", "Create a new Elasticsearch index with optional settings and mappings", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the new Elasticsearch index to create"), settings: z .record(z.any()) .optional() .describe( "Optional index settings like number of shards, replicas, etc." ), mappings: z .record(z.any()) .optional() .describe( "Optional index mappings defining field types and properties" ), }, async ({ index, settings, mappings }) => { try { const response = await esService.createIndex(index, settings, mappings); return { content: [ { type: "text", text: `Index '${index}' created successfully.`, }, ], }; } catch (error) { console.error( `Failed to create index: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 13: Delete an index server.tool( "delete_index", "Delete an Elasticsearch index", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index to delete"), }, async ({ index }) => { try { await esService.deleteIndex(index); return { content: [ { type: "text", text: `Index '${index}' deleted successfully.` }, ], }; } catch (error) { console.error( `Failed to delete index: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 14: Count documents server.tool( "count_documents", "Count documents in an index, optionally filtered by a query", { index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index to count documents in"), query: z .record(z.any()) .optional() .describe("Optional Elasticsearch query to filter documents to count"), }, async ({ index, query }) => { try { const count = await esService.countDocuments(index, query); return { content: [ { type: "text", text: `Count of documents in index '${index}'${ query ? " matching the provided query" : "" }: ${count}`, }, ], }; } catch (error) { console.error( `Failed to count documents: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 15: Get index templates server.tool( "get_templates", "Get index templates from Elasticsearch", { name: z.string().optional().describe("Optional template name filter"), }, async ({ name }) => { try { const templates = await esService.getIndexTemplates(name); return { content: [ { type: "text", text: `Index Templates:` }, { type: "text", text: JSON.stringify(templates, null, 2) }, ], }; } catch (error) { console.error( `Failed to get templates: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); // Tool 16: Get index aliases server.tool( "get_aliases", "Get index aliases from Elasticsearch", { name: z.string().optional().describe("Optional alias name filter"), }, async ({ name }) => { try { const aliases = await esService.getAliases(name); return { content: [ { type: "text", text: `Index Aliases:` }, { type: "text", text: JSON.stringify(aliases, null, 2) }, ], }; } catch (error) { console.error( `Failed to get aliases: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } ); return server; } // Get Elasticsearch configuration from environment variables const config: ElasticsearchConfig = { url: process.env.ES_URL || "http://localhost:9200", apiKey: process.env.ES_API_KEY, username: process.env.ES_USERNAME, password: process.env.ES_PASSWORD, caCert: process.env.ES_CA_CERT, version: process.env.ES_VERSION || "8", sslSkipVerify: process.env.ES_SSL_SKIP_VERIFY === "1" || process.env.ES_SSL_SKIP_VERIFY === "true", pathPrefix: process.env.ES_PATH_PREFIX, }; // Main function to start the server async function main(): Promise<void> { try { const transport = new StdioServerTransport(); const server = await createOctodetElasticsearchMcpServer(config); await server.connect(transport); console.error("Octodet Elasticsearch MCP server running on stdio"); // Handle termination signals process.on("SIGINT", async () => { await server.close(); process.exit(0); }); } catch (error) { console.error( "Server error:", error instanceof Error ? error.message : String(error) ); process.exit(1); } } main().catch((error) => { console.error("Unhandled error:", error); process.exit(1); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Octodet/elasticsearch-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server