8.3. 如何配置不同的客户端序列化器/反序列化器类型


在 Kafka 客户端应用程序中使用模式时,您必须选择要使用的特定模式类型,具体取决于您的用例。Apicurio Registry 为 Apache Avro、JSON Schema 和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何将 Kafka 应用程序配置为使用每种类型。

您还可以使用 Kafka 实现自定义序列化器和反序列化器类,并使用 Apicurio Registry REST Java 客户端利用 Apicurio Registry 功能。

serializers/deserializers 的 Kafka 应用程序配置

在 Kafka 应用程序中使用 Apicurio Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器,以及如何在 Kafka 消费者应用程序中配置反序列化器。

Kafka producer 中的序列化器配置示例

// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use Apicurio Registry-provided Kafka serializer for Avro
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // Register the schema artifact if not found in the registry.
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}
Copy to Clipboard Toggle word wrap

Kafka consumer 中的 deserializer 配置示例

// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use Apicurio Registry-provided Kafka deserializer for Avro
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // No other configuration needed because the schema globalId the deserializer uses is sent
    // in the payload. The deserializer extracts the globalId and uses it to look up the schema
    // from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}
Copy to Clipboard Toggle word wrap

8.3.1. 使用 Apicurio Registry 配置 Avro SerDe

本主题解释了如何为 Apache Avro 使用 Kafka 客户端序列化器和反序列化器(SerDes)类。

Apicurio Registry 为 Avro 提供以下 Kafka 客户端 SerDes 类:

  • io.apicurio.registry.serde.avro.AvroKafkaSerializer
  • io.apicurio.registry.serde.avro.AvroKafkaDeserializer

配置 Avro serializer

您可以使用以下方法配置 Avro serializer 类:

  • Apicurio Registry URL
  • 工件解析器策略
  • ID 位置
  • ID 编码
  • avro datum 供应商
  • avro 编码

ID 位置

serializer 传递模式的唯一 ID 作为 Kafka 消息的一部分,以便使用者可以使用正确的模式来反序列化。ID 可以在消息有效负载或消息标头中。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:

props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
Copy to Clipboard Toggle word wrap

属性名称为 apicurio.registry.headers.enabled

ID 编码

您可以在 Kafka 消息正文传递时自定义模式 ID 的编码方式。将 apicurio.registry.id-handler 配置属性设置为实现 io.apicurio.registry.serde.IdHandler 接口的类。Apicurio Registry 提供以下实现:

  • io.apicurio.registry.serde.Default4ByteIdHandler: 将 ID 存储为 4 字节长
  • io.apicurio.registry.serde.Legacy8ByteIdHandler: 将 ID 存储为 8 字节整数

Apicurio Registry 长期代表模式 ID,但出于旧原因,或者出于与其他 registry 或 SerDe 类兼容,您可能需要在发送 ID 时使用 4 字节。

avro datum 供应商

avro 提供不同的 datum 写入器和读者,用于写入和读取数据。Apicurio Registry 支持三种不同类型的类型:

  • generic
  • 特定
  • reflect

Apicurio Registry AvroDatumProvider 是使用类型的抽象,其中 DefaultAvroDatumProvider 被默认使用。

您可以设置以下配置选项:

  • apicurio.registry.avro-datum-provider: 指定 AvroDatumProvider 实现的完全限定的 Java 类名称,如 io.apicurio.registry.serde.avro.ReflectAvroDatumProvider
  • apicurio.registry.use-specific-avro-reader: 设置为 true,以便在使用 DefaultAvroDatumProvider时使用特定类型的

avro 编码

当使用 Avro 序列化数据时,您可以使用 Avro 二进制编码格式来确保数据编码为尽可能有效的格式。avro 还支持将数据编码为 JSON,这有助于检查每个消息的有效负载,例如用于日志记录或调试。

您可以通过将 apicurio.registry.avro.encoding 属性配置为 JSONBINARY来设置 Avro 编码。默认值为 BINARY

配置 Avro deserializer

您必须配置 Avro deserializer 类,以匹配序列化器的以下配置设置:

  • Apicurio Registry URL
  • ID 编码
  • avro datum 供应商
  • avro 编码

有关这些配置选项,请参阅 serializer 部分。属性名称和值是相同的。

注意

配置反序列化器时不需要以下选项:

  • 工件解析器策略
  • ID 位置

deserializer 类可以从消息中决定这些选项的值。不需要该策略,因为 serializer 负责发送 ID 作为消息的一部分。

ID 位置是通过在消息有效负载开始时检查 magic 字节来确定的。如果找到该字节,则使用配置的处理程序从消息有效负载中读取 ID。如果没有找到 magic 字节,则会从消息标头中读取 ID。

avro SerDes 和工件引用

在使用 Avro 消息和带有嵌套记录的模式时,会为每个嵌套记录注册一个新的工件。例如,以下 TradeKey 模式包括嵌套的 Exchange 模式:

带有嵌套交换模式的 TradeKey 模式

{
  "namespace": "com.kubetrade.schema.trade",
  "type": "record",
  "name": "TradeKey",
  "fields": [
    {
      "name": "exchange",
      "type": "com.kubetrade.schema.common.Exchange"
    },
    {
      "name": "key",
      "type": "string"
    }
  ]
}
Copy to Clipboard Toggle word wrap

Exchange 模式

{
  "namespace": "com.kubetrade.schema.common",
  "type": "enum",
  "name": "Exchange",
  "symbols" : ["GEMINI"]
}
Copy to Clipboard Toggle word wrap

当将这些模式与 Avro SerDes 搭配使用时,会在 Apicurio Registry 中创建两个工件,一个用于 TradeKey 模式,一个用于 Exchange 模式。每当使用 TradeKey 模式的消息被序列化或反序列化时,会检索这两个模式,允许您将定义分成不同的文件中。

8.3.2. 使用 Apicurio Registry 配置 JSON 架构 SerDe

本主题解释了如何将 Kafka 客户端序列化器和反序列化器(SerDes)类用于 JSON Schema。

Apicurio Registry 为 JSON Schema 提供以下 Kafka 客户端 SerDes 类:

  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer

与 Apache Avro 不同,JSON 架构不是序列化技术,而是验证技术。因此,JSON 架构的配置选项略有不同。例如,没有编码选项,因为数据始终编码为 JSON。

配置 JSON 架构序列化器

您可以配置 JSON 架构 serializer 类,如下所示:

  • Apicurio Registry URL
  • 工件解析器策略
  • 模式验证

唯一的非标准配置属性是 JSON Schema 验证,默认是启用的。您可以通过将 apicurio.registry.serde.validation-enabled 设置为 "false" 来禁用此功能。例如:

props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)
Copy to Clipboard Toggle word wrap

配置 JSON 架构反序列化器

您可以配置 JSON Schema deserializer 类,如下所示:

  • Apicurio Registry URL
  • 模式验证
  • 用于取消序列化数据的类

您必须提供 Apicurio Registry 的位置,以便可以载入 schema。其他配置是可选的。

注意

只有在序列化器通过 Kafka 消息中的全局 ID 时,反序列化器验证才能正常工作,这只有在 serializer 中启用了验证时才会发生。

JSON 架构 SerDe 和工件引用

JSON Schema SerDes 无法从消息有效负载发现架构,因此必须事先注册 schema 工件,这也应用工件引用。

根据架构的内容,如果 $ref 值是一个 URL,SerDes 会尝试使用该 URL 解析引用的模式,然后验证可以正常工作,根据主模式验证数据,并根据嵌套模式验证嵌套值。还实现了对 Apicurio Registry 中引用工件的支持。

例如,以下对 city .json 模式的引用是 city.json 模式:

带有 city.json schema 的参考信息.json 模式

{
 "$id": "https://example.com/citizen.schema.json",
 "$schema": "http://json-schema.org/draft-07/schema#",
 "title": "Citizen",
 "type": "object",
 "properties": {
   "firstName": {
     "type": "string",
     "description": "The citizen's first name."
   },
   "lastName": {
     "type": "string",
     "description": "The citizen's last name."
   },
   "age": {
     "description": "Age in years which must be equal to or greater than zero.",
     "type": "integer",
     "minimum": 0
   },
   "city": {
     "$ref": "city.json"
   }
 }
}
Copy to Clipboard Toggle word wrap

city.json schema

{
 "$id": "https://example.com/city.schema.json",
 "$schema": "http://json-schema.org/draft-07/schema#",
 "title": "City",
 "type": "object",
 "properties": {
   "name": {
     "type": "string",
     "description": "The city's name."
   },
   "zipCode": {
     "type": "integer",
     "description": "The zip code.",
     "minimum": 0
   }
 }
}
Copy to Clipboard Toggle word wrap

在本例中,给定的会面体上有一个城市。在 Apicurio Registry 中,使用名称 city.json 创建对城市工件的引用。在 SerDes (SerDes)模式被获取时,也会获取 city 模式,因为它是从depoint 模式引用的。当序列化/撤销数据时,引用名称用于解析嵌套的模式,允许对量级模式和嵌套城市模式进行验证。

8.3.3. 使用 Apicurio Registry 配置 Protobuf SerDe

本主题解释了如何为 Google Protobuf 使用 Kafka 客户端序列化器和反序列化器(SerDes)类。

Apicurio Registry 为 Protobuf 提供以下 Kafka 客户端 SerDes 类:

  • io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer
  • io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer

配置 Protobuf serializer

您可以配置 Protobuf serializer 类,如下所示:

  • Apicurio Registry URL
  • 工件解析器策略
  • ID 位置
  • ID 编码
  • 模式验证

有关这些配置选项的详情,请查看以下部分:

配置 Protobuf deserializer

您必须配置 Protobuf deserializer 类,以匹配序列化器中的以下配置设置:

  • Apicurio Registry URL
  • ID 编码

配置属性名称和值与 serializer 相同。

注意

配置反序列化器时不需要以下选项:

  • 工件解析器策略
  • ID 位置

deserializer 类可以从消息中决定这些选项的值。不需要该策略,因为 serializer 负责发送 ID 作为消息的一部分。

ID 位置是通过在消息有效负载开始时检查 magic 字节来确定的。如果找到该字节,则使用配置的处理程序从消息有效负载中读取 ID。如果没有找到 magic 字节,则会从消息标头中读取 ID。

注意

Protobuf deserializer 不会反序列化到确切的 Protobuf 消息实现,而是切换到 DynamicMessage 实例。否则,没有适当的 API 来做。

protobuf SerDes 和工件引用

当使用 import 语句的复杂 Protobuf 消息时,导入的 Protobuf 消息作为单独的工件存储在 Apicurio Registry 中。然后,当 Apicurio Registry 获取检查 Protobuf 消息的主要模式时,也会检索引用的方案,以便检查并序列化完整的消息模式。

例如,以下 table_info.proto 模式文件包括导入的 mode.proto schema 文件:

带有导入的 mode.proto 文件的 table_info.proto 文件

syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;

import "sample/mode.proto";

message TableInfo {

 int32 winIndex = 1;
 Mode mode = 2;
 int32 min = 3;
 int32 max = 4;
 string id = 5;
 string dataAdapter = 6;
 string schema = 7;
 string selector = 8;
 string subscription_id = 9;
}
Copy to Clipboard Toggle word wrap

mode.proto file

syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;

enum Mode {

MODE_UNKNOWN = 0;
RAW = 1;
MERGE = 2;
DISTINCT = 3;
COMMAND = 4;
}
Copy to Clipboard Toggle word wrap

在本例中,两个 Protobuf 工件存储在 Apicurio Registry 中,一个用于 TableInfo,一个用于 Mode。但是,由于 ModeTableInfo 的一部分,因此当获取每个 TableInfo 以检查 SerDes 中的消息时,Mode 也作为 TableInfo 引用的构件返回。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat