在 OpenShift 上安装 Debezium
在 OpenShift Container Platform 中使用 Debezium 1.9.5
摘要
前言 复制链接链接已复制到粘贴板!
使开源包含更多
红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息。
第 1 章 Debezium 概述 复制链接链接已复制到粘贴板!
Red Hat Integration 的 Debezium 是捕获数据库操作的分布式平台,为行级操作创建数据更改事件记录,并将事件记录发送到 Apache Kafka 主题。Debezium 基于 Apache Kafka 构建,并被部署并与 AMQ Streams 集成。
Debezium 将行级更改捕获到数据库表,并将对应的更改事件传递给 AMQ Streams。应用程序可以读取 这些更改事件流,并按发生更改事件的顺序访问更改事件。
Debezium 是 Debezium for Red Hat Integration 的上游社区项目。
Debezium 具有多个用途,包括:
- 数据复制
- 更新缓存和搜索索引
- 简化单体式应用程序
- 数据集成
- 启用流查询
Debezium 为以下常见数据库提供 Apache Kafka 连接连接器:
第 2 章 安装 Debezium 连接器 复制链接链接已复制到粘贴板!
通过使用连接器插件扩展 Kafka Connect,通过 AMQ Streams 安装 Debezium 连接器。在部署了 AMQ Streams 后,您可以通过 Kafka Connect 将 Debezium 部署为连接器配置。
2.1. Kafka 主题创建建议 复制链接链接已复制到粘贴板!
Debezium 在多个 Apache Kafka 主题中存储数据。这些主题必须由管理员提前创建,或者您可以将 Kafka Connect 配置为自动配置主题。
以下列表描述了创建主题时需要考虑的限制和建议:
- 用于 Debezium Db2、MySQL、Oracle 和 SQL Server 连接器的数据库历史记录主题
对于前面的每个连接器,都需要一个数据库历史记录主题。无论您手工创建数据库历史记录主题,使用 Kafka 代理自动创建主题,或使用 Kafka Connect 创建主题,以确保该主题配置了以下设置:
- 无限或非常长的保留。
- 在生产环境中至少三个复制因素。
- 单个分区。
- 其他主题
当您启用 Kafka 日志压缩 时,只保存给定记录 的最后 更改事件,在 Apache Kafka 中设置以下主题属性:
-
min.compaction.lag.ms 为确保主题消费者有足够的时间来接收所有事件和删除标记,请为前面的属性指定大于您预期用于接收器连接器的最大停机时间的值。例如,在对接收器连接器应用更新时可能会出现停机的情况。
-
- 在生产环境中复制。
单个分区。
您可以放松单一分区规则,但您的应用程序必须为数据库的不同行处理内存不足的事件。单个行的事件仍然被总排序。如果您使用多个分区,则 Kafka 通过对密钥进行哈希来确定分区。其他分区策略需要使用单一消息转换(SMT)来设置每个记录的分区号。
2.2. AMQ Streams 上的 Debezium 部署 复制链接链接已复制到粘贴板!
要在 Red Hat OpenShift Container Platform 上为 Debezium 设置连接器,请使用 AMQ Streams 构建 Kafka Connect 容器镜像,其中包含您要使用的连接器插件。连接器开始后,它会连接到已配置的数据库,并为插入、更新和删除行或文档生成更改事件记录。
从 Debezium 1.7 开始,部署 Debezium 连接器的首选方法是使用 AMQ Streams 构建包括连接器插件的 Kafka Connect 容器镜像。
在部署过程中,您要创建并使用以下自定义资源(CR):
-
一个
KafkaConnectCR,用于定义 Kafka Connect 实例,其中包含有关在镜像中包括连接器工件的信息。 -
KafkaConnectorCR,提供包括连接器用于访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,您可通过应用KafkaConnectorCR 来启动连接器。
在 Kafka Connect 镜像的构建规格中,您可以指定可供部署的连接器。对于每个连接器插件,您还可以指定您要用于部署的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。当 AMQ Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。
KafkaConnect CR 中的 spec.build.output 参数指定存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在 Docker 注册表中,也可以存储在 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。
如果您使用 KafkaConnect 资源来创建集群,之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 检索信息。
其他资源
- 在 OpenShift 上使用 AMQ Streams 中配置 Kafka 连接。
- 使用 OpenShift 上部署和升级 AMQ Streams 中的 AMQ Streams 自动创建新的容器镜像。
2.2.1. 使用 AMQ Streams 部署 Debezium 复制链接链接已复制到粘贴板!
您可以执行相同的步骤来部署每种类型的 Debezium 连接器。下面的部分论述了如何部署 Debezium MySQL 连接器。
使用早期的 AMQ Streams 版本,要在 OpenShift 上部署 Debezium 连接器,您需要首先为连接器构建 Kafka Connect 镜像。在 OpenShift 中部署连接器的当前首选方法是在 AMQ Streams 中使用构建配置,以自动构建包括您要使用的 Debezium 连接器插件的 Kafka Connect 容器镜像。
在构建过程中,AMQ Streams Operator 将 KafkaConnect 自定义资源(包括 Debezium connector 定义)中的输入参数转换为 Kafka Connect 容器镜像。构建会从红帽 Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。
新创建的容器被推送到 .spec.build.output 中指定的容器 registry 中,用于部署 Kafka Connect 集群。在 AMQ Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector 自定义资源来启动构建中包含的连接器。
先决条件
- 您可以访问安装集群 Operator 的 OpenShift 集群。
- AMQ Streams Operator 正在运行。
- 部署了 Apache Kafka 集群,如在 OpenShift 中部署和升级 AMQ Streams。
- Kafka Connect 在 AMQ Streams 上部署
- 您有 Red Hat Integration 许可证。
-
已安装 OpenShift
ocCLI 客户端,或有权访问 OpenShift Container Platform Web 控制台。 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限,或者您必须创建 ImageStream 资源:
- 将镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
- 在 registry 中创建和管理镜像的帐户和权限。
- 将构建镜像存储为原生 OpenShift ImageStream
- ImageStream 资源部署到集群中。您必须为集群明确创建 ImageStream。默认情况下,镜像流不可用。
流程
- 登录 OpenShift 集群。
为连接器创建 Debezium
KafkaConnect自定义资源(CR),或修改现有资源。例如,创建一个KafkaConnectCR,用于指定metadata.annotations和spec.build属性,如下例所示。使用名称(如dbz-connect.yaml)保存文件。例 2.1. 一个
dbz-connect.yaml文件,它定义了一个KafkaConnect自定义资源,其中包含一个 Debezium 连接器在以下示例中,自定义资源配置为下载以下工件:
- Debezium 连接器存档。
- Service Registry 归档。Service Registry 是一个可选组件。只有在打算使用带有连接器的 Avro 序列化时,才会添加 Service Registry 组件。
- Debezium 脚本 SMT 归档以及您要与 Debezium 连接器一起使用的相关脚本引擎。SMT 归档和脚本语言依赖项是可选的组件。只有在打算使用 基于 Debezium 内容的路由 SMT 或 过滤 SMT 时,才添加这些组件。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表 2.1. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources注解设置为"true",使 Cluster Operator 使用KafkaConnector资源在此 Kafka Connect 集群中配置连接器。2
spec.build配置指定存储构建镜像的位置,并列出镜像中包含的插件以及插件工件的位置。3
build.output指定在其中存储新构建的镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type的有效值为docker,可推送到容器 registry,如 Docker Hub 或 Quay,或将镜像推送到内部 OpenShift ImageStream 的镜像流。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定build.output的更多信息,请参阅 AMQ Streams Build schema 参考文档。5
插件配置列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,请指定插件名称,以及构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括您要与连接器搭配使用的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。6
artifacts.type的值指定artifacts.url中指定的构件的文件名。有效类型为zip、tgz或jar。Debezium 连接器存档以.zip文件格式提供。type值必须与url字段中引用的文件类型匹配。7
artifacts.url的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Debezium connector 工件可在 Red Hat Maven 软件仓库中找到。OpenShift 集群必须有权访问指定的服务器。8
(可选)指定用于下载 Service Registry 组件的工件
类型和url。只有在您希望连接器使用 Apache Avro 将事件键和值与 Service Registry 序列化,而不使用默认的 JSON 转换程序时,才包含 Service Registry 工件。9
(可选)指定 Debezium 脚本 SMT 归档的工件
类型和url,用于 Debezium 连接器。只有在打算使用 基于 Debezium 内容的路由 SMT 或 过滤 SMT 时使用脚本 SMT,还必须部署 JSR 223 兼容脚本实施,如 groovy。10
(可选)指定 JSR 223 兼容脚本实施的 JAR 文件的工件
类型和url,这是 Debezium 脚本 SMT 所需的。重要如果您使用 AMQ Streams 将连接器插件合并到 Kafka Connect 镜像中,对于每个所需脚本语言组件
工件。url必须指定 JAR 文件的位置,并且artifacts.type的值还必须设置为jar。无效的值会导致连接器在运行时失败。要启用在脚本 SMT 中使用 Apache Groovy 语言,示例中的自定义资源会检索以下库的 JAR 文件:
-
groovy -
Groovy-jsr223(描述符代理) -
Groovy-json(用于解析 JSON 字符串的模块)
另外,Debezium 脚本 SMT 也支持使用 JSR 223 实现 GraalVM JavaScript。
输入以下命令将
KafkaConnect构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 根据自定义资源中指定的配置,Streams Operator 会准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 会将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。创建一个
KafkaConnector资源,以定义您要部署的每个连接器的实例。
例如,创建以下KafkaConnectorCR,并将它保存为mysql-inventory-connector.yaml例 2.2.
MySQL-inventory-connector.yaml文件,为 Debezium 连接器定义KafkaConnector自定义资源Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表 2.2. 连接器配置设置描述 项 描述 1
要注册到 Kafka Connect 集群的连接器名称。
2
连接器类的名称。
3
可以同时操作的任务数量。
4
连接器的配置。
5
主机数据库实例的地址。
6
数据库实例的端口号。
7
Debezium 连接到数据库的用户帐户的名称。
8
数据库用户帐户的密码。
9
捕获更改的数据库的名称。
10
数据库实例或集群的逻辑名称。
指定的名称只能从字母数字字符或下划线组成。
由于逻辑名称用作从此连接器接收更改事件的任何 Kafka 主题的前缀,所以该名称在集群连接器中必须是唯一的。
如果您将连接器与 Avro 连接器集成,命名空间也用于相关的 Kafka 连接模式的名称,以及对应的 Avro 模式的命名空间。11
连接器捕获更改事件的表列表。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
oc create -n debezium -f {context}-inventory-connector.yamloc create -n debezium -f {context}-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 连接器被注册到 Kafka Connect 集群,然后开始针对由
KafkaConnectorCR 中的spec.config.database.dbname指定的数据库运行。在连接器 Pod 就绪后,Dbezium 正在运行。
您现在已准备好 验证 Debezium 部署。
2.2.2. 验证 Debezium 连接器是否正在运行 复制链接链接已复制到粘贴板!
如果连接器正确启动且没有错误,它会为每个连接器配置为捕获的表创建一个主题。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。
要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:
- 验证连接器状态。
- 验证连接器是否生成主题。
- 验证主题是否填充了用于读取操作的事件("op":"r")中连接器在各个表的初始快照期间生成的事件。
先决条件
- 在 OpenShift 中将一个 Debezium 连接器部署到 AMQ Streams 中。
-
已安装 OpenShift
ocCLI 客户端。 - 访问 OpenShift Container Platform web 控制台。
流程
使用以下方法之一检查
KafkaConnector资源的状态:通过 OpenShift Container Platform Web 控制台:
- 导航到 Home → Search。
-
在 Search 页面中,点 Resources 打开 Select Resource 框,然后键入
KafkaConnector。 - 在 KafkaConnectors 列表中,点您要检查的连接器名称,如 inventory-connector-mysql。
- 在 Conditions 部分,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
在终端窗口中:
使用以下命令:
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
oc describe KafkaConnector inventory-connector-mysql -n debezium
oc describe KafkaConnector inventory-connector-mysql -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow 该命令返回类似以下示例的状态信息:
例 2.3.
KafkaConnector资源状态Copy to Clipboard Copied! Toggle word wrap Toggle overflow
验证连接器是否创建了 Kafka 主题:
从 OpenShift Container Platform Web 控制台。
- 导航到 Home → Search。
-
在 Search 页面中,点 Resources 打开 Select Resource 框,然后键入
KafkaTopic。 - 在 KafkaTopics 列表中,点您要检查的主题名称,例如 inventory-connector-mysql.inventory.orders--ac5e98ac6a5d91e91e04d8ec0dc78a1ece439081d。
- 在 Conditions 部分,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
在终端窗口中:
使用以下命令:
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow 该命令返回类似以下示例的状态信息:
例 2.4.
KafkaTopic资源状态Copy to Clipboard Copied! Toggle word wrap Toggle overflow
检查主题内容。
- 在终端窗口中输入以下命令:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector_mysql.inventory.products_on_hand
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector_mysql.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow 指定主题名称的格式与步骤 1 中介绍的
oc describe命令相同,例如inventory_connector_mysql.inventory.addresses。对于主题中的每个事件,命令会返回类似如下的信息:
例 2.5. Debezium 更改事件的内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.9.5.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.9.5.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在前面的示例中,
有效负载值显示,连接器快照从表清单中生成读(."op" ="r")事件。product_on_handproduct_id记录的"前面"状态为null,表示记录不存在之前的值。"after"状态显示带有product_id101的项目的数量3。
您可以使用多个 Kafka Connect 服务集群和多个 Kafka 集群运行 Debezium。您可以部署到 Kafka Connect 集群的连接器数量取决于数据库事件的卷和速率。
后续步骤
有关部署特定连接器的更多信息,请参阅 Debezium 用户指南中的以下主题:
附录 A. 使用您的订阅 复制链接链接已复制到粘贴板!
Debezium 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。
访问您的帐户
- 转至 access.redhat.com。
- 如果您还没有帐户,请创建一个帐户。
- 登录到您的帐户。
激活订阅
- 转至 access.redhat.com。
- 导航到 My Subscriptions。
- 导航到 激活订阅 并输入您的 16 位激活号。
下载 zip 和 tar 文件
要访问 zip 或 tar 文件,请使用客户门户网站查找要下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。
- 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads。
- 向下滚动到 INTEGRATION and AUTOMATION。
- 点 Red Hat Integration 显示 Red Hat Integration 下载页面。
- 单击组件的 Download 链接。
修订 2022-11-12 21:39:39 +1000