8.3. 如何配置不同的客户端序列化r/deserializer 类型


当在 Kafka 客户端应用程序中使用 schema 时,必须根据您的用例选择要使用的特定模式类型。Service Registry 为 Apache Avro、JSON 架构和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何配置 Kafka 应用程序以使用每种类型。

您还可以使用 Kafka 来实现自定义序列化和反序列化器类,并使用 Service Registry REST Java 客户端利用 Service Registry REST Java 客户端利用 Service Registry 功能。

用于 serializers/deserializers 的 Kafka 应用程序配置

使用 Kafka 应用程序中的 Service Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器,以及如何在 Kafka 消费者应用程序中配置反序列化器。

Kafka producer 中的序列化器配置示例

// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use Service Registry-provided Kafka serializer for Avro
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());

    // Configure the Service Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // Register the schema artifact if not found in the registry.
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}
Copy to Clipboard Toggle word wrap

Kafka 消费者中的 deserializer 配置示例

// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use Service Registry-provided Kafka deserializer for Avro
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

    // Configure the Service Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // No other configuration needed because the schema globalId the deserializer uses is sent
    // in the payload. The deserializer extracts the globalId and uses it to look up the schema
    // from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}
Copy to Clipboard Toggle word wrap

8.3.1. 使用 Service Registry 配置 Avro SerDes

本主题解释了如何使用 Apache Avro 的 Kafka 客户端序列化器(SerDes)类。

Service Registry 为 Avro 提供以下 Kafka 客户端 SerDes 类:

  • io.apicurio.registry.serde.avro.AvroKafkaSerializer
  • io.apicurio.registry.serde.avro.AvroKafkaDeserializer

配置 Avro serializer

您可以使用以下方法配置 Avro serializer 类:

  • Service Registry URL
  • 工件解析器策略
  • ID 位置
  • ID 编码
  • Avro datum 供应商
  • Avro 编码

ID 位置

作为 Kafka 消息的一部分,序列化器会传递 schema 的唯一 ID,以便消费者使用正确的模式进行反序列化。ID 可以位于消息有效负载中,也可以在消息标头中进行。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:

props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
Copy to Clipboard Toggle word wrap

属性名称为 apicurio.registry.headers.enabled

ID 编码

您可以在 Kafka 消息正文中传递时,自定义 schema ID 的编码方式。将 apicurio.registry.id-handler 配置属性设置为实施 io.apicurio.registry.serde.IdHandler 接口的类。Service Registry 提供以下实现:

  • io.apicurio.registry.serde.DefaultIdHandler: 将 ID 存储为 8 字节长
  • io.apicurio.registry.serde.Legacy4ByteIdHandler: 将 ID 保存为 4 字节整数

Service Registry 代表 schema ID,但出于旧原因,或者出于与其它 registry 或 SerDe 类的兼容性,您可能想在发送 ID 时使用 4 字节。

Avro datum 供应商

Avro 提供不同的 datum 写入和读取数据。Service Registry 支持三种不同的类型:

  • 通用
  • 特定
  • reflect

Service Registry AvroDatumProvider 是使用其类型的抽象,其中默认使用 DefaultAvroDatumProvider

您可以设置以下配置选项:

  • apicurio.registry.avro-datum-provider: 指定 AvroDatumProvider 实施的完全限定域名,如 io.apicurio.registry.serde.avro.ReflectAvroDatumProvider
  • apicurio.registry.use- specific-avro-reader: 设置为 true,在使用 DefaultAvroDatumProvider时使用特定类型。

Avro 编码

当使用 Avro 序列化数据时,您可以使用 Avro 二进制编码格式来确保数据以高效格式编码。Avro 还支持将数据编码为 JSON,这有助于检查每条消息的有效负载,例如日志记录或调试。

您可以通过配置值为 JSONBINARYapicurio.registry.avro.encoding 属性来设置 Avro 编码。默认值为 BINARY

配置 Avro deserializer

您必须将 Avro deserializer 类配置为与 serializer 的以下配置设置匹配:

  • Service Registry URL
  • ID 编码
  • Avro datum 供应商
  • Avro 编码

有关这些配置选项,请参阅 serializer 部分。属性名称和值相同。

注意

配置 deserializer 时不需要以下选项:

  • 工件解析器策略
  • ID 位置

deserializer 类可以从消息中决定这些选项的值。策略并不是必需的,因为序列化器负责发送 ID 作为消息的一部分。

ID 位置是通过检查消息有效负载开始时的 magic 字节来确定的。如果找到该字节,则使用配置的处理程序从消息有效负载中读取 ID。如果没有找到音量字节,则 ID 会从邮件标头读取。

Avro SerDes 和 artifact 引用

在使用 Avro 消息和带有嵌套记录的模式时,会为每个嵌套记录注册一个新的工件。例如,以下 TradeKey 模式包括嵌套的 Exchange 模式:

带有嵌套交换模式的 TradeKey 模式

{
  "namespace": "com.kubetrade.schema.trade",
  "type": "record",
  "name": "TradeKey",
  "fields": [
    {
      "name": "exchange",
      "type": "com.kubetrade.schema.common.Exchange"
    },
    {
      "name": "key",
      "type": "string"
    }
  ]
}
Copy to Clipboard Toggle word wrap

Exchange schema

{
  "namespace": "com.kubetrade.schema.common",
  "type": "enum",
  "name": "Exchange",
  "symbols" : ["GEMINI"]
}
Copy to Clipboard Toggle word wrap

当将这些方案与 Avro SerDes 搭配使用时,在 Service Registry 中创建两个工件,一个用于 TradeKey 模式,另一个用于 Exchange 模式。每当使用 TradeKey 模式的消息被序列化或反序列化时,都会检索这两个模式,以便您将定义分成不同的文件中。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat