4.3. 部署 Debezium JDBC 连接器
要部署 Debezium JDBC 连接器,请安装 Debezium JDBC 连接器存档,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。
先决条件
- 已安装 Apache ZooKeeper、Apache Kafka 和 Kafka Connect。
- 已安装目标数据库并配置为接受 JDBC 连接。
流程
- 下载 Debezium JDBC 连接器插件存档。
- 将文件提取到 Kafka Connect 环境中。
(可选)从 Maven Central 下载 JDBC 驱动程序,并将下载的驱动程序文件提取到包含 JDBC sink 连接器 JAR 文件的目录。
注意JDBC sink 连接器不包括 Oracle 和 Db2 的驱动程序。您必须下载驱动程序并手动安装它们。
- 将驱动程序 JAR 文件添加到安装了 JDBC sink 连接器的路径中。
-
确保安装 JDBC sink 连接器的路径是 Kafka Connect
plugin.path
的一部分。 - 重启 Kafka Connect 进程以获取新的 JAR 文件。
4.3.1. Debezium JDBC 连接器配置
通常,您可以通过提交指定连接器的配置属性来注册 Debezium JDBC 连接器。以下示例显示了注册 Debezium JDBC sink 连接器实例的 JSON 请求,该连接器使用来自名为 orders
的事件,以及最常见的配置设置:
示例: Debezium JDBC 连接器配置
{ "name": "jdbc-connector", 1 "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", 2 "tasks.max": "1", 3 "connection.url": "jdbc:postgresql://localhost/db", 4 "connection.username": "pguser", 5 "connection.password": "pgpassword", 6 "insert.mode": "upsert", 7 "delete.enabled": "true", 8 "primary.key.mode": "record_key", 9 "schema.evolution": "basic", 10 "database.time_zone": "UTC" 11 } }
项 | 描述 |
---|---|
1 | 使用 Kafka Connect 服务注册时分配给连接器的名称。 |
2 | JDBC sink 连接器类的名称。 |
3 | 为此连接器创建的最大任务数量。 |
4 | 连接器用来连接到它写入的 sink 数据库的 JDBC URL。 |
5 | 用于身份验证的数据库用户的名称。 |
6 | 用于身份验证的数据库用户的密码。 |
7 | 连接器使用的 insert.mode。 |
8 | 启用删除数据库中的记录。如需更多信息,请参阅 delete.enabled 配置属性。 |
9 | 指定用于解析主键列的方法。如需更多信息,请参阅 primary.key.mode 配置属性。 |
10 | 启用连接器来演进目标数据库的模式。如需更多信息,请参阅 schema.evolution 配置属性。 |
11 | 指定编写临时字段类型时使用的时区。 |
12 | 要使用的主题列表,用逗号分开。 |
有关您可以为 Debezium JDBC 连接器设置的配置属性的完整列表,请参阅 JDBC 连接器属性。
您可以使用 POST
命令将此配置发送到正在运行的 Kafka Connect 服务。服务记录配置并启动执行以下操作的接收器连接器任务:
- 连接到数据库。
- 使用订阅的 Kafka 主题的事件。
- 将事件写入配置的数据库。