Debezium 入门
用于 Debezium 1.9.5
摘要
前言 复制链接链接已复制到粘贴板!
本教程演示如何使用 Debezium 在 MySQL 数据库中捕获更新。随着数据库中的数据发生变化,您可以看到生成的事件流。
教程包括以下步骤:
- 使用简单示例数据库将 MySQL 数据库服务器部署到 OpenShift。
- 在 AMQ Streams 中应用自定义资源,以自动构建包括 Debezium MySQL Connector 插件的 Kafka Connect 容器镜像。
- 创建 Debezium MySQL 连接器资源,以捕获数据库中的更改。
- 验证连接器部署。
- 查看连接器从数据库中发送到 Kafka 主题的更改事件。
先决条件
- 熟悉 OpenShift 和 AMQ Streams。
- 您可以访问安装集群 Operator 的 OpenShift 集群。
- AMQ Streams Operator 正在运行。
- 部署了 Apache Kafka 集群,如在 OpenShift 中部署和升级 AMQ Streams。
- 您有 Red Hat Integration 许可证。
-
您将了解如何使用 OpenShift 管理工具。已安装 OpenShift
ocCLI 客户端,或有权访问 OpenShift Container Platform Web 控制台。 根据您要存储 Kafka Connect 构建镜像的方式,您必须有权限来访问容器 registry,或者您必须在 OpenShift 中创建 ImageStream 资源:
- 将镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
- 在 registry 中创建和管理镜像的帐户和权限。
- 将构建镜像存储为原生 OpenShift ImageStream
- ImageStream 资源部署到集群中。您必须为集群明确创建 ImageStream。默认情况下,镜像流不可用。
使开源包含更多
红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息。
第 1 章 Debezium 简介 复制链接链接已复制到粘贴板!
Debezium 是一个分布式平台,将来自您现有数据库的信息转换为事件流,支持应用程序检测,并可立即响应数据库中的行级更改。
Debezium 基于 Apache Kafka 构建,提供一组与 Kafka 连接 兼容的连接器。每个连接器都用于特定的数据库管理系统(DBMS)。连接器记录 DBMS 中数据更改历史记录,在发生时检测更改,并将每个更改事件的记录流传输到 Kafka 主题。然后,消耗的应用程序可以从 Kafka 主题读取生成的事件记录。
通过利用 Kafka 的可靠流平台,Debezium 使应用程序能够正确且完全使用数据库中出现的更改。即使应用程序意外停止或者丢失了其连接,也不会错过中断期间发生的事件。应用程序重启后,它会恢复从其离开的地方的主题读取。
下面的教程显示了如何通过简单的配置部署和使用 Debezium MySQL 连接器。有关部署和使用 Debezium 连接器的详情,请查看连接器文档。
第 2 章 启动服务 复制链接链接已复制到粘贴板!
使用 Debezium 需要带有 Kafka 和 Kafka Connect、数据库和 Debezium 连接器服务的 AMQ Streams。要运行本教程的服务,您必须:
2.1. 部署 MySQL 数据库 复制链接链接已复制到粘贴板!
部署包含示例 inventory 数据库的 MySQL 数据库服务器,其中包含了数据预先填充的多个表。Debezium MySQL 连接器将捕获样本表中发生的更改,并将更改事件记录传输到 Apache Kafka 主题。
流程
运行以下命令启动 MySQL 数据库,该命令启动配置了
示例清单数据库的 MySQL 数据库服务器:oc new-app --name=mysql quay.io/debezium/example-mysql:latest
$ oc new-app --name=mysql quay.io/debezium/example-mysql:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow 运行以下命令,为 MySQL 数据库配置凭证,该命令更新 MySQL 数据库的部署配置以添加用户名和密码:
oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
$ oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpwCopy to Clipboard Copied! Toggle word wrap Toggle overflow 通过调用以下命令来验证 MySQL 数据库是否正在运行,该命令随后显示 MySQL 数据库是否正在运行,并且 pod 已就绪:
oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23s
$ oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23sCopy to Clipboard Copied! Toggle word wrap Toggle overflow 打开一个新终端,再登录示例
inventory数据库。此命令在运行 MySQL 数据库的 pod 中打开 MySQL 命令行客户端。客户端使用您之前配置的用户名和密码:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 列出
inventory数据库中的表:Copy to Clipboard Copied! Toggle word wrap Toggle overflow 浏览数据库并查看它包含的数据,例如查看
客户表:Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.2. 部署 Kafka 连接 复制链接链接已复制到粘贴板!
部署 MySQL 数据库后,使用 AMQ Streams 构建包括 Debezium MySQL Connector 插件的 Kafka Connect 容器镜像。在部署过程中,您要创建并使用以下自定义资源(CR):
-
一个
KafkaConnectCR,用于定义 Kafka Connect 实例,并包含镜像中要包含的 MySQL 连接器工件的信息。 -
KafkaConnectorCR 提供了包括 MySQL 连接器用于访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,您可通过应用KafkaConnectorCR 来启动连接器。
在构建过程中,AMQ Streams Operator 会将 KafkaConnect 自定义资源(包括 Debezium connector 定义)中的输入参数转换为 Kafka Connect 容器镜像。构建会从红帽 Maven 存储库下载所需的工件,并将它们合并到镜像中。新创建的容器被推送到 .spec.build.output 中指定的容器 registry 中,用于部署 Kafka Connect pod。在 AMQ Streams 构建 Kafka Connect 镜像后,使用 KafkaConnector 自定义资源启动连接器。
流程
-
登录 OpenShift 集群并创建或创建项目,例如
debezium。 为连接器创建 Debezium
KafkaConnect自定义资源(CR),或修改现有资源。例如,创建一个KafkaConnectCR,用于指定metadata.annotations和spec.build属性,如下例所示。使用名称(如dbz-connect.yaml)保存文件。例 2.1. 一个
dbz-connect.yaml文件,它定义了一个KafkaConnect自定义资源,其中包含一个 Debezium 连接器Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表 2.1. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources注解设置为"true",使 Cluster Operator 使用KafkaConnector资源在此 Kafka Connect 集群中配置连接器。2
spec.build配置指定存储构建镜像的位置,并列出镜像中包含的插件以及插件工件的位置。3
build.output指定在其中存储新构建的镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type的有效值为docker,可推送到容器 registry,如 Docker Hub 或 Quay,或将镜像推送到内部 OpenShift ImageStream 的镜像流。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定build.output的更多信息,请参阅 AMQ Streams Build schema 参考文档。5
插件配置列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,请指定插件名称,以及构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括您要与连接器搭配使用的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。6
artifacts.type的值指定artifacts.url中指定的构件的文件名。有效类型为zip、tgz或jar。Debezium 连接器存档以.zip文件格式提供。JDBC 驱动程序文件采用.jar格式。type值必须与url字段中引用的文件类型匹配。7
artifacts.url的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。OpenShift 集群必须有权访问指定的服务器。输入以下命令将
KafkaConnect构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 根据自定义资源中指定的配置,AMQ Streams Operator 会准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 会将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。创建一个
KafkaConnector资源来定义 MySQL 连接器实例。
例如,创建以下KafkaConnectorCR,并将其保存为debezium-inventory-connector.yaml例 2.2.
mysql-inventory-connector.yaml文件,为 Debezium 连接器定义KafkaConnector自定义资源Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表 2.2. 连接器配置设置描述 项 描述 1
要注册到 Kafka Connect 集群的连接器名称。
2
连接器类的名称。
3
只有一个任务应该在任何一个时间运行。使用单一连接器任务来确保订单和事件处理,因为 MySQL 连接器可读取 MySQL 服务器的
binlog。Kafka Connect 服务使用连接器启动一个或多个任务来完成工作。它会在 Kafka Connect 服务的集群中自动分发正在运行的任务。如果服务停止或崩溃,则会将任务重新分发到运行的服务。4
连接器的配置。
5
数据库主机,这是运行 MySQL 服务器(mysql)的容器的名称。
6
数据库实例的端口号。
7
Debezium 连接到数据库的用户帐户的名称。
8
数据库用户帐户的密码。
9
捕获更改的数据库的名称。
10
数据库实例或集群的逻辑名称。服务器名称是 MySQL 服务器或服务器集群的逻辑标识符。此名称用作所有 Kafka 主题的前缀。
11
连接器捕获更改事件的表列表。连接器只会检测 inventory 数据库中的更改。
12
指定 Kafka 代理和主题,连接器用来存储数据库模式的历史记录(您要将事件发送到的同一代理)。重启后,当连接器恢复读取时,连接器会恢复在 binlog 存在的数据库模式。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
oc create -n debezium -f mysql-inventory-connector.yaml
oc create -n debezium -f mysql-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 连接器被注册到 Kafka Connect 集群,然后开始针对由
KafkaConnectorCR 中的spec.config.database.dbname指定的数据库运行。在连接器 Pod 就绪后,Dbezium 正在运行。
您现在已准备好 验证连接器是否已创建 并已开始捕获 inventory 数据库中的更改。
2.3. 验证连接器部署 复制链接链接已复制到粘贴板!
如果连接器正确启动且没有错误,它会为每个连接器配置为捕获的表创建一个主题。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。
要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:
- 验证连接器状态。
- 验证连接器是否生成主题。
- 验证主题是否填充了用于读取操作的事件("op":"r")中连接器在各个表的初始快照期间生成的事件。
先决条件
- 在 OpenShift 中将一个 Debezium 连接器部署到 AMQ Streams 中。
-
已安装 OpenShift
ocCLI 客户端。 - 访问 OpenShift Container Platform web 控制台。
流程
使用以下方法之一检查
KafkaConnector资源的状态:通过 OpenShift Container Platform Web 控制台:
- 导航到 Home → Search。
-
在 Search 页面中,点 Resources 打开 Select Resource 框,然后键入
KafkaConnector。 - 在 KafkaConnectors 列表中,点您要检查的连接器名称,如 inventory-connector。
- 在 Conditions 部分,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
在终端窗口中:
使用以下命令:
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
oc describe KafkaConnector inventory-connector -n debezium
oc describe KafkaConnector inventory-connector -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow 该命令返回类似以下示例的状态信息:
例 2.3.
KafkaConnector资源状态Copy to Clipboard Copied! Toggle word wrap Toggle overflow
验证连接器是否创建了 Kafka 主题:
从 OpenShift Container Platform Web 控制台。
- 导航到 Home → Search。
-
在 Search 页面中,点 Resources 打开 Select Resource 框,然后键入
KafkaTopic。 - 在 KafkaTopics 列表中,点您要检查的主题名称,例如 inventory-connector.inventory.orders--ac5e98ac6a5d91e04d8ec0dc78a1e439081d。
- 在 Conditions 部分,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
在终端窗口中:
使用以下命令:
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow 该命令返回类似以下示例的状态信息:
例 2.4.
KafkaTopic资源状态Copy to Clipboard Copied! Toggle word wrap Toggle overflow
检查主题内容。
- 在终端窗口中输入以下命令:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector.inventory.products_on_hand
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow 指定主题名称的格式与步骤 1 中介绍的
oc describe命令相同,如inventory_connector.inventory.addresses。对于主题中的每个事件,命令会返回类似如下的信息:
例 2.5. Debezium 更改事件的内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.9.5.Final-redhat-00001","connector":"mysql","name":"inventory_connector","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.9.5.Final-redhat-00001","connector":"mysql","name":"inventory_connector","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在前面的示例中,
有效负载值显示,连接器快照从表清单中生成读(."op" ="r")事件。product_on_handproduct_id记录的"前面"状态为null,表示记录不存在之前的值。"after"状态显示带有product_id101的项目的数量3。
第 3 章 查看更改事件 复制链接链接已复制到粘贴板!
部署 Debezium MySQL 连接器后,它会开始捕获对 inventory 数据库的更改。
当连接器启动时,它会将事件写入一组 Apache Kafka 主题,每个主题代表 MySQL 数据库中一个表。每个主题的名称以数据库服务器的名称开头,dbserver1。
连接器写入以下 Kafka 主题:
dbserver1- 架构更改主题,其中 DDL 语句适用于正在捕获更改的表。
dbserver1.inventory.products-
接收
inventory数据库中 product表的更改事件记录。 dbserver1.inventory.products_on_hand-
接收
inventory数据库中 product_on_hand表的更改事件记录。 dbserver1.inventory.customers-
接收
inventory数据库中客户表的更改事件记录。 dbserver1.inventory.orders-
接收
清单数据库中订单表的变更事件记录。
本教程的其余部分检查 dbserver1.inventory.customers Kafka 主题。当您仔细查看主题时,您会看到它如何代表不同类型的更改事件,并查找有关每个事件捕获的连接器的信息。
教程包含以下部分:
3.1. 查看 创建事件 复制链接链接已复制到粘贴板!
查看 dbserver1.inventory.customers 主题,您可以看到 MySQL 连接器如何在 inventory 数据库中捕获创建事件。在这种情况下,创建 的事件捕获正添加到数据库中的新客户。
流程
打开一个新终端,再使用
kafka-console-consumer从主题开始使用 dbserver1.inventory.customers 主题。此命令在运行 Kafka(
my-cluster-kafka-0)的 Pod 中运行一个简单的使用者(kafka-console-consumer.sh):oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customersCopy to Clipboard Copied! Toggle word wrap Toggle overflow 使用者返回四个消息(采用 JSON 格式),每个消息对应
客户表中的每一行。每个消息都包含相应表行的事件记录。每个事件都有两个 JSON 文档: 一个键 和 一个值。键与行的主键对应,值显示行的详细信息(行包含的字段、每个字段的值以及行上执行的操作类型)。
对于最后一个事件,请查看 键 的详细信息。
以下是最后一次事件 的密钥 的详细信息(便于阅读):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 该事件有两个部分:
schema和payload。该模式包含一个 Kafka 连接模式,描述有效负载中的内容。在本例中,有效负载是名为dbserver1.inventory.customers.Key的构造,它是可选的,并且有一个必填字段(类型为int32)。有效负载具有单个id字段,值设为1004。通过查看事件 的密钥,您可以看到此事件适用于
inventory.customers表(其id主键列的值为1004的行)。检查同一事件 的值 的详细信息。
事件 的值 显示创建了行,并描述它包含的内容(在本例中,
id、first_name、last_name以及插入行的电子邮件)。以下是最后一次事件 的值 的详细信息(便于阅读):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 这个部分事件比,但与事件 的密钥 一样,它也具有
schema和有效负载。该架构包含名为dbserver1.inventory.customers.Envelope(版本 1)的 Kafka Connect 模式,它包含五个字段:op-
包含描述操作类型的字符串值的必需字段。MySQL 连接器的值为
c,用于创建(或插入)、u表示 update、d表示删除,以及r以便读取(在快照中)。 before-
可选字段(如果存在)包含事件 发生前 行的状态。其结构将由
dbserver1.Kafka Connect 模式进行描述,该模式在 inventory.customers 表中使用inventory.customers.Valuedbserver1连接器。 之后-
可选字段(如果存在)包含事件发生 后 行的状态。结构由
前面使用的相同dbserver1.inventory.customers.ValueKafka Connect 模式描述。 source-
一个必需字段,其中包含描述事件的源元数据的结构,该元素是 MySQL 的源元数据,包含多个字段:连接器名称、记录事件的
binlog文件的名称、该binlog文件中的位置,即事件内的行(如果有多个字段),受影响的数据库和表的名称, 进行更改的 MySQL 线程 ID,无论此事件是快照的一部分,如果可用,则 MySQL 服务器 ID 以及以秒为单位的时间戳。 ts_ms- 一个可选字段(如果存在)包含时间(在运行 Kafka Connect 任务的 JVM 中)以及连接器处理事件的 JVM 中。
注意事件的 JSON 表示要比它们描述的行要长。这是因为,对于每个事件键和值,Kafka Connect 提供了描述 有效负载的 schema。随着时间的推移,这个结构可能会改变。但是,在事件本身中使用键和值的 schema 使得使用应用程序更容易理解消息,特别是随着时间推移而变化。
Debezium MySQL 连接器根据数据库表的结构构建这些模式。如果您使用 DDL 语句更改 MySQL 数据库中的表定义,连接器会读取这些 DDL 声明并更新其 Kafka Connect 模式。这是构建每个事件的唯一方法,与在事件发生时所源自的表完全相同。但是,包含单一表的所有事件的 Kafka 主题可能会具有与表定义的每个状态对应的事件。
JSON converter 在每个消息中包含键和值模式,因此它会生成非常详细的事件。
将事件 的键和值 模式与
inventory数据库的状态进行比较。在运行 MySQL 命令行客户端的终端中,运行以下语句:Copy to Clipboard Copied! Toggle word wrap Toggle overflow 这表明您检查的事件记录与数据库中的记录相匹配。
3.2. 更新数据库并查看 更新 事件 复制链接链接已复制到粘贴板!
现在您已经了解了 Debezium MySQL 连接器如何在 inventory 数据库中捕获创建事件,现在您将更改其中一个记录,并查看连接器如何捕获它。
通过完成此步骤,您将了解如何查找数据库提交中更改的详细信息,以及如何比较更改事件,以确定与其他更改相关的更改。
流程
在运行 MySQL 命令行客户端的终端中,运行以下语句:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0Copy to Clipboard Copied! Toggle word wrap Toggle overflow 查看更新的
客户表:Copy to Clipboard Copied! Toggle word wrap Toggle overflow 切换到运行
kafka-console-consumer的终端,以查看第五个事件。通过在
客户表中更改记录,Debezium MySQL 连接器会生成新的事件。您应该会看到两个新的 JSON 文档:一个用于事件 的密钥,另一个用于新事件 的值。以下是 更新 事件 的密钥 详情(便于阅读):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 这个密钥 与上一个事件 的密钥 相同。
这里是新事件 的值。
架构部分没有更改,因此仅显示有效负载部分(为可读性格式化):Copy to Clipboard Copied! Toggle word wrap Toggle overflow 通过查看
有效负载部分,您可以了解与 更新 事件相关的几个重要内容:-
通过比较
before和after结构,您可以确定因为提交而实际在受影响的行中更改的内容。 -
通过检查
源结构,您可以查找有关更改的 MySQL 记录的信息(提供可追溯性)。 -
通过将事件的
payload部分与同一主题(或不同的主题)中的其他事件进行比较,您可以确定事件在之前、之后还是作为与另一个事件相同的 MySQL 提交的一部分。
-
通过比较
3.3. 删除数据库中的记录并查看 删除 事件 复制链接链接已复制到粘贴板!
现在您已经了解了 Debezium MySQL 连接器如何在 inventory 数据库中捕获 创建或更新 事件,现在您将删除其中一个记录并查看连接器如何捕获它。
通过完成这个步骤,您将了解如何查找有关 删除 事件的详情,以及 Kafka 如何使用 日志压缩 来减少 删除事件的数量,同时仍然启用使用者来获取所有事件。
流程
在运行 MySQL 命令行客户端的终端中,运行以下语句:
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注意如果上述命令失败并显示外键约束,则必须使用以下声明从地址表中删除客户 地址 的引用:
mysql> DELETE FROM addresses WHERE customer_id=1004;
mysql> DELETE FROM addresses WHERE customer_id=1004;Copy to Clipboard Copied! Toggle word wrap Toggle overflow 切换到运行
kafka-console-consumer的终端,以查看 两个新 事件。通过删除
客户表中的一行,Debezium MySQL 连接器会生成两个新事件。检查第一个新事件的键和值。
以下是第一个新事件 的密钥 详情(便于阅读):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 此密钥 与您在前两个事件中的键相同。
以下是第一个新事件 的值 (格式化以便便于阅读):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 因此,此事件提供了一个消费者,其中包含需要处理行删除所需的信息。另外还会提供旧的值,因为有些用户可能需要它们来正确处理删除。
检查第二个新事件的键和值。
以下是第二个新事件 的密钥 (格式用于可读性):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 再次说明,这个密钥 与您在前面的三个事件中完全相同。
以下是同一事件 的值 (格式化以便便于阅读):
{ "schema": null, "payload": null }{ "schema": null, "payload": null }Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果将 Kafka 设置为 日志压缩,则如果以后在同一键中至少有一个信息,它将从主题中删除旧的信息。这个最后一个事件称为 tombstone 事件,因为它有一个键和一个空值。这意味着 Kafka 将删除所有具有相同键的之前的信息。尽管删除了之前的消息,但 tombstone 事件表示消费者仍然可以从开始读取该主题,而不会丢失任何事件。
3.4. 重启 Kafka Connect 服务 复制链接链接已复制到粘贴板!
您已看到 Debezium MySQL 连接器如何捕获创建、更新和删除事件,现在它也可以在没有运行时捕获更改事件。
Kafka Connect 服务自动管理其已注册连接器的任务。因此,如果它重启时,它将启动任何非运行的任务。这意味着,即使 Debezium 没有运行,它仍然可以报告数据库中的更改。
在此过程中,您将停止 Kafka Connect,更改数据库中的一些数据,然后重新启动 Kafka Connect 以查看更改事件。
流程
停止 Kafka 连接服务。
打开 Kafka Connect 服务的部署配置:
oc edit dc/my-connect-cluster-connect
$ oc edit dc/my-connect-cluster-connectCopy to Clipboard Copied! Toggle word wrap Toggle overflow 部署配置会打开:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
将
spec.replicas值更改为 0。 - 保存部署配置。
验证 Kafka Connect 服务是否已停止。
这个命令显示 Kafka Connect 服务已完成,且没有 pod 正在运行:
oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-1-dxcs9 0/1 Completed 0 7h
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-1-dxcs9 0/1 Completed 0 7hCopy to Clipboard Copied! Toggle word wrap Toggle overflow
当 Kafka Connect 服务停机时,切换到运行 MySQL 客户端的终端,并在数据库中添加新记录。
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");Copy to Clipboard Copied! Toggle word wrap Toggle overflow 重启 Kafka 连接服务。
打开 Kafka Connect 服务的部署配置。
oc edit dc/my-connect-cluster-connect
$ oc edit dc/my-connect-cluster-connectCopy to Clipboard Copied! Toggle word wrap Toggle overflow 部署配置会打开:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
将
spec.replicas值更改为1。 - 保存部署配置。
验证 Kafka Connect 服务是否已重启。
这个命令显示 Kafka Connect 服务正在运行,且 pod 已就绪:
oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-2-q9kkl 1/1 Running 0 74s
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-2-q9kkl 1/1 Running 0 74sCopy to Clipboard Copied! Toggle word wrap Toggle overflow
-
切换到正在运行
kafka-console-consumer.sh的终端。新事件在到达时弹出。 检查您在 Kafka 连接离线时所创建的记录(便于阅读):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
第 4 章 后续步骤 复制链接链接已复制到粘贴板!
完成教程后,请考虑以下步骤:
进一步浏览教程。
使用 MySQL 命令行客户端添加、修改和删除数据库表中的行,并查看对主题的影响。请记住,您无法删除外部密钥引用的一行。
规划 Debezium 部署。
您可以在 OpenShift 或 Red Hat Enterprise Linux 中安装 Debezium。如需更多信息,请参阅以下:
修订 2022-11-12 21:38:54 +1000