8.3. 如何配置不同的客户端 serializer/deserializer 类型
在 Kafka 客户端应用程序中使用 schema 时,您必须根据自己的用例选择要使用的特定模式类型。Apicurio Registry 为 Apache Avro、JSON Schema 和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何配置 Kafka 应用程序以使用每种类型。
您还可以使用 Kafka 实现自定义 serializer 和 deserializer 类,并使用 Apicurio Registry REST Java 客户端使用 Apicurio Registry 功能。
用于 serializers/deserializers 的 Kafka 应用程序配置
使用 Kafka 应用程序中的 Apicurio Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器,以及如何在 Kafka 使用者应用程序中配置反序列化器。
Kafka producer 中的 serializer 配置示例
// Create the Kafka producer private static Producer<Object, Object> createKafkaProducer() { Properties props = new Properties(); // Configure standard Kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME); props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); // Use Apicurio Registry-provided Kafka serializer for Avro props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); // Configure the Apicurio Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); // Register the schema artifact if not found in the registry. props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE); // Create the Kafka producer Producer<Object, Object> producer = new KafkaProducer<>(props); return producer; }
Kafka consumer 中的 deserializer 配置示例
// Create the Kafka consumer private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() { Properties props = new Properties(); // Configure standard Kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME); props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use Apicurio Registry-provided Kafka deserializer for Avro props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName()); // Configure the Apicurio Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); // No other configuration needed because the schema globalId the deserializer uses is sent // in the payload. The deserializer extracts the globalId and uses it to look up the schema // from the registry. // Create the Kafka consumer KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props); return consumer; }
其他资源
- 如需示例应用程序,请参阅 Simple Avro 示例
8.3.1. 使用 Apicurio Registry 配置 Avro SerDes
本主题解释了如何使用 Apache Avro 的 Kafka 客户端序列化器(SerDes)类。
Apicurio Registry 为 Avro 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.avro.AvroKafkaSerializer
-
io.apicurio.registry.serde.avro.AvroKafkaDeserializer
配置 Avro serializer
您可以使用以下方法配置 Avro serializer 类:
- Apicurio Registry URL
- 工件解析器策略
- ID 位置
- ID 编码
- Avro datum 供应商
- Avro 编码
ID 位置
序列化器将模式的唯一 ID 作为 Kafka 消息的一部分传递,以便消费者使用正确的模式进行反序列化。ID 可以位于消息有效负载中,也可以在邮件标头中使用。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:
props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
属性名称是 apicurio.registry.headers.enabled
。
ID 编码
您可以在 Kafka 消息正文中传递时自定义如何编码 schema ID。将 apicurio.registry.id-handler
配置属性设置为实现 io.apicurio.registry.serde.IdHandler
接口的类。Apicurio Registry 提供以下实现:
-
io.apicurio.registry.serde.DefaultIdHandler
: 将 ID 存储为 8 字节长 -
io.apicurio.registry.serde.Legacy4ByteIdHandler
: 将 ID 存储为 4 字节整数
Apicurio Registry 代表一个长的模式 ID,但出于传统原因,或者出于与其他 registry 或 SerDe 类的兼容性,您可能需要在发送 ID 时使用 4 字节。
Avro datum 供应商
Avro 提供不同的 datum 写入器和读取器来写入和读取数据。Apicurio Registry 支持三种不同的类型:
- generic
- 具体
- 反射
Apicurio Registry AvroDatumProvider
是使用哪个类型的抽象,默认为使用 DefaultAvroDatumProvider
。
您可以设置以下配置选项:
-
apicurio.registry.avro-datum-provider
:指定AvroDatumProvider
实施的完全限定域名,如io.apicurio.registry.serde.avro.ReflectAvroDatumProvider
-
apicurio.registry.use-specific-avro-reader
: Set totrue
以使用DefaultAvroDatumProvider
特定类型
Avro 编码
使用 Avro 对数据进行序列化时,您可以使用 Avro 二进制编码格式确保以尽可能高效的方式编码数据。Avro 还支持将数据编码为 JSON,这有助于检查每条消息的载荷,例如用于日志记录或调试。
您可以通过配置 apicurio.registry.avro.encoding
属性来设置 Avro 编码,值为 JSON
或 BINARY
。默认值为 BINARY
。
配置 Avro deserializer
您必须配置 Avro deserializer 类,以匹配 serializer 的以下配置设置:
- Apicurio Registry URL
- ID 编码
- Avro datum 供应商
- Avro 编码
有关这些配置选项,请参阅 serializer 部分。属性名称和值相同。
配置 deserializer 时不需要以下选项:
- 工件解析器策略
- ID 位置
deserializer 类可以决定消息中这些选项的值。不需要该策略,因为 serializer 负责将 ID 作为消息的一部分发送。
ID 位置通过检查消息有效负载开头的 magic 字节来确定。如果找到了字节,则 ID 会使用配置的处理程序从消息有效负载中读取。如果没有找到 magic byte,则会从邮件标头中读取该 ID。
Avro SerDes 和 artifact 引用
在使用 Avro 消息和带有嵌套记录的模式时,会为每个嵌套记录注册一个新的工件。例如,以下 TradeKey
模式包括嵌套的 Exchange
模式:
带有嵌套交换模式的 TradeKey 模式
{ "namespace": "com.kubetrade.schema.trade", "type": "record", "name": "TradeKey", "fields": [ { "name": "exchange", "type": "com.kubetrade.schema.common.Exchange" }, { "name": "key", "type": "string" } ] }
Exchange schema
{ "namespace": "com.kubetrade.schema.common", "type": "enum", "name": "Exchange", "symbols" : ["GEMINI"] }
当将这些方案与 Avro SerDes 搭配使用时,在 Apicurio Registry 中创建两个工件,一个用于 TradeKey
模式,另一个用于 Exchange
模式。每当使用 TradeKey
模式的消息被序列化或反序列化时,都会检索这两个模式,以便您将定义分成不同的文件中。
其他资源
- 有关 Avro 配置的详情,请查看 AvroKafkaSerdeConfig Java 类
对于 Java 示例应用程序,请参阅:
8.3.2. 使用 Apicurio Registry 配置 JSON Schema SerDes
本主题解释了如何使用 Kafka 客户端序列化r 和 deserializer (SerDes)类用于 JSON Schema。
Apicurio Registry 为 JSON Schema 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
-
io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer
与 Apache Avro 不同,JSON Schema 不是序列化技术,而是验证技术。因此,JSON Schema 的配置选项并不相同。例如,没有编码选项,因为数据始终被编码为 JSON。
配置 JSON Schema serializer
您可以配置 JSON Schema serializer 类,如下所示:
- Apicurio Registry URL
- 工件解析器策略
- 模式验证
唯一非标准配置属性是 JSON Schema 验证(默认为启用)。您可以通过将 apicurio.registry.serde.validation-enabled
设置为 "false"
来禁用此功能。例如:
props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)
配置 JSON Schema deserializer
您可以配置 JSON Schema deserializer 类,如下所示:
- Apicurio Registry URL
- 模式验证
- 用于对数据进行反序列化的类
您必须提供 Apicurio Registry 的位置,以便可以加载 schema。其他配置是可选的。
只有在 serializer 传递 Kafka 消息中的全局 ID 时,仅当 serializer 中启用了验证时才会发生,反序列化验证才可以正常工作。
JSON Schema SerDes 和 artifact 引用
JSON Schema SerDes 无法从消息有效负载发现 schema,因此必须预先注册架构工件,这也会应用工件引用。
根据方案的内容,如果 $ref
值是 URL,则 SerDes 会尝试使用该 URL 解析引用的 schema,然后验证针对主架构的数据,并对主模式验证嵌套值。还实施了对在 Apicurio Registry 中引用工件的支持。
例如,以下 citizen.json
模式引用 city.json
模式:
公民.json 模式,并参考 city.json 模式
{ "$id": "https://example.com/citizen.schema.json", "$schema": "http://json-schema.org/draft-07/schema#", "title": "Citizen", "type": "object", "properties": { "firstName": { "type": "string", "description": "The citizen's first name." }, "lastName": { "type": "string", "description": "The citizen's last name." }, "age": { "description": "Age in years which must be equal to or greater than zero.", "type": "integer", "minimum": 0 }, "city": { "$ref": "city.json" } } }
City.json 模式
{ "$id": "https://example.com/city.schema.json", "$schema": "http://json-schema.org/draft-07/schema#", "title": "City", "type": "object", "properties": { "name": { "type": "string", "description": "The city's name." }, "zipCode": { "type": "integer", "description": "The zip code.", "minimum": 0 } } }
在本例中,给定公民具有城市。在 Apicurio Registry 中,使用名称 city.json
创建对城市工件的引用的公民工件。在 SerDes 中,当公民架构获取时,也会获取城市架构,因为它是从公民架构中引用的。在对数据进行序列化/反序列化数据时,使用引用名称来解决嵌套模式,从而允许针对市民和嵌套城市模式进行验证。
其他资源
- 如需了解更多详细信息,请参阅 JsonSchemaKafkaDeserializerConfig Java 类
对于 Java 示例应用程序,请参阅:
8.3.3. 使用 Apicurio Registry 配置 Protobuf SerDes
本主题解释了如何在 Google Protobuf 使用 Kafka 客户端序列化r 和 deserializer (SerDes)类。
Apicurio Registry 为 Protobuf 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer
-
io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer
配置 Protobuf serializer
您可以按照如下所示配置 Protobuf serializer 类:
- Apicurio Registry URL
- 工件解析器策略
- ID 位置
- ID 编码
- 模式验证
有关这些配置选项的详情,请查看以下部分:
配置 Protobuf deserializer
您必须配置 Protobuf deserializer 类,以匹配 serializer 中的以下配置设置:
- Apicurio Registry URL
- ID 编码
配置属性名称和值与 serializer 相同。
配置 deserializer 时不需要以下选项:
- 工件解析器策略
- ID 位置
deserializer 类可以决定消息中这些选项的值。不需要该策略,因为 serializer 负责将 ID 作为消息的一部分发送。
ID 位置通过检查消息有效负载开头的 magic 字节来确定。如果找到了字节,则 ID 会使用配置的处理程序从消息有效负载中读取。如果没有找到 magic byte,则会从邮件标头中读取该 ID。
Protobuf deserializer 不会反序列化到确切的 Protobuf 消息实现,而是切换到 DynamicMessage
实例。否则,没有适当的 API。
protobuf SerDes 和 artifact 引用
当使用 导入
语句的复杂 Protobuf 消息时,导入的 Protobuf 消息会作为单独的工件存储在 Apicurio Registry 中。然后,当 Apicurio Registry 获取主要 schema 来检查 Protobuf 消息时,还会检索引用的方案,以便可以检查和序列化的完整消息模式。
例如,下表_info.proto
模式文件包含导入的 mode.proto
schema 文件:
带有导入的 .mode.proto 文件的 table_info.proto 文件
syntax = "proto3"; package sample; option java_package = "io.api.sample"; option java_multiple_files = true; import "sample/mode.proto"; message TableInfo { int32 winIndex = 1; Mode mode = 2; int32 min = 3; int32 max = 4; string id = 5; string dataAdapter = 6; string schema = 7; string selector = 8; string subscription_id = 9; }
mode.proto file
syntax = "proto3"; package sample; option java_package = "io.api.sample"; option java_multiple_files = true; enum Mode { MODE_UNKNOWN = 0; RAW = 1; MERGE = 2; DISTINCT = 3; COMMAND = 4; }
在本例中,两个 Protobuf 工件存储在 Apicurio Registry 中,一个用于 TableInfo
,另一个用于 Mode
。但是,由于 Mode
是 表Info
的一部分,因此当获取 表Info
来检查 SerDes 中的消息时,模式
也会作为 表Info
引用的工件返回。
其他资源
对于 Java 示例应用程序,请参阅: