Skip to main content
Glama
Octodet

octodet-elasticsearch-mcp

bulk

Efficiently execute multiple Elasticsearch document operations—create, update, delete—in one API call, reducing requests and optimizing performance.

Instructions

Perform multiple document operations (create, update, delete) in a single API call

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
operationsYesArray of operations to perform in bulk
pipelineNoOptional pipeline to use for preprocessing documents

Implementation Reference

  • Main handler logic for the 'bulk' tool: validates input operations, builds Elasticsearch bulk operations array in correct format, calls esService.bulk, processes the response to summarize successes/failures, formats and returns text output.
    async ({ operations, pipeline }) => {
      try {
        // Validate operations
        operations.forEach((op, idx) => {
          if ((op.action === "update" || op.action === "delete") && !op.id) {
            throw new Error(
              `Operation #${idx + 1} (${op.action}): Document ID is required`
            );
          }
          if (
            (op.action === "index" ||
              op.action === "create" ||
              op.action === "update") &&
            !op.document
          ) {
            throw new Error(
              `Operation #${idx + 1} (${op.action}): Document body is required`
            );
          }
        });
    
        // Build the bulk operations array
        const bulkOperations: any[] = [];
    
        operations.forEach((op) => {
          const actionMeta: any = { _index: op.index };
          if (op.id) actionMeta._id = op.id;
    
          bulkOperations.push({ [op.action]: actionMeta });
    
          if (op.action !== "delete") {
            if (op.action === "update") {
              bulkOperations.push({ doc: op.document });
            } else {
              bulkOperations.push(op.document);
            }
          }
        });
    
        // Execute the bulk operation
        const response = await esService.bulk(bulkOperations, pipeline);
    
        // Process the response
        const summary = {
          took: response.took,
          errors: response.errors,
          successes: 0,
          failures: 0,
          actionResults: [] as any[],
        };
    
        // Count successes and failures
        response.items.forEach((item: any, idx: number) => {
          const actionType = Object.keys(item)[0];
          const result = item[actionType as keyof typeof item] as any;
    
          if (!result) return;
    
          if (result.error) {
            summary.failures++;
            summary.actionResults.push({
              operation: idx,
              action: actionType,
              id: result._id || "unknown",
              index: result._index || "unknown",
              status: result.status || 0,
              error: {
                type: result.error?.type || "unknown_error",
                reason: result.error?.reason || "Unknown error",
              },
            });
          } else {
            summary.successes++;
            summary.actionResults.push({
              operation: idx,
              action: actionType,
              id: result._id || "unknown",
              index: result._index || "unknown",
              status: result.status || 0,
              result: result.result || "unknown",
            });
          }
        });
    
        // Format the response
        let resultText = `Bulk operation completed in ${summary.took}ms\n`;
        resultText += `- Total operations: ${operations.length}\n`;
        resultText += `- Successful: ${summary.successes}\n`;
        resultText += `- Failed: ${summary.failures}\n`;
    
        // Add failure details
        if (summary.failures > 0) {
          resultText += "\nFailures:\n";
          const failures = summary.actionResults.filter((r) => r.error);
          failures.slice(0, 5).forEach((failure, idx) => {
            resultText += `${idx + 1}. Operation #${failure.operation} (${
              failure.action
            }): ${failure.error.reason} [${failure.error.type}]\n`;
          });
    
          if (failures.length > 5) {
            resultText += `...and ${failures.length - 5} more failures.\n`;
          }
        }
    
        return {
          content: [{ type: "text", text: resultText }],
        };
      } catch (error) {
        console.error(
          `Bulk operation failed: ${
            error instanceof Error ? error.message : String(error)
          }`
        );
        return {
          content: [
            {
              type: "text",
              text: `Error: ${
                error instanceof Error ? error.message : String(error)
              }`,
            },
          ],
        };
      }
    }
  • Input schema using Zod for the 'bulk' tool parameters: operations array and optional pipeline.
    {
      operations: z
        .array(
          z.object({
            action: z
              .enum(["index", "create", "update", "delete"])
              .describe(
                "The action to perform: index (create/replace), create (fail if exists), update, or delete"
              ),
            index: z
              .string()
              .trim()
              .min(1, "Index name is required")
              .describe("Name of the Elasticsearch index for this operation"),
            id: z
              .string()
              .optional()
              .describe(
                "Document ID (required for update and delete, optional for index/create)"
              ),
            document: z
              .record(z.any())
              .optional()
              .describe(
                "Document body (required for index/create/update, not used for delete)"
              ),
          })
        )
        .min(1, "At least one operation is required")
        .describe("Array of operations to perform in bulk"),
      pipeline: z
        .string()
        .optional()
        .describe("Optional pipeline to use for preprocessing documents"),
    },
  • src/index.ts:770-934 (registration)
    Registration of the 'bulk' MCP tool via server.tool, specifying name, description, input schema, and handler function.
    server.tool(
      "bulk",
      "Perform multiple document operations (create, update, delete) in a single API call",
      {
        operations: z
          .array(
            z.object({
              action: z
                .enum(["index", "create", "update", "delete"])
                .describe(
                  "The action to perform: index (create/replace), create (fail if exists), update, or delete"
                ),
              index: z
                .string()
                .trim()
                .min(1, "Index name is required")
                .describe("Name of the Elasticsearch index for this operation"),
              id: z
                .string()
                .optional()
                .describe(
                  "Document ID (required for update and delete, optional for index/create)"
                ),
              document: z
                .record(z.any())
                .optional()
                .describe(
                  "Document body (required for index/create/update, not used for delete)"
                ),
            })
          )
          .min(1, "At least one operation is required")
          .describe("Array of operations to perform in bulk"),
        pipeline: z
          .string()
          .optional()
          .describe("Optional pipeline to use for preprocessing documents"),
      },
      async ({ operations, pipeline }) => {
        try {
          // Validate operations
          operations.forEach((op, idx) => {
            if ((op.action === "update" || op.action === "delete") && !op.id) {
              throw new Error(
                `Operation #${idx + 1} (${op.action}): Document ID is required`
              );
            }
            if (
              (op.action === "index" ||
                op.action === "create" ||
                op.action === "update") &&
              !op.document
            ) {
              throw new Error(
                `Operation #${idx + 1} (${op.action}): Document body is required`
              );
            }
          });
    
          // Build the bulk operations array
          const bulkOperations: any[] = [];
    
          operations.forEach((op) => {
            const actionMeta: any = { _index: op.index };
            if (op.id) actionMeta._id = op.id;
    
            bulkOperations.push({ [op.action]: actionMeta });
    
            if (op.action !== "delete") {
              if (op.action === "update") {
                bulkOperations.push({ doc: op.document });
              } else {
                bulkOperations.push(op.document);
              }
            }
          });
    
          // Execute the bulk operation
          const response = await esService.bulk(bulkOperations, pipeline);
    
          // Process the response
          const summary = {
            took: response.took,
            errors: response.errors,
            successes: 0,
            failures: 0,
            actionResults: [] as any[],
          };
    
          // Count successes and failures
          response.items.forEach((item: any, idx: number) => {
            const actionType = Object.keys(item)[0];
            const result = item[actionType as keyof typeof item] as any;
    
            if (!result) return;
    
            if (result.error) {
              summary.failures++;
              summary.actionResults.push({
                operation: idx,
                action: actionType,
                id: result._id || "unknown",
                index: result._index || "unknown",
                status: result.status || 0,
                error: {
                  type: result.error?.type || "unknown_error",
                  reason: result.error?.reason || "Unknown error",
                },
              });
            } else {
              summary.successes++;
              summary.actionResults.push({
                operation: idx,
                action: actionType,
                id: result._id || "unknown",
                index: result._index || "unknown",
                status: result.status || 0,
                result: result.result || "unknown",
              });
            }
          });
    
          // Format the response
          let resultText = `Bulk operation completed in ${summary.took}ms\n`;
          resultText += `- Total operations: ${operations.length}\n`;
          resultText += `- Successful: ${summary.successes}\n`;
          resultText += `- Failed: ${summary.failures}\n`;
    
          // Add failure details
          if (summary.failures > 0) {
            resultText += "\nFailures:\n";
            const failures = summary.actionResults.filter((r) => r.error);
            failures.slice(0, 5).forEach((failure, idx) => {
              resultText += `${idx + 1}. Operation #${failure.operation} (${
                failure.action
              }): ${failure.error.reason} [${failure.error.type}]\n`;
            });
    
            if (failures.length > 5) {
              resultText += `...and ${failures.length - 5} more failures.\n`;
            }
          }
    
          return {
            content: [{ type: "text", text: resultText }],
          };
        } catch (error) {
          console.error(
            `Bulk operation failed: ${
              error instanceof Error ? error.message : String(error)
            }`
          );
          return {
            content: [
              {
                type: "text",
                text: `Error: ${
                  error instanceof Error ? error.message : String(error)
                }`,
              },
            ],
          };
        }
      }
    );
  • Helper method in ElasticsearchService that executes the Elasticsearch bulk API call with automatic refresh and optional pipeline.
    async bulk(operations: any[], pipeline?: string): Promise<any> {
      return await this.client.bulk({
        refresh: true,
        pipeline,
        operations,
      });
    }
  • TypeScript interface defining the structure of the bulk operation result from Elasticsearch.
    export interface BulkOperationResult {
      took: number;
      errors: boolean;
      items: Array<Record<string, any>>;
    }
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Octodet/elasticsearch-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server