Skip to main content
Glama
awesimon

Elasticsearch MCP Server

bulk

Import large volumes of documents into an Elasticsearch index, specifying the target index and optional document ID field for efficient data handling.

Instructions

Bulk data into an Elasticsearch index

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
documentsYesArray of documents to import
idFieldNoOptional document ID field name, if specified, the value of this field will be used as the document ID
indexYesTarget Elasticsearch index name

Implementation Reference

  • Core implementation of the bulk tool: prepares bulk operations, executes esClient.bulk(), processes response with success/failure counts and details.
    export async function bulk( esClient: Client, index: string, documents: Record<string, any>[], idField?: string ) { try { if (!documents || documents.length === 0) { return { content: [ { type: "text" as const, text: "Error: No documents provided for import" } ] }; } // 准备批量操作的数据 const operations: Record<string, any>[] = []; for (const doc of documents) { const action: Record<string, any> = { index: { _index: index } }; // 如果指定了ID字段且文档中存在该字段,使用该值作为文档ID if (idField && doc[idField]) { action.index._id = doc[idField]; } operations.push(action); operations.push(doc); } // 执行批量操作 const response = await esClient.bulk({ refresh: true, // 立即刷新索引,使数据可搜索 operations }); // 处理结果 const content: { type: "text"; text: string }[] = []; // 统计成功和失败的操作 const successCount = response.items.filter(item => !item.index?.error).length; const failureCount = response.items.filter(item => item.index?.error).length; content.push({ type: "text" as const, text: `Bulk import completed:\nTotal documents: ${documents.length}\nSuccessfully imported: ${successCount}\nFailed: ${failureCount}\nProcessing time: ${response.took}ms` }); // 如果有失败的操作,添加详细信息 if (failureCount > 0) { const errors = response.items .filter(item => item.index?.error) .map(item => { const error = item.index?.error; const id = item.index?._id || 'unknown'; return `ID: ${id} - Error type: ${error?.type}, Reason: ${error?.reason}`; }); content.push({ type: "text" as const, text: `Failed details:\n${errors.join('\n')}` }); } return { content }; } catch (error) { console.error(`Bulk import failed: ${error instanceof Error ? error.message : String(error)}`); return { content: [ { type: "text" as const, text: `Error: ${error instanceof Error ? error.message : String(error)}` } ] }; } }
  • src/server.ts:169-192 (registration)
    Registers the 'bulk' tool with the MCP server, including description, input schema, and handler.
    server.tool( "bulk", "Bulk data into an Elasticsearch index", { index: z .string() .trim() .min(1, "Index name is required") .describe("Target Elasticsearch index name"), documents: z .array(z.record(z.any())) .min(1, "At least one document is required") .describe("Array of documents to import"), idField: z .string() .optional() .describe("Optional document ID field name, if specified, the value of this field will be used as the document ID") }, async ({ index, documents, idField }) => { return await bulk(esClient, index, documents, idField); } );
  • Zod input schema for the bulk tool parameters: index (required string), documents (array of objects, min 1), idField (optional string).
    { index: z .string() .trim() .min(1, "Index name is required") .describe("Target Elasticsearch index name"), documents: z .array(z.record(z.any())) .min(1, "At least one document is required") .describe("Array of documents to import"), idField: z .string() .optional() .describe("Optional document ID field name, if specified, the value of this field will be used as the document ID") },

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/awesimon/elasticsearch-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server