11.5. 向 Debezium 连接器发送信号
Debezium 信号机制提供了一种修改连接器行为的方法,或者触发一次性操作,如启动表 的临时快照。要使用信号来触发连接器来执行指定操作,您可以将连接器配置为使用以下一个或多个频道:
信号可用于以下 Debezium 连接器:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
您可以通过设置 signal.enabled.channels
配置属性来指定启用哪个频道。属性列出启用的频道的名称。默认情况下,Debebebe 提供以下频道: source
和 kafka
。源
频道会被默认启用,因为增量快照信号需要它。
11.5.1. 启用 Debezium 源信号频道
默认情况下启用 Debezium 源信号频道。
您必须明确为您要使用它的每个连接器配置信号。
步骤
- 在源数据库中,创建一个信号数据收集表来向连接器发送信号。有关信号数据收集所需的结构的详情,请参考 信号数据收集的结构。
- 对于实现原生更改数据捕获 (CDC) 机制的源数据库,请为信号表启用 CDC。
将信号数据收集的名称添加到 Debezium 连接器配置中。
在连接器配置中,添加属性signal.data.collection
,并将其值设置为在第 1 步中创建的信号数据收集的完全限定名称。
例如,signal.data.collection = inventory.debezium_signals
。
信号集合的完全限定名称的格式取决于连接器。
以下示例显示了每个连接器使用的命名格式:- Db2
-
<schemaName>.<tableName>
- MongoDB
-
<databaseName>.<collectionName>
- MySQL
-
<databaseName>.<tableName>
- Oracle
-
<databaseName>.<schemaName>.<tableName>
- PostgreSQL
-
<schemaName>.<tableName>
- SQL Server
-
<databaseName>.<schemaName>.<tableName>
如需有关设置signal.data.collection
属性的信息,请参阅您的连接器的配置属性列表。
11.5.1.1. Debezium 信号数据收集的必要结构
信号数据收集或信号表会存储您发送到连接器的信号,以触发指定操作。信号表的结构必须符合以下标准格式:
- 包含三个字段(列)。
- 字段按特定顺序排列,如 表 1 所示。
字段 | 类型 | 描述 |
---|---|---|
|
|
标识信号实例的任意唯一字符串。 |
|
|
指定要发送的信号类型。 |
|
|
指定 JSON 格式的参数以传递给信号操作。 |
数据收集中的字段名称是任意的。前面的表中提供了推荐的名称。如果您使用不同的命名约定,请确保每个字段中的值与预期内容一致。
11.5.1.2. 创建 Debezium 信号数据收集
您可以通过向源数据库提交标准 SQL DDL 查询来创建信号表。
先决条件
- 您有足够的权限在源数据库中创建表。
步骤
-
向源数据库提交一个 SQL 查询以创建一个表,它与 required structure 一致,如以下示例所示:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
您分配给 id
变量的 VARCHAR
参数的空间量必须足够,以适应发送到信号表的信号 ID 字符串的大小。
如果 ID 的大小超过可用空间,则连接器无法处理信号。
以下示例显示了一个 CREATE TABLE
命令,它会创建一个三列 debezium_signal
表:
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
11.5.2. 启用 Debezium Kafka 信号频道
您可以通过将其添加到 signal.enabled.channels
配置属性来启用 Kafka 信号频道,然后将接收信号的主题名称添加到 signal.kafka.topic
属性中。启用信号频道后,会创建一个 Kafka 使用者来消耗发送到配置的信号主题的信号。
可供消费者使用的额外配置
要使用 Kafka 信号为大多数连接器触发临时增量快照,您必须首先在连接器配置 中启用 源
信号频道。源频道实施一个水位线机制,用于去除被增量快照捕获的事件,然后在流恢复后再次捕获。当使用信号频道触发 启用了 GTID 的只读 MySQL 数据库的增量快照时,不需要启用源频道。如需更多信息,请参阅 MySQL 只读增量快照
消息格式
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
该值是一个带有 type
和 data
字段的 JSON 对象。
当信号类型设置为 execute-snapshot
时,data
字段必须包含下表中列出的字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要运行的快照的类型。目前,Debezium 支持 |
| N/A |
以逗号分隔的正则表达式数组,与要包含在快照中的数据收集的完全限定名称匹配。 |
| N/A |
可选字符串,指定连接器评估为指定要包含在快照中的记录子集的条件。 注意
此属性已弃用,应该被 |
| N/A |
可选数组,用于指定连接器评估的一组额外条件,以确定快照中包含的记录子集。
|
以下示例显示了典型的 execute-snapshot
Kafka 信息:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
11.5.3. 启用 Debezium JMX 信号频道
您可以通过在连接器配置中的 signal.enabled.channels
属性中添加 JMX 信号来启用 JMX 信号,然后启用 JMX MBean 服务器 来公开信号 bean。
步骤
- 使用您首选的 JMX 客户端(例如:JConsole 或 JDK Mission Control,以连接到 MBean 服务器。
搜索 Mbean
debezium. <connector-type>.management.signals. <server>
。Mbean 公开接受以下输入参数的信号
操作:- p0
- 信号的 id。
- p1
-
信号的类型,如
execute-snapshot
。 - p2
- 包含指定信号类型的附加信息的 JSON 数据字段。
通过提供输入参数的值来发送
execute-snapshot
信号。
在 JSON 数据字段中,包含下表中列出的信息:表 11.9. 执行快照数据字段 字段 默认 值 type
incremental
要运行的快照的类型。目前,Debezium 支持
增量
和阻止
类型。data-collections
N/A
以逗号分隔的正则表达式数组,与要包含在快照中的表的完全限定名称匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。additional-condition
N/A
可选字符串,指定连接器评估为指定要包含在快照中的记录子集的条件。
注意此属性已弃用,应该被
additional-conditions
属性替代。additional-conditions
N/A
可选数组,用于指定连接器评估的一组额外条件,以确定快照中包含的记录子集。
每个额外条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:data-collection
- 过滤器应用到的 {data-collection} 的完全限定名称。您可以将不同的过滤器应用到每个 {data-collection}。
filter
指定数据库记录中必须存在的列值,以便包含快照,例如
"color='blue'
"。
快照进程根据过滤器
值评估 {data-collection} 中的记录,并只捕获包含匹配值的记录。
分配给filter
属性的特定值取决于临时快照的类型:-
对于增量快照,您可以指定搜索条件片段,如
"color='blue'"
,快照会附加到查询的 condition 子句中。 -
对于阻止快照,您可以指定完整的
SELECT
语句,例如您可以在snapshot.select.statement.overrides
属性中设置的声明。
-
对于增量快照,您可以指定搜索条件片段,如
下图显示了如何使用 JConsole 发送信号的示例:
11.5.4. Debezium 信号操作的类型
您可以使用信号启动以下操作:
有些信号与所有连接器不兼容。
11.5.4.1. 日志记录信号
您可以通过创建带有日志信号类型的信号表条目来请求连接器来向 日志
添加条目。处理信号后,连接器会将指定的消息输出到日志。另外,您可以配置信号,以便生成的消息包含流协调。
column | 值 | 描述 |
---|---|---|
id |
| |
type |
| 信号的操作类型。 |
data |
{"message": "Signal message at offset {}"} |
|
11.5.4.2. 临时快照信号
您可以通过创建一个带有 execute-snapshot
信号类型的信号来请求连接器来启动临时快照。处理信号后,连接器运行请求的快照操作。
与连接器首次启动后运行的初始快照不同,在连接器已经开始流更改事件后会在运行时发生临时快照。您可以随时启动临时快照。
临时快照可用于以下 Debezium 连接器:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
column | 值 |
---|---|
id |
|
type |
|
data |
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]} |
键 | 值 |
---|---|
test_connector |
{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"]}} |
有关临时快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。
临时快照停止信号
您可以通过创建一个带有 stop-snapshot
信号类型的信号表条目来请求连接器来停止 in-progress 临时快照。处理信号后,连接器将停止当前的 in-progress 快照操作。
您可以停止以下 Debezium 连接器的临时快照:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
column | 值 |
---|---|
id |
|
type |
|
data |
{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]} |
您必须指定信号 的类型
。data-collections
字段是可选的。将 data-collections
字段留空,以请求连接器停止当前快照中的所有活动。如果您希望增量快照继续,但您想要从快照中排除特定的集合,请提供要排除的集合或正则表达式的名称列表。连接器处理信号后,增量快照会进行,但它从您指定的集合中排除数据。
11.5.4.3. 增量快照
增量快照是特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基准状态,类似于初始快照。但是,与初始快照不同,增量快照会捕获块中的表,而不是一次捕获表。连接器使用水位线方法来跟踪快照的进度。
通过捕获块中指定表的初始状态,而不是在单个单体操作中捕获,与初始快照进程相比,增量快照具有以下优势:
- 虽然连接器捕获指定表的基准状态,但事务日志中接近实时事件流将继续不间断。
- 如果增量快照进程中断,可以从其停止的时间点恢复。
- 您可以随时启动增量快照。
增量快照暂停信号
您可以通过创建一个带有 pause-snapshot
信号类型的信号表条目来请求连接器来暂停 in-progress 增量快照。处理信号后,连接器将停止暂停当前 in-progress 快照操作。因此,无法指定数据收集,因为快照处理将在处理信号时的位置暂停。
您可以暂停以下 Debezium 连接器的增量快照:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
column | 值 |
---|---|
id |
|
type |
|
您必须指定信号 的类型
。data
字段将被忽略。
增量快照恢复信号
您可以通过创建带有 resume-snapshot
信号类型的信号表条目来请求连接器来恢复暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。
您可以为以下 Debezium 连接器恢复增量快照:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
column | 值 |
---|---|
id |
|
type |
|
您必须指定信号 的类型
。data
字段将被忽略。
有关增量快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。
11.5.4.4. 阻塞快照信号
您可以通过创建一个带有 execute-snapshot
信号类型和 data.type
的 signal 来请求连接器来启动临时 阻塞
快照。处理信号后,连接器运行请求的快照操作。
与连接器首次启动后运行的初始快照不同,在连接器停止以流传输数据库更改事件后,会在运行时发生临时阻止快照。您可以随时启动临时阻止快照。
以下 Debezium 连接器可以使用阻止快照:
- Db2
- MySQL
- Oracle
- PostgreSQL
- SQL Server
column | 值 |
---|---|
id |
|
type |
|
data |
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]} |
键 | 值 |
---|---|
test_connector |
{"type":"execute-snapshot","data": {"type": "blocking"} |
有关阻塞快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。
11.5.4.5. 定义自定义信号操作
自定义操作允许您扩展 Debezium 信号框架,以触发默认实现中不可用的操作。您可以将自定义操作与多个连接器一起使用。
要定义自定义信号操作,您必须定义以下接口:
@FunctionalInterface public interface SignalAction<P extends Partition> { /** * @param signalPayload the content of the signal * @return true if the signal was processed */ boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException; }
io.debezium.pipeline.signal.actions.SignalAction
会公开一个参数,它表示通过信号频道发送的消息有效负载。
在定义了自定义信号操作后,使用以下 SPI 接口使自定义操作可供信号机制使用: io.debezium.pipeline.signal.actions.SignalActionProvider
.
public interface SignalActionProvider { /** * Create a map of signal action where the key is the name of the action. * * @param dispatcher the event dispatcher instance * @param connectorConfig the connector config * @return a concrete action */ <P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig); }
您的实施必须返回信号操作的映射。将 map 键设置为操作的名称。密钥用作信号 的类型
。
11.5.4.6. Debezium 核心模块依赖项
自定义操作 Java 项目对 Debezium 核心模块有编译依赖项。在项目的 pom.xml
文件中包含以下编译依赖项:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version> 1
</dependency>
在 META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider
文件中声明您的供应商实现。
11.5.4.7. 部署自定义信号操作
先决条件
- 您有一个自定义操作 Java 程序。
步骤
-
要将自定义操作与 Debezium 连接器一起使用,请将 Java 项目导出到 JAR 文件,并将文件复制到包含您要使用它的每个 Debezium 连接器的 JAR 文件的目录中。
例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录的子目录中(/kafka/connect
),每个连接器 JAR 在其自己的子目录中(/kafka/connect/debezium-connector-db2、
/kafka/connect/debezium-connector-mysql
等)。
要将自定义操作与多个连接器搭配使用,您必须将自定义信号频道 JAR 文件的副本放在每个连接器的子目录中。