12.5. 向 Debezium 连接器发送信号


Debezium 信号机制提供了一种修改连接器行为的方法,或者触发一次性操作,如启动表 的临时快照。要使用信号来触发连接器来执行指定操作,您可以将连接器配置为使用以下一个或多个频道:

SourceSignalChannel
您可以发出 SQL 命令,来向专用信号数据收集添加信号消息。在源数据库中创建的信号数据收集被指定为与 Debezium 通信。
KafkaSignalChannel
您可以将信号信息提交至可配置的 Kafka 主题。
JmxSignalChannel
您可以通过 JMX 信号操作提交 信号。当 Debezium 检测到新的 日志记录 或临时快照记录 被添加到频道时,它会读取信号并启动请求的操作。

信号可用于以下 Debezium 连接器:

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

您可以通过设置 signal.enabled.channels 配置属性来指定启用哪个频道。属性列出启用的频道的名称。默认情况下,Debebebe 提供以下频道: sourcekafka 频道会被默认启用,因为增量快照信号需要它。

12.5.1. 启用 Debezium 源信号频道

默认情况下启用 Debezium 源信号频道。

您必须明确为您要使用它的每个连接器配置信号。

流程

  1. 在源数据库中,创建一个信号数据收集表来向连接器发送信号。有关信号数据收集所需的结构的详情,请参考 信号数据收集的结构
  2. 对于实现原生更改数据捕获 (CDC) 机制的源数据库,请为信号表启用 CDC。
  3. 将信号数据收集的名称添加到 Debezium 连接器配置中。
    在连接器配置中,添加属性 signal.data.collection,并将其值设置为在第 1 步中创建的信号数据收集的完全限定名称。

    例如, signal.data.collection = inventory.debezium_signals

    信号集合的完全限定名称的格式取决于连接器。
    以下示例显示了每个连接器使用的命名格式:

    Db2
    <schemaName>.<tableName>
    MongoDB
    <databaseName>.<collectionName>
    MySQL
    <databaseName>.<tableName>
    Oracle
    <databaseName>.<schemaName>.<tableName>
    PostgreSQL
    <schemaName>.<tableName>
    SQL Server
    <databaseName>.<schemaName>.<tableName>

    如需有关设置 signal.data.collection 属性的信息,请参阅您的连接器的配置属性列表。

12.5.1.1. Debezium 信号数据收集的必要结构

信号数据收集或信号表会存储您发送到连接器的信号,以触发指定操作。信号表的结构必须符合以下标准格式:

  • 包含三个字段(列)。
  • 字段按特定顺序排列,如 表 1 所示。
表 12.4. 信号数据收集的必要结构
字段类型描述

ID
(必需)

字符串

标识信号实例的任意唯一字符串。
您可以为提交到信号表的每个信号分配一个 id
通常,ID 是一个 UUID 字符串。
您可以使用信号实例来记录、调试或重复数据删除。
当信号触发 Debezium 来执行增量快照时,它会生成带有任意 id 字符串的信号消息。生成的消息包含的 id 字符串与提交信号中的 id 字符串无关。

类型
(必需)

字符串

指定要发送的信号类型。
您可以将一些信号类型与任何连接器搭配使用,而其他信号类型则仅适用于特定的连接器。

数据
(可选)

字符串

指定 JSON 格式的参数以传递给信号操作。
每种信号类型都需要一组特定的数据。

注意

数据收集中的字段名称是任意的。前面的表中提供了推荐的名称。如果您使用不同的命名约定,请确保每个字段中的值与预期内容一致。

12.5.1.2. 创建 Debezium 信号数据收集

您可以通过向源数据库提交标准 SQL DDL 查询来创建信号表。

前提条件

  • 您有足够的权限在源数据库中创建表。

流程

  • 向源数据库提交一个 SQL 查询以创建一个表,它与 required structure 一致,如以下示例所示:

    CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
注意

您分配给 id 变量的 VARCHAR 参数的空间量必须足够,以适应发送到信号表的信号 ID 字符串的大小。
如果 ID 的大小超过可用空间,则连接器无法处理信号。

以下示例显示了一个 CREATE TABLE 命令,它会创建一个三列 debezium_signal 表:

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

12.5.2. 启用 Debezium Kafka 信号频道

您可以通过将其添加到 signal.enabled.channels 配置属性来启用 Kafka 信号频道,然后将接收信号的主题名称添加到 signal.kafka.topic 属性中。启用信号频道后,会创建一个 Kafka 使用者来消耗发送到配置的信号主题的信号。

注意

要使用 Kafka 信号为连接器触发临时增量快照,您必须首先在连接器配置 中启用 信号频道。源频道实施一个水位线机制,用于去除被增量快照捕获的事件,然后在流恢复后再次捕获。

消息格式

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

该值是一个带有 typedata 字段的 JSON 对象。

当信号类型设置为 execute-snapshot 时,data 字段必须包含下表中列出的字段:

表 12.5. 执行快照数据字段
字段默认

type

incremental

要运行的快照的类型。目前,Debeium 仅支持 增量 类型。

data-collections

N/A

以逗号分隔的正则表达式数组,与要包含在快照中的表的完全限定名称匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

additional-condition

N/A

可选字符串,指定连接器评估为指定要包含在快照中的记录子集的条件。

以下示例显示了典型的 execute-snapshot Kafka 信息:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

12.5.3. 启用 Debezium JMX 信号频道

您可以通过在连接器配置中的 signal.enabled.channels 属性中添加 JMX 信号来启用 JMX 信号,然后启用 JMX MBean 服务器 来公开信号 bean。

流程

  1. 使用您首选的 JMX 客户端(例如:JConsole 或 JDK Mission Control,以连接到 MBean 服务器。
  2. 搜索 Mbean debezium. <connector-type>.management.signals. <server>。Mbean 公开接受以下输入参数 的信号 操作:

    p0
    信号的 id。
    p1
    信号的类型,如 execute-snapshot
    p2
    包含指定信号类型的附加信息的 JSON 数据字段。
  3. 通过提供输入参数的值来发送 execute-snapshot 信号。
    在 JSON 数据字段中,包含下表中列出的信息:

    表 12.6. 执行快照数据字段
    字段默认

    type

    incremental

    要运行的快照的类型。目前,Debeium 仅支持 增量 类型。

    data-collections

    N/A

    以逗号分隔的正则表达式数组,与要包含在快照中的表的完全限定名称匹配。
    使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

    additional-condition

    N/A

    可选字符串,指定连接器评估为指定要包含在快照中的记录子集的条件。

    下图显示了如何使用 JConsole 发送信号的示例:

    使用 JConsole 发送"execute-snapshot"信号

12.5.4. Debezium 信号操作的类型

您可以使用信号启动以下操作:

有些信号与所有连接器不兼容。

12.5.4.1. 日志记录信号

您可以通过创建带有日志信号类型的信号表条目来请求连接器来向 日志 添加条目。处理信号后,连接器会将指定的消息输出到日志。另外,您可以配置信号,以便生成的消息包含流协调。

表 12.7. 添加日志消息的信号记录示例
column描述

id

924e3ff8-2245-43ca-ba77-2af9af02fa07

 

type

log

信号的操作类型。

data

{"message": "Signal message at offset {}"}

message 参数指定要打印到日志的字符串。
如果您在消息中添加占位符({}),它将被流传输协调替换。

12.5.4.2. 临时快照信号

您可以通过创建一个带有 execute-snapshot 信号类型的信号来请求连接器来启动临时快照。处理信号后,连接器运行请求的快照操作。

与连接器首次启动后运行的初始快照不同,在连接器已经开始流更改事件后会在运行时发生临时快照。您可以随时启动临时快照。

临时快照可用于以下 Debezium 连接器:

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表 12.8. 临时快照信号记录示例
column

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

表 12.9. 临时快照信号消息示例

test_connector

{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}

有关临时快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。

临时快照停止信号

您可以通过创建一个带有 stop-snapshot 信号类型的信号表条目来请求连接器来停止 in-progress 临时快照。处理信号后,连接器将停止当前的 in-progress 快照操作。

您可以停止以下 Debezium 连接器的临时快照:

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表 12.10. 停止临时快照信号记录的示例
column

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

stop-snapshot

data

{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}

您必须指定信号 的类型data-collections 字段是可选的。将 data-collections 字段留空,以请求连接器停止当前快照中的所有活动。如果您希望增量快照继续,但您想要从快照中排除特定的集合,请提供要排除的集合或正则表达式的名称列表。连接器处理信号后,增量快照会进行,但它从您指定的集合中排除数据。

12.5.4.3. 增量快照

增量快照是特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基准状态,类似于初始快照。但是,与初始快照不同,增量快照会捕获块中的表,而不是一次捕获表。连接器使用水位线方法来跟踪快照的进度。

通过捕获块中指定表的初始状态,而不是在单个单体操作中捕获,与初始快照进程相比,增量快照具有以下优势:

  • 虽然连接器捕获指定表的基准状态,但事务日志中接近实时事件流将继续不间断。
  • 如果增量快照进程中断,可以从其停止的时间点恢复。
  • 您可以随时启动增量快照。

增量快照暂停信号

您可以通过创建一个带有 pause-snapshot 信号类型的信号表条目来请求连接器来暂停 in-progress 增量快照。处理信号后,连接器将停止暂停当前 in-progress 快照操作。因此,无法指定数据收集,因为快照处理将在处理信号时的位置暂停。

您可以暂停以下 Debezium 连接器的增量快照:

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表 12.11. 暂停增量快照信号记录的示例
column

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

pause-snapshot

您必须指定信号 的类型data 字段将被忽略。

增量快照恢复信号

您可以通过创建带有 resume-snapshot 信号类型的信号表条目来请求连接器来恢复暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。

您可以为以下 Debezium 连接器恢复增量快照:

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表 12.12. 恢复增量快照信号记录的示例
column

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

resume-snapshot

您必须指定信号 的类型data 字段将被忽略。

有关增量快照的更多信息,请参阅您的连接器文档中的 Snapshots 主题。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.