produce-message
Generate and send messages to a specified Kafka topic. Automatically creates the topic if it doesn’t exist, ensuring reliable message storage for user conversations or history.
Instructions
Produce records to a Kafka topic. For saving user messages/history they should be saved to a kafka topic named claude-conversations unless otherwise specified. If the topic does not exist, it will be created via the create-topics tool.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| message | Yes | The content of the message to produce | |
| topicName | Yes | Name of the kafka topic to produce the message to |
Implementation Reference
- The main handler function that parses arguments, performs schema registry checks and serialization for key/value, produces the message to Kafka, and returns delivery report.async handle( clientManager: ClientManager, toolArguments: Record<string, unknown>, ): Promise<CallToolResult> { const { topicName, value, key }: ProduceKafkaMessageArguments = produceKafkaMessageArguments.parse(toolArguments); // Only create registry if needed const needsRegistry = value.useSchemaRegistry || (key && key.useSchemaRegistry); const registry: SchemaRegistryClient | undefined = needsRegistry ? clientManager.getSchemaRegistryClient() : undefined; // Check for latest schema if needed (value) const valueSchemaCheck = await checkSchemaNeeded( topicName, value as MessageOptions, SerdeType.VALUE, registry, ); const valueSchemaResult = this.handleSchemaCheckResult(valueSchemaCheck); if (valueSchemaResult) return valueSchemaResult; // Check for latest schema if needed (key) if (key) { const keySchemaCheck = await checkSchemaNeeded( topicName, key as MessageOptions, SerdeType.KEY, registry, ); const keySchemaResult = this.handleSchemaCheckResult(keySchemaCheck); if (keySchemaResult) return keySchemaResult; } let valueToSend: Buffer | string; let keyToSend: Buffer | string | undefined; try { valueToSend = await serializeMessage( topicName, value as MessageOptions, SerdeType.VALUE, registry, ); if (key) { keyToSend = await serializeMessage( topicName, key as MessageOptions, SerdeType.KEY, registry, ); } } catch (err) { return this.createResponse( `Failed to serialize: ${err instanceof Error ? err.message : err}`, true, ); } // Send the message let deliveryReport: RecordMetadata[]; try { deliveryReport = await ( await clientManager.getProducer() ).send({ topic: topicName, messages: [ typeof keyToSend !== "undefined" ? { key: keyToSend, value: valueToSend } : { value: valueToSend }, ], }); } catch (err) { return this.createResponse( `Failed to produce message: ${err instanceof Error ? err.message : err}`, true, ); } const formattedResponse = deliveryReport .map((metadata) => { if (metadata.errorCode !== 0) { return `Error producing message to [Topic: ${metadata.topicName}, Partition: ${metadata.partition}, Offset: ${metadata.offset} with ErrorCode: ${metadata.errorCode}]`; } return `Message produced successfully to [Topic: ${metadata.topicName}, Partition: ${metadata.partition}, Offset: ${metadata.offset}]`; }) .join("\n"); const isError = deliveryReport.some((metadata) => metadata.errorCode !== 0); return this.createResponse(formattedResponse, isError); }
- Zod input schema definition for the tool arguments: topicName (required), value (with schema options), optional key.const produceKafkaMessageArguments = z.object({ topicName: z .string() .nonempty() .describe("Name of the kafka topic to produce the message to"), value: valueOptions, key: keyOptions.optional(), }); type ProduceKafkaMessageArguments = z.infer< typeof produceKafkaMessageArguments >;
- src/confluent/tools/tool-factory.ts:46-46 (registration)Registration of the 'produce-message' tool name to its handler instance in the ToolFactory's static handlers map, used by the MCP server.[ToolName.PRODUCE_MESSAGE, new ProduceKafkaMessageHandler()],
- src/confluent/tools/tool-name.ts:5-5 (helper)Enum constant defining the tool name string 'produce-message'.PRODUCE_MESSAGE = "produce-message",