6.5. 将信号发送到 Debezium 连接器


Debezium 信号机制提供了一种方式,可以修改连接器的行为,或触发一次性操作,如启动表 的临时快照。要使用信号触发连接器执行指定操作,您可以将连接器配置为使用以下一个或多个频道:

SourceSignalChannel
您可以发出 SQL 命令,来向专用的信号数据收集添加信号消息。您在源数据库中创建的信号数据收集被指定,专门用于与 Debezium 通信。
KafkaSignalChannel
您可以将信号信息提交到可配置的 Kafka 主题。
JmxSignalChannel
您可以通过 JMX 信号操作提交 信号
FileSignalChannel
您可以使用文件来发送信号。当 Debezium 检测到一个新的 日志记录 或临时快照记录添加到 频道中时,它会读取信号,并启动请求的操作。

信号可用于以下 Debezium 连接器:

  • Db2
  • MariaDB (技术预览)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

您可以通过设置 signal.enabled.channels 配置属性来指定启用哪个频道。属性列出了启用的频道的名称。默认情况下,Debebe 提供以下频道: sourcekafka 频道会被默认启用,因为增量快照信号需要它。

6.5.1. 启用 Debezium 源信号频道

默认情况下启用 Debezium 源信号频道。

您必须为您要使用它的每个连接器明确配置信号。

流程

  1. 在源数据库中,创建一个信号数据收集表,用于将信号发送到连接器。有关信号数据收集所需的结构的详情,请参考 信号数据收集的结构
  2. 对于实现原生更改数据捕获 (CDC) 机制的源数据库,请为信号表启用 CDC。
  3. 将信号数据收集的名称添加到 Debezium 连接器配置中。
    在连接器配置中,添加属性 signal.data.collection,并将其值设置为在第 1 步中创建的信号数据收集的完全限定名称。

    例如,signal.data.collection = inventory.debezium_signals

    信号集合的完全限定域名格式取决于连接器。
    以下示例显示了每个连接器使用的命名格式:

完全限定表名称

Db2
<schemaName>.<tableName>
MariaDB (技术预览)
<databaseName>.<tableName>
MongoDB
<databaseName>.<collectionName>
MySQL
<databaseName>.<tableName>
Oracle
<databaseName>.<schemaName>.<tableName>
PostgreSQL
<schemaName>.<tableName>
SQL Server
<databaseName>.<schemaName>.<tableName>

如需有关设置 signal.data.collection 属性的信息,请参阅您的连接器的配置属性列表。

6.5.1.1. Debezium 信号数据收集所需的结构

信号数据收集或信号表存储您发送到连接器的信号,以触发指定的操作。信号表的结构必须符合以下标准格式:

  • 包含三个字段(列)。
  • 字段按特定顺序排列,如 表 1 所示。
表 6.7. 信号数据收集所需的结构
字段类型描述

ID
(必需)

字符串

标识信号实例的任意唯一字符串。
您可以为提交给信号表的每个信号分配一个 id
通常,ID 是一个 UUID 字符串。
您可以使用信号实例进行日志记录、调试或重复数据删除。
当信号触发 Debezium 执行增量快照时,它会生成带有任意 id 字符串的信号消息。生成的消息包含的 id 字符串与提交的信号中的 id 字符串无关。

类型
(必需)

字符串

指定要发送的信号类型。
您可以使用一些信号类型以及信号可用的连接器,其他信号类型则仅适用于特定的连接器。

数据
(可选)

字符串

指定要传递给信号操作的 JSON 格式参数。
每个信号类型都需要特定的一组数据。

注意

数据收集中的字段名称是任意的。上表中提供了建议的名称。如果您使用不同的命名约定,请确保每个字段中的值与预期内容一致。

6.5.1.2. 创建 Debezium 信号数据收集

您可以通过将标准 SQL DDL 查询提交到源数据库来创建信号表。

先决条件

  • 您有足够的访问权限来在源数据库中创建表。

流程

  • 向源数据库提交一个 SQL 查询以创建一个表,它与 required structure 一致,如以下示例所示:

    CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
注意

分配给 id 变量的 VARCHAR 参数的空间量必须足够,以适应发送到信号表的信号的 ID 字符串的大小。
如果 ID 的大小超过可用空间,连接器无法处理信号。

以下示例显示了一个 CREATE TABLE 命令,它创建一个三列 debezium_signal 表:

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

6.5.2. 启用 Debezium Kafka 信号频道

您可以通过将信号添加到 signal.enabled.channels 配置属性来启用 Kafka 信号频道,然后添加接收信号的主题名称到 signal.kafka.topic 属性。启用信号频道后,会创建一个 Kafka 使用者,以使用发送到配置的信号主题的信号。

注意

要使用 Kafka 信号为大多数连接器触发临时增量快照,您必须首先在连接器配置 中启用 信号频道。源频道实施一个水位线机制来去除重复数据的事件,这些事件可能会被增量快照捕获,然后在流恢复后再次捕获。当使用信号频道来触发 启用了 GTID 的只读 MySQL 数据库的增量快照时,不需要启用源频道。如需更多信息,请参阅 MySQL 只读增量快照

消息格式

Kafka 消息的密钥必须与 topic.prefix 连接器配置选项的值匹配。

该值是一个带有 typedata 字段的 JSON 对象。

当 signal type 设为 execute-snapshot 时,data 字段必须包含下表中列出的字段:

表 6.8. 执行快照数据字段
字段默认

type

incremental

要运行的快照类型。目前 Debezium 支持 incrementalblocking 类型。

data-collections

N/A

以逗号分隔的正则表达式,与快照中包含的数据收集的完全限定名称匹配。
命名格式取决于 数据库。

additional-conditions

N/A

可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:

data-collection
过滤器应用到的数据收集的完全限定名称。您可以为每个数据收集应用不同的过滤器。
filter

指定在数据库记录中必须存在的列值,以便快照包含它,例如 "color='blue' "。
快照进程会根据 过滤器 值评估数据收集中的记录,并只捕获包含匹配值的记录。

分配给 过滤器 属性的特定值取决于临时快照的类型:

  • 对于增量快照,您可以指定一个搜索条件片段,如 "color='blue'",快照会附加到查询的 condition 子句中。
  • 要阻止快照,您可以指定完整的 SELECT 语句,例如您可以在 snapshot.select.statement.overrides 属性中设置的一个。

以下示例显示了典型的 execute-snapshot Kafka 信息:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

6.5.3. 启用 Debezium JMX 信号频道

您可以通过在连接器配置中的 signal.enabled.channels 属性中添加 JMX 信号来启用 JMX 信号,然后启用 JMX MBean Server 来公开信号 bean。

流程

  1. 使用您首选的 JMX 客户端(例如:用于连接到 MBean 服务器的 JConsole 或 JDK Mission Control。
  2. 搜索 Mbean debezium. <connector-type>.management.signals. <server>。Mbean 会公开接受以下输入参数 的信号 操作:

    p0
    信号的 id。
    p1
    信号的类型,如 execute-snapshot
    p2
    包含指定信号类型的附加信息的 JSON 数据字段。
  3. 通过为输入参数提供值,发送 execute-snapshot 信号。
    在 JSON data 字段中,包含下表中列出的信息:

    表 6.9. 执行快照数据字段
    字段默认

    type

    incremental

    要运行的快照类型。目前 Debezium 支持 incrementalblocking 类型。

    data-collections

    N/A

    以逗号分隔的正则表达式,与快照中包含的 表的完全限定域名 匹配。

    additional-conditions

    N/A

    可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
    每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:

    data-collection
    过滤器应用到的数据收集的完全限定名称。您可以为每个数据收集应用不同的过滤器。
    filter

    指定在数据库记录中必须存在的列值,以便快照包含它,例如 "color='blue' "。
    快照进程会根据 过滤器 值评估数据收集中的记录,并只捕获包含匹配值的记录。

    分配给 过滤器 属性的特定值取决于临时快照的类型:

    • 对于增量快照,您可以指定一个搜索条件片段,如 "color='blue'",快照会附加到查询的 condition 子句中。
    • 要阻止快照,您可以指定完整的 SELECT 语句,例如您可以在 snapshot.select.statement.overrides 属性中设置的一个。

下图显示了如何使用 JConsole 发送信号的示例:

使用 JConsole 发送 'execute-snapshot' 信号

6.5.4. Debezium 信号操作类型

您可以使用信号启动以下操作:

有些信号并不适用于所有连接器。

6.5.4.1. 日志记录信号

您可以通过创建带有 日志 信号类型的信号表条目来请求连接器在日志中添加条目。处理信号后,连接器将指定的消息输出到日志。另外,您可以配置信号,以便生成的消息包含流协调。

表 6.10. 用于添加日志消息的信号记录示例
描述

id

924e3ff8-2245-43ca-ba77-2af9af02fa07

 

type

log

信号的操作类型。

data

{"message": "Signal message at offset {}"}

message 参数指定要打印到日志的字符串。
如果您在消息中添加占位符({}),它将被流协调替代。

6.5.4.2. 临时快照信号

您可以通过创建带有 execute-snapshot 信号类型的信号来请求连接器来启动临时快照。处理信号后,连接器运行请求的快照操作。

与连接器在首次启动后运行的初始快照不同,在连接器已经开始流更改数据库后,临时快照会在运行时发生。您可以随时启动临时快照。

临时快照可用于以下 Debezium 连接器:

  • Db2
  • MariaDB (技术预览)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表 6.11. 临时快照信号记录示例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
表 6.12. 临时快照信号消息示例
value

test_connector

{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"}]}}

有关临时快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。

临时快照停止信号

您可以通过创建一个带有 stop-snapshot 信号类型的信号表条目,请求连接器来停止 in-progress 临时快照。处理信号后,连接器将停止当前的 in-progress 快照操作。

您可以停止以下 Debezium 连接器的临时快照:

  • Db2
  • MariaDB (技术预览)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表 6.13. 停止临时快照信号记录的示例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

stop-snapshot

data

{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}

您必须指定信号 的类型data-collections 字段是可选的。将 data-collections 字段留空,请求连接器来停止当前快照中的所有活动。如果您希望增量快照继续,但您要从快照中排除特定的集合,请提供要排除的集合或正则表达式名称的逗号分隔列表。连接器处理信号后,增量快照将继续,但它不包括您指定的集合中的数据。

6.5.4.3. 增量快照

增量快照是特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基准状态,类似于初始快照。但是,与初始快照不同,增量快照会以块的形式捕获表,而不是一次捕获所有。连接器使用水位线方法来跟踪快照的进度。

通过在块而不是单一单一操作中捕获指定表的初始状态,与初始快照过程相比,增量快照具有以下优点:

  • 虽然连接器捕获指定表的基准状态,但事务日志中近实时事件流将继续不间断。
  • 如果增量快照进程中断,可以从它停止的时间点恢复。
  • 您可以随时启动增量快照。

增量快照暂停信号

您可以通过创建一个带有 pause-snapshot 信号类型的信号表条目,请求连接器来暂停 in-progress 增量快照。处理信号后,连接器将停止暂停当前正在进行的快照操作。因此,无法指定数据收集,因为快照处理将在处理信号时暂停。

您可以暂停以下 Debezium 连接器的增量快照:

  • Db2
  • MariaDB (技术预览)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表 6.14. 暂停增量快照信号记录示例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

pause-snapshot

您必须指定信号 的类型data 字段将被忽略。

增量快照恢复信号

您可以通过创建带有 resume-snapshot 信号类型的信号表条目,请求连接器来恢复暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。

您可以恢复以下 Debezium 连接器的增量快照:

  • Db2
  • MariaDB (技术预览)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表 6.15. 恢复增量快照信号记录示例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

resume-snapshot

您必须指定信号 的类型data 字段将被忽略。

有关增量快照的更多信息,请参阅您的连接器文档中的 快照 主题。

6.5.4.4. 阻塞快照信号

您可以通过创建带有 execute-snapshot 信号类型和 data.type 值来请求连接器来启动临时阻止快照。处理信号后,连接器运行请求的快照操作。

与连接器在首次启动后运行的初始快照不同,在连接器停止到从数据库更改事件后,临时阻止快照发生。您可以在任何时间启动临时阻塞快照。

阻塞快照可用于以下 Debezium 连接器:

  • Db2
  • MariaDB (技术预览)
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表 6.16. 阻塞快照信号记录的示例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
表 6.17. 阻塞快照信号消息的示例
value

test_connector

{"type":"execute-snapshot","data": {"type": "blocking"}

有关阻塞快照的更多信息,请参阅您的连接器文档中的 快照 主题。

6.5.4.5. 定义自定义信号操作

自定义操作允许您扩展 Debezium 信号框架,以触发默认实现中不可用的操作。您可以使用带有多个连接器的自定义操作。

要定义自定义信号操作,您必须定义以下接口:

@FunctionalInterface
public interface SignalAction<P extends Partition> {

    /**
     * @param signalPayload the content of the signal
     * @return true if the signal was processed
     */
    boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException;
}

io.debezium.pipeline.signal.actions.SignalAction 使用一个参数公开单一方法,它代表通过信号频道发送的消息有效负载。

在定义了自定义信号操作后,使用以下 SPI 接口使自定义操作可供信号机制使用: io.debezium.pipeline.signal.actions.SignalActionProvider

public interface SignalActionProvider {

    /**
     * Create a map of signal action where the key is the name of the action.
     *
     * @param dispatcher the event dispatcher instance
     * @param connectorConfig the connector config
     * @return a concrete action
     */

    <P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig);
}

您的实施必须返回信号操作的映射。将 map 键设置为操作的名称。密钥用作信号 的类型

6.5.4.6. Debezium 核心模块依赖项

自定义操作 Java 项目已在 Debezium 核心模块上编译依赖项。在项目的 pom.xml 文件中包含以下编译依赖项:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> 1
</dependency>
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 3 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
${version.debezium} 代表 Debezium 连接器的版本。

META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider 文件中声明您的供应商实现。

6.5.4.7. 部署自定义信号操作

先决条件

  • 您有自定义操作 Java 程序。

流程

  • 要将自定义操作与 Debezium 连接器搭配使用,请将 Java 项目导出到 JAR 文件,并将文件复制到包含您要使用它的每个 Debezium 连接器的 JAR 文件的目录。

    例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect)的子目录中,每个连接器 JAR 都有自己的子目录(/kafka/connect/debezium-connector-db2/kafka/connect/debezium-connector-mysql 等等)。
注意

要将自定义操作与多个连接器搭配使用,您必须将自定义信号频道 JAR 文件的副本放在每个连接器的子目录中。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.