8.3. 如何配置不同的客户端 serializer/deserializer 类型


在 Kafka 客户端应用程序中使用 schema 时,您必须根据自己的用例选择要使用的特定模式类型。Apicurio Registry 为 Apache Avro、JSON Schema 和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何配置 Kafka 应用程序以使用每种类型。

您还可以使用 Kafka 实现自定义 serializer 和 deserializer 类,并使用 Apicurio Registry REST Java 客户端使用 Apicurio Registry 功能。

用于 serializers/deserializers 的 Kafka 应用程序配置

使用 Kafka 应用程序中的 Apicurio Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器,以及如何在 Kafka 使用者应用程序中配置反序列化器。

Kafka producer 中的 serializer 配置示例

// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use Apicurio Registry-provided Kafka serializer for Avro
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // Register the schema artifact if not found in the registry.
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}
Copy to Clipboard Toggle word wrap

Kafka consumer 中的 deserializer 配置示例

// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use Apicurio Registry-provided Kafka deserializer for Avro
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // No other configuration needed because the schema globalId the deserializer uses is sent
    // in the payload. The deserializer extracts the globalId and uses it to look up the schema
    // from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}
Copy to Clipboard Toggle word wrap

8.3.1. 使用 Apicurio Registry 配置 Avro SerDe

Apicurio Registry 为 Apache Avro 提供以下 Kafka 客户端序列化器和反序列化类:

  • io.apicurio.registry.serde.avro.AvroKafkaSerializer
  • io.apicurio.registry.serde.avro.AvroKafkaDeserializer

配置 Avro serializer

您可以使用以下方法配置 Avro serializer 类:

  • Apicurio Registry URL
  • 工件解析器策略
  • ID 位置
  • ID 编码
  • Avro datum 供应商
  • Avro 编码

ID 位置

序列化器将模式的唯一 ID 作为 Kafka 消息的一部分传递,以便消费者使用正确的模式进行反序列化。ID 可以位于消息有效负载中,也可以在邮件标头中使用。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:

props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
Copy to Clipboard Toggle word wrap

属性名称是 apicurio.registry.headers.enabled

ID 编码

您可以在 Kafka 消息正文中传递时自定义如何编码 schema ID。将 apicurio.registry.id-handler 配置属性设置为实现 io.apicurio.registry.serde.IdHandler 接口的类。Apicurio Registry 提供以下实现:

  • io.apicurio.registry.serde.DefaultIdHandler: 将 ID 存储为 8 字节长
  • io.apicurio.registry.serde.Legacy4ByteIdHandler: 将 ID 存储为 4 字节整数

Apicurio Registry 代表一个长的模式 ID,但出于传统原因,或者出于与其他 registry 或 SerDe 类的兼容性,您可能需要在发送 ID 时使用 4 字节。

Avro datum 供应商

Avro 提供不同的 datum 写入器和读取器来写入和读取数据。Apicurio Registry 支持三种不同的类型:

  • generic
  • 具体
  • 反射

Apicurio Registry AvroDatumProvider 是使用哪个类型的抽象,默认为使用 DefaultAvroDatumProvider

您可以设置以下配置选项:

  • apicurio.registry.avro-datum-provider :指定 AvroDatumProvider 实施的完全限定域名,如 io.apicurio.registry.serde.avro.ReflectAvroDatumProvider
  • apicurio.registry.use-specific-avro-reader: Set to true 以使用 DefaultAvroDatumProvider特定类型

Avro 编码

使用 Avro 对数据进行序列化时,您可以使用 Avro 二进制编码格式确保以尽可能高效的方式编码数据。Avro 还支持将数据编码为 JSON,这有助于检查每条消息的载荷,例如用于日志记录或调试。

您可以通过配置 apicurio.registry.avro.encoding 属性来设置 Avro 编码,值为 JSONBINARY。默认值为 BINARY

配置 Avro deserializer

您必须配置 Avro deserializer 类,以匹配 serializer 的以下配置设置:

  • Apicurio Registry URL
  • ID 编码
  • Avro datum 供应商
  • Avro 编码

有关这些配置选项,请参阅 serializer 部分。属性名称和值相同。

注意

配置 deserializer 时不需要以下选项:

  • 工件解析器策略
  • ID 位置

deserializer 类可以决定消息中这些选项的值。不需要该策略,因为 serializer 负责将 ID 作为消息的一部分发送。

ID 位置通过检查消息有效负载开头的 magic 字节来确定。如果找到了字节,则 ID 会使用配置的处理程序从消息有效负载中读取。如果没有找到 magic byte,则会从邮件标头中读取该 ID。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat