在 RHEL 上安装 Debezium
用于 Red Hat Enterprise Linux (RHEL)上的 Red Hat Integration 2.1.4
摘要
前言 复制链接链接已复制到粘贴板!
使开源包含更多
红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息。
第 1 章 Debezium 概述 复制链接链接已复制到粘贴板!
Red Hat Integration 的 Debezium 是一个分布式平台,它捕获数据库操作,为行级操作创建数据更改事件记录,并将事件记录更改为 Apache Kafka 主题。Debezium 基于 Apache Kafka 构建,并部署并与 AMQ Streams 集成。
Debezium 捕获对数据库表的行级更改,并将对应的更改事件传递给 AMQ Streams。应用程序可以读取 这些更改事件流,并按发生更改事件的顺序访问更改事件。
Debezium 是 Debezium for Red Hat Integration 的上游社区项目。
Debezium 具有多个用途,包括:
- 数据复制
- 更新缓存和搜索索引
- 简化单体式应用程序
- 数据集成
- 启用流查询
Debezium 为以下通用数据库提供 Apache Kafka Connect 连接器:
第 2 章 在 RHEL 上安装 Debezium 连接器 复制链接链接已复制到粘贴板!
通过 AMQ Streams 安装 Debezium 连接器,方法是使用连接器插件扩展 Kafka 连接。部署 AMQ Streams 后,您可以通过 Kafka Connect 将 Debezium 部署为连接器配置。
2.1. Kafka 主题创建建议 复制链接链接已复制到粘贴板!
Debezium 将数据存储到多个 Apache Kafka 主题中。主题必须提前由管理员创建,或者您可以配置 Kafka Connect 以 自动配置主题。
以下列表描述了创建主题时要考虑的限制和建议:
- Debezium Db2、MySQL、Oracle 和 SQL Server 连接器的数据库架构历史记录主题
对于前面的每个连接器,都需要一个数据库架构历史记录主题。无论您手动创建数据库架构历史记录主题,请使用 Kafka 代理自动创建主题,或使用 Kafka Connect 创建主题,请确保使用以下设置配置该主题:
- 无限或非常长的保留。
- 在生产环境中至少有三个复制因素。
- 单个分区。
- 其他主题
当您启用 Kafka 日志压缩时,以便只保存给定记录 的最后 更改事件,请在 Apache Kafka 中设置以下主题属性:
-
min.compaction.lag.ms
为确保主题使用者有足够的时间接收所有事件并删除标记,请指定之前属性的值大于您预期的接收器连接器的最大停机时间。例如,考虑将更新应用到接收器连接器时可能会出现的停机时间。
-
- 在生产环境中复制。
单个分区。
您可以放松单个分区规则,但您的应用程序必须为数据库中的不同行处理超出顺序的事件。一行的事件仍然被完全排序。如果您使用多个分区,则默认行为是 Kafka 通过哈希密钥来确定分区。其他分区策略需要使用单个消息转换(SMT)来为每个记录设置分区号。
2.2. 规划 Debezium 连接器配置 复制链接链接已复制到粘贴板!
在部署 Debezium 连接器前,请确定如何配置连接器。配置提供指定连接器行为的信息,并允许 Debezium 连接到源数据库。
您可以将连接器配置指定为 JSON,并准备好注册连接器时,您可以使用 curl
将配置提交到 Kafka Connect API 端点。
先决条件
- 部署了源数据库,Debezium 连接器可以访问数据库。
您知道以下信息,连接器需要访问源数据库:
- 数据库主机的名称或 IP 地址。
- 用于连接到数据库的端口号。
- 连接器可用于登录到数据库的帐户名称。
- 数据库用户帐户的密码。
- 数据库的名称。
- 您希望连接器从中捕获信息的表名称。
- 希望连接器向其发送更改事件的 Kafka 代理名称。
- 您希望连接器向发送数据库历史记录信息的 Kafka 主题名称。
流程
指定您要以 JSON 格式应用到 Debezium 连接器的配置。
以下示例显示了 Debezium MySQL 连接器的简单配置:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- 使用 Kafka Connect 集群注册的连接器名称。
- 2
- 连接器类的名称。
- 3
- 可同时运行的任务数量。一次只能运行一个任务。
- 4
- 主机数据库实例的主机名或 IP 地址。
- 5
- 数据库实例的端口号。
- 6
- Debezium 连接到数据库的用户帐户名称。
- 7
- 数据库用户帐户的密码。
- 8
- 连接器的唯一数字 ID。
此属性仅用于 MySQL 连接器。 - 9
- 用作数据库服务器的逻辑标识符的字符串,或连接器从中捕获更改的服务器集群。指定命名空间的字符串。Debezium 将此名称添加到连接器写入的每个 Kafka 主题,以及 Kafka Connect 模式的名称,以及 Avro 模式的命名空间(在使用 Avro converter 时)。
- 10
- 连接器从中捕获更改事件的表列表。
- 11
- 连接器发送数据库架构历史记录的 Kafka 代理名称。指定代理还会接收连接器发出的更改事件。
- 12
- 存储 schema 历史记录的 Kafka 主题的名称。
连接器重启后,连接器会从停止的时间恢复读取数据库日志,并为它离线时发生的任何事务发出事件。在连接器为 Kafka 写入未读取事务的更改事件前,它会检查架构历史记录,然后应用在原始事务发生时生效的模式。
附加信息
- 有关您可以为每种连接器设置的配置属性的详情,请参考 Debezium 用户指南中的 连接器的部署文档。
这个步骤描述了如何在 Red Hat Enterprise Linux 中为 Debezium 设置连接器。连接器使用 Apache Kafka Connect 部署到 AMQ Streams 集群,这是在 Apache Kafka 和外部系统间流传输数据的框架。Kafka Connect 必须以分布式模式运行,而不是独立模式。
先决条件
用于您需要部署 Debezium 的主机环境运行 Red Hat Enterprise Linux、AMQ Streams 和 Java(以一个支持的配置)。
- 有关如何安装 AMQ Streams 的详情,请参阅安装 AMQ Streams。
- 有关如何安装包含单个 ZooKeeper 节点和单个 Kafka 节点的基本非生产环境 AMQ Streams 集群的详情,请参考 运行单一节点 AMQ Streams 集群。
如果您正在运行 AMQ Streams 的早期版本,您必须首先升级到 AMQ Streams 2.3。有关升级过程的详情,请参考 AMQ Streams 和 Kafka 升级。
流程
- 从 Red Hat Integration 下载要使用的 Debezium 连接器或连接器。例如,要将 Debezium 与 MySQL 数据库搭配使用,请下载 Debezium 2.1.4 MySQL Connector。
在您部署 AMQ Streams 的 Red Hat Enterprise Linux 主机上,打开终端窗口,并在
/opt/kafka
中创建一个connector-plugins
目录(如果尚不存在):sudo mkdir /opt/kafka/connector-plugins
$ sudo mkdir /opt/kafka/connector-plugins
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 输入以下命令提取您下载到
/opt/kafka/connector-plugins
目录中的 Debezium 连接器存档的内容。sudo unzip debezium-connector-mysql-2.1.4.Final.zip -d /opt/kafka/connector-plugins
$ sudo unzip debezium-connector-mysql-2.1.4.Final.zip -d /opt/kafka/connector-plugins
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 对要安装的每个连接器重复步骤 1 -3。
在一个终端窗口中,以
kafka
用户身份进行登录:su - kafka Password:
$ su - kafka $ Password:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果 Kafka Connect 进程正在运行,请停止它。
输入以下命令检查 Kafka Connect 是否在分布式模式下运行:
jcmd | grep ConnectDistributed
$ jcmd | grep ConnectDistributed
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果进程正在运行,命令会返回进程 ID,例如:
18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 输入带有进程 ID 的
kill
命令来停止进程,例如:kill 18514
$ kill 18514
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
编辑
/opt/kafka/config/
中的connect-distributed.properties
文件,并将plugin.path
的值设置为 Debezium 连接器插件的父目录的位置:plugin.path=/opt/kafka/connector-plugins
plugin.path=/opt/kafka/connector-plugins
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以分布式模式启动 Kafka 连接。
/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
$ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka Connect 运行后,使用 Kafka Connect API 注册连接器。
输入curl
命令提交POST
请求,该请求将您在 第 2.2 节 “规划 Debezium 连接器配置” 中指定的连接器配置 JSON 发送到localhost:8083/connectors
的 Kafka Connect REST API 端点。
例如:Copy to Clipboard Copied! Toggle word wrap Toggle overflow 要注册多个连接器,请为每个连接器提交单独的请求。
重启 Kafka Connect 来实现您的更改。
当 Kafka Connect 启动时,它会从
connector-plugins
目录中加载配置的 Debezium 连接器。完成配置后,部署的连接器连接到源数据库,并为每个插入、更新或删除的行或文档生成事件。
- 为每个 Kafka Connect worker 节点重复步骤 5-10。
后续步骤
验证部署。
其他资源
2.4. 验证部署 复制链接链接已复制到粘贴板!
连接器启动后,它会对配置的数据库执行快照,并为您指定的每个表创建主题。
先决条件
您已根据 第 2.3 节 “在 Red Hat Enterprise Linux 上使用 AMQ Streams 部署 Debezium” 中的说明在 Red Hat Enterprise Linux 上部署了一个连接器。
在主机上的终端窗口中输入以下命令从 Kafka Connect API 请求连接器列表:
curl -H "Accept:application/json" localhost:8083/connectors/
$ curl -H "Accept:application/json" localhost:8083/connectors/
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 查询返回部署的连接器的名称,例如:
["inventory-connector"]
["inventory-connector"]
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在主机的终端窗口中,输入以下命令查看连接器运行的任务:
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 该命令返回类似以下示例的输出:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 显示 Kafka 集群中的主题列表。
在终端窗口中进入/opt/kafka/bin/
并运行以下 shell 脚本:./kafka-topics.sh --bootstrap-server=localhost:9092 --list
./kafka-topics.sh --bootstrap-server=localhost:9092 --list
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka 代理返回连接器创建的主题列表。可用的主题取决于连接器的
snapshot.mode
、snapshot.include.collection.list
和table.include.list
配置属性的设置。默认情况下,连接器会为数据库中的每个非系统表创建一个主题。查看主题的内容。
在终端窗口中进入/opt/kafka/bin/
,并运行kafka-console-consumer.sh
shell 脚本来显示上一命令返回的其中一个主题的内容:
例如:
./kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
./kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 对于主题中的每个事件,命令会返回类似以下输出的信息:
例 2.1. 集成更改事件的内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在上例中,
有效负载
值显示连接器快照从表inventory.products_on_hand
中生成一个读取("op" ="r"
)事件。product_id
记录的"before"
状态为null
,这表示记录没有之前的值。"after"
状态对于product_id
为101
的项目的quantity
显示为3
。
后续步骤
有关每个连接器可用的配置设置的信息,以及如何配置源数据库以启用更改数据捕获,请参阅 Debezium 用户指南。
2.5. 在 Kafka Connect 集群中更新 Debezium 连接器插件 复制链接链接已复制到粘贴板!
要替换在 Red Hat Enterprise Linux 上部署的 Debezium 连接器版本,您需要更新连接器插件。
流程
- 从 Red Hat Integration 下载站点下载 您要替换的 Debezium 连接器插件的副本。
将 Debezium 连接器存档的内容提取到
/opt/kafka/connector-plugins
目录中。sudo unzip debezium-connector-mysql-2.1.4.Final.zip -d /opt/kafka/connector-plugins
$ sudo unzip debezium-connector-mysql-2.1.4.Final.zip -d /opt/kafka/connector-plugins
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 重启 Kafka Connect。
附录 A. 使用您的订阅 复制链接链接已复制到粘贴板!
集成通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。
访问您的帐户
- 转至 access.redhat.com。
- 如果您还没有帐户,请创建一个帐户。
- 登录到您的帐户。
激活订阅
- 转至 access.redhat.com。
- 导航到 My Subscriptions。
- 导航到 激活订阅 并输入您的 16 位激活号。
下载 zip 和 tar 文件
要访问 zip 或 tar 文件,请使用客户门户网站查找下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。
- 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads。
- 向下滚动到 INTEGRATION AND AUTOMATION。
- 点 Red Hat Integration 以显示 Red Hat Integration 下载页面。
- 单击组件的 Download 链接。
更新于 2023-04-21