12.5. 向 Debezium 连接器发送信号
Debezium 信号机制提供了一种修改连接器行为的方法,或者触发一次性操作,如启动表 的临时快照。要使用信号来触发连接器来执行指定操作,您可以将连接器配置为使用以下一个或多个频道:
信号可用于以下 Debezium 连接器:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
您可以通过设置 signal.enabled.channels
配置属性来指定启用哪个频道。属性列出启用的频道的名称。默认情况下,Debebebe 提供以下频道: source
和 kafka
。源
频道会被默认启用,因为增量快照信号需要它。
12.5.1. 启用 Debezium 源信号频道
默认情况下启用 Debezium 源信号频道。
您必须明确为您要使用它的每个连接器配置信号。
流程
- 在源数据库中,创建一个信号数据收集表来向连接器发送信号。有关信号数据收集所需的结构的详情,请参考 信号数据收集的结构。
- 对于实现原生更改数据捕获 (CDC) 机制的源数据库,请为信号表启用 CDC。
将信号数据收集的名称添加到 Debezium 连接器配置中。
在连接器配置中,添加属性signal.data.collection
,并将其值设置为在第 1 步中创建的信号数据收集的完全限定名称。
例如,signal.data.collection = inventory.debezium_signals
。
信号集合的完全限定名称的格式取决于连接器。
以下示例显示了每个连接器使用的命名格式:- Db2
-
<schemaName>.<tableName>
- MongoDB
-
<databaseName>.<collectionName>
- MySQL
-
<databaseName>.<tableName>
- Oracle
-
<databaseName>.<schemaName>.<tableName>
- PostgreSQL
-
<schemaName>.<tableName>
- SQL Server
-
<databaseName>.<schemaName>.<tableName>
如需有关设置signal.data.collection
属性的信息,请参阅您的连接器的配置属性列表。
12.5.1.1. Debezium 信号数据收集的必要结构
信号数据收集或信号表会存储您发送到连接器的信号,以触发指定操作。信号表的结构必须符合以下标准格式:
- 包含三个字段(列)。
- 字段按特定顺序排列,如 表 1 所示。
字段 | 类型 | 描述 |
---|---|---|
|
|
标识信号实例的任意唯一字符串。 |
|
|
指定要发送的信号类型。 |
|
|
指定 JSON 格式的参数以传递给信号操作。 |
数据收集中的字段名称是任意的。前面的表中提供了推荐的名称。如果您使用不同的命名约定,请确保每个字段中的值与预期内容一致。
12.5.1.2. 创建 Debezium 信号数据收集
您可以通过向源数据库提交标准 SQL DDL 查询来创建信号表。
前提条件
- 您有足够的权限在源数据库中创建表。
流程
-
向源数据库提交一个 SQL 查询以创建一个表,它与 required structure 一致,如以下示例所示:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
您分配给 id
变量的 VARCHAR
参数的空间量必须足够,以适应发送到信号表的信号 ID 字符串的大小。
如果 ID 的大小超过可用空间,则连接器无法处理信号。
以下示例显示了一个 CREATE TABLE
命令,它会创建一个三列 debezium_signal
表:
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
12.5.2. 启用 Debezium Kafka 信号频道
您可以通过将其添加到 signal.enabled.channels
配置属性来启用 Kafka 信号频道,然后将接收信号的主题名称添加到 signal.kafka.topic
属性中。启用信号频道后,会创建一个 Kafka 使用者来消耗发送到配置的信号主题的信号。
可供消费者使用的额外配置
要使用 Kafka 信号为连接器触发临时增量快照,您必须首先在连接器配置 中启用 源
信号频道。源频道实施一个水位线机制,用于去除被增量快照捕获的事件,然后在流恢复后再次捕获。
消息格式
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
该值是一个带有 type
和 data
字段的 JSON 对象。
当信号类型设置为 execute-snapshot
时,data
字段必须包含下表中列出的字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要运行的快照的类型。目前,Debeium 仅支持 |
| N/A |
以逗号分隔的正则表达式数组,与要包含在快照中的表的完全限定名称匹配。 |
| N/A | 可选字符串,指定连接器评估为指定要包含在快照中的记录子集的条件。 |
以下示例显示了典型的 execute-snapshot
Kafka 信息:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
12.5.3. 启用 Debezium JMX 信号频道
您可以通过在连接器配置中的 signal.enabled.channels
属性中添加 JMX 信号来启用 JMX 信号,然后启用 JMX MBean 服务器 来公开信号 bean。
流程
- 使用您首选的 JMX 客户端(例如:JConsole 或 JDK Mission Control,以连接到 MBean 服务器。
搜索 Mbean
debezium. <connector-type>.management.signals. <server>
。Mbean 公开接受以下输入参数的信号
操作:- p0
- 信号的 id。
- p1
-
信号的类型,如
execute-snapshot
。 - p2
- 包含指定信号类型的附加信息的 JSON 数据字段。
通过提供输入参数的值来发送
execute-snapshot
信号。
在 JSON 数据字段中,包含下表中列出的信息:表 12.6. 执行快照数据字段 字段 默认 值 type
incremental
要运行的快照的类型。目前,Debeium 仅支持
增量
类型。data-collections
N/A
以逗号分隔的正则表达式数组,与要包含在快照中的表的完全限定名称匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。additional-condition
N/A
可选字符串,指定连接器评估为指定要包含在快照中的记录子集的条件。
下图显示了如何使用 JConsole 发送信号的示例:
12.5.4. Debezium 信号操作的类型
您可以使用信号启动以下操作:
有些信号与所有连接器不兼容。
12.5.4.1. 日志记录信号
您可以通过创建带有日志信号类型的信号表条目来请求连接器来向 日志
添加条目。处理信号后,连接器会将指定的消息输出到日志。另外,您可以配置信号,以便生成的消息包含流协调。
column | 值 | 描述 |
---|---|---|
id |
| |
type |
| 信号的操作类型。 |
data |
|
|
12.5.4.2. 临时快照信号
您可以通过创建一个带有 execute-snapshot
信号类型的信号来请求连接器来启动临时快照。处理信号后,连接器运行请求的快照操作。
与连接器首次启动后运行的初始快照不同,在连接器已经开始流更改事件后会在运行时发生临时快照。您可以随时启动临时快照。
临时快照可用于以下 Debezium 连接器:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
column | 值 |
---|---|
id |
|
type |
|
data |
|
键 | 值 |
---|---|
test_connector |
|
有关临时快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。
临时快照停止信号
您可以通过创建一个带有 stop-snapshot
信号类型的信号表条目来请求连接器来停止 in-progress 临时快照。处理信号后,连接器将停止当前的 in-progress 快照操作。
您可以停止以下 Debezium 连接器的临时快照:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
column | 值 |
---|---|
id |
|
type |
|
data |
|
您必须指定信号 的类型
。data-collections
字段是可选的。将 data-collections
字段留空,以请求连接器停止当前快照中的所有活动。如果您希望增量快照继续,但您想要从快照中排除特定的集合,请提供要排除的集合或正则表达式的名称列表。连接器处理信号后,增量快照会进行,但它从您指定的集合中排除数据。
12.5.4.3. 增量快照
增量快照是特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基准状态,类似于初始快照。但是,与初始快照不同,增量快照会捕获块中的表,而不是一次捕获表。连接器使用水位线方法来跟踪快照的进度。
通过捕获块中指定表的初始状态,而不是在单个单体操作中捕获,与初始快照进程相比,增量快照具有以下优势:
- 虽然连接器捕获指定表的基准状态,但事务日志中接近实时事件流将继续不间断。
- 如果增量快照进程中断,可以从其停止的时间点恢复。
- 您可以随时启动增量快照。
增量快照暂停信号
您可以通过创建一个带有 pause-snapshot
信号类型的信号表条目来请求连接器来暂停 in-progress 增量快照。处理信号后,连接器将停止暂停当前 in-progress 快照操作。因此,无法指定数据收集,因为快照处理将在处理信号时的位置暂停。
您可以暂停以下 Debezium 连接器的增量快照:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
column | 值 |
---|---|
id |
|
type |
|
您必须指定信号 的类型
。data
字段将被忽略。
增量快照恢复信号
您可以通过创建带有 resume-snapshot
信号类型的信号表条目来请求连接器来恢复暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。
您可以为以下 Debezium 连接器恢复增量快照:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
column | 值 |
---|---|
id |
|
type |
|
您必须指定信号 的类型
。data
字段将被忽略。
有关增量快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。