7.2. Debezium PostgreSQL 连接器的工作方式
为了最佳配置和运行 Debezium PostgreSQL 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称以及使用元数据会很有帮助。
详情包括在以下主题中:
7.2.1. PostgreSQL 连接器的安全性
要使用 Debezium 连接器从 PostgreSQL 数据库流更改,连接器必须使用数据库中的特定特权进行操作。虽然授予所需特权的一种方法是为用户授予 超级用户特权
的一种方法,但这样做可能会公开您的 PostgreSQL 数据到未授权的访问。最好创建一个您授予特定特权的专用 Debezium 复制用户,而不是为 Debezium 用户授予过量特权。
有关为 Debezium PostgreSQL 用户配置特权的更多信息,请参阅 设置权限。有关 PostgreSQL 逻辑复制安全性的更多信息,请参阅 PostgreSQL 文档。
7.2.2. Debezium PostgreSQL 连接器如何执行数据库快照
大多数 PostgreSQL 服务器都配置为在 WAL 段中保留数据库的完整历史记录。这意味着 PostgreSQL 连接器无法通过读取 WAL 来查看数据库的整个历史记录。因此,连接器首次启动时,它会执行数据库的初始 一致性快照。执行快照的默认行为由以下步骤组成。您可以通过将 snapshot.mode
连接器配置属性设置为 初始
以外的值来更改此行为。
-
使用 SERIALIZABLE、READ ONLY、DEFERRABLE 隔离级别启动事务,以确保后续事务中的读取是针对数据的单个一致版本。由于后续
INSERT
、UPDATE
和DELETE
操作而对数据的任何更改都对这个事务不可见。 - 阅读服务器事务日志中的当前位置。
-
扫描数据库表和模式,为每个行生成
READ
事件,并将事件写入适当的表特定的 Kafka 主题。 - 提交事务。
- 在连接器偏移中记录成功完成快照。
如果连接器失败,会在步骤 1 开始后重新平衡或停止,但在重启连接器开始新快照前。连接器完成其初始快照后,PostgreSQL 连接器将继续从第 2 步中读取的位置流。这样可确保连接器不会丢失任何更新。如果连接器因为某种原因再次停止,则连接器会在之前离开的地方继续流传输更改。
选项 | 描述 |
---|---|
|
连接器总是在启动时执行快照。快照完成后,连接器将继续在上述序列中从第 3 步进行流传输更改。这个模式在以下情况下很有用:
|
|
连接器永远不会执行快照。当连接器以这种方式配置时,其行为如下。如果 Kafka offsets 主题中存在之前存储的 LSN,则连接器将继续从该位置流更改。如果没有存储 LSN,则连接器在服务器上创建 PostgreSQL 逻辑复制插槽时从点开始流更改。只有在您知道所关注的所有数据仍然反映在 WAL 中时, |
| 连接器执行数据库快照,并在流传输任何更改事件记录前停止。如果连接器已启动但没有在停止前完成快照,则连接器会重启快照进程并在快照完成后停止。 |
| 弃用,所有模式都锁定。 |
7.2.2.1. 临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,遵循这个初始快照,连接器不会重复快照过程。连接器捕获的任何将来更改事件数据都仅通过流传输过程。
然而,在某些情况下,连接器在初始快照期间获取的数据可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能是执行临时快照:
- 连接器配置会被修改为捕获不同的表集合。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时命令快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。
当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用自动主题创建,Debezium 可以自动创建主题。https://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q2/html-single/debezium_user_guide/index#customization-of-kafka-connect-automatic-topic-creation
临时快照信号指定要包含在快照中的表。快照可以捕获数据库的整个内容,或者只捕获数据库中表的子集。另外,快照也可以捕获数据库中表内容的子集。
您可以通过向信号表发送 execute-snapshot
消息来指定要捕获的表。将 execute-snapshot
信号的类型设置为增量
,并提供快照中包含的表名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| 不适用 |
包含与要快照的表的完全限定域名匹配的正则表达式的数组。 |
| 不适用 | 可选字符串,它根据表的列指定条件,用于捕获表内容的子集。 |
触发临时快照
您可以通过在信号表中添加带有 execute-snapshot
信号类型的条目来启动临时快照。连接器处理消息后,它会开始快照操作。快照过程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和端点。根据表中的条目数量以及配置的块大小,Debezium 将表分成块,并一次为每个块进行快照。
7.2.2.2. 增量快照
为了提供管理快照的灵活性,Debezium 包括一个补充的快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,而不是一次性捕获数据库的完整状态,如初始快照,Debezium 以一系列可配置的块的形式捕获每个表。您可以指定您希望快照捕获的表,以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1 KB。
当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护它捕获的每一个表行的记录。这个阶段捕获数据的方法比标准初始快照过程提供以下优点:
- 您可以使用流化数据捕获并行运行增量快照,而不是经过发布流,直到快照完成为止。连接器会继续在整个快照过程中从更改日志捕获接近实时事件,且操作都不会阻断其他事件。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间开始,而不是从开始重新定义表。
-
您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其
table.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 按主密钥对表进行排序,然后根据 配置的块大小 将表分成块。然后,按块使用块,然后在块中捕获每个表行。对于它捕获的每行,快照会发出 READ
事件。该事件代表了块开始快照时的行值。
当快照继续进行时,其他进程可能会继续访问数据库,从而可能会修改表记录。要反映此类更改,INSERT
、UPDATE
或 DELETE
操作会照常提交到事务日志中。同样,持续 Debezium 流过程还会继续检测这些更改事件,并将对应的更改事件记录发送到 Kafka。
Debezium 如何使用相同的主密钥解决记录间的冲突
在某些情况下,流传输进程发出的 UPDATE
或 DELETE
事件会耗尽序列。也就是说,流过程可能会发出一个事件,它会在快照捕获包含该行的 READ
事件之前修改表行。当快照最终会为行发出对应的 READ
事件时,其值已经被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debezium 会使用一种缓冲区方案来解决冲突。只有在快照事件和流事件间冲突时才解决后,Debezium 会向 Kafka 发送事件记录。
快照窗口
为了帮助解决后续 READ
事件和修改同一表行的流事件之间的冲突,Debezium 采用所谓的 快照窗口。快照窗口分离了增量快照捕获指定表块的数据的时间间隔。在打开块的快照窗口前,Debebe 会遵循其常见的行为,并将事件直接从事务日志直接降级到目标 Kafka 主题。但是,从打开特定块的快照时,Debebe 会执行重复数据删除步骤,以解决具有相同主密钥的事件之间冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将其记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ
操作发出。同时,当用户继续更新数据收集中的记录,并更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debezium 开始处理快照块,它会将快照记录提供给内存缓冲区。在快照窗口中,缓冲区中 READ
事件的主键将与传入流事件的主键进行比较。如果没有找到匹配项,流的事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ
事件,并将流的记录写入目标主题,因为流的事件逻辑地替换静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ
事件。Debezium 将这些剩余的 READ
事件发送到表的 Kafka 主题。
连接器重复每个快照块的进程。
PostgreSQL 的 Debezium 连接器不支持增量快照运行时的 schema 更改。如果在增量快照启动前执行模式更改,但在发送信号 后,passthrough 配置选项 database.autosave
被设置为 conservative
以正确处理架构更改。
7.2.2.3. 触发增量快照
目前,启动增量快照的唯一方法是向源数据库上的 信号发送临时快照 信号。
您可以以 SQL INSERT
查询形式向信号表提交信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。
您提交的查询指定要包含在快照中的表,以及可选的指定快照操作类型。目前,快照操作的唯一有效选项是默认值 增量
。
要指定快照中包含的表,请提供一个 data-collections
数组,该数组列出了用于匹配表的表或一组正则表达式,例如:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debezium 会检测到不需要任何操作,且不执行快照。
如果要包含在快照中的表名称包含数据库、模式或表名称的句点(.
),要将表添加到 data-collections
数组中,您必须用双引号转义名称的每个部分。
例如,若要包含 公共
模式中存在的表,并且名为 My.Table
,请使用以下格式:" public"."My.Table
"。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
发送 SQL 查询,将临时增量快照请求添加到信号表中:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'execute-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}, 5 "additional-condition":"color=blue"}'); 6
命令中的
id
、type
和data
参数的值对应于 信号表的字段。下表描述了示例中的参数:
表 7.3. 向信号表发送增量快照信号的 SQL 命令中的字段描述 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定分配给信号请求的id
标识符的任意字符串。
使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。相反,在快照中,Debebe 会生成自己的id
字符串作为水位信号。3
execute-snapshot
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定signal.data.collection
配置属性中的连接器信号表的名称相同。5
增量
信号的
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
目前,唯一有效选项是默认值增量
。
如果没有指定值,连接器将运行一个增量快照。6
additional-condition
可选字符串,它根据表的列指定条件,用于捕获表内容的子集。有关
additional-condition
参数的详情请参考带有额外条件
的临时增量快照。
带有额外条件
的临时增量快照
如果您希望快照在表中仅包含内容子集,您可以通过将 additional-condition
参数附加到快照信号来修改信号请求。
典型的快照的 SQL 查询采用以下格式:
SELECT * FROM <tableName> ....
通过添加 additional-condition
参数,您可以将 WHERE
条件附加到 SQL 查询中,如下例所示:
SELECT * FROM <tableName> WHERE <additional-condition> ....
以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,假设您有一个包含以下列的产品表:
-
id
(主密钥) -
color
-
quantity
如果您希望 product 表的增量快照只包含 color=blue
的数据项,您可以使用以下 SQL 语句来触发快照:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');
additional-condition
参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含那些项目的 data =blue
和 quantity>10
:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增加快照事件信息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
7.2.2.4. 停止增量快照
您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT
查询,向表提交停止快照信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行时停止增量快照操作。
您提交的查询指定了 增量
的快照操作,以及可选的、要删除的当前运行快照的表。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
发送 SQL 查询以停止临时增量快照到信号表:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'stop-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}'); 5
signal 命令中的
id
、type
和data
参数的值对应于 信号表的字段。下表描述了示例中的参数:
表 7.4. 向信号表发送停止增量快照信号的 SQL 命令中的字段描述 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定分配给信号请求的id
标识符的任意字符串。
使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。3
stop-snapshot
指定
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定signal.data.collection
配置属性中的连接器信号表的名称相同。如果省略了data
字段的此组件,则信号将停止正在进行的整个增量快照。5
增量
信号的
data
字段所需的组件,用于指定要停止的快照操作类型。
目前,唯一有效选项是增量的
。
如果没有指定类型
值,则信号无法停止增量快照。
7.2.3. Debezium PostgreSQL 连接器流更改事件记录
PostgreSQL 连接器通常占用从 PostgreSQL 服务器连接到的大多数时间流更改。这种机制依赖于 PostgreSQL 的复制协议。此协议可让客户端在服务器的特定位置(称为 Log Sequence Numbers (LSN)在服务器事务日志中提交时从服务器接收更改。
每当服务器提交事务时,单独的服务器进程都会从 逻辑解码插件中调用回调功能。此功能处理事务的更改,将其转换为特定格式(在 Debezium 插件时为Protobuf 或 JSON),并在输出流上写入它们,然后客户端可以消耗它。
Debezium PostgreSQL 连接器充当 PostgreSQL 客户端。当连接器收到更改时,它会将事件转换为 Debezium 的 create, update, 或 delete 事件,包含该事件的 LSN 的事件。PostgreSQL 连接器将记录中的这些更改事件转发到 Kafka Connect 框架,该框架在同一进程中运行。Kafka Connect 进程异步写入更改事件记录,其顺序与生成到适当的 Kafka 主题的顺序相同。
Kafka Connect 会在另一个 Kafka 主题中记录最新的 偏移量。offset 表示 Debezium 包含在每个事件中的特定于源的位置信息。对于 PostgreSQL 连接器,每次更改事件中记录的 LSN 是偏移。
当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收的最后偏移。当 Kafka Connect 重启时,它会读取每个连接器的最后记录的偏移量,并在最后记录的偏移后启动每个连接器。当连接器重启时,它会向 PostgreSQL 服务器发送请求,以便在该位置后才启动事件。
PostgreSQL 连接器检索模式信息,作为逻辑解码插件发送的事件的一部分。但是,连接器不会检索有关哪些列组成主密钥的信息。连接器从 JDBC 元数据(频道)获取此信息。如果表的主键定义有变化(通过添加、删除或重命名主键列),则当 JDBC 的主密钥信息没有与逻辑解码插件生成的更改事件不同步时,会有一个小的时间。在这个小时间内,可以使用不一致的密钥结构来创建消息。要防止这个问题不一致,请更新主键结构,如下所示:
- 将数据库或应用程序置于只读模式。
- 让 Debezium 处理所有剩余的事件。
- 停止 Debezium。
- 更新相关表中的主密钥定义。
- 将数据库或应用程序置于读/写模式。
- 重启 Debezium。
PostgreSQL 10+ 逻辑解码支持(pgoutput
)
从 PostgreSQL 10+ 开始,有一个逻辑复制流模式,称为 pgoutput
,它被 PostgreSQL 原生支持。这意味着 Debezium PostgreSQL 连接器可以消耗该复制流,而无需额外的插件。对于不支持或不允许安装插件的环境,这尤其重要。
如需更多信息,请参阅设置 PostgreSQL。
7.2.4. 接收 Debezium PostgreSQL 更改事件记录的 Kafka 主题默认名称
默认情况下,PostgreSQL 连接器将所有 INSERT
、UPDATE
和 DELETE
操作的更改事件写入特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题:
topicPrefix.schemaName.tableName
以下列表提供默认名称组件的定义:
- topicPrefix
-
由
topic.prefix
配置属性指定的主题前缀。 - schemaName
- 发生更改事件的数据库模式的名称。
- tableName
- 发生更改事件的数据库表的名称。
例如,假设 fulfillment
是连接器中的逻辑服务器名称,该连接器捕获 PostgreSQL 安装中的更改,该安装具有一个 postgres
数据库和一个 inventory
schem,它包含四个表:products
, products_on_hand
, customers
, 和 orders
连接器会将记录流传输到这四个 Kafka 主题:
-
fulfillment.inventory.products
-
fulfillment.inventory.products_on_hand
-
fulfillment.inventory.customers
-
fulfillment.inventory.orders
现在假设表不是特定模式的一部分,而是在默认 公共
PostgreSQL 模式中创建。Kafka 主题的名称将是:
-
fulfillment.public.products
-
fulfillment.public.products_on_hand
-
fulfillment.public.customers
-
fulfillment.public.orders
连接器应用类似的命名约定来标记其 事务元数据主题。
如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由。
7.2.5. Debezium PostgreSQL 连接器生成的事件代表事务边界
Debezium 可以生成代表事务边界的事件,并增强数据更改事件消息。
Debezium 仅在部署连接器后为发生的事务注册并接收元数据。部署连接器前发生的事务元数据。
对于每个事务 BEGIN
和 END
,Debezium 生成一个包含以下字段的事件:
status
-
BEGIN
或END
。 id
-
由 Postgres 事务 ID 本身和给定操作的 LSN 组成的唯一事务标识符的字符串表示,即
txID:LSN
。 ts_ms
-
数据源上事务边界事件(
BEGIN
或END
事件)的时间。如果数据源没有向 Debezium 提供事件时间,则该字段代表 Debezium 处理事件的时间。 event_count
(用于END
事件)- 事务所设计的事件总数。
data_collections
(用于END
事件)-
data_collection
和event_count
元素的一组对,指示连接器为来自数据收集的更改发出的事件数。
示例
{ "status": "BEGIN", "id": "571:53195829", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "571:53195832", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "s1.a", "event_count": 1 }, { "data_collection": "s2.a", "event_count": 1 } ] }
除非通过 topic.transaction
选项覆盖,否则事务事件将写入名为 <topic. prefix>
。
.transaction
的主题
更改数据事件增强
当启用事务元数据时,可通过新的 事务
字段增强数据消息 Envelope
。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串表示。
total_order
- 事件在事务生成的所有事件的绝对路径。
data_collection_order
- 事件在事务发送的所有事件中每个数据收集位置。
以下是消息的示例:
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "571:53195832", "total_order": "1", "data_collection_order": "1" } }