Skip to main content
Glama
confluentinc

mcp-confluent

Official
by confluentinc

produce-message

Generate and send messages to a specified Kafka topic. Automatically creates the topic if it doesn’t exist, ensuring reliable message storage for user conversations or history.

Instructions

Produce records to a Kafka topic. For saving user messages/history they should be saved to a kafka topic named claude-conversations unless otherwise specified. If the topic does not exist, it will be created via the create-topics tool.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
messageYesThe content of the message to produce
topicNameYesName of the kafka topic to produce the message to

Implementation Reference

  • The main handler function that parses arguments, performs schema registry checks and serialization for key/value, produces the message to Kafka, and returns delivery report.
    async handle(
      clientManager: ClientManager,
      toolArguments: Record<string, unknown>,
    ): Promise<CallToolResult> {
      const { topicName, value, key }: ProduceKafkaMessageArguments =
        produceKafkaMessageArguments.parse(toolArguments);
    
      // Only create registry if needed
      const needsRegistry =
        value.useSchemaRegistry || (key && key.useSchemaRegistry);
      const registry: SchemaRegistryClient | undefined = needsRegistry
        ? clientManager.getSchemaRegistryClient()
        : undefined;
    
      // Check for latest schema if needed (value)
      const valueSchemaCheck = await checkSchemaNeeded(
        topicName,
        value as MessageOptions,
        SerdeType.VALUE,
        registry,
      );
      const valueSchemaResult = this.handleSchemaCheckResult(valueSchemaCheck);
      if (valueSchemaResult) return valueSchemaResult;
    
      // Check for latest schema if needed (key)
      if (key) {
        const keySchemaCheck = await checkSchemaNeeded(
          topicName,
          key as MessageOptions,
          SerdeType.KEY,
          registry,
        );
        const keySchemaResult = this.handleSchemaCheckResult(keySchemaCheck);
        if (keySchemaResult) return keySchemaResult;
      }
    
      let valueToSend: Buffer | string;
      let keyToSend: Buffer | string | undefined;
      try {
        valueToSend = await serializeMessage(
          topicName,
          value as MessageOptions,
          SerdeType.VALUE,
          registry,
        );
        if (key) {
          keyToSend = await serializeMessage(
            topicName,
            key as MessageOptions,
            SerdeType.KEY,
            registry,
          );
        }
      } catch (err) {
        return this.createResponse(
          `Failed to serialize: ${err instanceof Error ? err.message : err}`,
          true,
        );
      }
    
      // Send the message
      let deliveryReport: RecordMetadata[];
      try {
        deliveryReport = await (
          await clientManager.getProducer()
        ).send({
          topic: topicName,
          messages: [
            typeof keyToSend !== "undefined"
              ? { key: keyToSend, value: valueToSend }
              : { value: valueToSend },
          ],
        });
      } catch (err) {
        return this.createResponse(
          `Failed to produce message: ${err instanceof Error ? err.message : err}`,
          true,
        );
      }
      const formattedResponse = deliveryReport
        .map((metadata) => {
          if (metadata.errorCode !== 0) {
            return `Error producing message to [Topic: ${metadata.topicName}, Partition: ${metadata.partition}, Offset: ${metadata.offset} with ErrorCode: ${metadata.errorCode}]`;
          }
          return `Message produced successfully to [Topic: ${metadata.topicName}, Partition: ${metadata.partition}, Offset: ${metadata.offset}]`;
        })
        .join("\n");
      const isError = deliveryReport.some((metadata) => metadata.errorCode !== 0);
      return this.createResponse(formattedResponse, isError);
    }
  • Zod input schema definition for the tool arguments: topicName (required), value (with schema options), optional key.
    const produceKafkaMessageArguments = z.object({
      topicName: z
        .string()
        .nonempty()
        .describe("Name of the kafka topic to produce the message to"),
      value: valueOptions,
      key: keyOptions.optional(),
    });
    type ProduceKafkaMessageArguments = z.infer<
      typeof produceKafkaMessageArguments
    >;
  • Registration of the 'produce-message' tool name to its handler instance in the ToolFactory's static handlers map, used by the MCP server.
    [ToolName.PRODUCE_MESSAGE, new ProduceKafkaMessageHandler()],
  • Enum constant defining the tool name string 'produce-message'.
    PRODUCE_MESSAGE = "produce-message",

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server