Skip to main content
Glama

mcp-confluent

Official
by confluentinc
schema-registry-helper.ts•10.3 kB
/** * This module provides helper functions for working with Confluent Schema Registry. * It handles schema registration, serialization, and deserialization of messages * using various schema formats (AVRO, JSON, PROTOBUF). */ import { AvroDeserializer, AvroSerializer, Deserializer, DeserializerConfig, JsonDeserializer, JsonSerializer, ProtobufDeserializer, ProtobufSerializer, SchemaRegistryClient, SerdeType, Serializer, SerializerConfig, } from "@confluentinc/schemaregistry"; import { logger } from "@src/logger.js"; /** * Supported schema types for Confluent Schema Registry. * AVRO: Apache Avro binary format with schema evolution support * JSON: JSON Schema format with validation * PROTOBUF: Protocol Buffers format with backward compatibility */ export type SchemaType = "AVRO" | "JSON" | "PROTOBUF"; /** * Common options for schema registry operations. * These options control how schemas are registered and used for serialization. */ export interface SchemaRegistryOptions { useSchemaRegistry?: boolean; schemaType?: SchemaType; schema?: string; subject?: string; normalize?: boolean; } /** * Options for message serialization/deserialization. * Includes the message payload and schema registry configuration. */ export type MessageOptions = { message: Buffer | object | string; useSchemaRegistry?: boolean; schemaType?: SchemaType; schema?: string; subject?: string; normalize?: boolean; }; /** * Result of checking if a schema is needed for a message. * Can indicate: * - A schema is needed and provides the latest schema details * - No schema exists and needs to be provided * - No schema action is needed */ export type SchemaCheckResult = | { type: "schema-needed"; latestSchema: string; subject: string; schemaType: string; } | { type: "no-schema"; subject: string } | null; /** * Creates and returns the appropriate serializer instance based on schema type. * The serializer is configured to use either a specific schema ID or the latest version. * * @param schemaType - The type of schema (AVRO, JSON, PROTOBUF) * @param registry - The schema registry client instance * @param serdeType - Whether this is for key or value serialization * @param schemaId - Optional schema ID to use for serialization * @returns The appropriate Serializer instance * @throws Error if the schema type is unknown or unsupported */ export function getSerializer( schemaType: SchemaType | undefined, registry: SchemaRegistryClient, serdeType: SerdeType, schemaId?: number, ): Serializer { const serializerConfig: SerializerConfig = typeof schemaId === "number" ? { useSchemaId: schemaId } : { useLatestVersion: true }; const serializers = { AVRO: () => new AvroSerializer(registry, serdeType, serializerConfig), JSON: () => new JsonSerializer(registry, serdeType, serializerConfig), PROTOBUF: () => new ProtobufSerializer(registry, serdeType, serializerConfig), }; if (!schemaType || !(schemaType in serializers)) { throw new Error(`Unknown schemaType: ${schemaType}`); } return serializers[schemaType](); } /** * Creates and returns the appropriate deserializer instance based on schema type. * The deserializer is configured to handle schema evolution and compatibility. * * @param schemaType - The type of schema (AVRO, JSON, PROTOBUF) * @param registry - The schema registry client instance * @param serdeType - Whether this is for key or value deserialization * @returns The appropriate Deserializer instance * @throws Error if the schema type is unknown or unsupported */ export function getDeserializer( schemaType: SchemaType | undefined, registry: SchemaRegistryClient, serdeType: SerdeType, ): Deserializer { const deserializerConfig: DeserializerConfig = {}; const deserializers = { AVRO: () => new AvroDeserializer(registry, serdeType, deserializerConfig), JSON: () => new JsonDeserializer(registry, serdeType, deserializerConfig), PROTOBUF: () => new ProtobufDeserializer(registry, serdeType, deserializerConfig), }; if (!schemaType || !(schemaType in deserializers)) { throw new Error(`Unknown schemaType: ${schemaType}`); } return deserializers[schemaType](); } /** * Checks if a schema is needed for the given message options. * This function determines if: * 1. A schema is already registered and should be used * 2. No schema exists and needs to be provided * 3. No schema action is needed * * @param topicName - The Kafka topic name * @param options - The message options including schema, type, and payload * @param serdeType - Whether this is for key or value serialization * @param registry - The schema registry client instance (if used) * @returns An object describing the schema state, or null if no schema action is needed */ export async function checkSchemaNeeded( topicName: string, options: MessageOptions, serdeType: SerdeType, registry: SchemaRegistryClient | undefined, ): Promise<SchemaCheckResult> { if (options.useSchemaRegistry && !options.schema) { const subject = options.subject || `${topicName}-${serdeType === SerdeType.KEY ? "key" : "value"}`; const latest = registry ? await getLatestSchemaIfExists(registry, subject) : null; if (latest) { return { type: "schema-needed", latestSchema: latest.schema, subject, schemaType: latest.schemaType, }; } else { return { type: "no-schema", subject }; } } return null; } /** * Fetches the latest schema string and schema type for a given subject from the schema registry. * Handles 404 errors gracefully by returning null when no schema exists. * * @param registry - The schema registry client instance * @param subject - The subject to look up in the registry * @returns An object with the latest schema string and schema type, or null if not found * @throws Error if there's an unexpected error from the registry */ export async function getLatestSchemaIfExists( registry: SchemaRegistryClient, subject: string, ): Promise<{ schema: string; schemaType: SchemaType } | null> { try { const latest = await registry.getLatestSchemaMetadata(subject); // The docs say that when no schemaType is supplied, it's assumed to be AVRO const schemaType = (latest.schemaType || "AVRO") as SchemaType; return { schema: latest.schema!, schemaType }; } catch (err: unknown) { if ( err && typeof err === "object" && "status" in err && (err as { status?: number }).status === 404 ) { return null; } throw err; } } /** * Serializes a message using the provided options and schema registry configuration. * This function: * 1. Registers the schema if provided * 2. Validates the message type * 3. Creates the appropriate serializer * 4. Serializes the message * * @param topicName - The Kafka topic name * @param options - The message options including schema, type, and payload * @param serdeType - Whether this is for key or value serialization * @param registry - The schema registry client instance (if used) * @returns The serialized message as a Buffer or string * @throws Error if serialization fails, schema registration fails, or message type is invalid */ export async function serializeMessage( topicName: string, options: MessageOptions, serdeType: SerdeType, registry: SchemaRegistryClient | undefined, ): Promise<Buffer | string> { if (!options.useSchemaRegistry) { if (typeof options.message !== "string") { logger.warn( "Warning: Sending non-string message without schema registry. This may fail if the topic expects a schema.", ); } return typeof options.message === "string" ? options.message : JSON.stringify(options.message); } if (!options.schemaType) { throw new Error("schemaType is required when useSchemaRegistry is true"); } if (!registry) { throw new Error("Schema Registry client is required for serialization"); } // Default subject naming const subject = options.subject || `${topicName}-${serdeType === SerdeType.KEY ? "key" : "value"}`; let schemaId: number | undefined; // Register schema if provided if (options.schema) { try { schemaId = await registry.register( subject, { schema: options.schema, schemaType: options.schemaType, }, options.normalize, ); } catch (err) { throw new Error( `Failed to register schema for subject '${subject}': ${err}`, ); } } // Validate message type if (typeof options.message !== "object" || options.message === null) { throw new Error( "When using schema registry, message must be an object matching the schema.", ); } let serializer: Serializer; try { serializer = getSerializer( options.schemaType, registry, serdeType, schemaId, ); } catch (err) { throw new Error(`Failed to get serializer: ${err}`); } try { return await serializer.serialize(topicName, options.message); } catch (err) { throw new Error( `Failed to serialize message for subject '${subject}': ${err}`, ); } } /** * Deserializes a message using Schema Registry. * This function: * 1. Creates the appropriate deserializer * 2. Deserializes the message using the schema from the registry * * @param topic - The Kafka topic name * @param message - The message buffer to deserialize * @param schemaType - The schema type (AVRO, JSON, PROTOBUF) * @param registry - The schema registry client * @param serdeType - Whether this is key or value * @returns The deserialized object * @throws Error if deserialization fails */ export async function deserializeMessage( topic: string, message: Buffer, schemaType: SchemaType, registry: SchemaRegistryClient, serdeType: SerdeType, ): Promise<unknown> { try { const deserializer = getDeserializer(schemaType, registry, serdeType); return await deserializer.deserialize(topic, message); } catch (err) { throw new Error(`Failed to deserialize message: ${err}`); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server