2.3. 在 Red Hat Enterprise Linux 中使用 Apache Kafka 部署 Debezium


此流程描述了如何在 Red Hat Enterprise Linux 中为 Debezium 设置连接器。连接器使用 Apache Kafka Connect 部署到 Apache Kafka 集群的流,它是一个在 Apache Kafka 和外部系统间流传输数据的框架。Kafka Connect 必须以分布式模式运行,而不是以独立模式运行。

先决条件

注意

如果您正在运行早期版本的 AMQ Streams,您必须首先升级到 Apache Kafka 2.7 的 Streams。有关升级过程的详情,请参考 Apache Kafka 和 Kafka 升级流

  • 主机上具有管理特权(sudo 访问权限)。
  • Apache ZooKeeper 和 Apache Kafka 代理正在运行。
  • Kafka Connect 以分布式模式运行,而不是以独立模式运行。
  • 您知道在安装 Streams for Apache Kafka 时创建的 kafka 用户的凭证。
  • 部署源数据库以及部署 Debezium 的主机可以访问数据库。
  • 您知道如何配置连接器

流程

  1. 软件下载 站点下载您要使用的 Debezium 连接器或连接器。例如,要将 Debezium 与 MySQL 数据库搭配使用,请下载 Debezium 2.7.3 MySQL Connector
  2. 在您为 Apache Kafka 部署 Streams 的 Red Hat Enterprise Linux 主机上,打开终端窗口并在 /opt/kafka 中创建 connector-plugins 目录(如果它尚不存在):

    $ sudo mkdir /opt/kafka/connector-plugins
  3. 输入以下命令提取您下载到 /opt/kafka/connector-plugins 目录中的 Debezium 连接器存档的内容。

    $ sudo unzip debezium-connector-mysql-2.7.3.Final.zip -d /opt/kafka/connector-plugins
  4. 对您要安装的每个连接器重复步骤 1 -3。
  5. 在一个终端窗口中,以 kafka 用户身份进行登录:

    $ su - kafka
    $ Password:
  6. 如果 Kafka Connect 进程正在运行,请停止。

    1. 输入以下命令检查 Kafka Connect 是否在分布式模式下运行:

      $ jcmd | grep ConnectDistributed

      如果进程正在运行,命令会返回进程 ID,例如:

      18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
    2. 使用进程 ID 输入 kill 命令来停止进程,例如:

      $ kill 18514
  7. 编辑 /opt/kafka/config/ 中的 connect-distributed.properties 文件,并将 plugin.path 的值设置为 Debezium 连接器插件的父目录的位置:

    plugin.path=/opt/kafka/connector-plugins
  8. 在分布式模式下启动 Kafka 连接。

    $ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
  9. 在 Kafka Connect 运行后,使用 Kafka Connect API 注册连接器。
    输入 curl 命令提交 POST 请求,将您在 第 2.2 节 “规划 Debezium 连接器配置” 中指定的连接器配置 JSON 发送到 localhost:8083/connectors 的 Kafka Connect REST API 端点。
    例如:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "inventory-connector", "config":
    {"connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "table.include.list": "public.inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "dbhistory.inventory" }
    }'

    要注册多个连接器,请为每个连接器提交单独的请求。

  10. 重启 Kafka Connect 来实现您的更改。

    当 Kafka Connect 启动时,它会从 connector-plugins 目录中加载配置的 Debezium 连接器。

    完成配置后,部署的连接器连接到源数据库,并为每个插入、更新或删除行或文档生成事件。

  11. 为每个 Kafka Connect worker 节点重复第 5-10 步。

后续步骤

验证部署

其他资源

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.