在 RHEL 上安装 Debezium


Red Hat build of Debezium 3.0.8

在 Red Hat Enterprise Linux (RHEL)上与红帽构建的 Debezium 3.0.8 一起使用

Red Hat build of Debezium Documentation Team

摘要

本指南论述了如何在使用 AMQ Streams 的 RHEL 上安装红帽构建的 Debezium。

前言

对红帽文档提供反馈

我们感谢您对我们文档的反馈。

要提供反馈,请选择文档中的相关文本并添加注释。

先决条件

  • 已登陆到红帽客户门户网站。
  • 在红帽客户门户网站中,文档采用 多页 HTML 查看格式。

流程

要提供反馈,请执行以下步骤:

  1. 点文档右上角的反馈按钮查看现有的反馈。

    注意

    反馈功能仅在多页 HTML 格式中启用。

  2. 高亮标记您要提供反馈的文档中的部分。
  3. 点在高亮文本旁弹出的 Add Feedback

    文本框将在页面右侧的"反馈"部分中打开。

  4. 在文本框中输入您的反馈,然后点 Submit

    创建了一个与文档相关的问题。

  5. 要查看问题,请点反馈视图中的问题跟踪器链接。

第 1 章 Debezium 概述

红帽构建的 Debezium 是一个捕获数据库操作的分布式平台,为行级操作创建数据更改事件记录,并将事件记录流传输到 Apache Kafka 主题。Debezium 基于 Apache Kafka 构建,并集成了 Apache Kafka 的 Streams。

Debezium 捕获到数据库表的行级更改,并将对应的更改事件传递给 Apache Kafka 的 Streams。应用程序可以读取 这些更改事件流,并按发生更改事件的顺序访问更改事件。

Debezium 是红帽构建的 Debezium 的上游社区项目。

Debezium 具有多个用途,包括:

  • 数据复制
  • 更新缓存和搜索索引
  • 简化单体式应用程序
  • 数据集成
  • 启用流查询

Debezium 为以下通用数据库提供 Apache Kafka 连接连接器:

第 2 章 在 RHEL 上安装 Debezium 连接器

通过扩展使用连接器插件的 Kafka 连接,通过 Streams for Apache Kafka 安装 Debezium 连接器。部署 Apache Kafka 后,您可以通过 Kafka Connect 将 Debezium 部署为连接器配置。

2.1. Kafka 主题创建建议

Debezium 将数据存储在多个 Apache Kafka 主题中。主题必须由管理员提前创建,或者您可以将 Kafka Connect 配置为自动配置主题

以下列表描述了创建主题时需要考虑的限制和建议:

Debezium Db2、MySQL、Oracle 和 SQL Server 连接器的数据库架构历史记录主题

对于前面的每个连接器,都需要一个数据库 schema 历史记录主题。无论您手动创建数据库模式历史记录主题,使用 Kafka 代理自动创建主题,或使用 Kafka Connect 创建主题,请确保主题配置了以下设置:

  • 无限或非常长的保留。
  • 在生产环境中至少有三个复制因子。
  • 单个分区.
其他主题
  • 当您启用 Kafka 日志压缩,以便只保存给定记录 的最后 更改事件,在 Apache Kafka 中设置以下主题属性:

    • min.compaction.lag.ms
    • delete.retention.ms

      为确保主题用户有足够的时间接收所有事件和删除标记,请为之前的属性指定值,超过您预期用于接收器连接器的最大停机时间。例如,请考虑将更新应用到接收器连接器时可能会出现的停机时间。

  • 在生产环境中复制。
  • 单个分区.

    您可以放松单个分区规则,但您的应用程序必须处理数据库中不同行的边界事件。一行的事件仍然完全排序。如果您使用多个分区,则默认行为是 Kafka 通过对密钥进行哈希来确定分区。其他分区策略需要使用单一消息转换(SMT)来为每个记录设置分区号。

2.2. 规划 Debezium 连接器配置

在部署 Debezium 连接器前,请确定如何配置连接器。该配置提供了指定连接器行为的信息,并允许 Debezium 连接到源数据库。

您可以将连接器配置指定为 JSON,当准备好注册连接器时,您可以使用 curl 将配置提交到 Kafka Connect API 端点。

先决条件

  • 部署源数据库,Debezium 连接器可以访问数据库。
  • 您知道以下信息,连接器需要访问源数据库:

    • 数据库主机的名称或 IP 地址。
    • 用于连接到数据库的端口号。
    • 连接器可用于登录到数据库的帐户名称。
    • 数据库用户帐户的密码。
    • 数据库的名称。
    • 您希望连接器捕获信息的表名称。
    • 您希望连接器发出更改事件的 Kafka 代理的名称。
    • 您希望连接器将数据库历史记录信息发送到的 Kafka 主题的名称。

流程

  • 以 JSON 格式指定要应用到 Debezium 连接器的配置。

    以下示例显示了 Debezium MySQL 连接器的简单配置:

    {
      "name": "inventory-connector",  1
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 2
        "tasks.max": "1",  3
        "database.hostname": "mysql",  4
        "database.port": "3306", 5
        "database.user": "debezium", 6
        "database.password": "dbz", 7
        "database.server.id": "184054",  8
        "topic.prefix": "dbserver1",   9
        "table.include.list": "public.inventory",  10
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",  11
        "schema.history.internal.kafka.topic": "dbhistory.inventory"  12
      }
    }
    1
    要注册到 Kafka Connect 集群的连接器名称。
    2
    连接器类的名称。
    3
    可同时操作的任务数量。一次只能运行一个任务。
    4
    主机数据库实例的主机名或 IP 地址。
    5
    数据库实例的端口号。
    6
    Debezium 连接到数据库的用户帐户的名称。
    7
    数据库用户帐户的密码。
    8
    连接器的唯一数字 ID。
    此属性仅用于 MySQL 连接器。
    9
    用作数据库服务器的逻辑标识符或连接器捕获更改的服务器集群的逻辑标识符的字符串。指定的字符串指定一个命名空间。当使用 Avro converter 时,Debezium 会将这个名称添加到连接器写入的每个 Kafka 主题,以及 Kafka Connect 模式的名称,以及对应的 Avro 模式的命名空间。
    10
    连接器捕获更改事件的表列表。
    11
    连接器发送数据库 schema 历史记录的 Kafka 代理的名称。指定代理还会接收连接器发出的更改事件。
    12
    存储 schema 历史记录的 Kafka 主题的名称。
    连接器重启后,连接器会从它停止的点恢复数据库日志,为它离线时发生的任何事务发出事件。在连接器将未读取事务的更改事件写入 Kafka 之前,它会检查 schema 历史记录,然后应用在原始事务发生时生效的 schema。

附加信息

2.3. 在 Red Hat Enterprise Linux 中使用 Apache Kafka 部署 Debezium

此流程描述了如何在 Red Hat Enterprise Linux 中为 Debezium 设置连接器。连接器使用 Apache Kafka Connect 部署到 Apache Kafka 集群的流,它是一个在 Apache Kafka 和外部系统间流传输数据的框架。Kafka Connect 必须以分布式模式运行,而不是以独立模式运行。

先决条件

注意

如果您正在运行早期版本的 AMQ Streams,您必须首先升级到 Apache Kafka 2.9 的 Streams。有关升级过程的详情,请参考 Apache Kafka 和 Kafka 升级流

  • 主机上具有管理特权(sudo 访问权限)。
  • Apache ZooKeeper 和 Apache Kafka 代理正在运行。
  • Kafka Connect 以分布式模式运行,而不是以独立模式运行。
  • 您知道在安装 Streams for Apache Kafka 时创建的 kafka 用户的凭证。
  • 部署源数据库以及部署 Debezium 的主机可以访问数据库。
  • 您知道如何配置连接器

流程

  1. 软件下载 站点下载您要使用的 Debezium 连接器或连接器。例如,要将 Debezium 与 MySQL 数据库搭配使用,请下载 Debezium 3.0.8 MySQL Connector
  2. 在您为 Apache Kafka 部署 Streams 的 Red Hat Enterprise Linux 主机上,打开终端窗口并在 /opt/kafka 中创建 connector-plugins 目录(如果它尚不存在):

    $ sudo mkdir /opt/kafka/connector-plugins
  3. 输入以下命令提取您下载到 /opt/kafka/connector-plugins 目录中的 Debezium 连接器存档的内容。

    $ sudo unzip debezium-connector-mysql-3.0.8.Final.zip -d /opt/kafka/connector-plugins
  4. 对您要安装的每个连接器重复步骤 1 -3。
  5. 在一个终端窗口中,以 kafka 用户身份进行登录:

    $ su - kafka
    $ Password:
  6. 如果 Kafka Connect 进程正在运行,请停止。

    1. 输入以下命令检查 Kafka Connect 是否在分布式模式下运行:

      $ jcmd | grep ConnectDistributed

      如果进程正在运行,命令会返回进程 ID,例如:

      18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
    2. 使用进程 ID 输入 kill 命令来停止进程,例如:

      $ kill 18514
  7. 编辑 /opt/kafka/config/ 中的 connect-distributed.properties 文件,并将 plugin.path 的值设置为 Debezium 连接器插件的父目录的位置:

    plugin.path=/opt/kafka/connector-plugins
  8. 在分布式模式下启动 Kafka 连接。

    $ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
  9. 在 Kafka Connect 运行后,使用 Kafka Connect API 注册连接器。
    输入 curl 命令提交 POST 请求,将您在 第 2.2 节 “规划 Debezium 连接器配置” 中指定的连接器配置 JSON 发送到 localhost:8083/connectors 的 Kafka Connect REST API 端点。
    例如:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "inventory-connector", "config":
    {"connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "table.include.list": "public.inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "dbhistory.inventory" }
    }'

    要注册多个连接器,请为每个连接器提交单独的请求。

  10. 重启 Kafka Connect 来实现您的更改。

    当 Kafka Connect 启动时,它会从 connector-plugins 目录中加载配置的 Debezium 连接器。

    完成配置后,部署的连接器连接到源数据库,并为每个插入、更新或删除行或文档生成事件。

  11. 为每个 Kafka Connect worker 节点重复第 5-10 步。

后续步骤

验证部署

其他资源

2.4. 验证部署

连接器启动后,它会执行配置的数据库的快照,并为您指定的每个表创建主题。

先决条件

流程

  1. 在主机上的终端窗口中,输入以下命令从 Kafka Connect API 请求连接器列表:

    $ curl -H "Accept:application/json" localhost:8083/connectors/

    查询返回部署的连接器的名称,例如:

    ["inventory-connector"]
  2. 在主机上的终端窗口中输入以下命令来查看连接器运行的任务:

    $ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

    该命令返回类似以下示例的输出:

    HTTP/1.1 200 OK
    Date: Thu, 06 Feb 2020 22:12:03 GMT
    Content-Type: application/json
    Content-Length: 531
    Server: Jetty(9.4.20.v20190813)
    
    {
      "name": "inventory-connector",
      ...
      "tasks": [
        {
          "connector": "inventory-connector",
          "task": 0
        }
      ]
    }
  3. 显示 Kafka 集群中的主题列表。
    在终端窗口中进入 /opt/kafka/bin/ 并运行以下 shell 脚本:

    ./kafka-topics.sh --bootstrap-server=localhost:9092 --list

    Kafka 代理返回连接器创建的主题列表。可用的主题取决于连接器的 snapshot.modesnapshot.include.collection.listtable.include.list 配置属性的设置。默认情况下,连接器会为数据库中的每个非系统表创建一个主题。

  4. 查看主题的内容。
    在终端窗口中,导航到 /opt/kafka/bin/,然后运行 kafka-console-consumer.sh shell 脚本来显示上一命令返回的一个主题的内容:

    例如:

    ./kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=dbserver1.inventory.products_on_hand

    对于主题中的每个事件,命令会返回类似以下输出的信息:

    例 2.1. Debezium 更改事件的内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}

    在前面的示例中,有效负载 值显示连接器快照从表 inventory.products_on_hand 生成一个读取("op" ="r")事件。product_id 记录的 "before" 状态为 null,表示记录没有之前的值。"after" 状态对于 product_id101 的项目的 quantity 显示为 3

后续步骤

有关每个连接器可用的配置设置的详情,并了解如何配置源数据库以启用更改数据捕获,请参阅 Debezium 用户指南

2.5. 更新 Kafka Connect 集群中的 Debezium 连接器插件

要替换在 Red Hat Enterprise Linux 上部署的 Debezium 连接器版本,您需要更新连接器插件。

流程

  1. Software Downloads 下载您要替换的 Debezium 连接器插件的副本。
  2. 将 Debezium 连接器存档的内容提取到 /opt/kafka/connector-plugins 目录中。

    $ sudo unzip debezium-connector-mysql-3.0.8.Final.zip -d /opt/kafka/connector-plugins
  3. 重启 Kafka Connect。

附录 A. 使用您的订阅

Debezium 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。

访问您的帐户

  1. 转至 access.redhat.com
  2. 如果您还没有帐户,请创建一个帐户。
  3. 登录到您的帐户。

激活订阅

  1. 转至 access.redhat.com
  2. 导航到 Subscriptions
  3. 导航到 激活订阅 并输入您的 16 位激活号。

下载 zip 和 tar 文件

要访问 zip 或 tar 文件,请使用客户门户网站查找下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。

  1. 打开浏览器并 登录到红帽客户门户网站 产品下载页面
  2. 向下滚动到 INTEGRATION AND AUTOMATION
  3. 点组件的名称,以显示可用于下载的工件列表。
  4. 点您想要的工件的 Download 链接。

更新于 2025-04-11

法律通告

Copyright © 2025 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.