Skip to main content
Glama

mcp-confluent

Official
by confluentinc
consume-kafka-messages-handler.ts•8.47 kB
import { KafkaJS } from "@confluentinc/kafka-javascript"; import { KafkaMessage } from "@confluentinc/kafka-javascript/types/kafkajs.js"; import { SchemaRegistryClient, SerdeType } from "@confluentinc/schemaregistry"; import { ClientManager } from "@src/confluent/client-manager.js"; import { deserializeMessage, getLatestSchemaIfExists, } from "@src/confluent/schema-registry-helper.js"; import { CallToolResult } from "@src/confluent/schema.js"; import { BaseToolHandler, ToolConfig, } from "@src/confluent/tools/base-tools.js"; import { ToolName } from "@src/confluent/tools/tool-name.js"; import { EnvVar } from "@src/env-schema.js"; import { logger } from "@src/logger.js"; import { z } from "zod"; const messageOptions = z.object({ useSchemaRegistry: z .boolean() .optional() .default(false) .describe( "Whether to use schema registry for deserialization. If false, messages will be returned as raw.", ), subject: z .string() .optional() .describe( "Schema registry subject. Defaults to 'topicName-value' or 'topicName-key'.", ), }); const valueOptions = z.object({}).extend(messageOptions.shape); const keyOptions = z.object({}).extend(messageOptions.shape); type ValueOptions = z.infer<typeof valueOptions>; type KeyOptions = z.infer<typeof keyOptions>; export const consumeKafkaMessagesArgs = z.object({ topicNames: z .array(z.string()) .nonempty() .describe("Names of the Kafka topics to consume from."), maxMessages: z .number() .int() .positive() .optional() .default(10) .describe("Maximum number of messages to consume before stopping."), timeoutMs: z .number() .int() .positive() .optional() .default(10000) .describe( "Maximum time in milliseconds to wait for messages before stopping.", ), value: valueOptions, key: keyOptions.optional(), }); interface ProcessedMessage { key: unknown; value: unknown; timestamp: string; offset: string; headers?: Record<string, string>; topic: string; partition: number; } /** * Handler for consuming messages from Kafka topics with support for Schema Registry deserialization. * This handler allows consuming messages from one or more topics with configurable message limits and timeouts. * It supports automatic deserialization of Schema Registry encoded messages (AVRO, JSON, PROTOBUF). */ export class ConsumeKafkaMessagesHandler extends BaseToolHandler { /** * Processes a single Kafka message, handling deserialization of both key and value. * @param topic - The topic the message was consumed from * @param partition - The partition the message was consumed from * @param message - The raw Kafka message * @param registry - Optional Schema Registry client for deserialization * @param valueOptions - Options for value deserialization * @param keyOptions - Optional options for key deserialization * @returns A processed message with deserialized key and value */ async processMessage( topic: string, partition: number, message: KafkaMessage, registry: SchemaRegistryClient | undefined, valueOptions: ValueOptions, keyOptions?: KeyOptions, ): Promise<ProcessedMessage> { let processedKey: unknown = message.key?.toString(); let processedValue: unknown = message.value?.toString(); const deserializeWithOptions = async ( buffer: Buffer | undefined, options: ValueOptions | KeyOptions, serdeType: SerdeType, ): Promise<unknown> => { if (!options.useSchemaRegistry || !registry) { return buffer?.toString(); } const subject = options.subject || `${topic}-${serdeType === SerdeType.KEY ? "key" : "value"}`; const schema = await getLatestSchemaIfExists(registry, subject); if (!schema || !schema.schemaType) { return buffer?.toString(); } try { return await deserializeMessage( topic, buffer as Buffer, schema.schemaType, registry, serdeType, ); } catch (err) { logger.error( { error: err, topic, schemaType: schema.schemaType, serdeType }, `Error deserializing message ${serdeType} for topic ${topic}`, ); return buffer?.toString(); } }; processedValue = await deserializeWithOptions( message.value as Buffer, valueOptions, SerdeType.VALUE, ); if (message.key && keyOptions) { processedKey = await deserializeWithOptions( message.key as Buffer, keyOptions, SerdeType.KEY, ); } return { key: processedKey, value: processedValue, timestamp: message.timestamp, offset: message.offset, headers: message.headers ? Object.fromEntries( Object.entries(message.headers).map(([key, value]) => [ key, value?.toString() || "", ]), ) : undefined, topic, partition, }; } /** * Main handler for consuming messages from Kafka topics. * @param clientManager - The client manager for Kafka and registry clients * @param toolArguments - The arguments for the tool, including topics, message limits, and deserialization options * @param sessionId - Optional session ID for Kafka consumer * @returns A CallToolResult containing the consumed messages or error information */ async handle( clientManager: ClientManager, toolArguments: z.infer<typeof consumeKafkaMessagesArgs>, sessionId?: string, ): Promise<CallToolResult> { const { topicNames, maxMessages, timeoutMs, value, key } = consumeKafkaMessagesArgs.parse(toolArguments); const consumedMessages: ProcessedMessage[] = []; let timeoutReached = false; let consumer: KafkaJS.Consumer | undefined; const registry: SchemaRegistryClient | undefined = value.useSchemaRegistry || (key && key.useSchemaRegistry) ? clientManager.getSchemaRegistryClient() : undefined; try { consumer = await clientManager.getConsumer(sessionId); await consumer.connect(); await consumer.subscribe({ topics: topicNames }); const consumePromise = new Promise<void>((resolve, reject) => { if (!consumer) { reject(new Error("Consumer was unexpectedly undefined")); return; } consumer .run({ eachMessage: async ({ topic, partition, message }) => { if (timeoutReached) return; const processed = await this.processMessage( topic, partition, message, registry, value, key, ); consumedMessages.push(processed); if (consumedMessages.length >= maxMessages) { timeoutReached = true; resolve(); } }, }) .catch((error) => { reject(error); }); setTimeout(() => { timeoutReached = true; resolve(); }, timeoutMs); }); await consumePromise; return this.createResponse( `Consumed ${consumedMessages.length} messages from topics ${topicNames.join(", ")}.\nConsumed messages: ${JSON.stringify(consumedMessages, null, 2)}`, false, ); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : error instanceof KafkaJS.KafkaJSError ? `Kafka error (${error.code}): ${error.message}` : String(error); return this.createResponse( `Failed to consume messages: ${errorMessage}`, true, ); } finally { if (consumer) { try { await consumer.disconnect(); } catch (error) { logger.error({ error }, "Error cleaning up consumer"); } } } } getToolConfig(): ToolConfig { return { name: ToolName.CONSUME_MESSAGES, description: "Consumes messages from one or more Kafka topics. Supports automatic deserialization of Schema Registry encoded messages (AVRO, JSON, PROTOBUF).", inputSchema: consumeKafkaMessagesArgs.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["KAFKA_API_KEY", "KAFKA_API_SECRET", "BOOTSTRAP_SERVERS"]; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server