8.3. 如何配置不同的客户端序列化r/deserializer 类型
当在 Kafka 客户端应用程序中使用 schema 时,必须根据您的用例选择要使用的特定模式类型。Service Registry 为 Apache Avro、JSON 架构和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何配置 Kafka 应用程序以使用每种类型。
您还可以使用 Kafka 来实现自定义序列化和反序列化器类,并使用 Service Registry REST Java 客户端利用 Service Registry REST Java 客户端利用 Service Registry 功能。
用于 serializers/deserializers 的 Kafka 应用程序配置
使用 Kafka 应用程序中的 Service Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器,以及如何在 Kafka 消费者应用程序中配置反序列化器。
Kafka producer 中的序列化器配置示例
Kafka 消费者中的 deserializer 配置示例
8.3.1. 使用 Service Registry 配置 Avro SerDes 复制链接链接已复制到粘贴板!
本主题解释了如何使用 Apache Avro 的 Kafka 客户端序列化器(SerDes)类。
Service Registry 为 Avro 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.avro.AvroKafkaSerializer -
io.apicurio.registry.serde.avro.AvroKafkaDeserializer
配置 Avro serializer
您可以使用以下方法配置 Avro serializer 类:
- Service Registry URL
- 工件解析器策略
- ID 位置
- ID 编码
- Avro datum 供应商
- Avro 编码
ID 位置
作为 Kafka 消息的一部分,序列化器会传递 schema 的唯一 ID,以便消费者使用正确的模式进行反序列化。ID 可以位于消息有效负载中,也可以在消息标头中进行。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:
props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
属性名称为 apicurio.registry.headers.enabled。
ID 编码
您可以在 Kafka 消息正文中传递时,自定义 schema ID 的编码方式。将 apicurio.registry.id-handler 配置属性设置为实施 io.apicurio.registry.serde.IdHandler 接口的类。Service Registry 提供以下实现:
-
io.apicurio.registry.serde.DefaultIdHandler: 将 ID 存储为 8 字节长 -
io.apicurio.registry.serde.Legacy4ByteIdHandler: 将 ID 保存为 4 字节整数
Service Registry 代表 schema ID,但出于旧原因,或者出于与其它 registry 或 SerDe 类的兼容性,您可能想在发送 ID 时使用 4 字节。
Avro datum 供应商
Avro 提供不同的 datum 写入和读取数据。Service Registry 支持三种不同的类型:
- 通用
- 特定
- reflect
Service Registry AvroDatumProvider 是使用其类型的抽象,其中默认使用 DefaultAvroDatumProvider。
您可以设置以下配置选项:
-
apicurio.registry.avro-datum-provider: 指定AvroDatumProvider实施的完全限定域名,如io.apicurio.registry.serde.avro.ReflectAvroDatumProvider -
apicurio.registry.use- specific-avro-reader: 设置为true,在使用DefaultAvroDatumProvider时使用特定类型。
Avro 编码
当使用 Avro 序列化数据时,您可以使用 Avro 二进制编码格式来确保数据以高效格式编码。Avro 还支持将数据编码为 JSON,这有助于检查每条消息的有效负载,例如日志记录或调试。
您可以通过配置值为 JSON 或 BINARY 的 apicurio.registry.avro.encoding 属性来设置 Avro 编码。默认值为 BINARY。
配置 Avro deserializer
您必须将 Avro deserializer 类配置为与 serializer 的以下配置设置匹配:
- Service Registry URL
- ID 编码
- Avro datum 供应商
- Avro 编码
有关这些配置选项,请参阅 serializer 部分。属性名称和值相同。
配置 deserializer 时不需要以下选项:
- 工件解析器策略
- ID 位置
deserializer 类可以从消息中决定这些选项的值。策略并不是必需的,因为序列化器负责发送 ID 作为消息的一部分。
ID 位置是通过检查消息有效负载开始时的 magic 字节来确定的。如果找到该字节,则使用配置的处理程序从消息有效负载中读取 ID。如果没有找到音量字节,则 ID 会从邮件标头读取。
Avro SerDes 和 artifact 引用
在使用 Avro 消息和带有嵌套记录的模式时,会为每个嵌套记录注册一个新的工件。例如,以下 TradeKey 模式包括嵌套的 Exchange 模式:
带有嵌套交换模式的 TradeKey 模式
Exchange schema
当将这些方案与 Avro SerDes 搭配使用时,在 Service Registry 中创建两个工件,一个用于 TradeKey 模式,另一个用于 Exchange 模式。每当使用 TradeKey 模式的消息被序列化或反序列化时,都会检索这两个模式,以便您将定义分成不同的文件中。