6.5. 将信号发送到 Debezium 连接器
Debezium 信号机制提供了一种方式,可以修改连接器的行为,或触发一次性操作,如启动表 的临时快照。要使用信号触发连接器执行指定操作,您可以将连接器配置为使用以下一个或多个频道:
- SourceSignalChannel
- 您可以发出 SQL 命令,来向专用的信号数据收集添加信号消息。您在源数据库中创建的信号数据收集被指定,专门用于与 Debezium 通信。
- KafkaSignalChannel
- 您可以将信号信息提交到可配置的 Kafka 主题。
- JmxSignalChannel
-
您可以通过 JMX 信号操作提交
信号
。 - FileSignalChannel
- 您可以使用文件来发送信号。当 Debezium 检测到一个新的 日志记录 或临时快照记录添加到 频道中时,它会读取信号,并启动请求的操作。
信号可用于以下 Debezium 连接器:
- Db2
- MariaDB (技术预览)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
您可以通过设置 signal.enabled.channels
配置属性来指定启用哪个频道。属性列出了启用的频道的名称。默认情况下,Debebe 提供以下频道: source
和 kafka
。源
频道会被默认启用,因为增量快照信号需要它。
6.5.1. 启用 Debezium 源信号频道
默认情况下启用 Debezium 源信号频道。
您必须为您要使用它的每个连接器明确配置信号。
流程
- 在源数据库中,创建一个信号数据收集表,用于将信号发送到连接器。有关信号数据收集所需的结构的详情,请参考 信号数据收集的结构。
- 对于实现原生更改数据捕获 (CDC) 机制的源数据库,请为信号表启用 CDC。
-
将信号数据收集的名称添加到 Debezium 连接器配置中。
在连接器配置中,添加属性signal.data.collection
,并将其值设置为在第 1 步中创建的信号数据收集的完全限定名称。
例如,signal.data.collection = inventory.debezium_signals
。
信号集合的完全限定域名格式取决于连接器。
以下示例显示了每个连接器使用的命名格式:
完全限定表名称
- Db2
-
<schemaName>.<tableName>
- MariaDB (技术预览)
-
<databaseName>.<tableName>
- MongoDB
-
<databaseName>.<collectionName>
- MySQL
-
<databaseName>.<tableName>
- Oracle
-
<databaseName>.<schemaName>.<tableName>
- PostgreSQL
-
<schemaName>.<tableName>
- SQL Server
-
<databaseName>.<schemaName>.<tableName>
如需有关设置signal.data.collection
属性的信息,请参阅您的连接器的配置属性列表。
6.5.1.1. Debezium 信号数据收集所需的结构
信号数据收集或信号表存储您发送到连接器的信号,以触发指定的操作。信号表的结构必须符合以下标准格式:
- 包含三个字段(列)。
- 字段按特定顺序排列,如 表 1 所示。
字段 | 类型 | 描述 |
---|---|---|
|
|
标识信号实例的任意唯一字符串。 |
|
|
指定要发送的信号类型。 |
|
|
指定要传递给信号操作的 JSON 格式参数。 |
数据收集中的字段名称是任意的。上表中提供了建议的名称。如果您使用不同的命名约定,请确保每个字段中的值与预期内容一致。
6.5.1.2. 创建 Debezium 信号数据收集
您可以通过将标准 SQL DDL 查询提交到源数据库来创建信号表。
先决条件
- 您有足够的访问权限来在源数据库中创建表。
流程
-
向源数据库提交一个 SQL 查询以创建一个表,它与 required structure 一致,如以下示例所示:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
分配给 id
变量的 VARCHAR
参数的空间量必须足够,以适应发送到信号表的信号的 ID 字符串的大小。
如果 ID 的大小超过可用空间,连接器无法处理信号。
以下示例显示了一个 CREATE TABLE
命令,它创建一个三列 debezium_signal
表:
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
6.5.2. 启用 Debezium Kafka 信号频道
您可以通过将信号添加到 signal.enabled.channels
配置属性来启用 Kafka 信号频道,然后添加接收信号的主题名称到 signal.kafka.topic
属性。启用信号频道后,会创建一个 Kafka 使用者,以使用发送到配置的信号主题的信号。
可用于消费者的额外配置
要使用 Kafka 信号为大多数连接器触发临时增量快照,您必须首先在连接器配置 中启用 源
信号频道。源频道实施一个水位线机制来去除重复数据的事件,这些事件可能会被增量快照捕获,然后在流恢复后再次捕获。当使用信号频道来触发 启用了 GTID 的只读 MySQL 数据库的增量快照时,不需要启用源频道。如需更多信息,请参阅 MySQL 只读增量快照
消息格式
Kafka 消息的密钥必须与 topic.prefix
连接器配置选项的值匹配。
该值是一个带有 type
和 data
字段的 JSON 对象。
当 signal type 设为 execute-snapshot
时,data
字段必须包含下表中列出的字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要运行的快照类型。目前 Debezium 支持 |
| N/A |
以逗号分隔的正则表达式,与快照中包含的数据收集的完全限定名称匹配。 |
| N/A |
可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
|
以下示例显示了典型的 execute-snapshot
Kafka 信息:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
6.5.3. 启用 Debezium JMX 信号频道
您可以通过在连接器配置中的 signal.enabled.channels
属性中添加 JMX 信号来启用 JMX 信号,然后启用 JMX MBean Server 来公开信号 bean。
流程
- 使用您首选的 JMX 客户端(例如:用于连接到 MBean 服务器的 JConsole 或 JDK Mission Control。
搜索 Mbean
debezium. <connector-type>.management.signals. <server>
。Mbean 会公开接受以下输入参数的信号
操作:- p0
- 信号的 id。
- p1
-
信号的类型,如
execute-snapshot
。 - p2
- 包含指定信号类型的附加信息的 JSON 数据字段。
通过为输入参数提供值,发送
execute-snapshot
信号。
在 JSON data 字段中,包含下表中列出的信息:表 6.9. 执行快照数据字段 字段 默认 值 type
incremental
要运行的快照类型。目前 Debezium 支持
incremental
和blocking
类型。data-collections
N/A
以逗号分隔的正则表达式,与快照中包含的 表的完全限定域名 匹配。
additional-conditions
N/A
可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:data-collection
- 过滤器应用到的数据收集的完全限定名称。您可以为每个数据收集应用不同的过滤器。
filter
指定在数据库记录中必须存在的列值,以便快照包含它,例如
"color='blue'
"。
快照进程会根据过滤器
值评估数据收集中的记录,并只捕获包含匹配值的记录。
分配给过滤器
属性的特定值取决于临时快照的类型:-
对于增量快照,您可以指定一个搜索条件片段,如
"color='blue'"
,快照会附加到查询的 condition 子句中。 -
要阻止快照,您可以指定完整的
SELECT
语句,例如您可以在snapshot.select.statement.overrides
属性中设置的一个。
-
对于增量快照,您可以指定一个搜索条件片段,如
下图显示了如何使用 JConsole 发送信号的示例:
6.5.4. Debezium 信号操作类型
您可以使用信号启动以下操作:
有些信号并不适用于所有连接器。
6.5.4.1. 日志记录信号
您可以通过创建带有 日志
信号类型的信号表条目来请求连接器在日志中添加条目。处理信号后,连接器将指定的消息输出到日志。另外,您可以配置信号,以便生成的消息包含流协调。
列 | 值 | 描述 |
---|---|---|
id |
| |
type |
| 信号的操作类型。 |
data |
{"message": "Signal message at offset {}"} |
|
6.5.4.2. 临时快照信号
您可以通过创建带有 execute-snapshot
信号类型的信号来请求连接器来启动临时快照。处理信号后,连接器运行请求的快照操作。
与连接器在首次启动后运行的初始快照不同,在连接器已经开始流更改数据库后,临时快照会在运行时发生。您可以随时启动临时快照。
临时快照可用于以下 Debezium 连接器:
- Db2
- MariaDB (技术预览)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 值 |
---|---|
id |
|
type |
|
data |
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]} |
键 | value |
---|---|
test_connector |
{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"}]}} |
有关临时快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。
临时快照停止信号
您可以通过创建一个带有 stop-snapshot
信号类型的信号表条目,请求连接器来停止 in-progress 临时快照。处理信号后,连接器将停止当前的 in-progress 快照操作。
您可以停止以下 Debezium 连接器的临时快照:
- Db2
- MariaDB (技术预览)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 值 |
---|---|
id |
|
type |
|
data |
{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]} |
您必须指定信号 的类型
。data-collections
字段是可选的。将 data-collections
字段留空,请求连接器来停止当前快照中的所有活动。如果您希望增量快照继续,但您要从快照中排除特定的集合,请提供要排除的集合或正则表达式名称的逗号分隔列表。连接器处理信号后,增量快照将继续,但它不包括您指定的集合中的数据。
6.5.4.3. 增量快照
增量快照是特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基准状态,类似于初始快照。但是,与初始快照不同,增量快照会以块的形式捕获表,而不是一次捕获所有。连接器使用水位线方法来跟踪快照的进度。
通过在块而不是单一单一操作中捕获指定表的初始状态,与初始快照过程相比,增量快照具有以下优点:
- 虽然连接器捕获指定表的基准状态,但事务日志中近实时事件流将继续不间断。
- 如果增量快照进程中断,可以从它停止的时间点恢复。
- 您可以随时启动增量快照。
增量快照暂停信号
您可以通过创建一个带有 pause-snapshot
信号类型的信号表条目,请求连接器来暂停 in-progress 增量快照。处理信号后,连接器将停止暂停当前正在进行的快照操作。因此,无法指定数据收集,因为快照处理将在处理信号时暂停。
您可以暂停以下 Debezium 连接器的增量快照:
- Db2
- MariaDB (技术预览)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 值 |
---|---|
id |
|
type |
|
您必须指定信号 的类型
。data
字段将被忽略。
增量快照恢复信号
您可以通过创建带有 resume-snapshot
信号类型的信号表条目,请求连接器来恢复暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。
您可以恢复以下 Debezium 连接器的增量快照:
- Db2
- MariaDB (技术预览)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 值 |
---|---|
id |
|
type |
|
您必须指定信号 的类型
。data
字段将被忽略。
有关增量快照的更多信息,请参阅您的连接器文档中的 快照 主题。
6.5.4.4. 阻塞快照信号
您可以通过创建带有 execute-snapshot
信号类型和 data.type
值来请求连接器来启动临时阻止快照。处理信号后,连接器运行请求的快照操作。
与连接器在首次启动后运行的初始快照不同,在连接器停止到从数据库更改事件后,临时阻止快照发生。您可以在任何时间启动临时阻塞快照。
阻塞快照可用于以下 Debezium 连接器:
- Db2
- MariaDB (技术预览)
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 值 |
---|---|
id |
|
type |
|
data |
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]} |
键 | value |
---|---|
test_connector |
{"type":"execute-snapshot","data": {"type": "blocking"} |
有关阻塞快照的更多信息,请参阅您的连接器文档中的 快照 主题。
6.5.4.5. 定义自定义信号操作
自定义操作允许您扩展 Debezium 信号框架,以触发默认实现中不可用的操作。您可以使用带有多个连接器的自定义操作。
要定义自定义信号操作,您必须定义以下接口:
@FunctionalInterface public interface SignalAction<P extends Partition> { /** * @param signalPayload the content of the signal * @return true if the signal was processed */ boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException; }
io.debezium.pipeline.signal.actions.SignalAction
使用一个参数公开单一方法,它代表通过信号频道发送的消息有效负载。
在定义了自定义信号操作后,使用以下 SPI 接口使自定义操作可供信号机制使用: io.debezium.pipeline.signal.actions.SignalActionProvider
。
public interface SignalActionProvider { /** * Create a map of signal action where the key is the name of the action. * * @param dispatcher the event dispatcher instance * @param connectorConfig the connector config * @return a concrete action */ <P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig); }
您的实施必须返回信号操作的映射。将 map 键设置为操作的名称。密钥用作信号 的类型
。
6.5.4.6. Debezium 核心模块依赖项
自定义操作 Java 项目已在 Debezium 核心模块上编译依赖项。在项目的 pom.xml
文件中包含以下编译依赖项:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version> 1
</dependency>
在 META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider
文件中声明您的供应商实现。
6.5.4.7. 部署自定义信号操作
先决条件
- 您有自定义操作 Java 程序。
流程
-
要将自定义操作与 Debezium 连接器搭配使用,请将 Java 项目导出到 JAR 文件,并将文件复制到包含您要使用它的每个 Debezium 连接器的 JAR 文件的目录。
例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect
)的子目录中,每个连接器 JAR 都有自己的子目录(/kafka/connect/debezium-connector-db2
、/kafka/connect/debezium-connector-mysql
等等)。
要将自定义操作与多个连接器搭配使用,您必须将自定义信号频道 JAR 文件的副本放在每个连接器的子目录中。