2.3. 在 Red Hat Enterprise Linux 上使用 AMQ Streams 部署 Debezium


这个步骤描述了如何在 Red Hat Enterprise Linux 中为 Debezium 设置连接器。连接器使用 Apache Kafka Connect 部署到 AMQ Streams 集群,这是在 Apache Kafka 和外部系统间流传输数据的框架。Kafka Connect 必须以分布式模式运行,而不是独立模式。

先决条件

  • 用于您需要部署 Debezium 的主机环境运行 Red Hat Enterprise Linux、AMQ Streams 和 Java(以一个支持的配置)。

注意

如果您正在运行 AMQ Streams 的早期版本,您必须首先升级到 AMQ Streams 2.3。有关升级过程的详情,请参考 AMQ Streams 和 Kafka 升级

  • 您在主机上具有管理特权(sudo 访问权限)。
  • Apache ZooKeeper 和 Apache Kafka 代理正在运行。
  • Kafka Connect 在 分布式模式下运行,而不是以独立模式运行。
  • 您知道安装 AMQ Streams 时创建的 kafka 用户的凭证。
  • 部署源数据库以及部署 Debezium 的主机可以访问数据库。
  • 您知道如何配置连接器

流程

  1. Red Hat Integration 下载要使用的 Debezium 连接器或连接器。例如,要将 Debezium 与 MySQL 数据库搭配使用,请下载 Debezium 2.1.4 MySQL Connector
  2. 在您部署 AMQ Streams 的 Red Hat Enterprise Linux 主机上,打开终端窗口,并在 /opt/kafka 中创建一个 connector-plugins 目录(如果尚不存在):

    $ sudo mkdir /opt/kafka/connector-plugins
  3. 输入以下命令提取您下载到 /opt/kafka/connector-plugins 目录中的 Debezium 连接器存档的内容。

    $ sudo unzip debezium-connector-mysql-2.1.4.Final.zip -d /opt/kafka/connector-plugins
  4. 对要安装的每个连接器重复步骤 1 -3。
  5. 在一个终端窗口中,以 kafka 用户身份进行登录:

    $ su - kafka
    $ Password:
  6. 如果 Kafka Connect 进程正在运行,请停止它。

    1. 输入以下命令检查 Kafka Connect 是否在分布式模式下运行:

      $ jcmd | grep ConnectDistributed

      如果进程正在运行,命令会返回进程 ID,例如:

      18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
    2. 输入带有进程 ID 的 kill 命令来停止进程,例如:

      $ kill 18514
  7. 编辑 /opt/kafka/config/ 中的 connect-distributed.properties 文件,并将 plugin.path 的值设置为 Debezium 连接器插件的父目录的位置:

    plugin.path=/opt/kafka/connector-plugins
  8. 以分布式模式启动 Kafka 连接。

    $ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
  9. Kafka Connect 运行后,使用 Kafka Connect API 注册连接器。
    输入 curl 命令提交 POST 请求,该请求将您在 第 2.2 节 “规划 Debezium 连接器配置” 中指定的连接器配置 JSON 发送到 localhost:8083/connectors 的 Kafka Connect REST API 端点。
    例如:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ \
    -d '{"name": "inventory-connector", "config": \
    { "connector.class": "io.debezium.connector.mysql.MySqlConnector", \
    "tasks.max": "1", \
    "database.hostname": "mysql", \
    "database.port": "3306", \
    "database.user": "debezium", \
    "database.password": "dbz", \
    "database.server.id": "184054", \
    "topic.prefix": "dbserver1", \
    "table.include.list": "public.inventory", \
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", \
    "schema.history.internal.kafka.topic": "dbhistory.inventory" } }'

    要注册多个连接器,请为每个连接器提交单独的请求。

  10. 重启 Kafka Connect 来实现您的更改。

    当 Kafka Connect 启动时,它会从 connector-plugins 目录中加载配置的 Debezium 连接器。

    完成配置后,部署的连接器连接到源数据库,并为每个插入、更新或删除的行或文档生成事件。

  11. 为每个 Kafka Connect worker 节点重复步骤 5-10。

后续步骤

验证部署

其他资源

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.