Skip to main content
Glama
Octodet

octodet-elasticsearch-mcp

bulk

Efficiently execute multiple Elasticsearch document operations—create, update, delete—in one API call, reducing requests and optimizing performance.

Instructions

Perform multiple document operations (create, update, delete) in a single API call

Input Schema

NameRequiredDescriptionDefault
operationsYesArray of operations to perform in bulk
pipelineNoOptional pipeline to use for preprocessing documents

Input Schema (JSON Schema)

{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "properties": { "operations": { "description": "Array of operations to perform in bulk", "items": { "additionalProperties": false, "properties": { "action": { "description": "The action to perform: index (create/replace), create (fail if exists), update, or delete", "enum": [ "index", "create", "update", "delete" ], "type": "string" }, "document": { "additionalProperties": {}, "description": "Document body (required for index/create/update, not used for delete)", "type": "object" }, "id": { "description": "Document ID (required for update and delete, optional for index/create)", "type": "string" }, "index": { "description": "Name of the Elasticsearch index for this operation", "minLength": 1, "type": "string" } }, "required": [ "action", "index" ], "type": "object" }, "minItems": 1, "type": "array" }, "pipeline": { "description": "Optional pipeline to use for preprocessing documents", "type": "string" } }, "required": [ "operations" ], "type": "object" }

Implementation Reference

  • Main handler logic for the 'bulk' tool: validates input operations, builds Elasticsearch bulk operations array in correct format, calls esService.bulk, processes the response to summarize successes/failures, formats and returns text output.
    async ({ operations, pipeline }) => { try { // Validate operations operations.forEach((op, idx) => { if ((op.action === "update" || op.action === "delete") && !op.id) { throw new Error( `Operation #${idx + 1} (${op.action}): Document ID is required` ); } if ( (op.action === "index" || op.action === "create" || op.action === "update") && !op.document ) { throw new Error( `Operation #${idx + 1} (${op.action}): Document body is required` ); } }); // Build the bulk operations array const bulkOperations: any[] = []; operations.forEach((op) => { const actionMeta: any = { _index: op.index }; if (op.id) actionMeta._id = op.id; bulkOperations.push({ [op.action]: actionMeta }); if (op.action !== "delete") { if (op.action === "update") { bulkOperations.push({ doc: op.document }); } else { bulkOperations.push(op.document); } } }); // Execute the bulk operation const response = await esService.bulk(bulkOperations, pipeline); // Process the response const summary = { took: response.took, errors: response.errors, successes: 0, failures: 0, actionResults: [] as any[], }; // Count successes and failures response.items.forEach((item: any, idx: number) => { const actionType = Object.keys(item)[0]; const result = item[actionType as keyof typeof item] as any; if (!result) return; if (result.error) { summary.failures++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, error: { type: result.error?.type || "unknown_error", reason: result.error?.reason || "Unknown error", }, }); } else { summary.successes++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, result: result.result || "unknown", }); } }); // Format the response let resultText = `Bulk operation completed in ${summary.took}ms\n`; resultText += `- Total operations: ${operations.length}\n`; resultText += `- Successful: ${summary.successes}\n`; resultText += `- Failed: ${summary.failures}\n`; // Add failure details if (summary.failures > 0) { resultText += "\nFailures:\n"; const failures = summary.actionResults.filter((r) => r.error); failures.slice(0, 5).forEach((failure, idx) => { resultText += `${idx + 1}. Operation #${failure.operation} (${ failure.action }): ${failure.error.reason} [${failure.error.type}]\n`; }); if (failures.length > 5) { resultText += `...and ${failures.length - 5} more failures.\n`; } } return { content: [{ type: "text", text: resultText }], }; } catch (error) { console.error( `Bulk operation failed: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } }
  • Input schema using Zod for the 'bulk' tool parameters: operations array and optional pipeline.
    { operations: z .array( z.object({ action: z .enum(["index", "create", "update", "delete"]) .describe( "The action to perform: index (create/replace), create (fail if exists), update, or delete" ), index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index for this operation"), id: z .string() .optional() .describe( "Document ID (required for update and delete, optional for index/create)" ), document: z .record(z.any()) .optional() .describe( "Document body (required for index/create/update, not used for delete)" ), }) ) .min(1, "At least one operation is required") .describe("Array of operations to perform in bulk"), pipeline: z .string() .optional() .describe("Optional pipeline to use for preprocessing documents"), },
  • src/index.ts:770-934 (registration)
    Registration of the 'bulk' MCP tool via server.tool, specifying name, description, input schema, and handler function.
    server.tool( "bulk", "Perform multiple document operations (create, update, delete) in a single API call", { operations: z .array( z.object({ action: z .enum(["index", "create", "update", "delete"]) .describe( "The action to perform: index (create/replace), create (fail if exists), update, or delete" ), index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index for this operation"), id: z .string() .optional() .describe( "Document ID (required for update and delete, optional for index/create)" ), document: z .record(z.any()) .optional() .describe( "Document body (required for index/create/update, not used for delete)" ), }) ) .min(1, "At least one operation is required") .describe("Array of operations to perform in bulk"), pipeline: z .string() .optional() .describe("Optional pipeline to use for preprocessing documents"), }, async ({ operations, pipeline }) => { try { // Validate operations operations.forEach((op, idx) => { if ((op.action === "update" || op.action === "delete") && !op.id) { throw new Error( `Operation #${idx + 1} (${op.action}): Document ID is required` ); } if ( (op.action === "index" || op.action === "create" || op.action === "update") && !op.document ) { throw new Error( `Operation #${idx + 1} (${op.action}): Document body is required` ); } }); // Build the bulk operations array const bulkOperations: any[] = []; operations.forEach((op) => { const actionMeta: any = { _index: op.index }; if (op.id) actionMeta._id = op.id; bulkOperations.push({ [op.action]: actionMeta }); if (op.action !== "delete") { if (op.action === "update") { bulkOperations.push({ doc: op.document }); } else { bulkOperations.push(op.document); } } }); // Execute the bulk operation const response = await esService.bulk(bulkOperations, pipeline); // Process the response const summary = { took: response.took, errors: response.errors, successes: 0, failures: 0, actionResults: [] as any[], }; // Count successes and failures response.items.forEach((item: any, idx: number) => { const actionType = Object.keys(item)[0]; const result = item[actionType as keyof typeof item] as any; if (!result) return; if (result.error) { summary.failures++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, error: { type: result.error?.type || "unknown_error", reason: result.error?.reason || "Unknown error", }, }); } else { summary.successes++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, result: result.result || "unknown", }); } }); // Format the response let resultText = `Bulk operation completed in ${summary.took}ms\n`; resultText += `- Total operations: ${operations.length}\n`; resultText += `- Successful: ${summary.successes}\n`; resultText += `- Failed: ${summary.failures}\n`; // Add failure details if (summary.failures > 0) { resultText += "\nFailures:\n"; const failures = summary.actionResults.filter((r) => r.error); failures.slice(0, 5).forEach((failure, idx) => { resultText += `${idx + 1}. Operation #${failure.operation} (${ failure.action }): ${failure.error.reason} [${failure.error.type}]\n`; }); if (failures.length > 5) { resultText += `...and ${failures.length - 5} more failures.\n`; } } return { content: [{ type: "text", text: resultText }], }; } catch (error) { console.error( `Bulk operation failed: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } );
  • Helper method in ElasticsearchService that executes the Elasticsearch bulk API call with automatic refresh and optional pipeline.
    async bulk(operations: any[], pipeline?: string): Promise<any> { return await this.client.bulk({ refresh: true, pipeline, operations, }); }
  • TypeScript interface defining the structure of the bulk operation result from Elasticsearch.
    export interface BulkOperationResult { took: number; errors: boolean; items: Array<Record<string, any>>; }

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Octodet/elasticsearch-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server