bulk
Efficiently execute multiple Elasticsearch document operations—create, update, delete—in one API call, reducing requests and optimizing performance.
Instructions
Perform multiple document operations (create, update, delete) in a single API call
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| operations | Yes | Array of operations to perform in bulk | |
| pipeline | No | Optional pipeline to use for preprocessing documents |
Input Schema (JSON Schema)
{
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"properties": {
"operations": {
"description": "Array of operations to perform in bulk",
"items": {
"additionalProperties": false,
"properties": {
"action": {
"description": "The action to perform: index (create/replace), create (fail if exists), update, or delete",
"enum": [
"index",
"create",
"update",
"delete"
],
"type": "string"
},
"document": {
"additionalProperties": {},
"description": "Document body (required for index/create/update, not used for delete)",
"type": "object"
},
"id": {
"description": "Document ID (required for update and delete, optional for index/create)",
"type": "string"
},
"index": {
"description": "Name of the Elasticsearch index for this operation",
"minLength": 1,
"type": "string"
}
},
"required": [
"action",
"index"
],
"type": "object"
},
"minItems": 1,
"type": "array"
},
"pipeline": {
"description": "Optional pipeline to use for preprocessing documents",
"type": "string"
}
},
"required": [
"operations"
],
"type": "object"
}
Implementation Reference
- src/index.ts:808-933 (handler)Main handler logic for the 'bulk' tool: validates input operations, builds Elasticsearch bulk operations array in correct format, calls esService.bulk, processes the response to summarize successes/failures, formats and returns text output.async ({ operations, pipeline }) => { try { // Validate operations operations.forEach((op, idx) => { if ((op.action === "update" || op.action === "delete") && !op.id) { throw new Error( `Operation #${idx + 1} (${op.action}): Document ID is required` ); } if ( (op.action === "index" || op.action === "create" || op.action === "update") && !op.document ) { throw new Error( `Operation #${idx + 1} (${op.action}): Document body is required` ); } }); // Build the bulk operations array const bulkOperations: any[] = []; operations.forEach((op) => { const actionMeta: any = { _index: op.index }; if (op.id) actionMeta._id = op.id; bulkOperations.push({ [op.action]: actionMeta }); if (op.action !== "delete") { if (op.action === "update") { bulkOperations.push({ doc: op.document }); } else { bulkOperations.push(op.document); } } }); // Execute the bulk operation const response = await esService.bulk(bulkOperations, pipeline); // Process the response const summary = { took: response.took, errors: response.errors, successes: 0, failures: 0, actionResults: [] as any[], }; // Count successes and failures response.items.forEach((item: any, idx: number) => { const actionType = Object.keys(item)[0]; const result = item[actionType as keyof typeof item] as any; if (!result) return; if (result.error) { summary.failures++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, error: { type: result.error?.type || "unknown_error", reason: result.error?.reason || "Unknown error", }, }); } else { summary.successes++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, result: result.result || "unknown", }); } }); // Format the response let resultText = `Bulk operation completed in ${summary.took}ms\n`; resultText += `- Total operations: ${operations.length}\n`; resultText += `- Successful: ${summary.successes}\n`; resultText += `- Failed: ${summary.failures}\n`; // Add failure details if (summary.failures > 0) { resultText += "\nFailures:\n"; const failures = summary.actionResults.filter((r) => r.error); failures.slice(0, 5).forEach((failure, idx) => { resultText += `${idx + 1}. Operation #${failure.operation} (${ failure.action }): ${failure.error.reason} [${failure.error.type}]\n`; }); if (failures.length > 5) { resultText += `...and ${failures.length - 5} more failures.\n`; } } return { content: [{ type: "text", text: resultText }], }; } catch (error) { console.error( `Bulk operation failed: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } }
- src/index.ts:773-807 (schema)Input schema using Zod for the 'bulk' tool parameters: operations array and optional pipeline.{ operations: z .array( z.object({ action: z .enum(["index", "create", "update", "delete"]) .describe( "The action to perform: index (create/replace), create (fail if exists), update, or delete" ), index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index for this operation"), id: z .string() .optional() .describe( "Document ID (required for update and delete, optional for index/create)" ), document: z .record(z.any()) .optional() .describe( "Document body (required for index/create/update, not used for delete)" ), }) ) .min(1, "At least one operation is required") .describe("Array of operations to perform in bulk"), pipeline: z .string() .optional() .describe("Optional pipeline to use for preprocessing documents"), },
- src/index.ts:770-934 (registration)Registration of the 'bulk' MCP tool via server.tool, specifying name, description, input schema, and handler function.server.tool( "bulk", "Perform multiple document operations (create, update, delete) in a single API call", { operations: z .array( z.object({ action: z .enum(["index", "create", "update", "delete"]) .describe( "The action to perform: index (create/replace), create (fail if exists), update, or delete" ), index: z .string() .trim() .min(1, "Index name is required") .describe("Name of the Elasticsearch index for this operation"), id: z .string() .optional() .describe( "Document ID (required for update and delete, optional for index/create)" ), document: z .record(z.any()) .optional() .describe( "Document body (required for index/create/update, not used for delete)" ), }) ) .min(1, "At least one operation is required") .describe("Array of operations to perform in bulk"), pipeline: z .string() .optional() .describe("Optional pipeline to use for preprocessing documents"), }, async ({ operations, pipeline }) => { try { // Validate operations operations.forEach((op, idx) => { if ((op.action === "update" || op.action === "delete") && !op.id) { throw new Error( `Operation #${idx + 1} (${op.action}): Document ID is required` ); } if ( (op.action === "index" || op.action === "create" || op.action === "update") && !op.document ) { throw new Error( `Operation #${idx + 1} (${op.action}): Document body is required` ); } }); // Build the bulk operations array const bulkOperations: any[] = []; operations.forEach((op) => { const actionMeta: any = { _index: op.index }; if (op.id) actionMeta._id = op.id; bulkOperations.push({ [op.action]: actionMeta }); if (op.action !== "delete") { if (op.action === "update") { bulkOperations.push({ doc: op.document }); } else { bulkOperations.push(op.document); } } }); // Execute the bulk operation const response = await esService.bulk(bulkOperations, pipeline); // Process the response const summary = { took: response.took, errors: response.errors, successes: 0, failures: 0, actionResults: [] as any[], }; // Count successes and failures response.items.forEach((item: any, idx: number) => { const actionType = Object.keys(item)[0]; const result = item[actionType as keyof typeof item] as any; if (!result) return; if (result.error) { summary.failures++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, error: { type: result.error?.type || "unknown_error", reason: result.error?.reason || "Unknown error", }, }); } else { summary.successes++; summary.actionResults.push({ operation: idx, action: actionType, id: result._id || "unknown", index: result._index || "unknown", status: result.status || 0, result: result.result || "unknown", }); } }); // Format the response let resultText = `Bulk operation completed in ${summary.took}ms\n`; resultText += `- Total operations: ${operations.length}\n`; resultText += `- Successful: ${summary.successes}\n`; resultText += `- Failed: ${summary.failures}\n`; // Add failure details if (summary.failures > 0) { resultText += "\nFailures:\n"; const failures = summary.actionResults.filter((r) => r.error); failures.slice(0, 5).forEach((failure, idx) => { resultText += `${idx + 1}. Operation #${failure.operation} (${ failure.action }): ${failure.error.reason} [${failure.error.type}]\n`; }); if (failures.length > 5) { resultText += `...and ${failures.length - 5} more failures.\n`; } } return { content: [{ type: "text", text: resultText }], }; } catch (error) { console.error( `Bulk operation failed: ${ error instanceof Error ? error.message : String(error) }` ); return { content: [ { type: "text", text: `Error: ${ error instanceof Error ? error.message : String(error) }`, }, ], }; } } );
- Helper method in ElasticsearchService that executes the Elasticsearch bulk API call with automatic refresh and optional pipeline.async bulk(operations: any[], pipeline?: string): Promise<any> { return await this.client.bulk({ refresh: true, pipeline, operations, }); }
- src/types.ts:44-48 (schema)TypeScript interface defining the structure of the bulk operation result from Elasticsearch.export interface BulkOperationResult { took: number; errors: boolean; items: Array<Record<string, any>>; }