8.3. 如何配置不同的客户端序列化器/反序列化器类型
当在 Kafka 客户端应用程序中使用 schema 时,您必须根据您的用例选择要使用的特定模式类型。Service Registry 为 Apache Avro、JSON Schema 和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何将 Kafka 应用程序配置为使用每种类型。
您还可以使用 Kafka 实现自定义序列化器和反序列化器类,并使用 Service Registry REST Java 客户端利用 Service Registry 功能。
serializers/deserializers 的 Kafka 应用程序配置
使用 Kafka 应用程序中的 Service Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器以及如何在 Kafka 消费者应用程序中配置反序列化器。
Kafka producer 中的序列化器配置示例
Kafka consumer 中的 deserializer 配置示例
8.3.1. 使用 Service Registry 配置 Avro SerDe 复制链接链接已复制到粘贴板!
本节解释了如何对 Apache Avro 使用 Kafka 客户端序列化器和反序列化器(SerDes)类。
Service Registry 为 Avro 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.avro.AvroKafkaSerializer
-
io.apicurio.registry.serde.avro.AvroKafkaDeserializer
配置 Avro serializer
您可以使用以下方法配置 Avro serializer 类:
- Service Registry URL
- 工件解析器策略
- ID 位置
- ID 编码
- avro datum 供应商
- avro 编码
ID 位置
serializer 将 schema 的唯一 ID 作为 Kafka 消息的一部分传递,以便消费者可以使用正确的模式进行反序列化。ID 可以在消息有效负载或消息标头中。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:
props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
属性名称为 apicurio.registry.headers.enabled
。
ID 编码
您可以在 Kafka 消息正文中传递模式 ID 时对其进行编码。将 apicurio.registry.id-handler
配置属性设置为实施 io.apicurio.registry.serde.IdHandler
接口的类。Service Registry 提供以下实现:
-
io.apicurio.registry.serde.DefaultIdHandler
: 将 ID 存储为 8 字节长 -
io.apicurio.registry.serde.Legacy4ByteIdHandler
: 将 ID 存储为 4 字节整数
Service Registry 代表模式 ID,但由于旧原因,或者由于与其他 registry 或 SerDe 类兼容,您可能需要在发送 ID 时使用 4 字节。
avro datum 供应商
avro 提供不同的 datum writers 和 readers 来写入和读取数据。Service Registry 支持三种不同类型的:
- generic
- specific
- reflect
Service Registry AvroDatumProvider
是使用哪个类型的抽象,默认为使用 DefaultAvroDatumProvider
。
您可以设置以下配置选项:
-
apicurio.registry.avro-datum-provider
:指定AvroDatumProvider
实现的完全限定 Java 类名称,如io.apicurio.registry.serde.avro.ReflectAvroDatumProvider
-
apicurio.registry.use-specific-avro-reader
: 设置为true
,在使用DefaultAvroDatumProvider
时使用特定类型的
avro 编码
当使用 Avro 来序列化数据时,您可以使用 Avro 二进制编码格式来确保数据以尽可能有效的格式进行编码。avro 还支持将数据编码为 JSON,这有助于检查每个消息的有效负载,如日志记录或调试。
您可以通过将 apicurio.registry.avro.encoding
属性配置为 JSON
或 BINARY
来设置 Avro 编码。默认值为 BINARY
。
配置 Avro deserializer
您必须配置 Avro deserializer 类,以匹配序列化器的以下配置设置:
- Service Registry URL
- ID 编码
- avro datum 供应商
- avro 编码
有关这些配置选项,请参阅 serializer 部分。属性名称和值相同。
配置反序列化器时不需要以下选项:
- 工件解析器策略
- ID 位置
deserializer 类可以从消息确定这些选项的值。该策略不是必需的,因为 serializer 负责发送 ID 作为消息的一部分。
ID 位置是通过在消息有效负载开始时检查 magic 字节来确定的。如果找到该字节,则使用配置的处理程序从消息有效负载中读取该 ID。如果没有找到 magic 字节,则从消息标头中读取 ID。
avro SerDes 和工件引用
当使用 Avro 消息和带有嵌套记录的模式时,会为每个嵌套记录注册一个新的工件。例如,以下 TradeKey
模式包含一个嵌套 交换
模式:
带有嵌套交换模式的 TradeKey 模式
Exchange 模式
当将这些模式与 Avro SerDes 搭配使用时,会在 Service Registry 中创建两个工件,一个用于 TradeKey
模式,一个用于 Exchange
schema。每当使用 TradeKey
模式的消息被序列化或反序列化时,都会检索这两个模式,允许您将定义分成不同的文件。
8.3.2. 使用 Service Registry 配置 JSON 架构 SerDe 复制链接链接已复制到粘贴板!
本节解释了如何对 JSON Schema 使用 Kafka 客户端序列化器和反序列化器(SerDes)类。
Service Registry 为 JSON Schema 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
-
io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer
与 Apache Avro 不同,JSON Schema 不是序列化技术,而是使用验证技术。因此,JSON 架构的配置选项非常不同。例如,没有编码选项,因为数据始终编码为 JSON。
配置 JSON 架构序列化器
您可以配置 JSON Schema serializer 类,如下所示:
- Service Registry URL
- 工件解析器策略
- 模式验证
唯一的非标准配置属性是 JSON Schema 验证,默认是启用的。您可以通过将 apicurio.registry.serde.validation-enabled
设置为 "false"
来禁用此功能。例如:
props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)
props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)
配置 JSON 架构反序列化器
您可以配置 JSON Schema 反序列化器类,如下所示:
- Service Registry URL
- 模式验证
- 用于取消序列化数据的类
您必须提供 Service Registry 的位置,以便可以加载 schema。其他配置是可选的。
只有在 serializer 传递了 Kafka 消息中的全局 ID 时,反序列化器验证才能正常工作,这只有在 serializer 中启用了验证时才有效。
JSON 架构 SerDes 和工件引用
JSON Schema SerDes 无法从消息有效负载发现模式,因此必须先注册 schema 工件,这也应用工件引用。
根据架构的内容,如果 $ref
值是一个 URL,SerDes 会尝试使用该 URL 解析引用的模式,然后验证可以正常工作,针对主模式验证数据,并根据嵌套模式验证嵌套值。另外还实现了对在 Service Registry 中引用工件的支持。
例如,以下 Federal .json
模式引用 city.json
模式:
为 city.json schema 的引用,并引用 city.json schema
city.json schema
在本例中,给定的公民具有城市。在 Service Registry 中,使用名称 city.json
创建对城市工件的引用。在 SerDes 中,当获取统计形模式时,也会获取城市模式,因为它被有意引用。序列化/解码数据时,引用名称用于解析嵌套模式,允许根据大众模式和嵌套城市模式进行验证。
8.3.3. 使用 Service Registry 配置 Protobuf SerDes 复制链接链接已复制到粘贴板!
本节解释了如何对 Google Protobuf 使用 Kafka 客户端序列化器和反序列化器(SerDes)类。
Service Registry 为 Protobuf 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer
-
io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer
配置 Protobuf serializer
您可以配置 Protobuf serializer 类,如下所示:
- Service Registry URL
- 工件解析器策略
- ID 位置
- ID 编码
- 模式验证
有关这些配置选项的详情,请查看以下部分:
配置 Protobuf deserializer
您必须配置 Protobuf deserializer 类,以匹配序列化器中的以下配置设置:
- Service Registry URL
- ID 编码
配置属性名称和值与序列化器相同。
配置反序列化器时不需要以下选项:
- 工件解析器策略
- ID 位置
deserializer 类可以从消息确定这些选项的值。该策略不是必需的,因为 serializer 负责发送 ID 作为消息的一部分。
ID 位置是通过在消息有效负载开始时检查 magic 字节来确定的。如果找到该字节,则使用配置的处理程序从消息有效负载中读取该 ID。如果没有找到 magic 字节,则从消息标头中读取 ID。
Protobuf deserializer 不会反序列化到确切的 Protobuf 消息实现,而是切换到 DynamicMessage
实例。否则,没有适当的 API。
protobuf SerDes 和工件引用
当使用带有 import
语句的复杂 Protobuf 消息时,导入的 Protobuf 消息会作为单独的工件存储在 Service Registry 中。然后,当 Service Registry 获取检查 Protobuf 消息时,也会检索引用的方案,以便可以检查和序列化完整的消息模式。
例如,以下 table_info.proto
模式文件包含导入的 mode.proto
模式文件:
带有导入 mode.proto 文件的 table_info.proto 文件
mode.proto 文件
在本例中,两个 Protobuf 工件存储在 Service Registry 中,一个用于 TableInfo
,一个用于 Mode
。但是,由于
是 Mode
TableInfo
的一部分,因此当获取每个 TableInfo
以检查 SerDes 的消息时,mode 也返回为 TableInfo
引用的工件。