搜索

8.2. Debezium PostgreSQL 连接器的工作方式

download PDF

为了优化配置和运行 Debezium PostgreSQL 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称并使用元数据非常有用。

详情包括在以下主题中:

8.2.1. PostgreSQL 连接器的安全性

要使用 Debezium 连接器从 PostgreSQL 数据库流更改,连接器必须使用数据库中的特定权限运行。虽然授予必要的特权的一种方法是为用户提供 超级用户特权,这样做可能会将您的 PostgreSQL 数据暴露给未经授权的访问。最好创建一个专用的 Debezium 复制用户,而不是为 Debezium 用户授予特定的特权。

有关为 Debezium PostgreSQL 用户配置特权的更多信息,请参阅 设置权限。有关 PostgreSQL 逻辑复制安全性的更多信息,请参阅 PostgreSQL 文档

8.2.2. Debezium PostgreSQL 连接器如何执行数据库快照

大多数 PostgreSQL 服务器都配置为不保留 WAL 段中数据库的完整历史记录。这意味着 PostgreSQL 连接器无法通过只读取 WAL 来查看数据库的完整历史记录。因此,连接器首次启动时,它会执行数据库的初始 一致快照

您可以在以下部分找到有关快照的更多信息:

初始快照的默认工作流行为

执行快照的默认行为由以下步骤组成。您可以通过将 snapshot.mode 连接器配置属性设置为 初始 以外的值来更改此行为。

  1. 使用 SERIALIZABLE、READ ONLY、DEFERRABLE 隔离级别启动事务,以确保此事务中的后续读取会根据一个一致的数据版本进行。由于后续 INSERTUPDATEDELETE 操作而对数据的任何更改都不对这个事务可见。
  2. 阅读服务器的事务日志中的当前位置。
  3. 扫描数据库表和模式,为每个行生成一个 READ 事件,并将该事件写入适当的表特定 Kafka 主题。
  4. 提交事务。
  5. 在连接器偏移中记录快照成功完成。

如果连接器失败,会在步骤 1 开始后重新平衡或停止,但在重启连接器开始新快照前。连接器完成其初始快照后,PostgreSQL 连接器会从在第 2 步中读取的位置继续流。这样可确保连接器不会错过任何更新。如果连接器因为任何原因而再次停止,则在重启时,连接器会继续从之前关闭的位置进行流更改。

表 8.1. snapshot.mode 连接器配置属性的选项
选项描述

always

连接器总是在启动时执行快照。快照完成后,连接器将继续在上述序列中从第 3 步进行流更改。这个模式在以下情况下很有用:

  • 已知一些 WAL 段已被删除,且不再可用。
  • 集群失败后,会提升新的主设备。always 快照模式可确保连接器不会错过在新主提升后所做的任何更改,但在连接器在新主上重启前。

never

连接器永远不会执行快照。当以这种方式配置连接器时,其启动时的行为如下。如果 Kafka offsets 主题中存在之前存储的 LSN,则连接器将继续从该位置流更改。如果没有存储 LSN,则连接器会在服务器上创建 PostgreSQL 逻辑复制插槽时从时间点开始流更改。只有在您知道所有关注的数据仍然反映在 WAL 中时,never 快照模式很有用。

初始 (默认)

当没有 Kafka offsets 主题时,连接器会执行数据库快照。数据库快照完成后,将写入 Kafka offsets 主题。如果 Kafka offsets 主题中存在之前存储的 LSN,则连接器将继续从该位置流更改。

initial_only

连接器执行数据库快照并在流任何更改事件记录前停止。如果连接器已启动但在停止前没有完成快照,连接器会重启快照过程并在快照完成后停止。

exported

弃用,所有模式都是没有锁定的。

8.2.3. 临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,在这个初始快照后,连接器不会重复快照过程。连接器捕获的任何更改事件数据都只通过流处理。

然而,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:

  • 连接器配置会被修改为捕获不同的表集合。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或某些其他问题导致数据损坏。

您可以通过启动所谓的 临时快照来为之前捕获的表重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来发起临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。

临时快照信号指定要包含在快照中的表。快照可以捕获整个数据库的内容,或者仅捕获数据库中表的子集。另外,快照也可以捕获数据库中表的内容子集。

您可以通过将 execute-snapshot 消息发送到信号表来指定要捕获的表。将 execute-snapshot 信号类型设置为 增量,并提供快照中包含的表名称,如下表所述:

表 8.2. 临时 execute-snapshot 信号记录的示例
字段默认

type

incremental

指定您要运行的快照类型。
设置类型是可选的。目前,您只能请求 增量 快照。

data-collections

N/A

包含与要快照的表的完全限定域名匹配的正则表达式的数组。
名称的格式与 signal.data.collection 配置选项的格式相同。

additional-condition

N/A

可选字符串,根据表的列指定条件,用于捕获表的内容的子集。

surrogate-key

N/A

可选字符串,指定连接器在快照过程中用作表的主键的列名称。

触发临时快照

您可以通过向信号表中添加 execute-snapshot 信号类型的条目来发起临时快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和结束点。根据表中的条目数量以及配置的块大小,Debezium 会将表划分为块,并一次性执行每个块的快照。

目前,execute-snapshot 操作类型仅触发 增量快照。如需更多信息,请参阅 增加快照

8.2.4. 增量快照

为了提供管理快照的灵活性,Debezium 包含附加快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

在增量快照中,除了一次捕获数据库的完整状态,就像初始快照一样,Debebe 会在一系列可配置的块中捕获每个表。您可以指定您希望快照捕获的表 以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1024 行。

当增量快照进行时,Debebe 使用 watermarks 跟踪其进度,维护它捕获的每个表行的记录。与标准初始快照过程相比,捕获数据的阶段方法具有以下优点:

  • 您可以使用流化数据捕获并行运行增量快照,而不是在快照完成前进行后流。连接器会在快照过程中从更改日志中捕获接近实时事件,且操作都不会阻止其他操作。
  • 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的点开始,而不是从开始计算表。
  • 您可以随时根据需要运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其 table.include.list 属性中。

增量快照过程

当您运行增量快照时,Debezium 会按主键对每个表进行排序,然后根据 配置的块大小 将表分成块。然后,按块的工作块会捕获块中的每个表行。对于它捕获的每行,快照会发出 READ 事件。该事件代表块的快照开始时的行值。

当快照继续进行时,其他进程可能会继续访问数据库,可能会修改表记录。为了反映此类更改,INSERTUPDATEDELETE 操作会按照常常提交到事务日志。同样,持续 Debezium 流进程将继续检测这些更改事件,并将相应的更改事件记录发送到 Kafka。

Debezium 如何使用相同的主密钥在记录间解决冲突

在某些情况下,streaming 进程发出的 UPDATEDELETE 事件会停止序列。也就是说,流流过程可能会发出一个修改表行的事件,该事件捕获包含该行的 READ 事件的块。当快照最终为行发出对应的 READ 事件时,其值已被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debebe 使用缓冲方案来解析冲突。仅在快照事件和流化事件之间发生冲突后,De Debezium 会将事件记录发送到 Kafka。

快照窗口

为了帮助解决修改同一表行的后期事件和流化事件之间的冲突,Debebe 会使用一个所谓的 快照窗口快照窗口分解了增量快照捕获指定表块数据的间隔。在块的快照窗口打开前,Debebe 会使用其常见行为,并将事件从事务日志直接下游发送到目标 Kafka 主题。但从特定块的快照打开后,直到关闭为止,De-duplication 步骤会在具有相同主密钥的事件之间解决冲突。

对于每个数据收集,Debezium 会发出两种类型的事件,并将其存储在单个目标 Kafka 主题中。从表直接捕获的快照记录作为 READ 操作发送。同时,当用户继续更新数据收集中的记录,并且会更新事务日志来反映每个提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。

当快照窗口打开时,Debezium 开始处理快照块,它会向内存缓冲区提供快照记录。在快照窗口期间,缓冲区中 READ 事件的主密钥与传入流事件的主键进行比较。如果没有找到匹配项,则流化事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ 事件,并将流化记录写入目标主题,因为流的事件逻辑地取代静态快照事件。在块关闭的快照窗口后,缓冲区仅包含 READ 事件,这些事件不存在相关的事务日志事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。

连接器为每个快照块重复这个过程。

警告

PostgreSQL 的 Debezium 连接器不支持增量快照运行时的模式更改。如果在增量快照启动执行 schema 更改,但在以后发送信号,passthrough 配置选项 database.autosave 被设置为 conservative 以正确处理 schema 的更改。

8.2.4.1. 触发增量快照

目前,启动增量快照的唯一方法是向源数据库上的 信号表发送临时快照 信号。

作为 SQL INSERT 查询,您将向信号提交信号。

在 Debezium 检测到信号表中的更改后,它会读取信号并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,并可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值 incremental

要指定快照中包含的表,请提供列出表或用于匹配表的正则表达式数组的 数据集合,例如:

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会检测到不需要任何操作,且不会执行快照。

注意

如果要包含在快照中的表的名称在数据库、模式或表的名称中包含句点(.),以将表添加到 data-collections 数组中,您必须使用双引号转义名称的每个部分。

例如,要包含一个存在于 公共 模式的表,其名称为 My.Table,请使用以下格式 :"public"."My.Table "。

先决条件

使用源信号频道来触发增量快照

  1. 发送 SQL 查询,将临时增量快照请求添加到信号表中:

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}, 5
        "additional-condition":"color=blue"}'); 6

    命令中的 idtypedata 参数的值对应于 信号表 的字段

    下表描述了示例中的参数:

    表 8.3. SQL 命令中字段的描述,用于将增量快照信号发送到信号表
    描述

    1

    myschema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串识别信号表中的条目的日志记录消息。Debezium 不使用此字符串。相反,Debebe 会在快照期间生成自己的 id 字符串作为水位线信号。

    3

    execute-snapshot

    type 参数指定信号旨在触发的操作。

    4

    data-collections

    信号的 data 字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
    数组列出了按照完全限定名称匹配表的正则表达式,其格式与您在 signal.data.collection 配置属性中指定连接器信号表的名称相同。

    5

    incremental

    信号的 data 字段的可选 类型 组件,用于指定要运行的快照操作类型。
    目前,唯一有效的选项是默认值 incremental
    如果没有指定值,连接器将运行增量快照。

    6

    additional-condition

    可选字符串,根据表的列指定条件,用于捕获表的内容的子集。有关 additional-condition 参数的更多信息,请参阅 带有额外条件的临时增量快照

带有额外条件的临时增量快照

如果您希望快照只包含表中的内容子集,您可以通过向快照信号附加 additional-condition 参数来修改信号请求。

典型的快照的 SQL 查询采用以下格式:

SELECT * FROM <tableName> ....

通过添加 additional-condition 参数,您可以将 WHERE 条件附加到 SQL 查询中,如下例所示:

SELECT * FROM <tableName> WHERE <additional-condition> ....

以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

例如,假设您有一个包含以下列的 products 表:

  • ID (主键)
  • color
  • quantity

如果您需要 product 表的增量快照,其中只包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');

additional-condition 参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含 color=bluequantity>10 的项数据:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增加快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

字段名称描述

1

snapshot

指定要运行的快照操作类型。
目前,唯一有效的选项是默认值 incremental
在 SQL 查询中指定 type 值,您提交到信号表是可选的。
如果没有指定值,连接器将运行增量快照。

2

op

指定事件类型。
快照事件的值是 r,表示 READ 操作。

8.2.4.2. 使用 Kafka 信号频道来触发增量快照

您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

message 的值是带有 typedata 字段的 JSON 对象。

信号类型是 execute-snapshotdata 字段必须具有以下字段:

表 8.4. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前,Debeium 仅支持 增量 类型。
详情请查看下一节。

data-collections

N/A

以逗号分隔的正则表达式数组,与快照中包含的表的完全限定域名匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

additional-condition

N/A

可选字符串,指定连接器评估为指定要包含在快照中的列子集的条件。

execute-snapshot Kafka 消息示例:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

带有额外条件的临时增量快照

Debezium 使用 additional-condition 字段来选择表内容的子集。

通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:

SELECT * FROM <tableName> …​.

当快照请求包含 additional-condition 时,extra-condition 会附加到 SQL 查询中,例如:

SELECT * FROM <tableName> WHERE <additional-condition> …​.

例如,如果一个 product table with the column id (主键)、colorbrand,如果您希望快照只包含 color='blue' 的内容,当您请求快照时,您可以附加一个 additional-condition 语句来过滤内容:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`

您可以使用 additional-condition 语句根据多个列传递条件。例如,如果您希望快照只包含 color='blue' products 表中,以及 brand='MyBrand',则您可以发送以下请求:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`

8.2.4.3. 停止增量快照

您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT 查询向表提交停止快照信号。

在 Debezium 检测到信号表中的更改后,它会读取信号,并在正在进行时停止增量快照操作。

您提交的查询指定 增量 的快照操作,以及要删除的当前运行快照的表。

先决条件

使用源信号频道停止增量快照

  1. 发送 SQL 查询以停止临时增量快照到信号表:

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}'); 5

    signal 命令中的 idtypedata 参数的值对应于 信号表 的字段

    下表描述了示例中的参数:

    表 8.5. SQL 命令中字段的描述,用于将停止增量快照信号发送到信号表
    描述

    1

    myschema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串识别信号表中的条目的日志记录消息。Debezium 不使用此字符串。

    3

    stop-snapshot

    指定 type 参数指定信号要触发的操作。

    4

    data-collections

    信号的 data 字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
    数组列出了按照完全限定名称匹配表的正则表达式,其格式与您在 signal.data.collection 配置属性中指定连接器信号表的名称相同。如果省略了 data 字段的这一组件,信号将停止正在进行的整个增量快照。

    5

    incremental

    信号的 data 字段所需的组件,用于指定要停止的快照操作类型。
    目前,唯一有效的选项是 增量的
    如果没有指定 类型 值,信号将无法停止增量快照。

8.2.4.4. 使用 Kafka 信号频道停止增量快照

您可以将信号消息发送到 配置的 Kafka 信号主题,以停止临时增量快照。

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

message 的值是带有 typedata 字段的 JSON 对象。

信号类型是 stop-snapshotdata 字段必须具有以下字段:

表 8.6. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前,Debeium 仅支持 增量 类型。
详情请查看下一节。

data-collections

N/A

可选数组,以逗号分隔的正则表达式,与表的完全限定域名匹配,以包含在快照中。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

以下示例显示了典型的 stop-snapshot Kafka 信息:

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

8.2.5. Debezium PostgreSQL 连接器流更改事件记录

PostgreSQL 连接器通常将其大部分的时间流更改从 PostgreSQL 服务器连接到其中。这种机制依赖于 PostgreSQL 的复制协议。这个协议可让客户端从服务器接收更改,因为它们在服务器的事务日志中提交,这些位置被称为 Log Sequence Numbers (LSN)。

每当服务器提交事务时,单独的服务器进程会从 逻辑解码插件 调用回调功能。此功能处理事务中的更改,将其转换为特定格式(如果是 Debezium 插件则为Protobuf 或 JSON),并在输出流上写入它们,然后可以被客户端使用。

Debezium PostgreSQL 连接器充当 PostgreSQL 客户端。当连接器收到更改时,它会将事件转换为 Debezium 的 create, update, 或 delete 事件,包含该事件的 LSN 的事件。PostgreSQL 连接器将记录中的这些更改事件转发到 Kafka Connect 框架,该框架在同一进程中运行。Kafka Connect 进程异步写入更改事件记录,其顺序与生成到适当的 Kafka 主题相同。

定期,Kafka Connect 在另一个 Kafka 主题中记录最新的 偏移量。偏移表示 Debezium 包含在每个事件中的特定于源的位置信息。对于 PostgreSQL 连接器,在每次更改事件中记录的 LSN 是偏移量。

当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收的最后偏移。当 Kafka Connect 重启时,它会为每个连接器读取最后记录的偏移,并在其最后记录的偏移上启动每个连接器。当连接器重启时,它会将请求发送到 PostgreSQL 服务器,来仅在该位置后开始发送事件。

注意

PostgreSQL 连接器检索模式信息,作为逻辑解码插件发送的事件的一部分。但是,连接器不会检索有关构成主键的列的信息。连接器从 JDBC 元数据(在频道侧)获取此信息。如果表的主键定义有变化(通过添加、删除或重命名主键列),当来自 JDBC 的主密钥信息与逻辑解码插件生成的更改事件没有同步时,会有一个小时段。在此小期间内,可以创建带有不一致的密钥结构的消息。要防止这种不一致,请按如下所示更新主键结构:

  1. 将数据库或应用程序置于只读模式。
  2. 让 Debezium 处理所有剩余的事件。
  3. 停止 Debezium。
  4. 更新相关表中的主密钥定义。
  5. 将数据库或应用程序置于读/写模式。
  6. 重启 Debezium。

PostgreSQL 10+ 逻辑解码支持(pgoutput)

从 PostgreSQL 10+ 开始,有一个逻辑复制流模式,称为 pgoutput,它被 PostgreSQL 原生支持。这意味着 Debezium PostgreSQL 连接器可以消耗该复制流,而无需额外的插件。这对不支持或不支持或不支持插件的环境特别有用。

如需更多信息,请参阅设置 PostgreSQL

8.2.6. 接收 Debezium PostgreSQL 更改事件记录的默认 Kafka 主题名称

默认情况下,PostgreSQL 连接器将所有 INSERTUPDATEDELETE 操作的更改事件写入特定于该表的单一 Apache Kafka 主题。连接器使用以下惯例来命名更改事件主题:

topicPrefix.schemaName.tableName

以下列表为默认名称的组件提供定义:

topicPrefix
topic.prefix 配置属性指定的主题前缀。
schemaName
发生更改事件的数据库模式的名称。
tableName
发生更改事件的数据库表的名称。

例如,假设 fulfillment 是连接器中的逻辑服务器名称,该连接器捕获 PostgreSQL 安装中的更改,该安装具有一个 postgres 数据库和一个 inventory schem,它包含四个表:products, products_on_hand, customers, 和 orders连接器会将记录流传输到这四个 Kafka 主题:

  • fulfillment.inventory.products
  • fulfillment.inventory.products_on_hand
  • fulfillment.inventory.customers
  • fulfillment.inventory.orders

现在假设表不是特定架构的一部分,但在默认的 公共 PostgreSQL 模式中创建。Kafka 主题的名称为:

  • fulfillment.public.products
  • fulfillment.public.products_on_hand
  • fulfillment.public.customers
  • fulfillment.public.orders

连接器应用类似的命名约定,以标记其 事务元数据主题

如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 来自定义主题命名的更多信息,请参阅 主题路由

8.2.7. Debezium PostgreSQL 连接器生成的事件代表事务边界

Debezium 可以生成代表事务边界的事件,以及丰富的数据更改事件消息。

Debezium 接收事务元数据时的限制

Debezium 注册并只针对部署连接器后发生的事务接收元数据。部署连接器前发生的事务元数据不可用。

对于每个事务 BEGINEND,Debezium 会生成一个包含以下字段的事件:

status
BEGINEND.
id
由 Postgres 事务 ID 本身和给定操作的 LSN 组成的唯一事务标识符的字符串,即格式为 txID:LSN
ts_ms
数据源的事务边界事件(BEGINEND 事件)的时间。如果数据源没有向事件时间提供 Debezium,则该字段代表 Debezium 处理事件的时间。
event_count (用于 END 事件)
事务提供的事件总数。
data_collections (用于 END 事件)
data_collectionevent_count 元素的数组,用于指示连接器发出来自数据收集的更改的事件数量。

示例

{
  "status": "BEGIN",
  "id": "571:53195829",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "571:53195832",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic. prefix>.transaction 的主题

更改数据事件增强

启用事务元数据后,数据消息 Envelope 通过新的 transaction 字段进行了增强。此字段以字段复合的形式提供有关每个事件的信息:

id
唯一事务标识符的字符串。
total_order
事件在事务生成的所有事件中绝对位置。
data_collection_order
在事务发出的所有事件间,按数据收集位置。

以下是消息的示例:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
   ...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "571:53195832",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.