Skip to main content
Glama
awesimon

Elasticsearch MCP Server

bulk

Import multiple documents into an Elasticsearch index in a single operation to optimize data ingestion and improve performance.

Instructions

Bulk data into an Elasticsearch index

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
indexYesTarget Elasticsearch index name
documentsYesArray of documents to import
idFieldNoOptional document ID field name, if specified, the value of this field will be used as the document ID

Implementation Reference

  • The core handler function for the 'bulk' tool, implementing bulk document import into an Elasticsearch index using esClient.bulk, with error handling and success/failure reporting.
    export async function bulk( esClient: Client, index: string, documents: Record<string, any>[], idField?: string ) { try { if (!documents || documents.length === 0) { return { content: [ { type: "text" as const, text: "Error: No documents provided for import" } ] }; } // 准备批量操作的数据 const operations: Record<string, any>[] = []; for (const doc of documents) { const action: Record<string, any> = { index: { _index: index } }; // 如果指定了ID字段且文档中存在该字段,使用该值作为文档ID if (idField && doc[idField]) { action.index._id = doc[idField]; } operations.push(action); operations.push(doc); } // 执行批量操作 const response = await esClient.bulk({ refresh: true, // 立即刷新索引,使数据可搜索 operations }); // 处理结果 const content: { type: "text"; text: string }[] = []; // 统计成功和失败的操作 const successCount = response.items.filter(item => !item.index?.error).length; const failureCount = response.items.filter(item => item.index?.error).length; content.push({ type: "text" as const, text: `Bulk import completed:\nTotal documents: ${documents.length}\nSuccessfully imported: ${successCount}\nFailed: ${failureCount}\nProcessing time: ${response.took}ms` }); // 如果有失败的操作,添加详细信息 if (failureCount > 0) { const errors = response.items .filter(item => item.index?.error) .map(item => { const error = item.index?.error; const id = item.index?._id || 'unknown'; return `ID: ${id} - Error type: ${error?.type}, Reason: ${error?.reason}`; }); content.push({ type: "text" as const, text: `Failed details:\n${errors.join('\n')}` }); } return { content }; } catch (error) { console.error(`Bulk import failed: ${error instanceof Error ? error.message : String(error)}`); return { content: [ { type: "text" as const, text: `Error: ${error instanceof Error ? error.message : String(error)}` } ] }; } }
  • src/server.ts:168-192 (registration)
    Registration of the 'bulk' MCP tool, including name, description, input schema, and wrapper handler that delegates to the bulk function.
    // Bulk import data into an Elasticsearch index server.tool( "bulk", "Bulk data into an Elasticsearch index", { index: z .string() .trim() .min(1, "Index name is required") .describe("Target Elasticsearch index name"), documents: z .array(z.record(z.any())) .min(1, "At least one document is required") .describe("Array of documents to import"), idField: z .string() .optional() .describe("Optional document ID field name, if specified, the value of this field will be used as the document ID") }, async ({ index, documents, idField }) => { return await bulk(esClient, index, documents, idField); } );
  • Zod schema for 'bulk' tool inputs: index (string), documents (array of records), optional idField (string).
    { index: z .string() .trim() .min(1, "Index name is required") .describe("Target Elasticsearch index name"), documents: z .array(z.record(z.any())) .min(1, "At least one document is required") .describe("Array of documents to import"), idField: z .string() .optional() .describe("Optional document ID field name, if specified, the value of this field will be used as the document ID") },

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/awesimon/elasticsearch-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server