Skip to main content
Glama
confluentinc

mcp-confluent

Official
by confluentinc

produce-message

Generate and send messages to a specified Kafka topic. Automatically creates the topic if it doesn’t exist, ensuring reliable message storage for user conversations or history.

Instructions

Produce records to a Kafka topic. For saving user messages/history they should be saved to a kafka topic named claude-conversations unless otherwise specified. If the topic does not exist, it will be created via the create-topics tool.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
messageYesThe content of the message to produce
topicNameYesName of the kafka topic to produce the message to

Implementation Reference

  • The main handler function that parses arguments, performs schema registry checks and serialization for key/value, produces the message to Kafka, and returns delivery report.
    async handle(
      clientManager: ClientManager,
      toolArguments: Record<string, unknown>,
    ): Promise<CallToolResult> {
      const { topicName, value, key }: ProduceKafkaMessageArguments =
        produceKafkaMessageArguments.parse(toolArguments);
    
      // Only create registry if needed
      const needsRegistry =
        value.useSchemaRegistry || (key && key.useSchemaRegistry);
      const registry: SchemaRegistryClient | undefined = needsRegistry
        ? clientManager.getSchemaRegistryClient()
        : undefined;
    
      // Check for latest schema if needed (value)
      const valueSchemaCheck = await checkSchemaNeeded(
        topicName,
        value as MessageOptions,
        SerdeType.VALUE,
        registry,
      );
      const valueSchemaResult = this.handleSchemaCheckResult(valueSchemaCheck);
      if (valueSchemaResult) return valueSchemaResult;
    
      // Check for latest schema if needed (key)
      if (key) {
        const keySchemaCheck = await checkSchemaNeeded(
          topicName,
          key as MessageOptions,
          SerdeType.KEY,
          registry,
        );
        const keySchemaResult = this.handleSchemaCheckResult(keySchemaCheck);
        if (keySchemaResult) return keySchemaResult;
      }
    
      let valueToSend: Buffer | string;
      let keyToSend: Buffer | string | undefined;
      try {
        valueToSend = await serializeMessage(
          topicName,
          value as MessageOptions,
          SerdeType.VALUE,
          registry,
        );
        if (key) {
          keyToSend = await serializeMessage(
            topicName,
            key as MessageOptions,
            SerdeType.KEY,
            registry,
          );
        }
      } catch (err) {
        return this.createResponse(
          `Failed to serialize: ${err instanceof Error ? err.message : err}`,
          true,
        );
      }
    
      // Send the message
      let deliveryReport: RecordMetadata[];
      try {
        deliveryReport = await (
          await clientManager.getProducer()
        ).send({
          topic: topicName,
          messages: [
            typeof keyToSend !== "undefined"
              ? { key: keyToSend, value: valueToSend }
              : { value: valueToSend },
          ],
        });
      } catch (err) {
        return this.createResponse(
          `Failed to produce message: ${err instanceof Error ? err.message : err}`,
          true,
        );
      }
      const formattedResponse = deliveryReport
        .map((metadata) => {
          if (metadata.errorCode !== 0) {
            return `Error producing message to [Topic: ${metadata.topicName}, Partition: ${metadata.partition}, Offset: ${metadata.offset} with ErrorCode: ${metadata.errorCode}]`;
          }
          return `Message produced successfully to [Topic: ${metadata.topicName}, Partition: ${metadata.partition}, Offset: ${metadata.offset}]`;
        })
        .join("\n");
      const isError = deliveryReport.some((metadata) => metadata.errorCode !== 0);
      return this.createResponse(formattedResponse, isError);
    }
  • Zod input schema definition for the tool arguments: topicName (required), value (with schema options), optional key.
    const produceKafkaMessageArguments = z.object({
      topicName: z
        .string()
        .nonempty()
        .describe("Name of the kafka topic to produce the message to"),
      value: valueOptions,
      key: keyOptions.optional(),
    });
    type ProduceKafkaMessageArguments = z.infer<
      typeof produceKafkaMessageArguments
    >;
  • Registration of the 'produce-message' tool name to its handler instance in the ToolFactory's static handlers map, used by the MCP server.
    [ToolName.PRODUCE_MESSAGE, new ProduceKafkaMessageHandler()],
  • Enum constant defining the tool name string 'produce-message'.
    PRODUCE_MESSAGE = "produce-message",
Behavior3/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden. It discloses that topics are created automatically if they don't exist, which is a key behavioral trait. However, it lacks details on permissions, error handling, or rate limits. The description adds some context but doesn't fully cover all behavioral aspects expected for a mutation tool without annotations.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is appropriately sized and front-loaded: the first sentence states the core purpose, followed by specific guidelines. Every sentence adds value without redundancy, making it efficient and well-structured for quick understanding.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given no annotations and no output schema, the description provides basic context but is incomplete. It covers the main action and a behavioral trait (topic creation), but lacks details on permissions, response format, or error scenarios. For a mutation tool with 2 parameters and no structured support, this is adequate but has clear gaps.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 100%, so the schema already documents both parameters ('message' and 'topicName') with descriptions. The description doesn't add any meaning beyond what the schema provides, such as format examples or constraints. With high schema coverage, the baseline score is 3, as the description doesn't compensate with extra parameter insights.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Produce records to a Kafka topic.' It specifies the verb ('produce') and resource ('Kafka topic'), making the action explicit. However, it doesn't distinguish this tool from its siblings (e.g., create-topics, list-topics) beyond the core function, which prevents a perfect score.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines4/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides clear usage guidance: 'For saving user messages/history they should be saved to a kafka topic named claude-conversations unless otherwise specified.' This gives a specific context and default behavior. It also mentions an alternative tool ('create-topics') for topic creation if needed, but it doesn't explicitly state when not to use this tool or compare it to all siblings, keeping it from a score of 5.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server