2.3. 在 Red Hat Enterprise Linux 中使用 AMQ Streams 部署 Debezium
这个步骤描述了如何在 Red Hat Enterprise Linux 中为 Debezium 设置连接器。连接器通过 Apache Kafka Connect 部署到 AMQ Streams 集群,这是在 Apache Kafka 和外部系统之间传输数据的框架。Kafka 连接必须以分布式模式运行,而不是独立模式运行。
前提条件
要部署 Debezium 的主机环境在 受支持的配置 中运行 Red Hat Enterprise Linux、AMQ Streams 和 Java。
- 有关如何安装 AMQ Streams 的详情,请参考 安装 AMQ Streams。
- 有关如何安装包含单一 ZooKeeper 节点的基本、非生产 AMQ Streams 集群以及单个 Kafka 节点的详情,请参考 运行单一节点 AMQ Streams 集群。
如果您运行的 AMQ Streams 以前的版本,您必须首先升级到 AMQ Streams 2.2。有关升级过程的详情,请查看 AMQ Streams 和 Kafka 升级。
流程
- 下载您希望在 Red Hat Integration 下载站点 中使用的 Debezium 连接器或连接器。例如,要将 Debezium 与 MySQL 数据库搭配使用,请下载 Debezium 1.9.7 MySQL Connector。
在部署了 AMQ Streams 的 Red Hat Enterprise Linux 主机上,打开终端窗口并在
/opt/kafka
中创建connector-plugins
目录(如果它尚不存在):$ sudo mkdir /opt/kafka/connector-plugins
输入以下命令提取您下载到
/opt/kafka/connector-plugins
目录的 Debezium connector 存档的内容。$ sudo unzip debezium-connector-mysql-1.9.7.Final.zip -d /opt/kafka/connector-plugins
- 对您要安装的每个连接器重复步骤 1 -3。
在终端窗口中以
kafka
用户身份登录:$ su - kafka $ Password:
如果 Kafka Connect 进程正在运行,停止它。
输入以下命令检查 Kafka Connect 是否在分布式模式下运行:
$ jcmd | grep ConnectDistributed
如果进程正在运行,命令会返回进程 ID,例如:
18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
使用进程 ID 输入
kill
命令停止该进程,例如:$ kill 18514
编辑
/opt/kafka/config/
中的connect-distributed.properties
文件,并将plugin.path
的值设置为 Debezium connector 插件的父目录位置:plugin.path=/opt/kafka/connector-plugins
在分布式模式下启动 Kafka 连接。
$ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Kafka Connect 运行后,使用 Kafka Connect API 注册连接器。
输入curl
命令,以提交POST
请求,该请求会将您在 第 2.2 节 “规划 Debezium 连接器配置” 中指定的连接器配置 JSON 发送到localhost:8083/connectors
上的 Kafka Connect REST API 端点。
例如:curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ \ -d '{"name": "inventory-connector", "config": \ { "connector.class": "io.debezium.connector.mysql.MySqlConnector", \ "tasks.max": "1", \ "database.hostname": "mysql", \ "database.port": "3306", \ "database.user": "debezium", \ "database.password": "dbz", \ "database.server.id": "184054", \ "database.server.name": "dbserver1", \ "database.include.list": "public.inventory", \ "database.history.kafka.bootstrap.servers": "kafka:9092", \ "database.history.kafka.topic": "dbhistory.inventory" } }'
要注册多个连接器,请为每个连接器提交单独的请求。
重启 Kafka Connect 以实施您的更改。
当 Kafka Connect 启动时,它会从
connector-plugins
目录中加载配置的 Debezium 连接器。完成配置后,部署的连接器将连接到源数据库,并为每个插入、更新或删除行或文档生成事件。
- 为每个 Kafka Connect worker 节点重复第 5-10 步。
后续步骤
验证部署。
其他资源