第 7 章 在 Java 客户端中使用 serializers/deserializers 验证 Kafka 信息
Service Registry 为 Kafka producer 和使用 Java 编写的使用者应用程序提供客户端 serializers/deserializers (SerDes)。Kafka producer 应用使用 serializers 对符合特定事件架构的消息进行编码。Kafka 使用者应用程序使用反序列化程序根据特定的模式 ID 来验证已使用正确模式序列化消息。这样可确保一致的模式使用,并帮助防止运行时出现数据错误。
本章论述了如何在制作者和消费者应用程序中使用 Kafka 客户端 SerDes:
前提条件
- 您已读取 第 1 章 Service Registry 简介
- 已安装 Service Registry
您已创建了 Kafka producer 和使用者客户端应用程序
有关 Kafka 客户端应用程序的详情,请参阅在 OpenShift 中部署和升级 AMQ Streams。
7.1. Kafka 客户端应用程序和 Service Registry
Service Registry 将 schema 管理与客户端应用程序配置分离。您可以通过在客户端代码中指定其 URL,使 Java 客户端应用程序使用 Service Registry 的 schema。
您可以将架构存储在 registry 中,以序列化和反序列化消息,从客户端应用程序引用它们,以确保它们发送和接收的消息与这些架构兼容。Kafka 客户端应用程序可以在运行时从 Service Registry 中推送或拉取其模式。
架构可以演进,以便您可以在 Service Registry 中定义规则,例如确保 schema 更改有效,且不会破坏应用程序使用的之前版本。Service Registry 通过比较之前的 schema 版本来检查兼容性。
Service Registry 模式技术
服务 Registry 为 schema 技术提供架构 registry 支持,例如:
- Avro
- protobuf
- JSON Schema
客户端应用程序可通过服务 Registry 提供的 SerDes (SerDes)服务使用这些模式技术。Service Registry 提供的 SerDes 类的成熟度和使用可能有所不同。以下章节提供了有关每个模式类型的更多详情。
制作者模式配置
制作者客户端应用使用 serializer 将其发送到特定代理主题的消息放入正确的数据格式。
启用制作者以使用 Service Registry 进行序列化:
- 定义并将您的 schema 注册到 Service Registry (如果尚未存在)。
- Service Registry 的 URL
- 用于消息的 Service Registry 序列化器
- 将 Kafka 消息映射到 Service Registry 中的 schema 工件的策略
- 在 Service Registry 中查找或注册用于序列化的模式的策略
在注册了 schema 后,当启动 Kafka 和 Service Registry 时,您可以访问 schema,以格式化发送到 Kafka 代理主题的消息。另外,根据配置,生产者也可以在第一次使用时自动注册该架构。
如果 schema 已存在,您可以根据 Service Registry 中定义的兼容规则,使用 registry REST API 创建一个新版本。随着架构的演进,版本用于兼容性检查。组 ID、构件 ID 和版本表示标识架构的唯一元组。
消费者模式配置
消费者客户端应用程序使用反序列化器将所使用的消息从特定的代理主题获取到正确的数据格式。
启用使用者使用 Service Registry 进行反序列化:
- 在 Service Registry 中定义并注册您的 schema (如果尚未存在)
- Service Registry 的 URL
- 用于消息的 Service Registry deserializer
- 用于反序列化的输入数据流
使用全局 ID 检索 schema
默认情况下,该架构使用全局 ID 从 Service Registry 检索,该 ID 在被使用的消息中指定。架构全局 ID 可以位于消息标题或消息有效负载中,具体取决于制作者应用的配置。
在查找消息有效负载中的全局 ID 时,数据格式从 magic 字节开始,用作消费者的信号,然后是全局 ID,消息数据正常。例如:
# ... [MAGIC_BYTE] [GLOBAL_ID] [MESSAGE DATA]
然后,当启动 Kafka 和 Service Registry 时,您可以访问 schema 来格式化从 Kafka 代理主题接收的消息。
使用内容 ID 检索 schema
另外,您还可以根据内容 ID (这是工件内容的唯一 ID)从 Service Registry 检索 schema。全局 ID 是工件版本的唯一 ID。
内容 ID 不唯一标识版本,而是仅标识版本内容。如果多个版本共享相同的内容,则它们具有不同的全局 ID,但内容 ID 相同。Confluent Schema Registry 默认使用内容 ID。