第 7 章 在 Java 客户端中使用序列化器/反序列化器验证 Kafka 信息
Apicurio Registry 为使用 Java 编写的 Kafka 生成者和消费者应用程序提供客户端序列化器/反序列化器(SerDes)。Kafka 制作者应用程序使用 serializers 对符合特定事件模式的信息进行编码。Kafka 消费者应用程序使用 deserializers 来验证信息是否使用正确的模式序列化,具体取决于特定的模式 ID。这样可确保使用一致的模式,并有助于防止运行时的数据错误。
本章解释了如何在生成者和消费者客户端应用程序中使用 Kafka 客户端 SerDe :
先决条件
- 您已阅了读 第 1 章 Apicurio Registry 简介。
- 已安装 Apicurio Registry。
您已创建了 Kafka producer 和消费者客户端应用程序。
有关 Kafka 客户端应用程序的详情,请参阅在 OpenShift 中部署和管理 AMQ Streams。
7.1. Kafka 客户端应用程序和 Apicurio Registry 复制链接链接已复制到粘贴板!
Apicurio Registry 将模式管理与客户端应用程序配置分离。您可以通过在客户端代码中指定 URL 来启用 Java 客户端应用程序以使用 Apicurio Registry 中的模式。
您可以将 schema 存储在 Apicurio Registry 中,以序列化和反序列化消息,这些消息从客户端应用程序引用,以确保它们发送和接收的消息与这些模式兼容。Kafka 客户端应用程序可以在运行时从 Apicurio Registry 中推送或拉取其模式。
模式可以演进,以便在 Apicurio Registry 中定义规则,例如,确保 schema 更改有效,且不会破坏应用程序使用以前的版本。Apicurio Registry 通过将修改的模式与以前的模式进行比较来检查兼容性。
Apicurio Registry 模式技术
Apicurio Registry 为模式技术提供模式 registry 支持,例如:
- Avro
- protobuf
- JSON 架构
这些模式技术可通过 Apicurio Registry 提供的 Kafka 客户端序列化/反序列化器(SerDes)服务使用。Apicurio Registry 提供的 SerDes 类的成熟度和使用可能不同。以下章节详细介绍了每种模式类型。
producer 模式配置
制作者客户端应用程序使用序列化器将发送到特定代理主题的消息放入正确的数据格式。
启用制作者以使用 Apicurio Registry 进行序列化:
- 使用 Apicurio Registry 定义并注册您的 schema (如果它尚不存在)。
使用以下命令配置制作者 客户端代码:
- Apicurio Registry 的 URL
- Apicurio Registry serializer 用于消息
- 将 Kafka 消息映射到 Apicurio Registry 中的模式工件的策略
- 在 Apicurio Registry 中查找或注册用于序列化的模式的策略
注册 schema 后,当您启动 Kafka 和 Apicurio Registry 时,您可以访问 schema 来格式化发送到 Kafka 代理主题的信息。或者,根据配置,生成者也可以在首次使用时自动注册模式。
如果 schema 已存在,您可以根据 Apicurio Registry 中定义的兼容性规则使用 registry REST API 创建新版本。随着模式的演进,版本用于兼容性检查。组 ID、工件 ID 和版本代表用于标识模式的唯一元组。
consumer 模式配置
消费者客户端应用程序使用反序列化器将来自特定代理主题使用的消息提取到正确的数据格式。
启用消费者以使用 Apicurio Registry 进行反序列化:
- 使用 Apicurio Registry 定义并注册您的 schema (如果不存在)
- Apicurio Registry 的 URL
- 用于消息的 Apicurio Registry deserializer
- 用于序列化的输入数据流
使用内容 ID 检索模式
默认情况下,模式使用内容 ID 从 Apicurio Registry 检索(这是对工件版本 内容 的唯一 ID,但不对版本本身是唯一的),它在被消耗的消息中指定。架构内容 ID 可以位于消息标头或消息有效负载中,具体取决于制作者应用的配置。默认情况下,内容 ID 将位于消息正文中。
在消息有效负载中查找内容 ID 时,数据的格式以 magic 字节开头,用作消费者的信号,后跟内容 ID,以及消息数据。例如:
...
# ...
[MAGIC_BYTE]
[CONTENT_ID]
[MESSAGE DATA]
然后,当您启动 Kafka 和 Apicurio Registry 时,您可以访问 schema 来格式化从 Kafka 代理主题收到的消息。
使用全局 ID 检索模式
或者,您可以将 配置为根据全局 ID 从 Apicurio Registry 检索模式,这是工件版本的唯一 ID。使用全局 ID 而不是 contentID 时,可以使用相同的选项。您可以在消息标头或消息正文(默认)中发送全局 ID。
在消息有效负载中查找全局 ID 时,数据的格式以魔法字节开头,用作消费者的信号,后跟全局 ID,以及消息数据正常。例如:
...
# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]