bulk
Import multiple documents into a specified Elasticsearch index in a single operation, with optional document ID field for identification.
Instructions
Bulk data into an Elasticsearch index
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| index | Yes | Target Elasticsearch index name | |
| documents | Yes | Array of documents to import | |
| idField | No | Optional document ID field name, if specified, the value of this field will be used as the document ID |
Implementation Reference
- src/tools/bulk.ts:3-86 (handler)The `bulk` function is the core handler that performs bulk import of documents into an Elasticsearch index. It accepts esClient, index name, an array of documents, and an optional idField. It constructs bulk operations with index actions, executes them via esClient.bulk(), and returns a summary with success/failure counts and error details.
export async function bulk( esClient: Client, index: string, documents: Record<string, any>[], idField?: string ) { try { if (!documents || documents.length === 0) { return { content: [ { type: "text" as const, text: "Error: No documents provided for import" } ] }; } // 准备批量操作的数据 const operations: Record<string, any>[] = []; for (const doc of documents) { const action: Record<string, any> = { index: { _index: index } }; // 如果指定了ID字段且文档中存在该字段,使用该值作为文档ID if (idField && doc[idField]) { action.index._id = doc[idField]; } operations.push(action); operations.push(doc); } // 执行批量操作 const response = await esClient.bulk({ refresh: true, // 立即刷新索引,使数据可搜索 operations }); // 处理结果 const content: { type: "text"; text: string }[] = []; // 统计成功和失败的操作 const successCount = response.items.filter(item => !item.index?.error).length; const failureCount = response.items.filter(item => item.index?.error).length; content.push({ type: "text" as const, text: `Bulk import completed:\nTotal documents: ${documents.length}\nSuccessfully imported: ${successCount}\nFailed: ${failureCount}\nProcessing time: ${response.took}ms` }); // 如果有失败的操作,添加详细信息 if (failureCount > 0) { const errors = response.items .filter(item => item.index?.error) .map(item => { const error = item.index?.error; const id = item.index?._id || 'unknown'; return `ID: ${id} - Error type: ${error?.type}, Reason: ${error?.reason}`; }); content.push({ type: "text" as const, text: `Failed details:\n${errors.join('\n')}` }); } return { content }; } catch (error) { console.error(`Bulk import failed: ${error instanceof Error ? error.message : String(error)}`); return { content: [ { type: "text" as const, text: `Error: ${error instanceof Error ? error.message : String(error)}` } ] }; } } - src/server.ts:168-192 (registration)The 'bulk' tool is registered as an MCP tool using server.tool() with name 'bulk'. It defines input schema: index (required string), documents (required array of objects), and idField (optional string). The handler calls the bulk() function from tools/bulk.ts.
// Bulk import data into an Elasticsearch index server.tool( "bulk", "Bulk data into an Elasticsearch index", { index: z .string() .trim() .min(1, "Index name is required") .describe("Target Elasticsearch index name"), documents: z .array(z.record(z.any())) .min(1, "At least one document is required") .describe("Array of documents to import"), idField: z .string() .optional() .describe("Optional document ID field name, if specified, the value of this field will be used as the document ID") }, async ({ index, documents, idField }) => { return await bulk(esClient, index, documents, idField); } ); - src/server.ts:172-188 (schema)Input schema for the bulk tool: index (z.string, required), documents (z.array(z.record(z.any())), at least 1 required), idField (z.string, optional). These are Zod schemas used for input validation.
{ index: z .string() .trim() .min(1, "Index name is required") .describe("Target Elasticsearch index name"), documents: z .array(z.record(z.any())) .min(1, "At least one document is required") .describe("Array of documents to import"), idField: z .string() .optional() .describe("Optional document ID field name, if specified, the value of this field will be used as the document ID") }, - src/server.ts:15-27 (registration)The bulk function is exported from server.ts, making it available as part of the module's public API.
export { listIndices, getMappings, search, getClusterHealth, createIndex, createMapping, bulk, reindex, createIndexTemplate, getIndexTemplate, deleteIndexTemplate }; - src/server.ts:11-11 (registration)The bulk function is imported from './tools/bulk.js' into server.ts for registration and use.
import { bulk } from "./tools/bulk.js";