Apicurio Registry 用户指南


Red Hat build of Apicurio Registry 2.0

使用 Apicurio Registry 2.0

摘要

本指南介绍了 Apicurio Registry,并解释了如何使用 Apicurio Registry Web 控制台、REST API、Maven 插件或 Java 客户端管理事件 schema 和 API 设计。本指南还介绍了如何在 Java 用户和制作者应用程序中使用 Kafka 客户端序列化器和反序列化程序。它还描述了支持的 Apicurio Registry 内容类型以及可选的规则配置。

前言

使开源包含更多

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息

第 1 章 Apicurio Registry 简介

本章论述了 Apicurio Registry 概念和功能,并提供有关存储在 registry 中的受支持构件类型的详情:

1.1. Apicurio Registry 概述

Apicurio Registry 是用于跨 API 和事件驱动的架构共享标准事件模式和 API 设计的数据存储。您可以使用 Apicurio Registry 将数据的结构与客户端应用程序分离,并使用 REST 接口在运行时共享和管理您的数据类型和 API 描述。

例如,客户端应用程序可在运行时动态推送或拉取最新的 schema 更新到 Apicurio Registry,而无需重新部署。开发人员团队可以查询注册表中已在生产中部署的服务所需的现有模式,并可注册开发新服务所需的新模式。

您可以通过在客户端应用程序代码中指定 registry URL,启用客户端应用程序使用存储在 Apicurio Registry 中的 schema 和 API 设计。例如,registry 可以存储用于序列化和反序列化消息的 schema,然后可从客户端应用程序引用这些消息,以确保它们发送和接收的消息与这些架构兼容。

使用 Apicurio 注册表将数据结构与您的应用程序分离,通过减少总体消息大小来降低成本,并通过增加组织内的模式和 API 设计的一致性重复使用来提高效率。Apicurio Registry 提供了一个 Web 控制台,可供开发人员和管理员轻松管理 registry 内容。

另外,您可以配置可选规则来管理 registry 内容的演进。例如,这些规则包括确保上传的内容语法和语义有效的规则,或者向后兼容其他版本。任何配置的规则都必须在将新版本上传到 registry 之前进行,这样可确保不会产生无效或不兼容的 schema 或 API 设计的时间。

Apicurio Registry 基于 Apicurio Registry 开源社区项目。详情请查看 https://github.com/apicurio/apicurio-registry

Apicurio Registry 功能
  • 标准事件模式和 API 规格的多个有效负载格式
  • AMQ Streams 或 PostgreSQL 数据库中的可插拔 registry 存储选项
  • 使用 Web 控制台、REST API 命令、Maven 插件或 Java 客户端进行注册表内容管理
  • 管理 registry 内容随时间变化的内容验证和版本兼容性的规则
  • 完整的 Apache Kafka 模式 registry 支持,包括与外部系统的 Kafka 连接集成
  • Kafka 客户端 serializers/deserializers (Serdes)在运行时验证消息类型
  • 云原生 Quarkus Java 运行时,用于低内存占用和快速部署速度
  • 与现有的 Confluent 或 IBM schema registry 客户端应用程序兼容
  • OpenShift 中基于 Operator 的 Apicurio Registry 安装
  • 使用 Red Hat Single Sign-On 的 OpenID Connect (OIDC)验证

1.2. Apicurio Registry 中的模式和 API 工件和组

存储在 Apicurio Registry (如事件架构和 API 设计)中的项目称为 registry 工件。以下显示了简单共享价格应用程序的 JSON 格式的 Apache Avro schema 构件示例:

{
   "type": "record",
   "name": "price",
   "namespace": "com.example",
   "fields": [
       {
           "name": "symbol",
           "type": "string"
       },
       {
           "name": "price",
           "type": "string"
       }
   ]
}
Copy to Clipboard Toggle word wrap

当架构或 API 设计作为 registry 中的工件添加时,客户端应用可以使用该架构或 API 设计验证客户端消息在运行时是否符合正确的数据结构。

Apicurio Registry 为标准事件 schema 和 API 规格支持广泛的消息有效负载格式。例如,支持的格式包括 Apache Avro、Google Protobuf、GraphQL、AsyncAPI、OpenAPI 等。如需了解更多详细信息,请参阅 第 9 章 Apicurio Registry artifact 引用

模式和 API 组

工件组 是可选的 schema 或 API 工件集合。每个组包含一组逻辑相关的模式或 API 设计,通常由单个实体管理,属于特定应用程序或组织。

您可以在添加 schema 和 API 设计时创建可选的工件组,以将其组织到 Apicurio Registry 中。例如,您可以创建组来匹配您的 开发和生产 应用程序环境,或您的 销售 和工程 组织。

模式和 API 组可以包含多个工件类型。例如,您可以有 Protobuf、Avro、JSON Schema、OpenAPI 和 AsyncAPI 架构以及 API 工件在同一组中。

您可以使用 Apicurio Registry Web 控制台、核心 REST API、Maven 插件或 Java 客户端应用程序创建架构和 API 工件和可选组。以下简单示例显示了使用 REST API:

$ curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
  -H "X-Registry-ArtifactId: share-price" \
  --data '{"type":"record","name":"price","namespace":"com.example", \
   "fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}' \
  https://my-registry.example.com/apis/registry/v2/groups/my-group/artifacts
Copy to Clipboard Toggle word wrap

本例在名为 my-group 的工件组中添加了一个 Avro 模式,其工件 ID 为 share-price

注意

在使用 Apicurio Registry Web 控制台时,指定组是可选的,其中会自动创建默认组。当使用 v2 REST API 或 Maven 插件时,如果您不想创建唯一的组,您可以在 API 路径中指定 default 组。

1.3. 使用 Apicurio Registry Web 控制台管理内容

您可以使用 Apicurio Registry Web 控制台浏览和搜索 registry 中存储的 schema 和 API 工件和可选组,并添加新的 schema 和 API 工件、组和版本。您可以根据标签、名称、组和描述搜索工件。您可以查看工件的内容或其可用版本,或者在本地下载工件文件。

您还可以使用 Web 控制台为 registry 内容(包括全局)和 API 工件配置可选规则。当新的 schema 和 API 工件或版本上传到 registry 时,会应用这些可选规则用于内容验证和兼容性。如需了解更多详细信息,请参阅 第 9 章 Apicurio Registry artifact 引用

图 1.1. Apicurio Registry web 控制台

Apicurio Registry web 控制台

Apicurio Registry Web 控制台包括在 Apicurio Registry 部署的主端点中,例如在 http://MY-REGISTRY-URL/ui 上。

1.4. registry 核心 REST API 概述

使用 Apicurio Registry 核心 REST API,客户端应用程序可以管理 Apicurio Registry 中的 schema 和 API 工件。此 API 为以下对象提供创建、读取、更新和删除操作:

工件
管理存储在 registry 中的架构和 API 工件。您还可以管理工件的生命周期状态: enabled、disabled 或 deprecated。
工件版本
管理在更新 schema 或 API 工件时所创建的版本。您还可以管理工件版本的生命周期状态: enabled、disabled 或 deprecated。
工件元数据
管理架构或 API 工件的详细信息,如创建或修改的时间及其当前状态。您可以编辑工件名称、描述或标签。工件组和工件创建和修改的时间是只读的。
工件规则
配置规则,以管理特定模式或 API 构件的内容演进,以防止将无效或不兼容的内容添加到 registry 中。工件规则覆盖配置的任何全局规则。
全局规则
配置规则,以管理所有模式和 API 工件的内容演进,以防止将无效或不兼容的内容添加到 registry 中。只有在工件没有配置自己的特定工件规则时,才会应用全局规则。
搜索
浏览或搜索 schema 和 API 工件和版本,例如名称、组、描述或标签。
Admin
.zip 文件中导出或导入 registry 内容,并在运行时管理 registry 服务器实例的日志级别。
与其他 schema registry REST API 兼容

Apicurio Registry 版本 2 通过包括其对应的 REST API 实现提供与以下模式 registry 的 API 兼容性:

  • Apicurio Registry 版本 1
  • Confluent Schema Registry 版本 6
  • IBM Event Streams schema registry 版本 1
  • CNCF CloudEvents Schema Registry 版本 0

使用 Confluent 客户端库的应用程序可以使用 Apicurio Registry 作为简易替代品。如需了解更多详细信息,请参阅 Replacing Confluent Schema Registry

1.5. Apicurio Registry 存储选项

Apicurio Registry 为 registry 数据的底层存储提供以下选项:

  • PostgreSQL 12 数据库
  • AMQ Streams 1.8

Kafka producer 应用可以使用 serializers 对符合特定事件架构的消息进行编码。然后,Kafka 使用者应用程序可以使用反序列化者根据特定的模式 ID 来验证消息已被序列化。

图 1.2. Apicurio Registry 和 Kafka 客户端 SerDe 架构

registry SerDe 架构

Apicurio Registry 提供 Kafka 客户端 serializers/deserializers (SerDes)在运行时验证以下消息类型:

  • Apache Avro
  • Google 协议缓冲
  • JSON Schema

Apicurio Registry Maven 存储库和源代码发行版本包括用于这些消息类型的 Kafka SerDe 实施,Kafka 客户端开发人员可以与 registry 集成。这些实施包括用于每个支持的消息类型的自定义 Java 类,例如 io.apicurio.registry.serde.avro,客户端应用程序可以在运行时从 registry 中拉取 schema。

您可以将 Apicurio Registry 与 Apache Kafka Connect 搭配使用,以在 Kafka 和外部系统间流传输数据。使用 Kafka Connect,您可以为不同的系统定义连接器,将大量数据移入和移出基于 Kafka 的系统。

图 1.3. Apicurio Registry 和 Kafka Connect 架构

registry 和 Kafka Connect 架构

Apicurio Registry 为 Kafka Connect 提供以下功能:

  • Kafka Connect 模式的存储
  • Apache Avro 和 JSON Schema 的 Kafka Connect 转换器
  • 用于管理 schema 的 registry REST API

您可以使用 Avro 和 JSON Schema 转换器将 Kafka Connect 模式映射到 Avro 或 JSON 模式。然后这些架构可将消息键和值序列化为紧凑 Avro 二进制格式或人类可读 JSON 格式。转换的 JSON 也比较详细,因为消息不包含架构信息,只是架构 ID。

Apicurio Registry 可以管理和跟踪 Kafka 主题中使用的 Avro 和 JSON 模式。由于该架构存储在 Apicurio 注册表中,并与消息内容分离,因此每条消息必须只包含 tiny 模式标识符。对于 Kafka 等 I/O 绑定系统,这意味着生产者和消费者的总吞吐量。

在这个用例中,Apicurio Registry 提供的 Avro 和 JSON Schema serializers (SerDes)也被 Kafka producer 和使用者使用。您使用更改事件进行写入的 Kafka 消费者应用程序可以使用 Avro 或 JSON Serdes 对这些更改事件进行反序列化处理。您可以将这些 Serdes 安装到任何基于 Kafka 的系统,并将它们与 Kafka Connect 一起使用,或者基于 Kafka Connect 的系统,如 Debezium 和 Camel Kafka Connector。

1.8. Apicurio Registry 演示示例

Apicurio Registry 提供开源示例应用程序,它演示了如何在不同用例中使用 registry。例如,这包括存储 Kafka 序列化器和反序列化器(SerDe)类使用的模式。这些 Java 类从 registry 获取模式,以便在生成或消耗操作来序列化、反序列化或验证 Kafka 消息有效负载时使用。

这些示例应用程序包括以下内容:

  • 简单 Avro
  • 简单 JSON Schema
  • Confluent SerDes 集成
  • Avro bean
  • 自定义 ID 策略
  • 简单 Avro Maven
  • REST 客户端
  • 混合 Avro 模式
  • 云事件

如需了解更多详细信息,请参阅 https://github.com/Apicurio/apicurio-registry-examples

1.9. Apicurio Registry 可用发行版本

Expand
表 1.1. Apicurio Registry Operator 和镜像
分发位置发行类别

Apicurio Registry Operator

OpenShift Web 控制台在 OperatorsOperatorHub

公开发行

Apicurio Registry Operator 的容器镜像

Red Hat Ecosystem Catalog

公开发行

AMQ Streams 中 Kafka 存储的容器镜像

Red Hat Ecosystem Catalog

公开发行

PostgreSQL 中数据库存储的容器镜像

Red Hat Ecosystem Catalog

公开发行

Expand
表 1.2. Apicurio Registry zip downloads
分发位置发行类别

安装自定义资源定义示例

Red Hat Software Downloads

公开发行

Apicurio Registry v1 到 v2 迁移工具

Red Hat Software Downloads

公开发行

Maven 软件仓库

Red Hat Software Downloads

公开发行

源代码

Red Hat Software Downloads

公开发行

Kafka Connect converters

Red Hat Software Downloads

公开发行

注意

您必须有 Red Hat Integration 的订阅并登录到红帽客户门户网站,才能访问可用的 Apicurio Registry 发行版本。

第 2 章 Apicurio Registry 内容规则

本章介绍了用于管理 registry 内容的可选规则,并提供有关可用规则配置的详情:

2.1. 使用规则管理 registry 内容

要监管 registry 内容的演进,您可以为添加到 registry 中的工件内容配置可选的规则。所有配置的全局规则或工件规则都必须传递,然后才能将新的工件版本上传到 registry。配置的工件规则覆盖任何配置的全局规则。

这些规则的目标是防止将无效的内容添加到 registry 中。例如,因为以下原因,内容可能无效:

  • 给定工件类型的语法无效(例如,AVROPROTOBUF
  • 有效语法,但语义违反了规格
  • 不兼容时,当新内容包含与当前工件版本相关的破坏更改时

您可以使用 Apicurio Registry Web 控制台、REST API 命令或 Java 客户端应用程序添加这些可选内容规则。

2.2. 应用规则时

只有在将内容添加到 registry 时,才会应用规则。这包括以下 REST 操作:

  • 添加工件
  • 更新工件
  • 添加工件版本

如果违反规则,Apicurio Registry 会返回 HTTP 错误。响应正文包含违反的规则,以及显示错误情况的消息。

注意

如果没有为工件配置任何规则,则应用当前配置的全局规则集合(若有)。

2.3. 规则如何工作

每个规则都有一个名称和可选配置信息。registry 存储维护每个工件和全局规则列表的规则列表。列表中的每个规则由一个名称和一组配置属性组成,它们特定于规则实施。

提供了规则,其中包含当前版本的工件(如果存在)和要添加的构件的新版本。规则实现返回为 true 或 false,具体取决于工件是否通过该规则。如果没有,registry 会报告 HTTP 错误响应中原因。有些规则可能不使用内容的先前版本。例如,兼容性规则使用之前的版本,但语法或语义有效规则并不。

其他资源

如需了解更多详细信息,请参阅 第 9 章 Apicurio Registry artifact 引用

2.4. 内容规则配置

您可以单独为每个工件以及全局配置规则。Apicurio Registry 应用为特定工件配置的规则。如果没有在该级别上配置任何规则,Apicurio Registry 将应用全局配置的规则。如果没有配置全局规则,则不会应用任何规则。

配置工件规则

您可以使用 Apicurio Registry Web 控制台或 REST API 配置工件规则。详情请查看以下内容:

配置全局规则

您可以使用以下方法配置全局规则:

  • 在 REST API 中使用 /rules 操作
  • 使用 Apicurio Registry Web 控制台
  • 使用 Apicurio Registry 应用程序属性设置默认全局规则

配置默认全局规则

您可以在应用程序级别配置 Apicurio Registry,以启用或禁用全局规则。您可以在安装时配置默认全局规则,而无需使用以下应用程序属性格式进行安装后配置:

registry.rules.global.<ruleName>
Copy to Clipboard Toggle word wrap

当前支持以下规则名称:

  • 兼容性
  • 有效期

application 属性的值必须是有效的配置选项,它特定于所配置的规则。下表显示了每个规则的有效值:

Expand
表 2.1. Apicurio Registry 内容规则
规则

有效期

FULL

 

SYNTAX_ONLY

 

NONE

兼容性

向后

 

BACKWARD_TRANSITIVE

 

向前

 

FORWARD_TRANSITIVE

 

FULL

 

FULL_TRANSITIVE

 

NONE

注意

您可以将这些应用程序属性配置为 Java 系统属性,或者将它们包含在 Quarkus application.properties 文件中。如需了解更多详细信息,请参阅 Quarkus 文档

本章介绍了如何使用 Apicurio Registry Web 控制台管理存储在 registry 中的 schema 和 API 工件。这包括上传和浏览 registry 内容,以及配置可选规则:

3.1. 使用 Apicurio Registry Web 控制台添加工件

您可以使用 Apicurio Registry Web 控制台将事件 schema 和 API 设计工件上传到 registry。有关您可以上传的工件类型的详情,请参阅 第 9 章 Apicurio Registry artifact 引用。本节演示了上传 Apicurio Registry 工件、应用工件规则和添加新的工件版本的简单示例。

前提条件

  • 必须在自己的环境中安装并运行 Apicurio Registry。

流程

  1. 连接到 Apicurio Registry web 控制台:

    http://MY_REGISTRY_URL/ui

  2. Upload artifact,并指定以下内容:

    • group 和 ID :使用默认空设置自动生成 ID 和 默认组,或者输入可选的工件组或 ID。
    • 类型 :使用默认 Auto-Detect 设置来自动检测工件类型,或者从下拉菜单中选择工件类型,例如 Avro SchemaOpenAPI

      注意

      Apicurio Registry 服务器无法自动检测 Kafka Connect Schema 工件类型。您必须手动选择此工件类型。

    • 工件: Drag 和 drop or click Browse 以上传文件,如 my-schema.jsonmy-openapi.json
  3. Upload,查看 Artifact Details

    图 3.1. Apicurio Registry Web 控制台中的工件详情

    Registry Web 控制台中的工件详情
    • info :显示工件名称和可选的组、描述、生命周期状态、创建和最后一次修改。点击 Edit Artifact Metadata 铅笔图标编辑工件名称和描述或添加标签,然后点 Download 向本地下载工件文件。另外,还显示您可以启用和配置的构件 内容规则
    • 文档 (仅限OpenAPI):显示自动生成的 REST API 文档。
    • 内容 :显示完整工件内容的只读视图。
  4. 内容 规则中,单击 Enable 来配置 Validity RuleCompatibility Rule,然后从下拉菜单中选择相应的规则配置。如需了解更多详细信息,请参阅 第 9 章 Apicurio Registry artifact 引用
  5. Upload new version 添加新工件版本,然后拖放或点击 Browse 上传该文件,如 my-schema.jsonmy-openapi.json
  6. 要删除工件,点 Upload new version 旁边的垃圾桶图标。

    警告

    删除工件会删除工件及其所有版本,无法撤消。工件版本是不可变的,不能单独删除。

3.2. 使用 Apicurio Registry Web 控制台查看工件

您可以使用 Apicurio Registry Web 控制台浏览 registry 中存储的事件 schema 和 API 设计工件。本节显示了查看 Apicurio Registry 工件、组、版本和工件规则的简单示例。有关 registry 中存储的构件类型的更多详情,请参阅 第 9 章 Apicurio Registry artifact 引用

前提条件

  • 必须在自己的环境中安装并运行 Apicurio Registry。
  • 工件必须使用 Apicurio Web 控制台、REST API 命令、Maven 插件或 Java 客户端应用程序添加到注册表中。

流程

  1. 连接到 Apicurio Registry web 控制台:

    http://MY_REGISTRY_URL/ui

  2. 浏览存储在 registry 中的工件列表,或者输入搜索字符串来查找工件。您可以选择根据特定 名称描述标签搜索

    图 3.2. 在 Apicurio Registry web 控制台中浏览工件

    在 Registry web 控制台中浏览工件
  3. 点击 View artifact 查看 Artifact Details

    • info :显示工件名称和可选的组、描述、生命周期状态、创建和最后一次修改。点击 Edit Artifact Metadata 铅笔图标编辑工件名称和描述或添加标签,然后点 Download 向本地下载工件文件。另外,还显示您可以启用和配置的构件 内容规则
    • 文档 (仅限OpenAPI):显示自动生成的 REST API 文档。
    • 内容 :显示完整工件内容的只读视图。
  4. 如果添加了其他版本,可选择 从下拉菜单查看不同的工件版本。

您可以使用 Apicurio Registry Web 控制台配置可选规则,以防止将无效的内容添加到 registry 中。所有配置的工件规则或全局规则都必须传递,然后才能将新的工件版本上传到 registry。配置的工件规则覆盖任何配置的全局规则。如需了解更多详细信息,请参阅 第 2 章 Apicurio Registry 内容规则

本节演示了配置全局和工件规则的简单示例。有关您可以选择的不同规则类型和相关配置设置的详情,请参考 第 9 章 Apicurio Registry artifact 引用

前提条件

  • 必须在自己的环境中安装并运行 Apicurio Registry。
  • 对于工件规则,工件必须使用 Apicurio Registry Web 控制台、REST API 命令、Maven 插件或 Java 客户端应用程序添加到注册表中。

流程

  1. 连接到 Apicurio Registry web 控制台:

    http://MY_REGISTRY_URL/ui

  2. 对于工件规则,浏览 registry 中存储的工件列表,或者输入搜索字符串来查找工件。您可以选择根据特定的工件 名称描述标签搜索
  3. 单击 View artifact,以查看 Artifact Details
  4. Content Rules 中,点 Enable 来配置 Validity RuleCompatibility Rule,然后从下拉菜单中选择相应的规则配置。如需了解更多详细信息,请参阅 第 9 章 Apicurio Registry artifact 引用

    图 3.3. 在 Apicurio Registry web 控制台中配置内容规则

    在 Registry web 控制台中配置规则
  5. 对于全局规则,请单击工具栏右上角的 Manage global rules,再单击 Enable 以配置全局 Validity RuleCompatibility Rule,然后从下拉菜单选择适当的规则配置。如需了解更多详细信息,请参阅 第 9 章 Apicurio Registry artifact 引用
  6. 要禁用工件规则或全局规则,请点击规则旁边的垃圾箱图标。

第 4 章 使用 REST API 管理 Apicurio Registry 内容

客户端应用程序可以使用 Registry REST API 操作来管理 Apicurio Registry 中的 schema 和 API 工件,例如在生产环境中部署的 CI/CD 管道。Registry REST API 为存储在 registry 中的工件、版本、元数据和规则提供创建、读取、更新和删除操作。如需更多信息,请参阅 Apicurio Registry REST API 文档

本章论述了 Apicurio Registry 核心 REST API,并演示了如何使用它来管理 registry 中存储的架构和 API 工件:

本节演示了一个简单的基于 curl 的示例,它使用了 registry v2 核心 REST API 在 registry 中添加并检索 Apache Avro schema 工件。

前提条件

  • 必须在自己的环境中安装并运行 Apicurio Registry。

流程

  1. 使用 /groups/{group}/artifacts 操作向注册表添加工件。以下示例 curl 命令为共享价格应用程序添加简单的工件:

    $ curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
      -H "X-Registry-ArtifactId: share-price" \ 
    1
    
      --data '{"type":"record","name":"price","namespace":"com.example", \
       "fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}' \ 
    2
    
      http://MY-REGISTRY-HOST/apis/registry/v2/groups/my-group/artifacts 
    3
    Copy to Clipboard Toggle word wrap
    1
    这个示例添加了一个 Avro schema 工件,其工件 ID 为 share-price。如果您没有指定唯一的工件 ID,Apicurio Registry 会自动生成一个作为 UUID。
    2
    MY-REGISTRY-HOST 是部署 Apicurio Registry 的主机名。例如: my-cluster-service-registry-myproject.example.com
    3
    本例在 API 路径中指定 my-group 的组 ID。如果没有指定唯一组 ID,则必须在 API 路径中指定 ./groups/default
  2. 验证响应是否包含预期的 JSON 正文,以确认添加了构件。例如:

    {"createdBy":"","createdOn":"2021-04-16T09:07:51+0000","modifiedBy":"",
    "modifiedOn":"2021-04-16T09:07:51+0000","id":"share-price","version":"1", 
    1
    
    "type":"AVRO","globalId":2,"state":"ENABLED","groupId":"my-group","contentId":2} 
    2
    Copy to Clipboard Toggle word wrap
    1
    在添加工件时没有指定版本,因此会自动创建默认版本 1
    2
    这是添加到 registry 的第二个工件,因此全局 ID 和内容 ID 的值为 2
  3. 使用 API 路径中的工件 ID,从 registry 检索工件内容。在这个示例中,指定的 ID 是 share-price

    $ curl http://MY-REGISTRY-URL/apis/registry/v2/groups/my-group/artifacts/share-price \
    {"type":"record","name":"price","namespace":"com.example", \
      "fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}
    Copy to Clipboard Toggle word wrap

如果您没有在使用 v2 REST API 添加 schema 和 API 工件到 Apicurio Registry 时指定工件版本,Apicurio Registry 会自动生成一个。创建新工件时的默认版本是 1

Apicurio Registry 还支持自定义版本,您可以在其中使用 X-Registry-Version HTTP 请求标头作为字符串来指定版本。指定自定义版本值将覆盖创建或更新工件时通常分配的默认版本。然后,在执行需要版本的 REST API 操作时,您可以使用此版本值。

本节演示了一个简单的基于 curl 的示例,它使用了 registry v2 核心 REST API 在 registry 中添加并检索自定义 Apache Avro schema 版本。在使用 REST API 添加或更新工件或添加工件版本时,您可以指定自定义版本。

前提条件

  • 必须在自己的环境中安装并运行 Apicurio Registry。

流程

  1. 使用 /groups/{group}/artifacts 操作在注册表中添加工件版本。以下示例 curl 命令为共享价格应用程序添加简单的工件:

    $ curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
      -H "X-Registry-ArtifactId: my-share-price" -H "X-Registry-Version: 1.1.1" \ 
    1
    
      --data '{"type":"record","name":" p","namespace":"com.example", \
       "fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}' \ 
    2
    
       http://MY-REGISTRY-HOST/apis/registry/v2/groups/my-group/artifacts 
    3
    Copy to Clipboard Toggle word wrap
    1
    这个示例添加了一个 Avro schema 工件,其工件 ID 为 my-share-price1.1.1 版本。如果没有指定版本,Apicurio Registry 会自动生成默认版本 1
    2
    MY-REGISTRY-HOST 是部署 Apicurio Registry 的主机名。例如: my-cluster-service-registry-myproject.example.com
    3
    本例在 API 路径中指定 my-group 的组 ID。如果没有指定唯一组 ID,则必须在 API 路径中指定 ./groups/default
  2. 验证响应是否包含预期的 JSON 正文,以确认是否添加了自定义工件版本。例如:

    {"createdBy":"","createdOn":"2021-04-16T10:51:43+0000","modifiedBy":"",
    "modifiedOn":"2021-04-16T10:51:43+0000","id":"my-share-price","version":"1.1.1", 
    1
    
    "type":"AVRO","globalId":3,"state":"ENABLED","groupId":"my-group","contentId":3} 
    2
    Copy to Clipboard Toggle word wrap
    1
    在添加工件时,指定了自定义 1.1.1 版本。
    2
    这是添加到 registry 中的第三个工件,因此全局 ID 和内容 ID 的值为 3
  3. 使用 API 路径中的工件 ID 和版本从 registry 检索工件内容。在这个示例中,指定的 ID 是 my-share-price,版本是 1.1.1

    $ curl http://MY-REGISTRY-URL/apis/registry/v2/groups/my-group/artifacts/my-share-price/versions/1.1.1 \
    {"type":"record","name":"price","namespace":"com.example", \
      "fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}
    Copy to Clipboard Toggle word wrap

本节演示了一个简单的基于 curl 的示例,它使用了 registry v2 核心 REST API 以 .zip 格式导出并导入现有的 registry 数据。例如,当从一个 Apicurio Registry v2.x 实例迁移到另一个 Apicurio Registry v2.x 实例时,这很有用。

前提条件

  • 必须在自己的环境中安装并运行 Apicurio Registry。

流程

  1. 从现有源 Apicurio Registry 实例导出 registry 数据:

    $ curl http://MY-REGISTRY-HOST/apis/registry/v2/admin/export \
      --output my-registry-data.zip
    Copy to Clipboard Toggle word wrap

    MY-REGISTRY-HOST 是部署源 Apicurio Registry 的主机名。例如: my-cluster-source-registry-myproject.example.com

  2. 将 registry 数据导入到目标 Apicurio Registry 实例中:

    $ curl -X POST "http://MY-REGISTRY-HOST/apis/registry/v2/admin/import" \
      -H "Content-Type: application/zip" --data-binary @my-registry-data.zip
    Copy to Clipboard Toggle word wrap

    MY-REGISTRY-HOST 是部署目标 Apicurio Registry 的主机名。例如: my-cluster-target-registry-myproject.example.com

本章论述了如何使用 Apicurio Registry Maven 插件管理存储在 registry 中的架构和 API 工件:

前提条件

5.1. 使用 Maven 插件添加 schema 和 API 工件

Maven 插件的最常见用例是在构建期间添加工件。您可以使用 注册 执行目标来实现此目的。

流程

  • 更新 Maven pom.xml 文件,以使用 apicurio-registry-maven-plugin 注册工件。以下示例显示了注册 Apache Avro 和 GraphQL 模式:

    <plugin>
      <groupId>io.apicurio</groupId>
      <artifactId>apicurio-registry-maven-plugin</artifactId>
      <version>${apicurio.version}</version>
      <executions>
          <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>register</goal>  
    1
    
            </goals>
            <configuration>
                <registryUrl>http://REGISTRY-URL/apis/registry/v2</registryUrl> 
    2
    
                <artifacts>
                    <artifact>
                        <groupId>TestGroup</groupId> 
    3
    
                        <artifactId>FullNameRecord</artifactId>
                        <file>${project.basedir}/src/main/resources/schemas/record.avsc</file>
                        <ifExists>FAIL</ifExists>
                    </artifact>
                    <artifact>
                        <groupId>TestGroup</groupId>
                        <artifactId>ExampleAPI</artifactId> 
    4
    
                        <type>GRAPHQL</type>
                        <file>${project.basedir}/src/main/resources/apis/example.graphql</file>
                        <ifExists>RETURN_OR_UPDATE</ifExists>
                        <canonicalize>true</canonicalize>
                    </artifact>
                </artifacts>
            </configuration>
        </execution>
      </executions>
     </plugin>
    Copy to Clipboard Toggle word wrap
    1
    指定 register 作为执行目标,以便将 schema 构件上传到 registry。
    2
    使用 ../apis/registry/v2 端点指定 Apicurio Registry URL。
    3
    指定 Apicurio Registry 工件组 ID。如果您不想使用唯一组,可以指定 default 组。
    4
    您可以使用指定的组 ID、工件 ID 和位置上传多个工件。

5.2. 使用 Maven 插件下载 schema 和 API 工件

您可以使用 Maven 插件从 Apicurio Registry 下载工件。例如,从注册的模式生成代码时,这通常很有用。

流程

  • 更新您的 Maven pom.xml 文件,以使用 apicurio-registry-maven-plugin 下载工件。以下示例显示了下载 Apache Avro 和 GraphQL 模式。

    <plugin>
      <groupId>io.apicurio</groupId>
      <artifactId>apicurio-registry-maven-plugin</artifactId>
      <version>${apicurio.version}</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>download</goal> 
    1
    
          </goals>
          <configuration>
              <registryUrl>http://REGISTRY-URL/apis/registry/v2</registryUrl> 
    2
    
              <artifacts>
                  <artifact>
                      <groupId>TestGroup</groupId> 
    3
    
                      <artifactId>FullNameRecord</artifactId> 
    4
    
                      <file>${project.build.directory}/classes/record.avsc</file>
                      <overwrite>true</overwrite>
                  </artifact>
                  <artifact>
                      <groupId>TestGroup</groupId>
                      <artifactId>ExampleAPI</artifactId>
                      <version>1</version>
                      <file>${project.build.directory}/classes/example.graphql</file>
                      <overwrite>true</overwrite>
                  </artifact>
              </artifacts>
          </configuration>
        </execution>
      </executions>
    </plugin>
    Copy to Clipboard Toggle word wrap
    1
    指定 下载 作为执行目标。
    2
    使用 ../apis/registry/v2 端点指定 Apicurio Registry URL。
    3
    指定 Apicurio Registry 工件组 ID。如果您不想使用唯一组,可以指定 default 组。
    4
    您可以使用工件 ID 将多个工件下载到指定目录中。

5.3. 使用 Maven 插件测试架构和 API 工件

您可能需要验证是否可以注册工件,而无需实际进行任何更改。这在 Apicurio Registry 中配置规则时通常很有用。如果工件内容违反任何配置的规则,测试工件会导致失败。

注意

即使工件通过测试,也不会将任何内容添加到 Apicurio Registry。

流程

  • 更新您的 Maven pom.xml 文件,以使用 apicurio-registry-maven-plugin 来测试工件。以下示例显示了测试 Apache Avro 模式:

    <plugin>
      <groupId>io.apicurio</groupId>
      <artifactId>apicurio-registry-maven-plugin</artifactId>
      <version>${apicurio.version}</version>
      <executions>
          <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>test-update</goal>  
    1
    
            </goals>
            <configuration>
                <registryUrl>http://REGISTRY-URL/apis/registry/v2</registryUrl> 
    2
    
                <artifacts>
                    <artifact>
                        <groupId>TestGroup</groupId> 
    3
    
                        <artifactId>FullNameRecord</artifactId>
                        <file>${project.basedir}/src/main/resources/schemas/record.avsc</file> 
    4
    
                    </artifact>
                    <artifact>
                        <groupId>TestGroup</groupId>
                        <artifactId>ExampleAPI</artifactId>
                        <type>GRAPHQL</type>
                        <file>${project.basedir}/src/main/resources/apis/example.graphql</file>
                    </artifact>
                </artifacts>
            </configuration>
        </execution>
      </executions>
     </plugin>
    Copy to Clipboard Toggle word wrap
    1
    指定 test-update 作为测试 schema 构件的执行目标。
    2
    使用 ../apis/registry/v2 端点指定 Apicurio Registry URL。
    3
    指定 Apicurio Registry 工件组 ID。如果您不想使用唯一组,可以指定 default 组。
    4
    您可以使用工件 ID 从指定目录中测试多个工件。

本章论述了如何使用 Apicurio Registry Java 客户端:

6.1. Apicurio Registry Java 客户端

您可以使用 Java 客户端应用程序管理存储在 Apicurio Registry 中的工件。您可以使用 Apicurio Registry Java 客户端类创建、读取、更新或删除存储在 registry 中的工件。您还可以使用客户端执行管理功能,如管理全局规则或导入和导出 registry 数据。

您可以通过在项目中添加正确的依赖关系来访问 Apicurio Registry Java 客户端。如需了解更多详细信息,请参阅 第 6.2 节 “编写 Apicurio Registry 客户端应用程序”

Apicurio Registry 客户端使用 JDK 提供的 HTTP 客户端来实施。这可让您自定义其使用,例如添加自定义标头或为传输层安全(TLS)身份验证启用选项。如需了解更多详细信息,请参阅 第 6.3 节 “Apicurio Registry Java 客户端配置”

6.2. 编写 Apicurio Registry 客户端应用程序

本节介绍如何使用 Java 客户端应用程序管理存储在 Apicurio Registry 中的工件。

前提条件

流程

  1. 在 Maven 项目中添加以下依赖项:

    <dependency>
        <groupId>io.apicurio</groupId>
        <artifactId>apicurio-registry-client</artifactId>
        <version>${apicurio-registry.version}</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. 按照以下方法创建 registry 客户端:

    public class ClientExample {
    
        private static final RegistryRestClient client;
    
         public static void main(String[] args) throws Exception {
            // Create a registry client
            String registryUrl = "https://my-registry.my-domain.com/apis/registry/v2"; 
    1
    
            RegistryClient client = RegistryClientFactory.create(registryUrl); 
    2
    
        }
    }
    Copy to Clipboard Toggle word wrap
    1
    如果您指定了 https://my-registry.my-domain.com 的示例 registry URL,客户端将自动附加 /apis/registry/v2
    2
    有关创建 Apicurio Registry 客户端时的更多选项,请参见下一节中的 Java 客户端配置。
  3. 创建客户端后,您可以通过客户端使用 Apicurio Registry REST API 中的所有操作。如需了解更多详细信息,请参阅 Apicurio Registry REST API 文档

6.3. Apicurio Registry Java 客户端配置

Apicurio Registry Java 客户端根据客户端工厂包含以下配置选项:

Expand
表 6.1. Apicurio Registry Java 客户端配置选项
选项描述参数

普通客户端

用于与正在运行的 registry 交互的基本 REST 客户端。

baseUrl

带有自定义配置的客户端

使用用户提供的配置 registry 客户端。

baseurl,Map<String Object> 配置

带有自定义配置和身份验证的客户端

接受包含自定义配置的 map 的 registry 客户端。这很有用,例如,为调用添加自定义标头。这还需要提供用于对请求进行身份验证的 auth 实例。

baseurl, Map<String Object> 配置, Auth auth

自定义标头配置

要配置自定义标头,您必须将 apicurio.registry.request.request.headers 前缀添加到 配置映射键。例如,apicurio.registry.request.headers.headers.Authorization 的键为 Basic: xxxxx 的标题,它的值Basic: xxxxx

TLS 配置选项

您可以使用以下属性为 Apicurio Registry Java 客户端配置传输层安全性(TLS)身份验证:

  • apicurio.registry.request.ssl.truststore.location
  • apicurio.registry.request.ssl.truststore.password
  • apicurio.registry.request.ssl.truststore.type
  • apicurio.registry.request.ssl.keystore.location
  • apicurio.registry.request.ssl.keystore.password
  • apicurio.registry.request.ssl.keystore.type
  • apicurio.registry.request.ssl.key.password

Apicurio Registry 为 Kafka producer 以及以 Java 语言编写的消费者应用程序提供客户端序列化/反序列化器(SerDes)。Kafka producer 应用使用 serializers 对符合特定事件架构的消息进行编码。Kafka 使用者应用程序使用反序列化程序根据特定的模式 ID 来验证已使用正确模式序列化消息。这样可确保一致的模式使用,并帮助防止运行时出现数据错误。

本章论述了如何在生产者和消费者应用程序中使用 Kafka 客户端 SerDe:

前提条件

7.1. Kafka 客户端应用程序和 Apicurio Registry

Apicurio Registry 将 schema 管理与客户端应用程序配置分离。您可以通过在客户端代码中指定其 URL,启用 Java 客户端应用程序以使用 Apicurio Registry 的 schema。

您可以将架构存储在 registry 中,以序列化和反序列化消息,从客户端应用程序引用它们,以确保它们发送和接收的消息与这些架构兼容。Kafka 客户端应用程序可在运行时从 Apicurio Registry 中推送或拉取其模式。

架构可以演进,以便您可以在 Apicurio Registry 中定义规则,例如,确保 schema 更改有效,且不会破坏以前由应用程序使用的版本。Apicurio 注册表通过将修改的模式与之前的 schema 版本进行比较,检查兼容性。

Apicurio Registry schema 技术

Apicurio Registry 为 schema 技术提供 schema registry 支持,例如:

  • Avro
  • protobuf
  • JSON Schema

客户端应用程序可通过 Apicurio Registry 提供的 SerDe)服务使用这些模式技术。Apicurio Registry 提供的 SerDe 类的成熟度和使用可能有所不同。以下章节提供了有关每个模式类型的更多详情。

制作者模式配置

制作者客户端应用使用 serializer 将其发送到特定代理主题的消息放入正确的数据格式。

启用制作者使用 Apicurio Registry 进行序列化:

在注册模式后,当启动 Kafka 和 Apicurio Registry 时,您可以访问 schema,以格式化发送到 Kafka 代理主题的消息。另外,根据配置,生产者也可以在第一次使用时自动注册该架构。

如果 schema 已存在,您可以根据 Apicurio Registry 中定义的兼容规则,使用 registry REST API 创建一个新版本。随着架构的演进,版本用于兼容性检查。组 ID、构件 ID 和版本表示标识架构的唯一元组。

消费者模式配置

消费者客户端应用程序使用反序列化器将所使用的消息从特定的代理主题获取到正确的数据格式。

启用一个消费者使用 Apicurio Registry 进行反序列化:

使用全局 ID 检索 schema

默认情况下,schema 由使用全局 ID 的 deserializer 从 Apicurio Registry 检索,后者在被使用的消息中指定。架构全局 ID 可以位于消息标题或消息有效负载中,具体取决于制作者应用的配置。

在查找消息有效负载中的全局 ID 时,数据格式从 magic 字节开始,用作消费者的信号,然后是全局 ID,消息数据正常。例如:

# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]
Copy to Clipboard Toggle word wrap

然后,当启动 Kafka 和 Apicurio Registry 时,您可以访问 schema 来格式化从 Kafka 代理主题获取的消息。

使用内容 ID 检索 schema

另外,您还可以根据内容 ID (这是工件内容的唯一 ID)从 Apicurio Registry 检索 schema。全局 ID 是工件版本的唯一 ID。

内容 ID 不唯一标识版本,而是仅标识版本内容。如果多个版本共享相同的内容,则它们具有不同的全局 ID,但内容 ID 相同。Confluent Schema Registry 默认使用内容 ID。

7.2. 在 Apicurio Registry 中查找 schema 的策略

Kafka 客户端 serializer 使用 lookup 策略来 决定在 Apicurio Registry 中注册消息 schema 的工件 ID 和全局 ID。对于给定的主题和消息,您可以使用 ArtifactResolverStrategy Java 接口的不同实现来返回对 registry 中工件的引用。

每个策略的类都位于 io.apicurio.registry.serde.strategy 软件包中。Avro SerDe 的特定策略类位于 io.apicurio.registry.serde.avro.strategy 软件包中。默认策略是 TopicIdStrategy,它查找与 Kafka 主题相同的名称与 Kafka 主题接收消息的 Apicurio Registry 工件。

Example

public ArtifactReference artifactReference(String topic, boolean isKey, T schema) {
        return ArtifactReference.builder()
                .groupId(null)
                .artifactId(String.format("%s-%s", topic, isKey ? "key" : "value"))
                .build();
Copy to Clipboard Toggle word wrap

  • topic 参数是接收消息的 Kafka 主题的名称。
  • 当消息键被序列化时,isKey 参数为 true,当消息值被序列化时为 false
  • schema 参数是消息序列化或反序列化的模式。
  • 返回的 ArtifactReference 包含从中注册架构的工件 ID。

您使用的查找策略取决于如何和存储 schema。例如,如果您具有不同的 Kafka 主题和相同 Avro 消息类型,则可以使用使用 记录 ID 的策略。

ArtifaceResolverStrategy 接口

工件解析器策略提供了一个将 Kafka 主题和消息信息映射到 Apicurio Registry 中的工件的方法。映射的常见惯例是将 Kafka 主题名称与 相结合,具体取决于 serializer 是否用于 Kafka 消息键或值。

但是,您可以使用 Apicurio Registry 提供的策略,或创建一个实现 io. apicurio.registry.registry.serde.strategy.ArtifactResolverStrategy 提供的策略来映射映射。

返回工件引用的策略

Apicurio Registry 提供以下策略,以根据 ArtifaceResolverStrategy 的实现返回工件引用:

RecordIdStrategy
使用 schema 完整名称的 Avro 特定策略。
TopicRecordIdStrategy
使用主题名称和架构名的 Avro 特定策略。
TopicIdStrategy
使用主题名称和 键或 值后缀的默认策略
SimpleTopicIdStrategy
仅使用主题名称的简单策略。
DefaultSchemaResolver 接口

默认架构解析器查找并标识在工件解析器策略提供的工件引用下注册的 schema 的特定版本。每个工件的每个版本都有一个全局唯一标识符,可用于检索该工件的内容。每个 Kafka 消息中包含此全局 ID,这样一个 deserializer 可以从 Apicurio Registry 正确获取 schema。

默认架构解析器可以查找现有的工件版本,或者在找不到的工件版本时可以注册,具体取决于使用哪个策略。您也可以通过创建一个实现 io.apicurio.registry.serde.SchemaResolver 的自定义 Java 类来提供自己的策略。但是,建议您使用 DefaultSchemaResolver 并指定配置属性。

配置 registry 查找选项

使用 DefaultSchemaResolver 时,您可以使用应用属性配置其行为。下表显示了一些常用的示例:

Expand
表 7.1. Apicurio Registry 查找配置选项
属性类型描述默认

apicurio.registry.find-latest

布尔值

指定 serializer 是否在 registry 中查找对应组 ID 和工件 ID 的最新工件。

false

apicurio.registry.use-id

字符串

指示 serializer 将指定的 ID 写入 Kafka,并指示 deserializer 使用此 ID 来查找 schema。

apicurio.registry.auto-register

布尔值

指定 serializer 试图在 registry 中创建工件。JSON Schema serializer 不支持它。

false

apicurio.registry.check-period-ms

字符串

指定以毫秒为单位缓存全局 ID 的时长。如果没有配置,则每次都会获取全局 ID。

7.3. 在 Apicurio Registry 中注册模式

在定义了适当格式(如 Apache Avro)的模式后,您可以将 schema 添加到 Apicurio Registry。

您可以使用以下方法添加 schema:

  • Apicurio Registry web 控制台
  • 使用 Apicurio Registry REST API 的 curl 命令
  • Apicurio Registry 提供的 Maven 插件
  • 添加到客户端代码的 schema 配置

在注册了 schema 后,客户端应用程序无法使用 Apicurio Registry。

Apicurio Registry web 控制台

安装 Apicurio Registry 时,您可以从 ui 端点连接到 web 控制台:

http://MY-REGISTRY-URL/ui

在控制台中,您可以添加、查看和配置架构。您还可以创建防止将无效内容添加到 registry 中的规则。

curl 命令示例
 curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
   -H "X-Registry-ArtifactId: share-price" \ 
1

   --data '{
     "type":"record",
     "name":"price",
     "namespace":"com.example",
     "fields":[{"name":"symbol","type":"string"},
     {"name":"price","type":"string"}]}'
   https://my-cluster-my-registry-my-project.example.com/apis/registry/v2/groups/my-group/artifacts -s 
2
Copy to Clipboard Toggle word wrap
1
简单 Avro 模式构件.
2
公开 Apicurio Registry 的 OpenShift 路由名称。
Maven 插件示例
<plugin>
  <groupId>io.apicurio</groupId>
  <artifactId>apicurio-registry-maven-plugin</artifactId>
  <version>${apicurio.version}</version>
  <executions>
      <execution>
        <phase>generate-sources</phase>
        <goals>
            <goal>register</goal>  
1

        </goals>
        <configuration>
            <registryUrl>http://REGISTRY-URL/apis/registry/v2</registryUrl> 
2

            <artifacts>
                <artifact>
                    <groupId>TestGroup</groupId> 
3

                    <artifactId>FullNameRecord</artifactId>
                    <file>${project.basedir}/src/main/resources/schemas/record.avsc</file>
                    <ifExists>FAIL</ifExists>
                </artifact>
                <artifact>
                    <groupId>TestGroup</groupId>
                    <artifactId>ExampleAPI</artifactId> 
4

                    <type>GRAPHQL</type>
                    <file>${project.basedir}/src/main/resources/apis/example.graphql</file>
                    <ifExists>RETURN_OR_UPDATE</ifExists>
                    <canonicalize>true</canonicalize>
                </artifact>
            </artifacts>
        </configuration>
    </execution>
  </executions>
 </plugin>
Copy to Clipboard Toggle word wrap
1
指定 register 作为执行目标,以便将 schema 构件上传到 registry。
2
使用 ../apis/registry/v2 端点指定 Apicurio Registry URL。
3
指定 Apicurio Registry 工件组 ID。
4
您可以使用指定的组 ID、工件 ID 和位置上传多个工件。
使用制作者客户端示例进行配置
String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
    "https://my-cluster-service-registry-myproject.example.com/apis/registry/v2"); 
1

try (RegistryService service = RegistryClient.create(registryUrl_node1)) {
    String artifactId = ApplicationImpl.INPUT_TOPIC + "-value";
    try {
        service.getArtifactMetaData(artifactId); 
2

    } catch (WebApplicationException e) {
        CompletionStage <ArtifactMetaData> csa = service.createArtifact(
            ArtifactType.AVRO,
            artifactId,
            new ByteArrayInputStream(LogInput.SCHEMA$.toString().getBytes())
        );
        csa.toCompletableFuture().get();
    }
}
Copy to Clipboard Toggle word wrap
1
您可以为多个 URL 节点注册属性。
2
检查以确认该架构是否已根据工件 ID 是否存在。

7.4. 使用 Kafka 使用者客户端的 schema

此流程描述了如何配置使用 Java 编写的 Kafka 使用者客户端,以使用 Apicurio Registry 中的 schema。

前提条件

  • 已安装 Apicurio Registry
  • 模式使用 Apicurio Registry 注册

流程

  1. 使用 Apicurio Registry 的 URL 配置客户端。例如:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    Properties props = new Properties();
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, registryUrl);
    Copy to Clipboard Toggle word wrap
  2. 使用 Apicurio Registry deserializer 配置客户端。例如:

    // Configure Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // Configure deserializer settings
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        AvroKafkaDeserializer.class.getName()); 
    1
    
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        AvroKafkaDeserializer.class.getName()); 
    2
    Copy to Clipboard Toggle word wrap
    1
    由 Apicurio Registry 提供的 deserializer。
    2
    反序列化采用 Apache Avro JSON 格式。

7.5. 使用 Kafka producer 客户端的 schema

此流程描述了如何配置使用 Java 编写的 Kafka producer 客户端,以使用 Apicurio Registry 中的 schema。

前提条件

  • 已安装 Apicurio Registry
  • 模式使用 Apicurio Registry 注册

流程

  1. 使用 Apicurio Registry 的 URL 配置客户端。例如:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    Properties props = new Properties();
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, registryUrl);
    Copy to Clipboard Toggle word wrap
  2. 使用 serializer 和策略配置客户端,以在 Apicurio Registry 中查找 schema。例如:

    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 
    1
    
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 
    2
    
    props.put(SerdeConfig.FIND_LATEST_ARTIFACT, Boolean.TRUE); 
    3
    Copy to Clipboard Toggle word wrap
    1
    Apicurio Registry 提供的消息键的 serializer。
    2
    Apicurio Registry 提供的消息值的 serializer。
    3
    查找策略,用于查找 schema 的全局 ID。

7.6. 使用 Kafka Streams 应用程序的 schema

这个步骤描述了如何配置在 Java 中编写的 Kafka Streams 客户端,以使用 Apicurio Registry 中的 Apache Avro 模式。

前提条件

  • 已安装 Apicurio Registry
  • 模式使用 Apicurio Registry 注册

流程

  1. 使用 Apicurio Registry URL 创建并配置 Java 客户端:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    
    RegistryService client = RegistryClient.cached(registryUrl);
    Copy to Clipboard Toggle word wrap
  2. 配置 serializer 和 deserializer:

    Serializer<LogInput> serializer = new AvroKafkaSerializer<LogInput>(); 
    1
    
    
    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <LogInput>(); 
    2
    
    
    Serde<LogInput> logSerde = Serdes.serdeFrom(
        serializer,
        deserializer
    );
    
    Map<String, Object> config = new HashMap<>();
    config.put(SerdeConfig.REGISTRY_URL, registryUrl);
    config.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true);
    logSerde.configure(config, false); 
    3
    Copy to Clipboard Toggle word wrap
    1
    Apicurio Registry 提供的 Avro serializer。
    2
    Apicurio Registry 提供的 Avro deserializer。
    3
    配置 Apicurio Registry URL 和 Avro 读取器以 Avro 格式进行反序列化。
  3. 创建 Kafka Streams 客户端:

    KStream<String, LogInput> input = builder.stream(
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );
    Copy to Clipboard Toggle word wrap

本章介绍了如何在制作者和消费者 Java 客户端应用程序中配置 Kafka SerDes 的详细信息:

您可以使用本节所示的示例常量在客户端应用程序中直接配置特定的客户端 serializer/deserializer (SerDe)服务和 schema 查找策略。另外,您可以在文件或实例中配置对应的 Apicurio Registry 应用程序属性。

以下小节显示了常用的 SerDe constants 和 配置选项示例。

配置 SerDe 服务
public class SerdeConfig {

   public static final String REGISTRY_URL = "apicurio.registry.url"; 
1

   public static final String ID_HANDLER = "apicurio.registry.id-handler"; 
2

   public static final String ENABLE_CONFLUENT_ID_HANDLER = "apicurio.registry.as-confluent"; 
3
Copy to Clipboard Toggle word wrap
1
Apicurio Registry 所需的 URL。
2
扩展 ID 处理以支持其他 ID 格式,并使其与 Apicurio Registry SerDe 服务兼容。例如,将默认 ID 格式从 Long 更改为 Integer 支持 Confluent ID 格式。
3
简化 Confluent ID 的处理。如果设置为 true,则 Integer 用于全局 ID 查找。该设置不应与 ID_HANDLER 选项搭配使用。
配置 SerDe 查找策略
public class SerdeConfig {

   public static final String ARTIFACT_RESOLVER_STRATEGY = "apicurio.registry.artifact-resolver-strategy"; 
1

   public static final String SCHEMA_RESOLVER = "apicurio.registry.schema-resolver"; 
2

...
Copy to Clipboard Toggle word wrap
1
实现工件解析器策略并在 Kafka SerDe 和工件 ID 之间映射的 Java 类。默认为主题 ID 策略。这仅由 serializer 类使用。
2
实施架构解析器的 Java 类。默认为 DefaultSchemaResolver。这由 serializer 和 deserializer 类使用。
配置 Kafka 转换器
public class SerdeBasedConverter<S, T> extends SchemaResolverConfigurer<S, T> implements Converter, Closeable {

   public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 
1

   public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; 
2
Copy to Clipboard Toggle word wrap
1
与 Apicurio Registry Kafka 转换器一起使用所需的 serializer。
2
与 Apicurio Registry Kafka 转换器一起使用所需的 deserializer。
配置不同架构类型

有关如何为不同的模式技术配置 SerDe 的详情,请查看以下操作:

8.2. Apicurio Registry serializer/deserializer 配置属性

本节提供有关 Apicurio Registry Kafka 序列化器/划分器(SerDes)的 Java 配置属性的引用信息。

SchemaResolver 接口

Apicurio Registry SerDes 基于 SchemaResolver 接口,后者提取对 registry 的访问,并为所有支持的格式的 SerDes 类应用相同的查找逻辑。

Expand
表 8.1. SchemaResolver 接口的配置属性
常数属性描述类型默认

SCHEMA_RESOLVER

apicurio.registry.schema-resolver

供 serializers 和 deserializers 使用。实施 SchemaResolver 的完全限定 Java 类名称。

字符串

io.apicurio.registry.serde.DefaultSchemaResolver

注意

建议使用 DefaultSchemaResolver,并为大多数用例提供有用的功能。对于某些高级用例,您可以使用 SchemaResolver 的自定义实现。

DefaultSchemaResolver 类

您可以使用 DefaultSchemaResolver 配置以下功能:

  • 访问 registry API
  • 如何查找 registry 中的工件
  • 如何向 Kafka 写入和读取工件信息
  • deserializers 的回退选项
配置 registry API 访问选项

DefaultSchemaResolver 提供以下属性来配置对核心 registry API 的访问:

Expand
表 8.2. 访问 registry API 的配置属性
常数属性描述类型默认

REGISTRY_URL

apicurio.registry.url

供 serializers 和 deserializers 使用。用于访问 registry API 的 URL。

字符串

AUTH_SERVICE_URL

apicurio.auth.service.url

供 serializers 和 deserializers 使用。身份验证服务的 URL。在使用 OAuth 客户端凭证流访问安全 registry 时是必需的。

字符串

AUTH_REALM

apicurio.auth.realm

供 serializers 和 deserializers 使用。用于访问身份验证服务的 realm。在使用 OAuth 客户端凭证流访问安全 registry 时是必需的。

字符串

AUTH_CLIENT_ID

apicurio.auth.client.id

供 serializers 和 deserializers 使用。用于访问身份验证服务的客户端 ID。在使用 OAuth 客户端凭证流访问安全 registry 时是必需的。

字符串

AUTH_CLIENT_SECRET

apicurio.auth.client.secret

供 serializers 和 deserializers 使用。用于访问身份验证服务的客户端机密。在使用 OAuth 客户端凭证流访问安全 registry 时是必需的。

字符串

AUTH_USERNAME

apicurio.auth.username

供 serializers 和 deserializers 使用。访问 registry 的用户名。使用 HTTP 基本身份验证访问安全 registry 时是必需的。

字符串

AUTH_PASSWORD

apicurio.auth.password

供 serializers 和 deserializers 使用。用于访问 registry 的密码。使用 HTTP 基本身份验证访问安全 registry 时是必需的。

字符串

配置 registry 查找选项

DefaultSchemaResolver 使用以下属性来配置如何查找 Apicurio Registry 中的工件。

Expand
表 8.3. registry 工件查询的配置属性
常数属性描述类型默认

ARTIFACT_RESOLVER_STRATEGY

apicurio.registry.artifact-resolver-strategy

仅供序列化商使用。实现 ArtifactResolverStrategy 的完全限定 Java 类名称并将每个 Kafka 消息映射到 ArtifactReference (groupIdartifactId 和 version)。例如,默认策略使用主题名称作为 schema artifactId

字符串

io.apicurio.registry.serde.strategy.TopicIdStrategy

EXPLICIT_ARTIFACT_GROUP_ID

apicurio.registry.artifact.group-id

仅供序列化商使用。设置用于查询或创建工件的 groupId。覆盖 ArtifactResolverStrategy 返回的 groupId

字符串

EXPLICIT_ARTIFACT_ID

apicurio.registry.artifact.artifact-id

仅供序列化商使用。设置用于查询或创建工件的 artifactId。覆盖 ArtifactResolverStrategy 返回的 artifactId

字符串

EXPLICIT_ARTIFACT_VERSION

apicurio.registry.artifact.version

仅供序列化商使用。设置用于查询或创建工件的工件版本。覆盖 ArtifactResolverStrategy 返回的版本。

字符串

FIND_LATEST_ARTIFACT

apicurio.registry.find-latest

仅供序列化商使用。指定 serializer 是否在 registry 中查找对应组 ID 和工件 ID 的最新工件。

布尔值

false

AUTO_REGISTER_ARTIFACT

apicurio.registry.auto-register

仅供序列化商使用。指定 serializer 试图在 registry 中创建工件。JSON Schema serializer 不支持此功能。

布尔值

false

AUTO_REGISTER_ARTIFACT_IF_EXISTS

apicurio.registry.auto-register.if-exists

仅供序列化商使用。在存在冲突创建工件时,配置客户端的行为,因为工件已存在。可用值包括 FAILUPDATERETURNRETURN_OR_UPDATE

字符串

RETURN_OR_UPDATE

CHECK_PERIOD_MS

apicurio.registry.check-period-ms

供 serializers 和 deserializers 使用。指定在 auto-eviction 前缓存工件的时长。如果没有设置,则每次都会获取工件。

字符串

USE_ID

apicurio.registry.use-id

供 serializers 和 deserializers 使用。配置,以使用指定的 IdOption 作为工件的标识符。选项为 globalIdcontentId。指示 serializer 将指定的 ID 写入 Kafka,并指示 deserializer 使用这个 ID 来查找 schema。

字符串

globalId

配置在 Kafka 中读/写 registry 工件

DefaultSchemaResolver 使用以下属性来配置工件信息是如何写入和从 Kafka 读取的。

Expand
表 8.4. Kafka 中读/写工件信息的配置属性
常数属性描述类型默认

ENABLE_HEADERS

apicurio.registry.headers.enabled

供 serializers 和 deserializers 使用。配置 以读取/写入工件标识符到 Kafka 消息标头,而不是在消息有效负载中。

布尔值

true

HEADERS_HANDLER

apicurio.registry.headers.handler

供 serializers 和 deserializers 使用。实现 HeadersHandler 的完全限定 Java 类名称,并将工件标识符写入/读取至 Kafka 消息标头。

字符串

io.apicurio.registry.serde.headers.DefaultHeadersHandler

ID_HANDLER

apicurio.registry.id-handler

供 serializers 和 deserializers 使用。实施 IdHandler 的类的完全限定 Java 类名称,并将工件标识符写入/读取消息有效负载。只有在 apicurio.registry.headers.enabled 被设置为 false 时才使用。

字符串

io.apicurio.registry.serde.DefaultIdHandler

ENABLE_CONFLUENT_ID_HANDLER

apicurio.registry.as-confluent

供 serializers 和 deserializers 使用。启用与 IdHandler 传统兼容实施的快捷方式。只有在 apicurio.registry.headers.enabled 被设置为 false 时才使用。

布尔值

true

配置 deserializer fall-back 选项

DefaultSchemaResolver 使用以下属性为所有 deserializers 配置回退提供程序。

Expand
表 8.5. deserializer fall-back provider 的配置属性
常数属性描述类型默认

FALLBACK_ARTIFACT_PROVIDER

apicurio.registry.fallback.provider

仅限由 deserializers 使用。设置 FallbackArtifactProvider 的自定义实现,用于解析用于反序列化的工件。FallbackArtifactProvider 配置一个回退工件,以便在查找失败时从 registry 获取。

字符串

io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider

DefaultFallbackArtifactProvider 使用以下属性来配置 deserializer fall-back 选项:

Expand
表 8.6. deserializer fall-back 选项的配置属性
常数属性描述类型默认

FALLBACK_ARTIFACT_ID

apicurio.registry.fallback.artifact-id

仅供反序列化商使用。设置用于解析用于反序列化的工件的 artifactId

字符串

FALLBACK_ARTIFACT_GROUP_ID

apicurio.registry.fallback.group-id

仅供反序列化商使用。设置用于解析用于反序列化的组的 groupId

字符串

FALLBACK_ARTIFACT_VERSION

apicurio.registry.fallback.version

仅供反序列化商使用。设置用于解析用于反序列化的工件的版本。

字符串

其他资源

  • 如需了解更多详细信息,请参阅 SerdeConfig Java 类
  • 您可以将应用程序属性配置为 Java 系统属性,或者在 Quarkus application.properties 文件中包含它们。如需了解更多详细信息,请参阅 Quarkus 文档

在 Kafka 客户端应用程序中使用 schema 时,您必须根据自己的用例选择要使用的特定模式类型。Apicurio Registry 为 Apache Avro、JSON Schema 和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何配置 Kafka 应用程序以使用每种类型。

您还可以使用 Kafka 实现自定义 serializer 和 deserializer 类,并使用 Apicurio Registry REST Java 客户端使用 Apicurio Registry 功能。

用于 serializers/deserializers 的 Kafka 应用程序配置

使用 Kafka 应用程序中的 Apicurio Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器,以及如何在 Kafka 使用者应用程序中配置反序列化器。

Kafka producer 中的 serializer 配置示例

// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use Apicurio Registry-provided Kafka serializer for Avro
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // Register the schema artifact if not found in the registry.
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}
Copy to Clipboard Toggle word wrap

Kafka consumer 中的 deserializer 配置示例

// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use Apicurio Registry-provided Kafka deserializer for Avro
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // No other configuration needed because the schema globalId the deserializer uses is sent
    // in the payload. The deserializer extracts the globalId and uses it to look up the schema
    // from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}
Copy to Clipboard Toggle word wrap

8.3.1. 使用 Apicurio Registry 配置 Avro SerDe

Apicurio Registry 为 Apache Avro 提供以下 Kafka 客户端序列化器和反序列化类:

  • io.apicurio.registry.serde.avro.AvroKafkaSerializer
  • io.apicurio.registry.serde.avro.AvroKafkaDeserializer

配置 Avro serializer

您可以使用以下方法配置 Avro serializer 类:

  • Apicurio Registry URL
  • 工件解析器策略
  • ID 位置
  • ID 编码
  • Avro datum 供应商
  • Avro 编码

ID 位置

序列化器将模式的唯一 ID 作为 Kafka 消息的一部分传递,以便消费者使用正确的模式进行反序列化。ID 可以位于消息有效负载中,也可以在邮件标头中使用。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:

props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
Copy to Clipboard Toggle word wrap

属性名称是 apicurio.registry.headers.enabled

ID 编码

您可以在 Kafka 消息正文中传递时自定义如何编码 schema ID。将 apicurio.registry.id-handler 配置属性设置为实现 io.apicurio.registry.serde.IdHandler 接口的类。Apicurio Registry 提供以下实现:

  • io.apicurio.registry.serde.DefaultIdHandler: 将 ID 存储为 8 字节长
  • io.apicurio.registry.serde.Legacy4ByteIdHandler: 将 ID 存储为 4 字节整数

Apicurio Registry 代表一个长的模式 ID,但出于传统原因,或者出于与其他 registry 或 SerDe 类的兼容性,您可能需要在发送 ID 时使用 4 字节。

Avro datum 供应商

Avro 提供不同的 datum 写入器和读取器来写入和读取数据。Apicurio Registry 支持三种不同的类型:

  • generic
  • 具体
  • 反射

Apicurio Registry AvroDatumProvider 是使用哪个类型的抽象,默认为使用 DefaultAvroDatumProvider

您可以设置以下配置选项:

  • apicurio.registry.avro-datum-provider :指定 AvroDatumProvider 实施的完全限定域名,如 io.apicurio.registry.serde.avro.ReflectAvroDatumProvider
  • apicurio.registry.use-specific-avro-reader: Set to true 以使用 DefaultAvroDatumProvider特定类型

Avro 编码

使用 Avro 对数据进行序列化时,您可以使用 Avro 二进制编码格式确保以尽可能高效的方式编码数据。Avro 还支持将数据编码为 JSON,这有助于检查每条消息的载荷,例如用于日志记录或调试。

您可以通过配置 apicurio.registry.avro.encoding 属性来设置 Avro 编码,值为 JSONBINARY。默认值为 BINARY

配置 Avro deserializer

您必须配置 Avro deserializer 类,以匹配 serializer 的以下配置设置:

  • Apicurio Registry URL
  • ID 编码
  • Avro datum 供应商
  • Avro 编码

有关这些配置选项,请参阅 serializer 部分。属性名称和值相同。

注意

配置 deserializer 时不需要以下选项:

  • 工件解析器策略
  • ID 位置

deserializer 类可以决定消息中这些选项的值。不需要该策略,因为 serializer 负责将 ID 作为消息的一部分发送。

ID 位置通过检查消息有效负载开头的 magic 字节来确定。如果找到了字节,则 ID 会使用配置的处理程序从消息有效负载中读取。如果没有找到 magic byte,则会从邮件标头中读取该 ID。

8.3.2. 使用 Apicurio Registry 配置 JSON Schema SerDe

Apicurio Registry 为 JSON Schema 提供以下 Kafka 客户端序列化r 和 deserializer 类:

  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer

与 Apache Avro 不同,JSON Schema 不是序列化技术,而是验证技术。因此,JSON Schema 的配置选项并不相同。例如,没有编码选项,因为数据始终被编码为 JSON。

配置 JSON Schema serializer

您可以配置 JSON Schema serializer 类,如下所示:

  • Apicurio Registry URL
  • 工件解析器策略
  • 模式验证

唯一非标准配置属性是 JSON Schema 验证(默认为启用)。您可以通过将 apicurio.registry.serde.validation-enabled 设置为 "false" 来禁用此功能。例如:

props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)
Copy to Clipboard Toggle word wrap

配置 JSON Schema deserializer

您可以配置 JSON Schema deserializer 类,如下所示:

  • Apicurio Registry URL
  • 模式验证
  • 用于对数据进行反序列化的类

您必须提供 Apicurio Registry 的位置,以便可以加载 schema。其他配置是可选的。

注意

只有在 serializer 传递 Kafka 消息中的全局 ID 时,仅当 serializer 中启用了验证时才会发生,反序列化验证才可以正常工作。

8.3.3. 使用 Apicurio Registry 配置 Protobuf SerDe

Apicurio Registry 为 Google Protobuf 提供以下 Kafka 客户端序列化r 和 deserializer 类:

  • io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer
  • io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer

配置 Protobuf serializer

您可以按照如下所示配置 Protobuf serializer 类:

  • Apicurio Registry URL
  • 工件解析器策略
  • ID 位置
  • ID 编码
  • 模式验证

有关这些配置选项的详情,请查看以下部分:

配置 Protobuf deserializer

您必须配置 Protobuf deserializer 类,以匹配 serializer 中的以下配置设置:

  • Apicurio Registry URL
  • ID 编码

配置属性名称和值与 serializer 相同。

注意

配置 deserializer 时不需要以下选项:

  • 工件解析器策略
  • ID 位置

deserializer 类可以决定消息中这些选项的值。不需要该策略,因为 serializer 负责将 ID 作为消息的一部分发送。

ID 位置通过检查消息有效负载开头的 magic 字节来确定。如果找到了字节,则 ID 会使用配置的处理程序从消息有效负载中读取。如果没有找到 magic byte,则会从邮件标头中读取该 ID。

注意

Protobuf deserializer 不会对您的确切的 Protobuf 消息实施进行反序列化,而是指向 DynamicMessage 实例。否则,没有适当的 API。

第 9 章 Apicurio Registry artifact 引用

本章详细介绍了存储在 Apicurio Registry 中支持的工件类型、状态、元数据和内容规则。

9.1. Apicurio Registry 工件类型

您可以在 Apicurio Registry 中存储和管理以下工件类型:

Expand
表 9.1. Apicurio Registry 工件类型
类型描述

ASYNCAPI

AsyncAPI 规格

AVRO

Apache Avro 模式

GRAPHQL

GraphQL 模式

JSON

JSON Schema

KCONNECT

Apache Kafka Connect schema

OPENAPI

OpenAPI 规格

PROTOBUF

Google 协议缓冲模式

WSDL

Web 服务定义语言

XSD

XML Schema 定义

9.2. Apicurio Registry 工件状态

这些是 Apicurio Registry 中的有效构件状态:

Expand
表 9.2. Apicurio Registry 工件状态
状态描述

ENABLED

基本状态,所有操作都可用。

DISABLED

工件及其元数据可以通过 Apicurio Registry Web 控制台查看和搜索,但其内容不能被任何客户端获取。

弃用

工件可以完全可用,但会在获取工件内容时向 REST API 响应中添加标头。Apicurio Registry Rest Client 会在它看到已弃用的内容时记录警告。

9.3. Apicurio Registry 工件元数据

当工件添加到 Apicurio Registry 时,一组元数据属性与工件内容一起存储。这个元数据由一组生成的只读属性组成,以及您可以设置的一些属性。

Expand
表 9.3. Apicurio Registry 元数据属性
属性类型可编辑

id

字符串

false

type

ArtifactType

false

state

ArtifactState

true

version

整数

false

createdBy

字符串

false

createdOn

date

false

modifiedBy

字符串

false

modifiedOn

date

false

name

字符串

true

description

字符串

true

labels

字符串数组

true

属性

map

true

更新工件元数据

  • 您可以使用 Apicurio Registry REST API 来更新使用元数据端点的可编辑属性集合。
  • 您只能通过使用状态转换 API 编辑 state 属性。例如,您可以将工件标记为 已弃用 或禁用

其他资源

如需了解更多详细信息,请参阅 Apicurio Registry REST API 文档中的 /artifacts/{artifactId}/meta 部分。

9.4. Apicurio Registry 内容规则类型

您可以指定以下规则类型,在 Apicurio Registry 中管理内容演变:

Expand
表 9.4. Apicurio Registry 内容规则类型
类型描述

有效期

在将其添加到 registry 之前验证数据。这个规则的可能配置值有:

  • FULL :验证都是语法和语义。
  • SYNTAX_ONLY :验证是仅语法。

兼容性

确保新添加的工件与之前添加的版本兼容。这个规则的可能配置值有:

  • FULL :新工件与最近添加的工件向前和后向兼容。
  • FULL_TRANSITIVE :新的工件是向前的,向后兼容所有之前添加的工件。
  • BACKWARD :使用新工件的客户端可以使用最新添加的构件读取写入的数据。
  • BACKWARD_TRANSITIVE :使用新工件的客户端可以使用所有之前添加的工件读取写入的数据。
  • FORWARD :使用最新添加的工件的客户端可以使用新的工件来读取写入的数据。
  • FORWARD_TRANSITIVE :使用之前添加的所有工件的客户端可以使用新的工件读取写入的数据。
  • NONE :所有向后兼容检查都被禁用。

9.5. Apicurio Registry 内容规则成熟度

不是所有内容规则都完全实施,适用于 Apicurio Registry 支持的每个工件类型。下表显示了每个规则和工件类型的当前成熟度。

Expand
表 9.5. Apicurio Registry 内容规则成熟度列表
工件类型有效期规则兼容性规则

Avro

full

full

protobuf

full

JSON Schema

full

full

OpenAPI

full

AsyncAPI

仅限语法

GraphQL

仅限语法

Kafka Connect

仅限语法

WSDL

仅限语法

XSD

仅限语法

附录 A. 使用您的订阅

Apicurio Registry 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。

访问您的帐户

  1. 转至 access.redhat.com
  2. 如果您还没有帐户,请创建一个帐户。
  3. 登录到您的帐户。

激活订阅

  1. 转至 access.redhat.com
  2. 导航到 My Subscriptions
  3. 导航到 激活订阅 并输入您的 16 位激活号。

下载 ZIP 和 TAR 文件

要访问 ZIP 或 TAR 文件,请使用客户门户网站查找下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。

  1. 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads
  2. 找到 Integration and Automation 类别中的 Red Hat Integration 条目。
  3. 选择所需的 Apicurio Registry 产品。此时会打开 Software Downloads 页面。
  4. 单击组件的 Download 链接。

为系统注册软件包

要在 Red Hat Enterprise Linux 上安装 RPM 软件包,必须注册您的系统。如果您使用 ZIP 或 TAR 文件,则不需要这一步。

  1. 转至 access.redhat.com
  2. 进入 Registration Assistant
  3. 选择您的操作系统版本,再继续到下一页。
  4. 使用您的系统终端中列出的命令完成注册。

如需了解更多相关信息,请参阅 如何注册到红帽客户门户网站(Red Hat Customer Portal )

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部