Skip to main content
Glama
confluentinc

mcp-confluent

Official
by confluentinc

produce-message

Generate and send messages to a specified Kafka topic. Automatically creates the topic if it doesn’t exist, ensuring reliable message storage for user conversations or history.

Instructions

Produce records to a Kafka topic. For saving user messages/history they should be saved to a kafka topic named claude-conversations unless otherwise specified. If the topic does not exist, it will be created via the create-topics tool.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
messageYesThe content of the message to produce
topicNameYesName of the kafka topic to produce the message to

Implementation Reference

  • The main handler function that parses arguments, performs schema registry checks and serialization for key/value, produces the message to Kafka, and returns delivery report.
    async handle( clientManager: ClientManager, toolArguments: Record<string, unknown>, ): Promise<CallToolResult> { const { topicName, value, key }: ProduceKafkaMessageArguments = produceKafkaMessageArguments.parse(toolArguments); // Only create registry if needed const needsRegistry = value.useSchemaRegistry || (key && key.useSchemaRegistry); const registry: SchemaRegistryClient | undefined = needsRegistry ? clientManager.getSchemaRegistryClient() : undefined; // Check for latest schema if needed (value) const valueSchemaCheck = await checkSchemaNeeded( topicName, value as MessageOptions, SerdeType.VALUE, registry, ); const valueSchemaResult = this.handleSchemaCheckResult(valueSchemaCheck); if (valueSchemaResult) return valueSchemaResult; // Check for latest schema if needed (key) if (key) { const keySchemaCheck = await checkSchemaNeeded( topicName, key as MessageOptions, SerdeType.KEY, registry, ); const keySchemaResult = this.handleSchemaCheckResult(keySchemaCheck); if (keySchemaResult) return keySchemaResult; } let valueToSend: Buffer | string; let keyToSend: Buffer | string | undefined; try { valueToSend = await serializeMessage( topicName, value as MessageOptions, SerdeType.VALUE, registry, ); if (key) { keyToSend = await serializeMessage( topicName, key as MessageOptions, SerdeType.KEY, registry, ); } } catch (err) { return this.createResponse( `Failed to serialize: ${err instanceof Error ? err.message : err}`, true, ); } // Send the message let deliveryReport: RecordMetadata[]; try { deliveryReport = await ( await clientManager.getProducer() ).send({ topic: topicName, messages: [ typeof keyToSend !== "undefined" ? { key: keyToSend, value: valueToSend } : { value: valueToSend }, ], }); } catch (err) { return this.createResponse( `Failed to produce message: ${err instanceof Error ? err.message : err}`, true, ); } const formattedResponse = deliveryReport .map((metadata) => { if (metadata.errorCode !== 0) { return `Error producing message to [Topic: ${metadata.topicName}, Partition: ${metadata.partition}, Offset: ${metadata.offset} with ErrorCode: ${metadata.errorCode}]`; } return `Message produced successfully to [Topic: ${metadata.topicName}, Partition: ${metadata.partition}, Offset: ${metadata.offset}]`; }) .join("\n"); const isError = deliveryReport.some((metadata) => metadata.errorCode !== 0); return this.createResponse(formattedResponse, isError); }
  • Zod input schema definition for the tool arguments: topicName (required), value (with schema options), optional key.
    const produceKafkaMessageArguments = z.object({ topicName: z .string() .nonempty() .describe("Name of the kafka topic to produce the message to"), value: valueOptions, key: keyOptions.optional(), }); type ProduceKafkaMessageArguments = z.infer< typeof produceKafkaMessageArguments >;
  • Registration of the 'produce-message' tool name to its handler instance in the ToolFactory's static handlers map, used by the MCP server.
    [ToolName.PRODUCE_MESSAGE, new ProduceKafkaMessageHandler()],
  • Enum constant defining the tool name string 'produce-message'.
    PRODUCE_MESSAGE = "produce-message",

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server