11.5. 向 Debezium 连接器发送信号
Debezium 信号机制提供了一种修改连接器行为的方法,或者触发一次性操作,如启动表 的临时快照。要使用信号来触发连接器来执行指定操作,您可以将连接器配置为使用以下一个或多个频道:
信号可用于以下 Debezium 连接器:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
您可以通过设置 signal.enabled.channels 配置属性来指定启用哪个频道。属性列出启用的频道的名称。默认情况下,Debebebe 提供以下频道: source 和 kafka。源 频道会被默认启用,因为增量快照信号需要它。
11.5.1. 启用 Debezium 源信号频道 复制链接链接已复制到粘贴板!
默认情况下启用 Debezium 源信号频道。
您必须明确为您要使用它的每个连接器配置信号。
步骤
- 在源数据库中,创建一个信号数据收集表来向连接器发送信号。有关信号数据收集所需的结构的详情,请参考 信号数据收集的结构。
- 对于实现原生更改数据捕获 (CDC) 机制的源数据库,请为信号表启用 CDC。
将信号数据收集的名称添加到 Debezium 连接器配置中。
在连接器配置中,添加属性signal.data.collection,并将其值设置为在第 1 步中创建的信号数据收集的完全限定名称。
例如,signal.data.collection = inventory.debezium_signals。
信号集合的完全限定名称的格式取决于连接器。
以下示例显示了每个连接器使用的命名格式:- Db2
-
<schemaName>.<tableName> - MongoDB
-
<databaseName>.<collectionName> - MySQL
-
<databaseName>.<tableName> - Oracle
-
<databaseName>.<schemaName>.<tableName> - PostgreSQL
-
<schemaName>.<tableName> - SQL Server
-
<databaseName>.<schemaName>.<tableName>
如需有关设置signal.data.collection属性的信息,请参阅您的连接器的配置属性列表。
11.5.1.1. Debezium 信号数据收集的必要结构 复制链接链接已复制到粘贴板!
信号数据收集或信号表会存储您发送到连接器的信号,以触发指定操作。信号表的结构必须符合以下标准格式:
- 包含三个字段(列)。
- 字段按特定顺序排列,如 表 1 所示。
| 字段 | 类型 | 描述 |
|---|---|---|
|
|
|
标识信号实例的任意唯一字符串。 |
|
|
|
指定要发送的信号类型。 |
|
|
|
指定 JSON 格式的参数以传递给信号操作。 |
数据收集中的字段名称是任意的。前面的表中提供了推荐的名称。如果您使用不同的命名约定,请确保每个字段中的值与预期内容一致。
11.5.1.2. 创建 Debezium 信号数据收集 复制链接链接已复制到粘贴板!
您可以通过向源数据库提交标准 SQL DDL 查询来创建信号表。
先决条件
- 您有足够的权限在源数据库中创建表。
步骤
-
向源数据库提交一个 SQL 查询以创建一个表,它与 required structure 一致,如以下示例所示:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
您分配给 id 变量的 VARCHAR 参数的空间量必须足够,以适应发送到信号表的信号 ID 字符串的大小。
如果 ID 的大小超过可用空间,则连接器无法处理信号。
以下示例显示了一个 CREATE TABLE 命令,它会创建一个三列 debezium_signal 表:
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
11.5.2. 启用 Debezium Kafka 信号频道 复制链接链接已复制到粘贴板!
您可以通过将其添加到 signal.enabled.channels 配置属性来启用 Kafka 信号频道,然后将接收信号的主题名称添加到 signal.kafka.topic 属性中。启用信号频道后,会创建一个 Kafka 使用者来消耗发送到配置的信号主题的信号。
可供消费者使用的额外配置
要使用 Kafka 信号为大多数连接器触发临时增量快照,您必须首先在连接器配置 中启用 源 信号频道。源频道实施一个水位线机制,用于去除被增量快照捕获的事件,然后在流恢复后再次捕获。当使用信号频道触发 启用了 GTID 的只读 MySQL 数据库的增量快照时,不需要启用源频道。如需更多信息,请参阅 MySQL 只读增量快照
消息格式
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
该值是一个带有 type 和 data 字段的 JSON 对象。
当信号类型设置为 execute-snapshot 时,data 字段必须包含下表中列出的字段:
| 字段 | 默认 | 值 |
|---|---|---|
|
|
|
要运行的快照的类型。目前,Debezium 支持 |
|
| N/A |
以逗号分隔的正则表达式数组,与要包含在快照中的数据收集的完全限定名称匹配。 |
|
| N/A |
可选字符串,指定连接器评估为指定要包含在快照中的记录子集的条件。 注意
此属性已弃用,应该被 |
|
| N/A |
可选数组,用于指定连接器评估的一组额外条件,以确定快照中包含的记录子集。
|
以下示例显示了典型的 execute-snapshot Kafka 信息:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
11.5.3. 启用 Debezium JMX 信号频道 复制链接链接已复制到粘贴板!
您可以通过在连接器配置中的 signal.enabled.channels 属性中添加 JMX 信号来启用 JMX 信号,然后启用 JMX MBean 服务器 来公开信号 bean。
步骤
- 使用您首选的 JMX 客户端(例如:JConsole 或 JDK Mission Control,以连接到 MBean 服务器。
搜索 Mbean
debezium. <connector-type>.management.signals. <server>。Mbean 公开接受以下输入参数的信号操作:- p0
- 信号的 id。
- p1
-
信号的类型,如
execute-snapshot。 - p2
- 包含指定信号类型的附加信息的 JSON 数据字段。
通过提供输入参数的值来发送
execute-snapshot信号。
在 JSON 数据字段中,包含下表中列出的信息:Expand 表 11.9. 执行快照数据字段 字段 默认 值 typeincremental要运行的快照的类型。目前,Debezium 支持
增量和阻止类型。data-collectionsN/A
以逗号分隔的正则表达式数组,与要包含在快照中的表的完全限定名称匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。additional-conditionN/A
可选字符串,指定连接器评估为指定要包含在快照中的记录子集的条件。
注意此属性已弃用,应该被
additional-conditions属性替代。additional-conditionsN/A
可选数组,用于指定连接器评估的一组额外条件,以确定快照中包含的记录子集。
每个额外条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:data-collection- 过滤器应用到的 {data-collection} 的完全限定名称。您可以将不同的过滤器应用到每个 {data-collection}。
filter指定数据库记录中必须存在的列值,以便包含快照,例如
"color='blue'"。
快照进程根据过滤器值评估 {data-collection} 中的记录,并只捕获包含匹配值的记录。
分配给filter属性的特定值取决于临时快照的类型:-
对于增量快照,您可以指定搜索条件片段,如
"color='blue'",快照会附加到查询的 condition 子句中。 -
对于阻止快照,您可以指定完整的
SELECT语句,例如您可以在snapshot.select.statement.overrides属性中设置的声明。
-
对于增量快照,您可以指定搜索条件片段,如
下图显示了如何使用 JConsole 发送信号的示例:
11.5.4. Debezium 信号操作的类型 复制链接链接已复制到粘贴板!
您可以使用信号启动以下操作:
有些信号与所有连接器不兼容。
11.5.4.1. 日志记录信号 复制链接链接已复制到粘贴板!
您可以通过创建带有日志信号类型的信号表条目来请求连接器来向 日志 添加条目。处理信号后,连接器会将指定的消息输出到日志。另外,您可以配置信号,以便生成的消息包含流协调。
| column | 值 | 描述 |
|---|---|---|
| id |
| |
| type |
| 信号的操作类型。 |
| data |
{"message": "Signal message at offset {}"}
|
|
11.5.4.2. 临时快照信号 复制链接链接已复制到粘贴板!
您可以通过创建一个带有 execute-snapshot 信号类型的信号来请求连接器来启动临时快照。处理信号后,连接器运行请求的快照操作。
与连接器首次启动后运行的初始快照不同,在连接器已经开始流更改事件后会在运行时发生临时快照。您可以随时启动临时快照。
临时快照可用于以下 Debezium 连接器:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
| column | 值 |
|---|---|
| id |
|
| type |
|
| data |
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
|
| 键 | 值 |
|---|---|
| test_connector |
{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"]}}
|
有关临时快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。
临时快照停止信号
您可以通过创建一个带有 stop-snapshot 信号类型的信号表条目来请求连接器来停止 in-progress 临时快照。处理信号后,连接器将停止当前的 in-progress 快照操作。
您可以停止以下 Debezium 连接器的临时快照:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
| column | 值 |
|---|---|
| id |
|
| type |
|
| data |
{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}
|
您必须指定信号 的类型。data-collections 字段是可选的。将 data-collections 字段留空,以请求连接器停止当前快照中的所有活动。如果您希望增量快照继续,但您想要从快照中排除特定的集合,请提供要排除的集合或正则表达式的名称列表。连接器处理信号后,增量快照会进行,但它从您指定的集合中排除数据。
11.5.4.3. 增量快照 复制链接链接已复制到粘贴板!
增量快照是特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基准状态,类似于初始快照。但是,与初始快照不同,增量快照会捕获块中的表,而不是一次捕获表。连接器使用水位线方法来跟踪快照的进度。
通过捕获块中指定表的初始状态,而不是在单个单体操作中捕获,与初始快照进程相比,增量快照具有以下优势:
- 虽然连接器捕获指定表的基准状态,但事务日志中接近实时事件流将继续不间断。
- 如果增量快照进程中断,可以从其停止的时间点恢复。
- 您可以随时启动增量快照。
增量快照暂停信号
您可以通过创建一个带有 pause-snapshot 信号类型的信号表条目来请求连接器来暂停 in-progress 增量快照。处理信号后,连接器将停止暂停当前 in-progress 快照操作。因此,无法指定数据收集,因为快照处理将在处理信号时的位置暂停。
您可以暂停以下 Debezium 连接器的增量快照:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
| column | 值 |
|---|---|
| id |
|
| type |
|
您必须指定信号 的类型。data 字段将被忽略。
增量快照恢复信号
您可以通过创建带有 resume-snapshot 信号类型的信号表条目来请求连接器来恢复暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。
您可以为以下 Debezium 连接器恢复增量快照:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
| column | 值 |
|---|---|
| id |
|
| type |
|
您必须指定信号 的类型。data 字段将被忽略。
有关增量快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。
11.5.4.4. 阻塞快照信号 复制链接链接已复制到粘贴板!
您可以通过创建一个带有 execute-snapshot 信号类型和 data.type 的 signal 来请求连接器来启动临时 阻塞 快照。处理信号后,连接器运行请求的快照操作。
与连接器首次启动后运行的初始快照不同,在连接器停止以流传输数据库更改事件后,会在运行时发生临时阻止快照。您可以随时启动临时阻止快照。
以下 Debezium 连接器可以使用阻止快照:
- Db2
- MySQL
- Oracle
- PostgreSQL
- SQL Server
| column | 值 |
|---|---|
| id |
|
| type |
|
| data |
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
|
| 键 | 值 |
|---|---|
| test_connector |
{"type":"execute-snapshot","data": {"type": "blocking"}
|
有关阻塞快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。
11.5.4.5. 定义自定义信号操作 复制链接链接已复制到粘贴板!
自定义操作允许您扩展 Debezium 信号框架,以触发默认实现中不可用的操作。您可以将自定义操作与多个连接器一起使用。
要定义自定义信号操作,您必须定义以下接口:
io.debezium.pipeline.signal.actions.SignalAction 会公开一个参数,它表示通过信号频道发送的消息有效负载。
在定义了自定义信号操作后,使用以下 SPI 接口使自定义操作可供信号机制使用: io.debezium.pipeline.signal.actions.SignalActionProvider.
您的实施必须返回信号操作的映射。将 map 键设置为操作的名称。密钥用作信号 的类型。
11.5.4.6. Debezium 核心模块依赖项 复制链接链接已复制到粘贴板!
自定义操作 Java 项目对 Debezium 核心模块有编译依赖项。在项目的 pom.xml 文件中包含以下编译依赖项:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>
在 META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider 文件中声明您的供应商实现。
11.5.4.7. 部署自定义信号操作 复制链接链接已复制到粘贴板!
先决条件
- 您有一个自定义操作 Java 程序。
步骤
-
要将自定义操作与 Debezium 连接器一起使用,请将 Java 项目导出到 JAR 文件,并将文件复制到包含您要使用它的每个 Debezium 连接器的 JAR 文件的目录中。
例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录的子目录中(/kafka/connect),每个连接器 JAR 在其自己的子目录中(/kafka/connect/debezium-connector-db2、/kafka/connect/debezium-connector-mysql等)。
要将自定义操作与多个连接器搭配使用,您必须将自定义信号频道 JAR 文件的副本放在每个连接器的子目录中。