elasticsearchService.ts•7.75 kB
import {
  Client,
  ClientOptions,
  Transport,
  TransportRequestOptions,
  TransportRequestParams,
} from "@elastic/elasticsearch";
import fs from "fs";
import {
  ElasticsearchConfig,
  IndexInfo,
  ShardInfo,
  ClusterHealth,
} from "./models.js";
import { VERSION } from "../index.js";
// Prepend a path prefix to every request path
class CustomTransport extends Transport {
  private readonly pathPrefix: string;
  constructor(
    opts: ConstructorParameters<typeof Transport>[0],
    pathPrefix: string
  ) {
    super(opts);
    this.pathPrefix = pathPrefix;
  }
  async request(
    params: TransportRequestParams,
    options?: TransportRequestOptions
  ): Promise<any> {
    const newParams = { ...params, path: this.pathPrefix + params.path };
    return await super.request(newParams, options);
  }
}
export class ElasticsearchService {
  private client: Client;
  private config: ElasticsearchConfig;
  constructor(config: ElasticsearchConfig) {
    this.config = config;
    this.client = this.createClient(config);
  }
  // Create the Elasticsearch client with appropriate configuration
  private createClient(config: ElasticsearchConfig): Client {
    const {
      url,
      apiKey,
      username,
      password,
      caCert,
      pathPrefix,
      version,
      sslSkipVerify,
    } = config;
    const clientOptions: ClientOptions = {
      node: url,
      headers: {
        "user-agent": `octodet-elasticsearch-mcp/${VERSION}`,
      },
    };
    // Set up custom transport with path prefix if needed
    if (pathPrefix != null) {
      const verifiedPathPrefix = pathPrefix;
      clientOptions.Transport = class extends CustomTransport {
        constructor(opts: ConstructorParameters<typeof Transport>[0]) {
          super(opts, verifiedPathPrefix);
        }
      };
    }
    // Configure authentication
    if (apiKey != null) {
      clientOptions.auth = { apiKey };
    } else if (username != null && password != null) {
      clientOptions.auth = { username, password };
    }
    // Configure TLS/SSL settings
    clientOptions.tls = {};
    if (caCert != null && caCert.length > 0) {
      try {
        const ca = fs.readFileSync(caCert);
        clientOptions.tls.ca = ca;
      } catch (error) {
        console.error(
          `Failed to read certificate file: ${
            error instanceof Error ? error.message : String(error)
          }`
        );
      }
    }
    // Configure version-specific settings
    if (version === "8") {
      clientOptions.maxRetries = 5;
      clientOptions.requestTimeout = 30000;
      clientOptions.headers = {
        accept: "application/vnd.elasticsearch+json;compatible-with=8",
        "content-type": "application/vnd.elasticsearch+json;compatible-with=8",
      };
    } else if (version === "9") {
      clientOptions.maxRetries = 5;
      clientOptions.requestTimeout = 30000;
    }
    // Skip SSL verification if requested
    if (sslSkipVerify === true) {
      clientOptions.tls.rejectUnauthorized = false;
    }
    return new Client(clientOptions);
  }
  // Get the Elasticsearch client instance
  getClient(): Client {
    return this.client;
  }
  // List indices matching a pattern
  async listIndices(indexPattern: string): Promise<IndexInfo[]> {
    const response = await this.client.cat.indices({
      index: indexPattern,
      format: "json",
      h: "index,health,status,docs.count,store.size,pri,rep",
    });
    return response.map((index: any) => ({
      index: index.index,
      health: index.health,
      status: index.status,
      docsCount: parseInt(index["docs.count"] || "0", 10),
      storeSize: index["store.size"] || "0",
      primaryShards: parseInt(index.pri || "0", 10),
      replicaShards: parseInt(index.rep || "0", 10),
    }));
  }
  // Get mappings for an index
  async getMappings(index: string): Promise<any> {
    const response = await this.client.indices.getMapping({
      index,
    });
    return response[index]?.mappings || {};
  }
  // Get cluster health information
  async getClusterHealth(): Promise<ClusterHealth> {
    const response = await this.client.cluster.health();
    return {
      status: response.status,
      nodeCount: response.number_of_nodes,
      datanodeCount: response.number_of_data_nodes,
      activePrimaryShards: response.active_primary_shards,
      activeShards: response.active_shards,
      relocatingShards: response.relocating_shards,
      initializingShards: response.initializing_shards,
      unassignedShards: response.unassigned_shards,
      pendingTasks: response.number_of_pending_tasks,
    };
  }
  // Get shard information
  async getShards(index?: string): Promise<ShardInfo[]> {
    const response = await this.client.cat.shards({
      index,
      format: "json",
    });
    return response.map((shard: any) => ({
      index: shard.index,
      shard: shard.shard,
      prirep: shard.prirep,
      state: shard.state,
      docs: shard.docs,
      store: shard.store,
      ip: shard.ip,
      node: shard.node,
    }));
  }
  // Perform a search with flexible query parameters
  async search(index: string, queryBody: any): Promise<any> {
    return await this.client.search({
      index,
      ...queryBody,
    });
  }
  // Add a document to an index
  async addDocument(index: string, document: any, id?: string): Promise<any> {
    const params: any = { index, document };
    if (id) params.id = id;
    return await this.client.index(params);
  }
  // Update a document
  async updateDocument(index: string, id: string, document: any): Promise<any> {
    return await this.client.update({
      index,
      id,
      doc: document,
    });
  }
  // Delete a document
  async deleteDocument(index: string, id: string): Promise<any> {
    return await this.client.delete({ index, id });
  }
  // Update documents by query
  async updateByQuery(params: any): Promise<any> {
    return await this.client.updateByQuery(params);
  }
  // Delete documents by query
  async deleteByQuery(params: any): Promise<any> {
    return await this.client.deleteByQuery(params);
  }
  // Perform bulk operations
  async bulk(operations: any[], pipeline?: string): Promise<any> {
    return await this.client.bulk({
      refresh: true,
      pipeline,
      operations,
    });
  }
  // Get index templates
  async getIndexTemplates(name?: string): Promise<any> {
    return await this.client.indices.getIndexTemplate({
      name,
    });
  }
  // Get index aliases
  async getAliases(name?: string): Promise<any> {
    return await this.client.indices.getAlias({
      name,
    });
  }
  // Refresh an index
  async refreshIndex(index: string): Promise<any> {
    return await this.client.indices.refresh({
      index,
    });
  }
  // Create an index with optional settings and mappings
  async createIndex(
    index: string,
    settings?: any,
    mappings?: any
  ): Promise<any> {
    const body: any = {};
    if (settings) {
      body.settings = settings;
    }
    if (mappings) {
      body.mappings = mappings;
    }
    return await this.client.indices.create({
      index,
      ...(Object.keys(body).length > 0 ? { body } : {}),
    });
  }
  // Delete an index
  async deleteIndex(index: string): Promise<any> {
    return await this.client.indices.delete({
      index,
    });
  }
  // Get ingest pipelines
  async getPipelines(id?: string): Promise<any> {
    return await this.client.ingest.getPipeline({
      id,
    });
  }
  // Count documents in an index matching a query
  async countDocuments(index: string, query?: any): Promise<number> {
    const response = await this.client.count({
      index,
      ...(query ? { query } : {}),
    });
    return response.count;
  }
}