8.3. 如何配置不同的客户端 serializer/deserializer 类型
在 Kafka 客户端应用程序中使用 schema 时,您必须根据自己的用例选择要使用的特定模式类型。Service Registry 为 Apache Avro、JSON Schema 和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何配置 Kafka 应用程序以使用每种类型。
您还可以使用 Kafka 实现自定义序列化器和反序列化类,并使用 Service Registry REST Java 客户端利用 Service Registry 功能。
用于 serializers/deserializers 的 Kafka 应用程序配置
使用 Kafka 应用程序中的 Service Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器,以及如何在 Kafka 使用者应用程序中配置反序列化器。
Kafka producer 中的 serializer 配置示例
// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
Properties props = new Properties();
// Configure standard Kafka settings
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
// Use Service Registry-provided Kafka serializer for Avro
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
// Configure the Service Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
// Register the schema artifact if not found in the registry.
props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
// Create the Kafka producer
Producer<Object, Object> producer = new KafkaProducer<>(props);
return producer;
}
Kafka consumer 中的 deserializer 配置示例
// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
Properties props = new Properties();
// Configure standard Kafka settings
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use Service Registry-provided Kafka deserializer for Avro
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
// Configure the Service Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
// No other configuration needed because the schema globalId the deserializer uses is sent
// in the payload. The deserializer extracts the globalId and uses it to look up the schema
// from the registry.
// Create the Kafka consumer
KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
return consumer;
}
8.3.1. 使用 Service Registry 配置 Avro SerDes 复制链接链接已复制到粘贴板!
本主题解释了如何使用 Apache Avro 的 Kafka 客户端序列化器(SerDes)类。
Service Registry 为 Avro 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.avro.AvroKafkaSerializer -
io.apicurio.registry.serde.avro.AvroKafkaDeserializer
配置 Avro serializer
您可以使用以下方法配置 Avro serializer 类:
- Service Registry URL
- 工件解析器策略
- ID 位置
- ID 编码
- Avro datum 供应商
- Avro 编码
ID 位置
序列化器将模式的唯一 ID 作为 Kafka 消息的一部分传递,以便消费者使用正确的模式进行反序列化。ID 可以位于消息有效负载中,也可以在邮件标头中使用。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:
props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
属性名称是 apicurio.registry.headers.enabled。
ID 编码
您可以在 Kafka 消息正文中传递时自定义如何编码 schema ID。将 apicurio.registry.id-handler 配置属性设置为实现 io.apicurio.registry.serde.IdHandler 接口的类。Service Registry 提供以下实现:
-
io.apicurio.registry.serde.DefaultIdHandler: 将 ID 存储为 8 字节长 -
io.apicurio.registry.serde.Legacy4ByteIdHandler: 将 ID 存储为 4 字节整数
Service Registry 长期代表模式 ID,但出于传统原因,或者出于与其他 registry 或 SerDe 类的兼容性,在发送 ID 时,您可能需要使用 4 字节。
Avro datum 供应商
Avro 提供不同的 datum 写入器和读取器来写入和读取数据。服务 Registry 支持三种不同的类型:
- 通用
- 具体
- 反射
Service Registry AvroDatumProvider 是使用哪个类型的抽象,默认为使用 DefaultAvroDatumProvider。
您可以设置以下配置选项:
-
apicurio.registry.avro-datum-provider:指定AvroDatumProvider实施的完全限定域名,如io.apicurio.registry.serde.avro.ReflectAvroDatumProvider -
apicurio.registry.use-specific-avro-reader: Set totrue以使用DefaultAvroDatumProvider特定类型
Avro 编码
使用 Avro 对数据进行序列化时,您可以使用 Avro 二进制编码格式确保以尽可能高效的方式编码数据。Avro 还支持将数据编码为 JSON,这有助于检查每条消息的载荷,例如用于日志记录或调试。
您可以通过配置 apicurio.registry.avro.encoding 属性来设置 Avro 编码,值为 JSON 或 BINARY。默认值为 BINARY。
配置 Avro deserializer
您必须配置 Avro deserializer 类,以匹配 serializer 的以下配置设置:
- Service Registry URL
- ID 编码
- Avro datum 供应商
- Avro 编码
有关这些配置选项,请参阅 serializer 部分。属性名称和值相同。
配置 deserializer 时不需要以下选项:
- 工件解析器策略
- ID 位置
deserializer 类可以决定消息中这些选项的值。不需要该策略,因为 serializer 负责将 ID 作为消息的一部分发送。
ID 位置通过检查消息有效负载开头的 magic 字节来确定。如果找到了字节,则 ID 会使用配置的处理程序从消息有效负载中读取。如果没有找到 magic byte,则会从邮件标头中读取该 ID。
Avro SerDes 和 artifact 引用
在使用 Avro 消息和带有嵌套记录的模式时,会为每个嵌套记录注册一个新的工件。例如,以下 TradeKey 模式包括嵌套的 Exchange 模式:
带有嵌套交换模式的 TradeKey 模式
{
"namespace": "com.kubetrade.schema.trade",
"type": "record",
"name": "TradeKey",
"fields": [
{
"name": "exchange",
"type": "com.kubetrade.schema.common.Exchange"
},
{
"name": "key",
"type": "string"
}
]
}
Exchange schema
{
"namespace": "com.kubetrade.schema.common",
"type": "enum",
"name": "Exchange",
"symbols" : ["GEMINI"]
}
当将这些方案与 Avro SerDes 搭配使用时,在 Service Registry 中创建两个工件,一个用于 TradeKey 模式,另一个用于 Exchange 模式。每当使用 TradeKey 模式的消息被序列化或反序列化时,都会检索这两个模式,以便您将定义分成不同的文件中。
8.3.2. 使用 Service Registry 配置 JSON Schema SerDes 复制链接链接已复制到粘贴板!
本主题解释了如何使用 Kafka 客户端序列化r 和 deserializer (SerDes)类用于 JSON Schema。
Service Registry 为 JSON Schema 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer -
io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer
与 Apache Avro 不同,JSON Schema 不是序列化技术,而是验证技术。因此,JSON Schema 的配置选项并不相同。例如,没有编码选项,因为数据始终被编码为 JSON。
配置 JSON Schema serializer
您可以配置 JSON Schema serializer 类,如下所示:
- Service Registry URL
- 工件解析器策略
- 模式验证
唯一非标准配置属性是 JSON Schema 验证(默认为启用)。您可以通过将 apicurio.registry.serde.validation-enabled 设置为 "false" 来禁用此功能。例如:
props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)
配置 JSON Schema deserializer
您可以配置 JSON Schema deserializer 类,如下所示:
- Service Registry URL
- 模式验证
- 用于对数据进行反序列化的类
您必须提供 Service Registry 的位置,以便可以加载 schema。其他配置是可选的。
只有在 serializer 传递 Kafka 消息中的全局 ID 时,仅当 serializer 中启用了验证时才会发生,反序列化验证才可以正常工作。
JSON Schema SerDes 和 artifact 引用
JSON Schema SerDes 无法从消息有效负载发现 schema,因此必须预先注册架构工件,这也会应用工件引用。
根据方案的内容,如果 $ref 值是 URL,则 SerDes 会尝试使用该 URL 解析引用的 schema,然后验证针对主架构的数据,并对主模式验证嵌套值。还实施了对 Service Registry 中引用工件的支持。
例如,以下 citizen.json 模式引用 city.json 模式:
公民.json 模式,并参考 city.json 模式
{
"$id": "https://example.com/citizen.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Citizen",
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The citizen's first name."
},
"lastName": {
"type": "string",
"description": "The citizen's last name."
},
"age": {
"description": "Age in years which must be equal to or greater than zero.",
"type": "integer",
"minimum": 0
},
"city": {
"$ref": "city.json"
}
}
}
city.json schema
{
"$id": "https://example.com/city.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "City",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The city's name."
},
"zipCode": {
"type": "integer",
"description": "The zip code.",
"minimum": 0
}
}
}
在本例中,给定公民具有城市。在 Service Registry 中,会使用名称 city.json 创建对城市工件的引用的公民工件。在 SerDes 中,当公民架构获取时,也会获取城市架构,因为它是从公民架构中引用的。在对数据进行序列化/反序列化数据时,使用引用名称来解决嵌套模式,从而允许针对市民和嵌套城市模式进行验证。
8.3.3. 使用 Service Registry 配置 Protobuf SerDes 复制链接链接已复制到粘贴板!
本主题解释了如何在 Google Protobuf 使用 Kafka 客户端序列化r 和 deserializer (SerDes)类。
Service Registry 为 Protobuf 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer -
io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer
配置 Protobuf serializer
您可以按照如下所示配置 Protobuf serializer 类:
- Service Registry URL
- 工件解析器策略
- ID 位置
- ID 编码
- 模式验证
有关这些配置选项的详情,请查看以下部分:
配置 Protobuf deserializer
您必须配置 Protobuf deserializer 类,以匹配 serializer 中的以下配置设置:
- Service Registry URL
- ID 编码
配置属性名称和值与 serializer 相同。
配置 deserializer 时不需要以下选项:
- 工件解析器策略
- ID 位置
deserializer 类可以决定消息中这些选项的值。不需要该策略,因为 serializer 负责将 ID 作为消息的一部分发送。
ID 位置通过检查消息有效负载开头的 magic 字节来确定。如果找到了字节,则 ID 会使用配置的处理程序从消息有效负载中读取。如果没有找到 magic byte,则会从邮件标头中读取该 ID。
Protobuf deserializer 不会对您的确切的 Protobuf 消息实施进行反序列化,而是指向 DynamicMessage 实例。否则,没有适当的 API。
protobuf SerDes 和 artifact 引用
当使用 导入 语句的复杂 Protobuf 消息时,导入的 Protobuf 消息会作为单独的工件存储在 Service Registry 中。然后,当 Service Registry 获取主 schema 来检查 Protobuf 消息时,还会检索引用的方案,以便可以检查和序列化的完整消息模式。
例如,下表_info.proto 模式文件包含导入的 mode.proto schema 文件:
带有导入的 .mode.proto 文件的 table_info.proto 文件
syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;
import "sample/mode.proto";
message TableInfo {
int32 winIndex = 1;
Mode mode = 2;
int32 min = 3;
int32 max = 4;
string id = 5;
string dataAdapter = 6;
string schema = 7;
string selector = 8;
string subscription_id = 9;
}
mode.proto 文件
syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;
enum Mode {
MODE_UNKNOWN = 0;
RAW = 1;
MERGE = 2;
DISTINCT = 3;
COMMAND = 4;
}
在本例中,两个 Protobuf 工件存储在 Service Registry 中,一个用于 TableInfo,另一个用于 模式。但是,由于 Mode 是 表Info 的一部分,因此当获取 表Info 来检查 SerDes 中的消息时,模式 也会作为 表Info 引用的工件返回。