2.6. PostgreSQL 的 Debezium 连接器


Debezium PostgreSQL 连接器捕获 PostgreSQL 数据库模式中的行级更改。有关与连接器兼容的 PostgreSQL 版本的详情,请参考 Debezium 支持的配置 页面

第一次连接到 PostgreSQL 服务器或集群时,连接器会获取所有模式的一致性快照。完成该快照后,连接器会持续捕获行级更改,这些更改插入、更新和删除数据库内容,并提交到 PostgreSQL 数据库。连接器生成数据更改事件记录,并将其流传输到 Kafka 主题。对于每个表,默认行为是连接器将所有生成的事件流传输到该表的独立 Kafka 主题。应用程序和服务消耗来自该主题的数据更改事件记录。

使用 Debezium PostgreSQL 连接器的信息和流程组织如下:

2.6.1. Debezium PostgreSQL 连接器概述

PostgreSQL 的逻辑解码 功能是在版本 9.4 中引入的。是一种机制,允许提取提交至事务日志的更改,以及通过 输出插件 以用户友好的方式处理这些更改。输出插件可让客户端消耗更改。

PostgreSQL 连接器包含两个主要部分,它们一起用于读取和处理数据库更改:

  • pgoutput 是 PostgreSQL 10+ 中的标准逻辑解码输出插件。这是此 Debezium 发行版本中唯一支持的逻辑解码输出插件。此插件由 PostgreSQL 社区维护,供 PostgreSQL 本身用于 逻辑复制。此插件始终存在,因此不需要安装其他库。Debezium 连接器将原始复制事件流直接解释为更改事件。
  • Java 代码(实际的 Kafka Connect 连接器),它使用 PostgreSQL 的流 复制协议 和 PostgreSQL JDBC 驱动程序 来读取逻辑解码输出插件生成的更改。

连接器会为捕获的每行级别的插入、更新和删除操作生成更改事件,并在单独的 Kafka 主题中为每个表发送更改事件记录。客户端应用程序读取与感兴趣的数据库表对应的 Kafka 主题,并可对这些主题收到的每行事件做出反应。

PostgreSQL 通常会在一段时间后清除 write-ahead 日志(WAL)片段。这意味着连接器没有对数据库所做的任何更改的完整历史记录。因此,当 PostgreSQL 连接器首先连接到特定的 PostgreSQL 数据库时,它会首先执行每个数据库模式 的一致性快照。连接器完成快照后,它会继续从创建快照的确切点中流传输更改。这样,连接器会以所有数据的一致性视图开始,且不会省略拍摄快照时所做的任何更改。

连接器可以接受失败。当连接器读取更改并生成事件时,它会记录每个事件的 WAL 位置。如果连接器因任何原因(包括通信失败、网络问题或崩溃)停止,重启连接器将继续读取最后一个关闭的 WAL。这包括快照。如果连接器在快照过程中停止,连接器会在重启时启动新的快照。

重要

连接器依赖于并反映 PostgreSQL 逻辑解码功能,它有以下限制:

  • 逻辑解码不支持 DDL 更改。这意味着连接器无法向消费者报告 DDL 更改事件。
  • 因为逻辑解码复制插槽在 commit swig- swigand not post commit swig-wagonundesirable side-effects 过程中发布更改。当客户端可能会观察不一致的状态时,有两个主要场景。首先,在复制前的 master 结束时发布未提交的更改。其次,发布无法临时读取的更改(例如,读写一致性),因为它们正在复制。例如,EmbeddedEngine 使用者收到创建的行通知,但它不能被事务读取。

另外,pgoutput 逻辑解码输出插件不会捕获生成的列的值,从而导致连接器输出中缺少这些列的数据。

发生错误时的行为 描述了连接器如何响应是否有问题。

重要

Debezium 目前仅支持使用 UTF-8 字符编码的数据库。使用单字节字符编码时,无法正确处理包含扩展 ASCII 代码字符的字符串。

2.6.2. Debezium PostgreSQL 连接器如何工作

要优化地配置和运行 Debezium PostgreSQL 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称并使用元数据。

详情包括在以下主题中:

2.6.2.1. PostgreSQL 连接器的安全性

要使用 Debezium 连接器从 PostgreSQL 数据库流传输更改,连接器必须使用数据库中的特定权限操作。虽然授予所需的特权的一种方法是为用户授予 超级用户特权,但这样做可能会将您的 PostgreSQL 数据暴露给未经授权的访问权限。最好创建一个专用的 Debezium 复制用户,而不是为 Debezium 用户授予特权。

有关为 Debezium PostgreSQL 用户配置 权限的更多信息,请参阅设置权限。有关 PostgreSQL 逻辑复制安全性的更多信息,请参阅 PostgreSQL 文档

2.6.2.2. Debezium PostgreSQL 连接器如何执行数据库快照

大多数 PostgreSQL 服务器配置为不会在 WAL 段中保留数据库的完整历史记录。这意味着 PostgreSQL 连接器无法通过只读取 WAL 来查看数据库的完整历史记录。因此,当连接器首次启动时,它会执行数据库的初始 一致的快照

您可以在以下部分中找到有关快照的更多信息:

初始快照的默认工作流行为

以下步骤描述了连接器在初始快照过程中执行的默认步骤。您可以通过将 snapshot.mode 连接器配置属性设置为 初始 的值来更改此行为。

  1. 启动一个事务,它使用 snapshot.isolation.mode 属性指定的隔离级别。指定模式决定了此事务中的后续读取是否针对单个一致的数据版本。根据模式,更改导致其他客户端后续 INSERTUPDATEDELETE 操作的数据可能对此事务可见。
  2. 读取服务器事务日志中的当前位置。
  3. 扫描数据库表和模式,为每个行生成一个 READ 事件,并将该事件写入适当的表特定的 Kafka 主题。
  4. 提交事务。
  5. 在连接器偏移中记录成功完成快照。

如果连接器失败,会在步骤 1 开始后重新平衡或停止,但在重启连接器开始新快照前。连接器完成其初始快照后,PostgreSQL 连接器将继续从第 2 步中读取的位置流。这样可确保连接器不会丢失任何更新。如果连接器因任何原因再次停止,它会在重启后继续流传输更改。

表 2.128. snapshot.mode 连接器配置属性的选项
选项描述

always

连接器在启动时始终执行快照。快照完成后,连接器将继续从上述序列中的步骤 3 中流传输更改。这个模式在以下情况下很有用:

  • 众所周知,一些 WAL 段已被删除,并不再可用。
  • 集群失败后,如果提升了新的主系统。always 快照模式可确保连接器不会丢失在新主主被提升后所做的任何更改,但在连接器在新主上重启之前。

Initial (默认)

当没有 Kafka offsets 主题时,连接器会执行数据库快照。数据库快照完成后,将写入 Kafka 偏移主题。如果 Kafka offsets 主题中存在之前存储的 LSN,则连接器将继续从该位置流更改。

initial_only

连接器执行数据库快照并在流传输任何更改事件记录前停止。如果连接器已启动,但没有在停止前完成快照,则连接器会重启快照过程,并在快照完成后停止。

no_data

连接器永远不会执行快照。当以这种方式配置连接器时,启动后,它的行为如下:

如果 Kafka offsets 主题中存在之前存储的 LSN,则连接器将继续从该位置流更改。如果没有存储 LSN,则连接器从服务器上创建 PostgreSQL 逻辑复制插槽的时间点开始流更改。只有在您知道所有感兴趣的数据仍然反映在 WAL 中时,才使用此快照模式。

never

弃用,请参阅 no_data

when_needed

连接器启动后,只有在检测到以下情况之一时才执行快照:

  • 它无法检测任何主题偏移。
  • 之前记录的偏移量指定了服务器上不可用的日志位置。

2.6.2.3. 临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。按照这个初始快照,在正常情况下,连接器不会重复快照过程。任何以后更改连接器捕获的事件数据都会通过流处理。

然而,在某些情况下,在初始快照中获取的连接器可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行临时快照:

  • 连接器配置会被修改为捕获不同的表集合。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或某些其他问题导致数据损坏。

您可以通过启动所谓的 临时快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。

临时快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或者仅捕获数据库中表的子集。此外,快照也可捕获数据库中表的内容的子集。

您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为 incrementalblocking,并提供快照中包含的表名称,如下表所述:

表 2.129. 临时 execute-snapshot 信号记录示例
字段默认

type

incremental

指定您要运行的快照类型。
目前,您可以请求 增量阻塞 快照。

data-collections

N/A

包含与快照中包含的表的完全限定域名匹配的正则表达式的数组。
对于 PostgreSQL 连接器,使用以下格式来指定表的完全限定名称: schema.table

additional-conditions

N/A

可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数:

data-collection
过滤器应用到的表的完全限定域名。您可以对每个表应用不同的过滤器。
filter
指定在数据库记录中必须存在的列值,以便快照包含它,例如 "color='blue' "。

您分配给 filter 参数的值是您在为阻塞快照设置 snapshot.select.statement.overrides 属性时,在 SELECT 语句的 WHERE 子句中指定的值。

surrogate-key

N/A

可选字符串,用于指定连接器在快照过程中用作表的主键的列名称。

触发临时增量快照

您可以通过向信号表添加带有 execute-snapshot 信号类型的条目,或者向 Kafka 信号发送信号消息 来启动临时增量快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主键值,并使用这些值作为每个表的开始和端点。根据表中的条目数量以及配置的块大小,Debezium 会将表分成块,并持续持续对每个块进行快照。

如需更多信息,请参阅 增加快照

触发临时阻塞快照

您可以通过在信号表或信号主题中添加带有 execute-snapshot 信号类型的条目来启动临时阻塞快照。连接器处理消息后,它会开始快照操作。连接器会临时停止流,然后启动指定表的快照,遵循它在初始快照过程中使用的同一进程。快照完成后,连接器会恢复流。

如需更多信息,请参阅 块快照

2.6.2.4. 增量快照

为了在管理快照方面提供灵活性,Debezium 包含一个补充快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

在增量快照中,而不是像初始快照一样捕获数据库的完整状态,而是在一系列可配置的块中捕获每个表。您可以指定您希望快照捕获的表 以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小是 1024 行。

当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护其捕获的每个表行的记录。与标准初始快照过程相比,这个分阶段方法捕获数据具有以下优点:

  • 您可以并行运行带有流数据捕获的增量快照,而不是延迟流,直到快照完成为止。连接器将继续在整个快照过程中从更改日志捕获接近实时事件,且操作都会阻止另一个事件。
  • 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从它停止的点开始,而不是从开始重新捕获表。
  • 您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其 table.include.list 属性中。

增量快照过程

当您运行增量快照时,Debezium 按主密钥对每个表进行排序,然后根据 配置的块大小 将表分成块。通过块工作的块,然后捕获块中的每个表行。对于它捕获的每行,快照会发出 READ 事件。该事件代表了块开始快照时所在行的值。

当快照进行时,其他进程可能会继续访问数据库,可能会修改表记录。要反映此类更改,INSERTUPDATEDELETE 操作会按预期提交到事务日志。同样,持续 Debezium 流过程会继续检测到这些更改事件,并将对应的更改事件记录发送到 Kafka。

Debezium 如何处理具有相同主键的记录冲突

在某些情况下,streaming 进程发出的 UPDATEDELETE 事件会按顺序接收。也就是说,流处理可能会发出一个事件,在快照捕获了包含该行的 READ 事件前修改表行。当快照最终为行发出对应的 READ 事件时,其值已经被取代。为确保以正确的逻辑顺序处理出序列的增量快照事件,Debezium 采用缓冲方案来解决冲突。只有在快照事件和流事件之间冲突后,才会解析 Debezium 向 Kafka 发出事件记录。

快照窗口

为了帮助解决 late-arriving READ 事件和修改同一表行之间的冲突,Debezium 会使用一个所谓的 快照窗口。快照窗口分离间隔,在此期间会捕获指定表块的数据。在块的快照窗口打开前,Debezium 会遵循其通常的行为,并将事件从下游直接发送到目标 Kafka 主题。但是,从为特定块的快照打开,直到它关闭为止,Deduplication 会执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。

对于每个数据收集,Debebe 会发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表捕获的快照记录被发送为 READ 操作。同时,当用户继续更新数据收集中的记录,并且更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。

当快照窗口打开时,Debebe 开始处理快照块,它会向内存缓冲提供快照记录。在快照窗口中,缓冲区中 READ 事件的主键与传入流事件的主键进行比较。如果没有找到匹配项,则流的事件记录直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃 buffered READ 事件,并将流记录写入目标主题,因为以逻辑方式取代静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。

连接器会为每个快照块重复这个过程。

目前,您可以使用以下任一方法启动增量快照:

警告

PostgreSQL 的 Debezium 连接器不支持增量快照运行时的 schema 更改。如果在增量快照启动执行 schema 更改,但在以后发送信号,passthrough 配置选项 database.autosave 被设置为 conservative 以正确处理 schema 的更改。

2.6.2.4.1. 触发增量快照

要启动增量快照,您可以发送 临时快照信号 到源数据库上的信号表。您可以提交快照信号,作为 SQL INSERT 查询。

在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,并可选择性地指定快照操作的类型。Debezium 目前支持 增量阻塞 快照类型。

要指定要包含在快照中的表,提供一个列出表的 data-collections 数组,或用于匹配表的正则表达式数组,例如:

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debebe 会解释空数组,意味着不需要任何操作,且不会执行快照。

注意

如果要包含在快照中的表的名称包含一个点(.)、空格或其它非字母数字字符,则必须使用双引号转义表名称。
例如,要包含 公共 模式中存在的表,并且名为 My.Table,请使用以下格式 :"public.\"My.Table\" "。

先决条件

使用源信号频道触发增量快照

  1. 发送 SQL 查询,将临时增量快照请求添加到信号表中:

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema1.table2"], 4
        "type":"incremental", 5
        "additional-conditions":[{"data-collection": "schema1.table1" ,"filter":"color=\'blue\'"}]}'); 6

    命令中的 idtypedata 参数的值 与信号表的字段 相对应。
    下表描述了示例中的参数:

    表 2.130. SQL 命令中的字段描述,用于将增量快照信号发送到信号表
    描述

    1

    schema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串来识别将日志消息记录到信号表中的条目。Debezium 不使用这个字符串。相反,在快照过程中,Debebe 会生成自己的 id 字符串作为水位线信号。

    3

    execute-snapshot

    type 参数指定信号要触发的操作。

    4

    data-collections

    信号的必需组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
    数组列出了使用格式 schema.table 的正则表达式,以匹配表的完全限定名称。此格式与您用来指定连接器 信号表的名称相同

    5

    incremental

    data 字段的可选类型 组件,用于指定要运行的快照操作类型。
    有效值为 incrementalblocking 值。
    如果没有指定值,连接器默认为执行增量快照。

    6

    additional-conditions

    可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
    每个额外条件都是带有 data-collectionfilter 属性的对象。您可以为每个数据收集指定不同的过滤器。
    请参阅 data-collection 属性是过滤器应用到的数据收集的完全限定域名。有关 additional-conditions 参数的详情,请参考 第 2.6.2.4.2 节 “使用附加 条件运行临时增量快照”

2.6.2.4.2. 使用附加 条件运行临时增量快照

如果您希望快照只在表中包括内容子集,您可以通过将 additional-conditions 参数附加到快照信号来修改信号请求。

对典型快照的 SQL 查询采用以下格式:

SELECT * FROM <tableName> ....

通过添加 additional-conditions 参数,您可以在 SQL 查询中附加 WHERE 条件,如下例所示:

SELECT * FROM <data-collection> WHERE <filter> ....

以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

例如,假设您有一个包含以下列的 products 表:

  • ID (主密钥)
  • color
  • quantity

如果您需要 product 表的增量快照,其中只包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue"}]}');

additional-conditions 参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含 color=bluequantity>10 的项数据:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue AND quantity>10"}]}');

以下示例显示了连接器捕获的增量快照事件的 JSON。

例 2.40. 增量快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "ts_us":"1620393591654547",
    "ts_ns":"1620393591654547920",
    "transaction":null
}
表 2.131. 增量快照事件消息中的字段描述
字段名称描述

1

snapshot

指定要运行的快照操作类型。
目前,唯一有效的选项是 阻塞增量
在提交至信号表的 SQL 查询中指定 type 值是可选的。
如果没有指定值,连接器将运行一个增量快照。

2

op

指定事件类型。
快照事件的值是 r,表示 READ 操作。

2.6.2.4.3. 使用 Kafka 信号频道触发增量快照

您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。

Kafka 消息的密钥必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是带有 typedata 字段的 JSON 对象。

信号类型是 execute-snapshotdata 字段必须具有以下字段:

表 2.132. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前 Debezium 支持 incrementalblocking 类型。
详情请查看下一部分。

data-collections

N/A

以逗号分隔的正则表达式,与快照中包含的表的完全限定域名匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式来指定名称。

additional-conditions

N/A

可选的附加条件数组,用于指定连接器评估以指定快照中包含的记录子集的条件。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数: data-collection:: 过滤器适用的表的完全限定域名。您可以对每个表应用不同的过滤器。过滤:: specifys 列值必须存在于数据库记录中才能包括快照,例如 "color='blue' "。

您分配给 filter 参数的值是您在为阻塞快照设置 snapshot.select.statement.overrides 属性时,在 SELECT 语句的 WHERE 子句中指定的值。

例 2.41. execute-snapshot Kafka 信息

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`

带有额外条件的临时增量快照

Debezium 使用 additional-conditions 字段来选择表内容的子集。

通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:

SELECT * FROM &lt ;tableName&gt; …​.

当快照请求包含 additional-conditions 属性时,属性的 data-collectionfilter 参数会附加到 SQL 查询中,例如:

SELECT * FROM &lt ;data-collection> WHERE & lt;filter&gt; …​.

例如,如果一个带有列 ID (主键)、颜色 和品牌products 表,如果您希望快照只包含 color='blue' 的内容,当请求快照时,您可以添加 additional-conditions 属性来过滤内容:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`

您还可以使用 additional-conditions 属性来根据多个列传递条件。例如,使用与上例中的相同 product 表,如果您希望快照只包含 color='blue'brand='MyBrand' products 表中的内容,您可以发送以下请求:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.6.2.4.4. 停止增量快照

在某些情况下,可能需要停止增量快照。例如,您可能意识到快照没有被正确配置,或者您可能要确保资源可用于其他数据库操作。您可以通过向源数据库上的信号发送信号来停止已经运行的快照。

您可以通过在 SQL INSERT 查询中发送停止快照信号,向信号表提交停止快照信号。stop-snapshot 信号将快照操作的类型指定为 增量,并选择性地指定要从当前运行的快照中省略的表。在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行中时停止增量快照操作。

其他资源

您还可以通过向 Kafka 信号发送 JSON 消息来停止增量快照

先决条件

使用源信号频道停止增量快照

  1. 发送 SQL 查询以停止临时增量快照到信号表:

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema1.table2"], 4
        "type":"incremental"}'); 5

    signal 命令中的 idtypedata 参数的值与 信号表的字段相对应
    下表描述了示例中的参数:

    表 2.133. SQL 命令中的字段描述,用于将停止增量快照信号发送到信号表
    描述

    1

    schema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串来识别将日志消息记录到信号表中的条目。Debezium 不使用这个字符串。

    3

    stop-snapshot

    指定 type 参数,指定信号要触发的操作。

    4

    data-collections

    信号的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
    数组以 schema.table格式按完全限定名称匹配表的正则表达式

    如果您从 data 字段省略这个组件,信号将停止正在进行的整个增量快照。

    5

    incremental

    信号的必需组件,用于指定要停止的快照操作类型。
    目前,唯一有效的选项为 增量
    如果没有指定 类型 值,信号将无法停止增量快照。

2.6.2.4.5. 使用 Kafka 信号频道停止增量快照

您可以向 配置的 Kafka 信号主题 发送信号消息,以停止临时增量快照。

Kafka 消息的密钥必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是带有 typedata 字段的 JSON 对象。

signal 类型是 stop-snapshotdata 字段必须具有以下字段:

表 2.134. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前 Debezium 只支持 incremental 类型。
详情请查看下一部分。

data-collections

N/A

可选的、以逗号分隔的正则表达式,与表的完全限定名称匹配,表名称或正则表达式,以匹配要从快照中删除的表名称。
使用格式 schema.table 指定表名称。

以下示例显示了典型的 stop-snapshot Kafka 信息:

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

2.6.2.5. 阻塞快照

为了在管理快照方面提供更多灵活性,Debezium 包含一个额外的临时快照机制,称为 阻塞快照。阻塞快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

阻塞快照的行为与 初始快照 相似,但您可以在运行时触发快照。

您可能想要在以下情况下运行阻塞快照,而不是使用标准初始快照过程:

  • 您可以添加新表,并在连接器运行时完成快照。
  • 您可以添加大表,并且您希望快照在短时间内完成,而不是通过增量快照完成。

阻塞快照过程

当您运行阻塞快照时,Debebe 会停止流,然后启动指定表的快照,遵循它在初始快照过程中使用的同一进程。快照完成后,会恢复流。

配置快照

您可以在信号 的数据 组件中设置以下属性:

  • data-collections:指定哪个表必须是快照。
  • data-collections :指定您要包括快照的表。
    此属性接受与完全限定表名称匹配的正则表达式列表。属性的行为与 table.include.list 属性的行为类似,它指定要捕获在阻塞快照中的表。
  • additional-conditions :您可以为不同的表指定不同的过滤器。

    • data-collection 属性是要应用过滤器的表的完全限定名称,并可区分大小写或区分大小写,具体取决于数据库。
    • filter 属性将具有与 snapshot.select.statement.overrides 时使用的相同值,即条件应当匹配的表的完全限定域名。

例如:

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}

可能的副本

您发送信号触发快照的时间之间可能会有延迟,以及流停止和快照启动时的时间。因此,在快照完成后,连接器可能会发出一些由快照捕获的重复记录的事件记录。

2.6.2.6. Debezium PostgreSQL 连接器如何更改事件记录

PostgreSQL 连接器通常会花费大量时间从它连接到的 PostgreSQL 服务器流更改。这种机制依赖于 PostgreSQL 的复制协议。此协议使客户端能够接收服务器在服务器事务日志中提交的更改(在某些位置上,称为 Log Sequence Numbers (LSN))。

每当服务器提交事务时,单独的服务器进程都会调用来自 逻辑解码插件的回调功能。此功能处理事务的更改,将它们转换为特定格式(在 Debezium 插件中是Protobuf 或 JSON),并将它们写到输出流中,然后供客户端使用。

Debezium PostgreSQL 连接器充当 PostgreSQL 客户端。当连接器收到更改时,它会将事件转换为 Debezium 的 create, update, 或 delete 事件,包含该事件的 LSN 的事件。PostgreSQL 连接器将记录中的这些更改事件转发到 Kafka Connect 框架,该框架在同一进程中运行。Kafka Connect 进程异步写入更改事件记录,它们生成的顺序与正确的 Kafka 主题相同。

定期,Kafka Connect 会定期记录另一个 Kafka 主题中最新的偏移量。offset 表示 Debezium 包括每个事件的源特定位置信息。对于 PostgreSQL 连接器,在每个更改事件中记录的 LSN 是偏移量。

当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收的最后偏移量。当 Kafka Connect 重启时,它会读取每个连接器的最后记录偏移,并在最后记录的偏移处启动每个连接器。当连接器重启时,它会向 PostgreSQL 服务器发送请求,以发送仅在该位置后启动的事件。

注意

PostgreSQL 连接器检索模式信息,作为逻辑解码插件发送的事件的一部分。但是,连接器不会检索关于哪些列组成主密钥的信息。连接器从 JDBC 元数据(侧频道)获取此信息。如果表的主键定义有变化(通过添加、删除或重命名主键列),当 JDBC 的主密钥信息没有与逻辑解码插件生成的更改事件同步时,会有一个小的时间段。在此小期内,可以以不一致的关键结构创建消息。要防止这种不一致,请更新主密钥结构,如下所示:

  1. 将数据库或应用程序置于只读模式。
  2. 让 Debezium 处理所有剩余的事件。
  3. 停止 Debezium。
  4. 更新相关表中的主密钥定义。
  5. 将数据库或应用程序置于读/写模式。
  6. 重启 Debezium。

PostgreSQL 10+ 逻辑解码支持(pgoutput)

从 PostgreSQL 10+ 开始,有一个逻辑复制流模式,称为 pgoutput,它被 PostgreSQL 原生支持。这意味着 Debezium PostgreSQL 连接器可以使用该复制流,而无需额外的插件。这在不支持或不允许安装插件的环境中特别有用。

如需更多信息,请参阅设置 PostgreSQL

2.6.2.7. 接收 Debezium PostgreSQL 更改事件记录的 Kafka 主题的默认名称

默认情况下,PostgreSQL 连接器将所有 INSERTUPDATEDELETE 操作的更改事件写入一个特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题:

topicPrefix.schemaName.tableName

以下列表提供了默认名称组件的定义:

topicPrefix
topic.prefix 配置属性指定的主题前缀。
schemaName
发生更改事件的数据库架构的名称。
tableName
发生更改事件的数据库表的名称。

例如,假设 fulfillment 是连接器中的逻辑服务器名称,该连接器捕获 PostgreSQL 安装中的更改,该安装具有一个 postgres 数据库和一个 inventory schem,它包含四个表:products, products_on_hand, customers, 和 orders连接器会将记录流传输到这四个 Kafka 主题:

  • fulfillment.inventory.products
  • fulfillment.inventory.products_on_hand
  • fulfillment.inventory.customers
  • fulfillment.inventory.orders

现在,假设表不是特定模式的一部分,但在默认的 公共 PostgreSQL 模式中创建。Kafka 主题的名称将是:

  • fulfillment.public.products
  • fulfillment.public.products_on_hand
  • fulfillment.public.customers
  • fulfillment.public.orders

连接器应用类似的命名约定来标记其 事务元数据主题

如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由

2.6.2.8. Debezium PostgreSQL 连接器生成的事件代表事务边界

Debezium 可以生成代表事务边界的事件,以及丰富的数据更改事件消息。

Debezium 接收事务元数据时的限制

Debezium 注册并接收部署连接器后发生的事务的元数据。部署连接器前发生的事务元数据不可用。

对于每个事务 BEGINEND,Debebe 会生成一个包含以下字段的事件:

status
BEGINEND.
id
由 Postgres 事务 ID 本身和以冒号分隔的给定操作的 LSN 组成的唯一事务标识符的字符串,即格式是 txID:LSN
ts_ms
数据源的事务边界事件(BEGINEND 事件)的时间。如果数据源没有向 Debezium 提供事件时间,则字段代表 Debezium 处理事件的时间。
event_count (用于 END 事件)
事务处理的事件总数。
data_collections (用于 END 事件)
一组 data_collectionevent_count 元素,用于指示连接器为来自数据收集的更改发出的事件数。

示例

{
  "status": "BEGIN",
  "id": "571:53195829",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "571:53195832",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic. prefix>.transaction 的主题

更改数据事件增强

当启用事务元数据时,数据消息 Envelope 会增加新的 transaction 字段。此字段以字段复合的形式提供有关每个事件的信息:

id
唯一事务标识符的字符串表示。
total_order
事件在事务生成的所有事件间的绝对位置。
data_collection_order
事件在事务发送的所有事件中的每个数据收集位置。

以下是消息示例:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
   ...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335451",
  "ts_ns": "1580390884335451325",
  "transaction": {
    "id": "571:53195832",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

2.6.3. Debezium PostgreSQL 连接器数据更改事件的描述

Debezium PostgreSQL 连接器为每个行级 INSERTUPDATEDELETE 操作生成数据更改事件。每个事件包含一个键和值。键和值的结构取决于更改的表。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间变化,用户很难处理。要解决这个问题,每个事件都包含其内容的 schema,或者如果您使用 schema registry,消费者可以用来从 registry 获取 schema 的模式 ID。这使得每个事件自包含。

以下框架 JSON 显示了更改事件的基本四部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect 转换器决定了更改事件中的这四个部分的表示。只有在将转换器配置为生成它时,schema 字段才会处于 change 事件中。同样,只有在将转换器配置为生成它时,事件键和事件有效负载才会处于更改事件中。如果您使用 JSON 转换程序,并将其配置为生成所有四个基本更改事件部分,则更改事件具有此结构:

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
表 2.135. 更改事件基本内容概述
字段名称描述

1

schema

第一个 schema 字段是事件键的一部分。它指定一个 Kafka Connect 模式,它描述了事件键的 payload 部分中的内容。换句话说,第一个 模式 字段描述了主键的结构,如果表没有主键,则描述主键的结构。

可以通过设置 message.key.columns 连接器配置属性 来覆盖表的主键。在这种情况下,第一个 schema 字段描述了该属性标识的密钥的结构。

2

payload

第一个 payload 字段是事件键的一部分。它有上一个 schema 字段描述的结构,其中包含更改的行的密钥。

3

schema

第二个 schema 字段是事件值的一部分。它指定 Kafka Connect 模式,它描述了事件值的 payload 部分中的内容。换句话说,第二个 模式 描述了更改的行的结构。通常,此模式包含嵌套的模式。

4

payload

第二个 payload 字段是事件值的一部分。它有上一个 schema 字段描述的结构,其中包含更改的行的实际数据。

默认情况下,连接器流将 事件记录更改为使用与事件原始表相同的名称的主题

注意

从 Kafka 0.10 开始,Kafka 可以选择使用创建消息 的时间戳 记录事件键和值(由制作者记录),或者由 Kafka 写入日志。

警告

PostgreSQL 连接器确保所有 Kafka Connect 模式名称遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称和模式和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效的字符,它将被一个下划线字符替代。

如果逻辑服务器名称、模式名称或表名称包含无效字符,则这可能会导致意外冲突,并且区分名称的唯一字符无效,因此用下划线替换。

详情包括在以下主题中:

2.6.3.1. 关于 Debezium PostgreSQL 更改事件中的键

对于给定表,更改事件的键有一个结构,它在表创建时包含每个列的字段。或者,如果表将 REPLICA IDENTITY 设置为 FULLUSING INDEX,则每个唯一键约束都有一个字段。

考虑在 公共 数据库架构中定义的 customers 表,以及该表的更改事件密钥示例。

表示例

CREATE TABLE customers (
  id SERIAL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

更改事件键示例

如果 topic.prefix 连接器配置属性的值 PostgreSQL_server,则 customers 表的每个更改事件都具有相同的关键结构,在 JSON 中如下所示:

{
  "schema": { 1
    "type": "struct",
    "name": "PostgreSQL_server.public.customers.Key", 2
    "optional": false, 3
    "fields": [ 4
          {
              "name": "id",
              "index": "0",
              "schema": {
                  "type": "INT32",
                  "optional": "false"
              }
          }
      ]
  },
  "payload": { 5
      "id": "1"
  },
}
表 2.136. 更改事件键的描述
字段名称描述

1

schema

键的 schema 部分指定一个 Kafka Connect 模式,它描述了键的 payload 部分中的内容。

2

PostgreSQL_server.inventory.customers.Key

定义键有效负载结构的 schema 名称。这个模式描述了更改的表的主密钥的结构。Key 模式名称的格式是 connector-name.database-name.table-name.Key。在本例中:

  • PostgreSQL_server 是生成此事件的连接器的名称。
  • Inventory 是包含已更改的表的数据库。
  • 客户 是更新的表。

3

optional

指明事件键是否在其 payload 字段中包含一个值。在本例中,需要键有效负载中的值。当表没有主键时,键有效负载字段中的值是可选的。

4

fields

指定 有效负载中 预期的每个字段,包括每个字段的名称、索引和模式。

5

payload

包含生成此更改事件的行的密钥。在本例中,键包含一个 id 字段,其值为 1

注意

虽然 column.exclude.listcolumn.include.list 连接器配置属性允许您只捕获表列的子集,但主键或唯一键中的所有列始终包含在事件的密钥中。

警告

如果表没有主键或唯一键,则更改事件的键为 null。没有主键约束或唯一键约束的表中的行无法唯一标识。

2.6.3.2. 关于 Debezium PostgreSQL 更改事件中的值

更改事件中的值比键复杂一些。与键一样,值也有一个 schema 部分和一个 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。更改创建、更新或删除数据的操作的事件,它们都有带有 envelope 结构的值 payload。

考虑用于显示更改事件键示例相同的示例表:

CREATE TABLE customers (
  id SERIAL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

更改此表的更改事件的值部分因 REPLICA IDENTITY 设置和事件所针对的操作而异。

以下部分详情如下:

副本身份

REPLICA IDENTITY 是一个特定于 PostgreSQL 的表级设置,它决定了逻辑解码插件可用于 UPDATEDELETE 事件的信息量。更具体地说,设置 REPLICA IDENTITY 控制在发生 UPDATEDELETE 事件时所涉及的表列前面的值(若有)信息。

REPLICA IDENTITY 有 4 个可能的值:

  • DEFAULT - 默认行为是,如果该表有一个主键,则 UPDATEDELETE 事件包含表的主键列前面的值。对于 UPDATE 事件,只有带有更改值的主键列才会显示。

    如果表没有主键,连接器不会为该表发出 UPDATEDELETE 事件。对于没有主键的表,连接器只发出 创建事件。通常,使用没有主键的表将消息附加到表的末尾,这意味着 UPDATEDELETE 事件不可用。

  • NOTHING - Emitted 事件用于 UPDATEDELETE 操作,不包含任何表列之前值的信息。
  • FULL - Emitted 事件用于 UPDATEDELETE 操作,包含表中所有列的以前的值。
  • INDEX index-name - Emitted 事件用于 UPDATEDELETE 操作,包含指定索引中包含的列的先前值。UPDATE 事件还包含带有更新值的索引列。

创建 事件

以下示例显示了一个更改事件的值部分,连接器为在 customer 表中创建数据的操作生成的更改事件的值部分:

{
    "schema": { 1
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value", 2
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ns"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source", 3
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "PostgreSQL_server.inventory.customers.Envelope" 4
    },
    "payload": { 5
        "before": null, 6
        "after": { 7
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { 8
            "version": "3.0.8.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863123,
            "ts_ns": 1559033904863123000,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]",
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "c", 9
        "ts_ms": 1559033904863, 10
        "ts_us": 1559033904863841, 11
        "ts_ns": 1559033904863841257 12
    }
}
表 2.137. 创建 事件值字段的描述
字段名称描述

1

schema

值的 schema,它描述了值有效负载的结构。在连接器为特定表生成的更改事件时,更改事件的值 schema 都相同。

2

name

schema 部分中,每个 name 字段指定值有效负载中的字段的 schema。

PostgreSQL_server.inventory.customers.Value 是有效负载 beforeafter 字段的 schema。这个模式特定于 customers 表。

beforeafter 字段的模式名称格式为 logicalName.tableName.Value,这样可确保 schema 名称在数据库中是唯一的。这意味着,在使用 Avro converter 时,每个逻辑源中的每个表生成的 Avro 模式都有自己的演进和历史记录。

3

name

io.debezium.connector.postgresql.Source 是有效负载 source 字段的 schema。这个模式特定于 PostgreSQL 连接器。连接器用于它生成的所有事件。

4

name

PostgreSQL_server.inventory.customers.Envelope 是有效负载的整体结构的 schema,其中 PostgreSQL_server 是连接器名称,inventory 是数据库,customers 是表。

5

payload

值的实际数据。这是更改事件提供的信息。

可能会出现事件的 JSON 表示大于它们描述的行。这是因为 JSON 表示必须包含 schema 和 message 部分。但是,通过使用 Avro converter,您可以显著减少连接器流到 Kafka 主题的消息大小。

6

before

指定事件发生前行状态的可选字段。当 op 字段是 c 用于创建(如本例所示),before 字段为 null,因为此更改事件用于新内容。

注意

此字段是否可用是否取决于每个表的 REPLICA IDENTITY 设置。

7

after

指定事件发生后行状态的可选字段。在本例中,after 字段包含新行的 idfirst_namelast_nameemail 列的值。

8

source

描述事件源元数据的强制字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、发生事件的顺序以及事件是否是同一事务的一部分。源元数据包括:

  • Debezium 版本
  • 连接器类型和名称
  • 包含新行的数据库和表
  • Stringified JSON 数组的额外偏移信息。第一个值始终是最后提交的 LSN,第二个值始终是当前的 LSN。任一值都 为空
  • 模式名称
  • 如果事件是快照的一部分
  • 执行操作的事务的 ID
  • 数据库日志中操作的偏移量
  • 在数据库中进行更改时的时间戳

9

op

描述导致连接器生成事件的操作类型的强制字符串。在本例中,c 表示操作创建了行。有效值为:

  • c = create
  • u = update
  • d = delete
  • r = 读取(仅适用于快照)
  • t = truncate
  • m = message

10

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

更新 事件

示例 customers 表中一个更新的改变事件的值有与那个表的 create 事件相同的模式。同样,事件值的有效负载具有相同的结构。但是,事件值 payload 在更新 事件中包含不同的值。以下是当连接器在 customers 表中为更新生成的更改事件值的示例:

{
    "schema": { ... },
    "payload": {
        "before": { 1
            "id": 1
        },
        "after": { 2
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { 3
            "version": "3.0.8.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863769,
            "ts_ns": 1559033904863769000,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "u", 4
        "ts_ms": 1465584025523,  5
        "ts_us": 1465584025523514,  6
        "ts_ns": 1465584025523514964,  7
    }
}
表 2.138. 更新 事件值字段的描述
字段名称描述

1

before

可选字段,其中包含数据库提交前所在行中的值。在本例中,仅存在主键列 id,因为表的 REPLICA IDENTITY 设置默认为 DEFAULT

对于一个 更新 事件,要包含行中的所有列的先前值,您必须通过运行 ALTER TABLE 客户 REPLICA IDENTITY FULL 来更改 customers 表

2

after

指定事件发生后行状态的可选字段。在本例中,first_name 值现在是 Anne Marie

3

source

描述事件源元数据的强制字段。source 字段结构与 create 事件中的字段相同,但某些值有所不同。源元数据包括:

  • Debezium 版本
  • 连接器类型和名称
  • 包含新行的数据库和表
  • 模式名称
  • 如果事件是快照的一部分(对于 更新 事件始终为 false
  • 执行操作的事务的 ID
  • 数据库日志中操作的偏移量
  • 在数据库中进行更改时的时间戳

4

op

描述操作类型的强制字符串。在 update 事件值中,op 字段值为 u,表示此行因为更新而改变。

5

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

注意

更新行主/唯一键的列会更改行键的值。当密钥更改时,Debezium 会输出 三个 事件:一个 DELETE 事件和一个带有行的旧键的 tombstone 事件,后跟一行的新键的事件。详情位于下一节中。

主密钥更新

更改行的主键字段的 UPDATE 操作被称为主键更改。对于主键更改,为了发送 UPDATE 事件记录,连接器会为旧键发送 DELETE 事件记录,并为新的(updated)密钥发送 CREATE 事件记录。这些事件具有常见的结构和内容,另外,每个事件都有一个与主键更改相关的消息标头:

  • DELETE 事件记录具有 __debezium.newkey 作为消息标头。此标头的值是更新行的新主键。
  • CREATE 事件记录具有 __debezium.oldkey 作为消息标头。此标头的值是更新行具有的上一个(折叠)主键。

删除 事件

delete 更改事件中的值与为同一表的 createupdate 事件相同的 schema 部分。示例 customer 表的 delete 事件中 payload 部分类似如下:

{
    "schema": { ... },
    "payload": {
        "before": { 1
            "id": 1
        },
        "after": null, 2
        "source": { 3
            "version": "3.0.8.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863852,
            "ts_ns": 1559033904863852000,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "d", 4
        "ts_ms": 1465581902461, 5
        "ts_us": 1465581902461496, 6
        "ts_ns": 1465581902461496187, 7
    }
}
表 2.139. 删除 事件值字段的描述
字段名称描述

1

before

指定事件发生前行的状态的可选字段。在一个 delete 事件值中,before 字段包含在使用数据库提交删除行前的值。

在本例中,before 字段仅包含主键列,因为表的 REPLICA IDENTITY 设置是 DEFAULT

2

after

可选字段,用于指定事件发生后行的状态。在一个 delete 事件值中,after 字段为 null,表示行不再存在。

3

source

描述事件源元数据的强制字段。在一个 delete 事件值中,source 字段结构与同一表的 createupdate 事件相同。许多 source 字段值也相同。在 delete 事件值中,ts_mslsn 字段值以及其他值可能已更改。但是 delete 事件值中的 source 字段提供相同的元数据:

  • Debezium 版本
  • 连接器类型和名称
  • 包含已删除行的数据库和表
  • 模式名称
  • 如果事件是快照的一部分(对 删除 事件始终为 false
  • 执行操作的事务的 ID
  • 数据库日志中操作的偏移量
  • 在数据库中进行更改时的时间戳

4

op

描述操作类型的强制字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

删除 更改事件记录为消费者提供处理删除此行所需的信息。

警告

要让消费者处理为没有主键的表生成的 删除 事件,请将表的 REPLICA IDENTITY 设置为 FULL。当表没有主键且表的 REPLICA IDENTITY 设置为 DEFAULTNOTHING 时,删除 事件没有 before 字段。

PostgreSQL 连接器事件设计为使用 Kafka 日志压缩。日志压缩支持删除一些旧的信息,只要至少保留每个密钥的最新消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

tombstone 事件

删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,为了使 Kafka 删除具有相同键的所有信息,消息值必须是 null。为了实现此目的,PostgreSQL 连接器遵循一个 delete 事件,其中包含一个特殊的tombstone 事件,它有相同的键但值为 null

截断 事件

truncate 更改事件信号,提示表已被截断。message 键在本例中是 null,消息值类似如下:

{
    "schema": { ... },
    "payload": {
        "source": { 1
            "version": "3.0.8.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863112,
            "ts_ns": 1559033904863112000,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "t", 2
        "ts_ms": 1559033904961, 3
        "ts_us": 1559033904961654, 4
        "ts_ns": 1559033904961654789 5
    }
}
表 2.140. truncate 事件值字段的描述
字段名称描述

1

source

描述事件源元数据的强制字段。在 truncate 事件值中,source 字段结构与为同一表的 create, update, 和 delete 事件相同,提供此元数据:

  • Debezium 版本
  • 连接器类型和名称
  • 包含新行的数据库和表
  • 模式名称
  • 如果事件是快照的一部分(对 删除 事件始终为 false
  • 执行操作的事务的 ID
  • 数据库日志中操作的偏移量
  • 在数据库中进行更改时的时间戳

2

op

描述操作类型的强制字符串。op 字段值为 t,表示此表已被截断。

3

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

如果单个 TRUNCATE 操作影响多个表,连接器会为每个 截断 表发出一个更改事件记录。

注意

truncate 事件表示对整个表所做的更改,没有消息键。因此,对于具有多个分区的主题,对更改事件没有排序保证(创建更新 等),或 截断 与表相关的事件。例如,如果消费者从多个分区读取表的事件,它可能会在收到 截断 事件后从一个分区接收一个表 的更新 事件,该事件会从另一个分区中删除表中的所有数据。排序只针对使用单个分区的主题保证。

如果您不希望连接器捕获 截断 事件,请使用 skipped.operations 选项过滤它们。

消息 事件

此事件类型仅通过 Postgres 14+ 上的 pgoutput 插件支持(Postgres 文档)

一个 消息 事件提示,一个通用逻辑解码消息已被直接插入到 WAL 中,通常使用 pg_logical_emit_message 函数。message 键是 Struct,在本例中带有一个名为 prefix 的单个字段,执行插入消息时指定的前缀。message 值类似于用于事务消息:

{
    "schema": { ... },
    "payload": {
        "source": { 1
            "version": "3.0.8.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863879,
            "ts_ns": 1559033904863879000,
            "snapshot": false,
            "db": "postgres",
            "schema": "",
            "table": "",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "m", 2
        "ts_ms": 1559033904961, 3
        "ts_us": 1559033904961621, 4
        "ts_ns": 1559033904961621379, 5
        "message": { 6
            "prefix": "foo",
            "content": "Ymfy"
        }
    }
}

与其他事件类型不同,非事务消息将没有任何关联的 BEGINEND 事务事件。message 值与非事务信息类似:

{
    "schema": { ... },
    "payload": {
        "source": { 1
            "version": "3.0.8.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863762,
            "ts_ns": 1559033904863762000,
            "snapshot": false,
            "db": "postgres",
            "schema": "",
            "table": "",
            "lsn": 46523128,
            "xmin": null
        },
        "op": "m", 2
        "ts_ms": 1559033904961, 3
        "ts_us": 1559033904961741, 4
        "ts_ns": 1559033904961741698, 5
        "message": { 6
            "prefix": "foo",
            "content": "Ymfy"
    }
}
表 2.141. 消息 事件值字段的描述
字段名称描述

1

source

描述事件源元数据的强制字段。在 message 事件值中,source 字段结构将不会包括任何 message 事件的 tableschema 信息,并只有在 message 事件是事务性时才有 txId

  • Debezium 版本
  • 连接器类型和名称
  • 数据库名称
  • 架构名称(始终为 消息事件显示 ""
  • 表名称(始终为 消息 事件显示 ""
  • 如果事件是快照的一部分(对于 消息 事件始终为 false
  • 执行操作的事务的 ID (非事务 message 事件为 null
  • 数据库日志中操作的偏移量
  • 事务消息:当消息插入到 WAL 中时的 Timestamp
  • 非事务消息; 连接器遇到消息时的 Timestamp

2

op

描述操作类型的强制字符串。op 字段值为 m,表示这是一个 消息 事件。

3

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

对于事务 消息 事件,source 对象的 ts_ms 属性指示数据库中为事务 消息 事件进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

对于非事务 消息 事件,源对象的 ts_ms 表示连接器遇到 消息事件的时间,而 payload.ts_ms 则表示连接器处理事件的时间。这个区别在于,在 Postgres 的通用逻辑消息格式和非事务逻辑消息之前没有包括提交时间戳的事实(其具有时间戳信息)。

4

message

包含消息元数据的字段

2.6.4. Debezium PostgreSQL 连接器如何映射数据类型

PostgreSQL 连接器代表对带有类似行表的事件行的更改。事件包含每个列值的一个字段。事件中的该值取决于列的 PostgreSQL 数据类型。以下小节描述了连接器如何将 PostgreSQL 数据类型映射到 字面类型和 semantic 类型 (在 event 字段中)。

  • literal type 代表值如何被代表,使用 Kafka Connect schema 类型:INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, 和 STRUCT
  • 语义类型 描述了 Kafka Connect 模式如何使用字段名称来捕获字段 的含义

如果默认数据类型转换不满足您的需要,您可以为连接器 创建自定义转换器

详情包括在以下部分中:

基本类型

下表描述了连接器如何映射基本类型。

表 2.142. PostgreSQL 基本数据类型的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

布尔值

布尔值

不适用

BIT(1)

布尔值

不适用

BIT( > 1)

BYTES

io.debezium.data.Bits

length schema 参数包含一个代表位数的整数。生成的 byte[] 以 little-endian 形式包含位,其大小为包含指定的位数。例如: numBytes = n/8 +(n % 8 == 0 ?0 : 1),其中 n 是位数。

位不同[(M)]

BYTES

io.debezium.data.Bits

length schema 参数包含一个整数,代表位数(2^31 - 1,如果没有为列提供长度)。生成的 byte[] 以 little-endian 形式包含位,并根据内容进行大小。指定的大小 (M) 存储在 io.debezium.data.Bits 类型的 length 参数中。

SMALLINT,SMALLSERIAL

INT16

不适用

整数, 串行

INT32

不适用

BIGINT,BIGSERIAL,OID

INT64

不适用

REAL

FLOAT32

不适用

双精度

FLOAT64

不适用

CHAR[(M)]

字符串

不适用

VARCHAR[(M)]

字符串

不适用

CHARACTER[(M)]

字符串

不适用

BPCHAR[(M)]

字符串

不适用

CHARACTER VARYING[(M)]

字符串

不适用

TIMESTAMPTZ , 带有时区的时间戳

字符串

io.debezium.time.ZonedTimestamp

带有时区信息的时间戳的字符串,其中时区是 GMT。

TIMETZ ,带有时区的时间

字符串

io.debezium.time.ZonedTime

包含时区信息的时间值的字符串,其中时区是 GMT。

INTERVAL [P]

INT64

io.debezium.time.MicroDuration
(默认)

每个月的 365.25 / 12.0 公式用于时间间隔的大约微秒数。

INTERVAL [P]

字符串

io.debezium.time.Interval
(当 interval.handling.mode 设置为 字符串

遵循特征 P<years>Y<months>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的间隔值的字符串表示,例如 P1Y2M3DT4H5M6.78S

BYTEA

BYTESSTRING

n/a

根据连接器 的二进制处理模式 设置,使用 raw 字节(默认)、base64 编码的字符串或 base64url-safe-encoded 字符串,或十六进制编码的字符串。

Debezium 只支持值 hex 的 Postgres bytea_output 配置。有关 PostgreSQL 二进制数据类型的更多信息,请参阅 PostgreSQL 文档

JSON, JSONB

字符串

io.debezium.data.Json

包含 JSON 文档、数组或 scalar 的字符串表示。

XML

字符串

io.debezium.data.Xml

包含 XML 文档的字符串表示。

UUID

字符串

io.debezium.data.Uuid

包含 PostgreSQL UUID 值的字符串表示。

STRUCT

io.debezium.data.geometry.Point

包含两个 FLOAT64 字段的结构,(x,y)。每个字段代表 geometric 点的协调。

LTREE

字符串

io.debezium.data.Ltree

包含 PostgreSQL LTREE 值的字符串表示。

CITEXT

字符串

不适用

INET

字符串

不适用

INT4RANGE

字符串

N/a

整数的范围。

INT8RANGE

字符串

N/a

bigint 的范围.

NUMRANGE

字符串

N/a

数字范围.

TSRANGE

字符串

n/a

包含没有时区的时间戳范围的字符串表示。

TSTZRANGE

字符串

n/a

包含带有本地系统时区的时间戳范围的字符串表示。

DATERANGE

字符串

n/a

包括代表一个日期范围的字符串。它始终有一个独占的上限。

ENUM

字符串

io.debezium.data.Enum

包含 PostgreSQL ENUM 值的字符串表示。允许的值集合在 allowed schema 参数中维护。

临时类型

除了 PostgreSQL 的 TIMESTAMPTZTIMETZ 数据类型,它们包含时区信息,临时类型如何映射取决于 time.precision.mode 连接器配置属性的值。以下小节描述了这些映射:

time.precision.mode=adaptive

time.precision.mode 属性设为 adaptive 时,默认连接器根据列的数据类型定义决定字面类型和语义类型。这样可确保事件 准确 表示数据库中的值。

表 2.143. time.precision.mode 为 adaptive时的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

DATE

INT32

io.debezium.time.Date

代表时期起的天数。

TIME (1), TIME (2), TIME (3)

INT32

io.debezium.time.Time

代表过去午夜的毫秒数,不包括时区信息。

TIME(4), TIME(5), TIME(6)

INT64

io.debezium.time.MicroTime

代表过去午夜的微秒数,不包括时区信息。

TIMESTAMP (1), TIMESTAMP (2), TIMESTAMP (3)

INT64

io.debezium.time.Timestamp

代表自 epoch 起的毫秒数,且不包含时区信息。

TIMESTAMP (4), TIMESTAMP (5) , TIMESTAMP (6 ), TIMESTAMP

INT64

io.debezium.time.MicroTimestamp

代表自时期起的微秒数,且不包含时区信息。

time.precision.mode=adaptive_time_microseconds

time.precision.mode 配置属性设置为 adaptive_time_microseconds 时,连接器根据列的数据类型决定 temporal 类型的字面类型和语义类型。这样可确保事件 准确 表示数据库中的值,但所有 TIME 字段都捕获为微秒。

表 2.144. time.precision.mode 为 adaptive_time_microseconds时的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

DATE

INT32

io.debezium.time.Date

代表时期起的天数。

TIME([P])

INT64

io.debezium.time.MicroTime

代表时间值(以微秒为单位),且不包含时区信息。PostgreSQL 允许将精度 P 位于范围 0-6 中,以存储最多为微秒的精度。

TIMESTAMP (1) , TIMESTAMP (2), TIMESTAMP (3)

INT64

io.debezium.time.Timestamp

代表时期过去的毫秒数,不包括时区信息。

TIMESTAMP (4) , TIMESTAMP (5), TIMESTAMP (6 ), TIMESTAMP

INT64

io.debezium.time.MicroTimestamp

代表时期过去的微秒数,不包括时区信息。

time.precision.mode=connect

time.precision.mode 配置属性设置为 connect 时,连接器使用 Kafka Connect 逻辑类型。当消费者只能处理内置的 Kafka Connect 逻辑类型,且无法处理变量精度时间值时,这很有用。但是,由于 PostgreSQL 支持 microsecond 精度,因此由带有 connect 时间精度模式的连接器 生成的事件会在 数据库列具有大于 3 的 fractional second 精度 值时丢失。

表 2.145. time.precision.mode 为 connect时的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

DATE

INT32

org.apache.kafka.connect.data.Date

代表 epoch 后的天数。

TIME([P])

INT64

org.apache.kafka.connect.data.Time

代表午夜起的毫秒数,但不包含时区信息。PostgreSQL 允许 P 在范围 0-6 中存储最多为微秒的精度,但这种模式会在 P 大于 3 时丢失精度。

TIMESTAMP([P])

INT64

org.apache.kafka.connect.data.Timestamp

代表自 epoch 起的毫秒数,但不包含时区信息。PostgreSQL 允许 P 在范围 0-6 中存储最多为微秒的精度,但这种模式会在 P 大于 3 时丢失精度。

TIMESTAMP 类型

TIMESTAMP 类型代表一个没有时区信息的时间戳。这些列根据 UTC 转换为等同的 Kafka Connect 值。例如,当 time.precision.mode 没有设置为 connect 时,TIMESTAMP 值 "2018-06-20 15:13:16.945104" 由一个带有值 "1529507596945104" 的 io.debezium.time.MicroTimestamp 代表。

运行 Kafka Connect 和 Debezium 的 JVM 时区不会影响此转换。

PostgreSQL 支持在 TIMESTAMP 列中使用 +/-infinite 值。这些特殊的值转换为时间戳,在正无限循环的情况下值为 9223372036825200000,在负无限循环的情况值为 -9223372036832400000。此行为模拟 PostgreSQL JDBC 驱动程序的标准行为。如需更多信息,请参阅 org.postgresql.PGStatement 接口。

十进制类型

PostgreSQL 连接器配置属性 decimal.handling.mode 的设置决定了连接器如何映射十进制类型。

当将 decimal.handling.mode 属性设为 precise 时,连接器使用 Kafka Connect org.apache.kafka.connect.data.Decimal logical type for all DECIMAL,NUMERICMONEY 列。这是默认的模式。

表 2.146. 当 decimal.handling.mode 为 精确时的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

scale schema 参数包含一个整数,代表十进制点被移动的数量。

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

scale schema 参数包含一个整数,代表十进制点被移动的数量。

金钱[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

scale schema 参数包含一个整数,代表十进制点被移动的数量。scale schema 参数由 money.fraction.digits 连接器配置属性决定。

此规则存在例外情况。当在没有扩展限制的情况下使用 NUMERICDECIMAL 类型时,来自数据库的值会为每个值有不同的(variable)扩展。在这种情况下,连接器使用 io.debezium.data.VariableScaleDecimal,其中包含值和传输值的规模。

表 2.147. 当没有扩展限制时,DECIMAL 和 NUMERIC 类型的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

数字

STRUCT

io.debezium.data.VariableScaleDecimal

包含两个字段的结构:type INT32,其中包含传输的值的 BYTES 的扩展,以未 扩展 的形式包含原始值。

十进制

STRUCT

io.debezium.data.VariableScaleDecimal

包含两个字段的结构:type INT32,其中包含传输的值的 BYTES 的扩展,以未 扩展 的形式包含原始值。

当将 decimal.handling.mode 属性设置为 double 时,连接器表示所有 DECIMALNUMERICMONEY 值作为 Java double 值,并根据下表所示进行编码。

表 2.148. 当 decimal.handling.mode 为双模式时 的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(架构名称)

NUMERIC[(M[,D])]

FLOAT64

 

DECIMAL[(M[,D])]

FLOAT64

 

金钱[(M[,D])]

FLOAT64

 

decimal.handling.mode 配置属性的最后可能的设置 是字符串。在这种情况下,连接器代表 DECIMAL,NUMERICMONEY 值作为其格式化的字符串表示,并对它们进行编码,如下表中所示。

表 2.149. 当 decimal.handling.mode 为 字符串时的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(架构名称)

NUMERIC[(M[,D])]

字符串

 

DECIMAL[(M[,D])]

字符串

 

金钱[(M[,D])]

字符串

 

decimal.handling.modestringdouble 时,PostgreSQL 支持将 NaN (不是数字)作为存储在 DECIMAL/NUMERIC 值中的特殊值。在这种情况下,连接器将 NaN 编码为 Double.NaN 或字符串常量 NAN

HSTORE 类型

PostgreSQL 连接器配置属性 hstore.handling.mode 的设置决定了连接器如何映射 HSTORE 值。

hstore.handling.mode 属性设为 json (默认值)时,连接器代表 HSTORE 值作为 JSON 值的字符串表示,并对它们进行编码,如下表所示。当 hstore.handling.mode 属性设为 map 时,连接器将 MAP 模式类型用于 HSTORE 值。

表 2.150. HSTORE 数据类型的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

HSTORE

字符串

io.debezium.data.Json

示例:使用 JSON 转换的输出表示为 {"key" : "val"}

HSTORE

MAP

n/a

示例:使用 JSON 转换的输出表示为 {"key" : "val"}

域类型

PostgreSQL 支持用户定义的类型,这些类型基于其他底层类型。当使用此类列类型时,Debebe 根据完整的类型层次结构公开列的表示。

重要

捕获使用 PostgreSQL 域类型的列中的更改需要特殊考虑。当定义列使其包含扩展其中一个默认数据库类型的域类型并且域类型定义了自定义长度或扩展时,生成的模式会继承定义的长度或扩展。

当定义列使其包含扩展另一个域类型的域类型时,生成的模式 不会继承 定义的长度或扩展,因为该信息在 PostgreSQL 驱动程序列元数据中不可用。

网络地址类型

PostgreSQL 具有可存储 IPv4、IPv6 和 MAC 地址的数据类型。最好使用这些类型而不是纯文本类型来存储网络地址。网络地址类型提供输入错误检查和专用运算符和功能。

表 2.151. 网络地址类型的映射
PostgreSQL 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

INET

字符串

N/a

IPv4 和 IPv6 网络

CIDR

字符串

N/a

IPv4 和 IPv6 主机和网络

MACADDR

字符串

N/a

MAC 地址

MACADDR8

字符串

N/a

MAC 地址(EUI-64 格式)

PostGIS 类型

PostgreSQL 连接器支持所有 PostGIS 数据类型

表 2.152. PostGIS 数据类型映射
PostGIS 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

GEOMETRY
(planar)

STRUCT

io.debezium.data.geometry.Geometry

包含有两个字段的结构:

  • srid (INT32) - 空间参考系统标识符,用于定义存储在结构中的 geometry 对象的类型。
  • wkb (BYTES) - geometry 对象的二进制表示,以 Well-Known-Binary 格式编码。

有关格式详情,请参阅 Open Geospatial Consortium Simple Features Access 规格

GEOGRAPHY
(spherical)

STRUCT

io.debezium.data.geometry.Geography

包含有两个字段的结构:

  • srid (INT32) - 空间参考系统标识符,用于定义存储在结构中的geography 对象类型。
  • wkb (BYTES) - geometry 对象的二进制表示,以 Well-Known-Binary 格式编码。

有关格式详情,请参阅 Open Geospatial Consortium Simple Features Access 规格

pgvector 类型

PostgreSQL 连接器支持所有 pgvector 扩展数据类型

表 2.153. PostgreSQL pgvector 数据类型的映射
pgvector 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

VECTOR

数组(FLOAT64)

io.debezium.data.DoubleVector

HALFVEC

数组(FLOAT32)

io.debezium.data.FloatVector

SPARSEVEC

STRUCT

io.debezium.data.SparseVector

包含以下字段的结构:

维度(INT16)
稀疏向量的总长度。
Vector (MAP (INT16, FLOAT64))

代表稀疏向量的映射。
每个映射值包括以下元素:

  • 向量元素的索引号(通过 1开始)。
  • vector 元素的值。

要粘贴的值

PostgreSQL 在页面大小上有一个硬限制。这意味着,大于大约 8 KB 的值需要使用 TOAST 存储存储存储。这会影响来自数据库的复制消息。使用 TOAST 机制存储的值,且尚未更改的值不会包含在消息中,除非它们是表的副本身份的一部分。Debezium 没有安全的方式直接从数据库读取缺少的值,因为这可能导致竞争条件。因此,Debezium 会遵循这些规则来处理粘贴值:

  • 带有 REPLICA IDENTITY FULL -TOAST 列值的表是更改事件中的 beforeafter 字段的一部分,就像任何其他列一样。
  • 带有 REPLICA IDENTITY DEFAULT 的表 - 从数据库接收 UPDATE 事件时,任何不是副本身份一部分的 TOAST 列值都不会包含在事件中。同样,当收到 DELETE 事件时,没有 TOAST 列(如果有)位于 before 字段中。由于 Debezium 无法在此例中安全地提供列值,因此连接器会返回由连接器配置属性 unavailable.value.placeholder 定义的占位符值。

默认值

如果为数据库模式中的列指定默认值,PostgreSQL 连接器会在可能的情况下尝试将这个值传播到 Kafka 模式。最常见的数据类型包括:

  • 布尔值
  • 数字类型(INTFLOATNUMERIC 等)
  • 文本类型(CHARVARCHARTEXT 等)
  • 临时类型(DATE,TIME,INTERVAL,TIMESTAMP,TIMESTAMPTZ)
  • JSON,JSONB,XML
  • UUID

请注意,对于 temporal 类型,PostgreSQL 库提供了默认值解析;因此,PostgreSQL 通常支持的任何字符串表示也应该被连接器支持。

如果默认值由函数生成,而不是直接指定,则连接器将为给定的数据类型导出等效的 0。这些值包括:

  • 用于 BOOLEANFALSE
  • 0, 带有适当的精度,用于数字类型
  • text/XML 类型的空字符串
  • {} 用于 JSON 类型
  • 1970-01-01 用于 DATE,TIMESTAMP,TIMESTAMPTZ 类型
  • TIME 00:00
  • INTERVALEPOCH
  • UUID00000000-0000-0000-0000-000000000000

这个支持目前只适用于显式使用功能。例如,CURRENT_TIMESTAMP (6) 支持括号,但 CURRENT_TIMESTAMP 不被支持。

重要

对默认值传播的支持主要是为了允许在将 PostgreSQL 连接器与 schema registry 搭配使用时实现安全模式演进,该 registry 在 schema 版本之间强制实施兼容性。由于这个主要关注,以及不同插件的刷新行为,Kafka 模式中的默认值无法保证始终与数据库 schema 中的默认值同步。

  • 根据给定插件触发刷新内存模式的时间,默认值可能会在 Kafka 模式中显示 'late'。如果默认更改多次 in-between 刷新,则可能不会在 Kafka 模式中跳过值
  • 在连接器等待处理时,如果在 Kafka 模式中触发了 schema 刷新,默认值可能会出现 'early'。这是因为在刷新时从数据库读取列元数据,而不是在复制消息中显示。如果连接器位于且发生刷新,则会出现这种情况,如果连接器在一段时间后停止了连接器,则会出现这种情况,同时继续写入源数据库。

这个行为可能是意外的,但仍然是安全的。只有架构定义会受到影响,但消息中存在的实际值则与写入源数据库的内容保持一致。

自定义转换器

默认情况下,Debebe 不会使用自定义数据类型从列复制数据,如使用 SQL CREATE TYPE 语句创建的复合类型。要使用自定义数据类型复制列,请遵循 创建自定义转换器 的说明,但有几个重要的注意事项:

  • 将连接器配置中的 include.unknown.datatypes 属性设置为 true。默认 false 设置会导致自定义转换器始终返回 null 值。
  • 传递给转换器的值类型取决于为复制插槽配置的逻辑解码输出插件。

    • decoderbufs 传递列数据的字节数组(byte[])表示。
    • pgoutput 传递列数据的字符串表示。

2.6.5. 设置 PostgreSQL 以运行 Debezium 连接器

此 Debezium 发行版本只支持原生 pgoutput 逻辑复制流。要设置 PostgreSQL,使其使用 pgoutput 插件,您必须启用复制插槽,并配置具有足够特权的用户来执行复制。

详情包括在以下主题中:

2.6.5.1. 为 Debezium pgoutput 插件配置复制插槽

PostgreSQL 的逻辑解码使用复制插槽。要配置复制插槽,请在 postgresql.conf 文件中指定以下内容:

wal_level=logical
max_wal_senders=1
max_replication_slots=1

这些设置指示 PostgreSQL 服务器如下:

  • wal_level - 使用 write-ahead 日志的逻辑解码。
  • max_wal_senders - 使用最多一个单独的进程来处理 WAL 更改。
  • max_replication_slots - 允许为流 WAL 更改创建最多一个复制插槽。

可以保证复制插槽保留 Debezium 所需的所有 WAL 条目,即使在 Debezium 中断期间也是如此。因此,务必要密切监控复制插槽,以避免:

  • 磁盘消耗太多
  • 任何条件,如目录 bloat,当复制插槽保留太长时,可能会发生这种情况

如需更多信息,请参阅有关 复制插槽的 PostgreSQL 文档

注意

熟悉 PostgreSQL write-ahead 日志 的机制和配置有助于使用 Debezium PostgreSQL 连接器。

2.6.5.2. 为 Debezium 连接器设置 PostgreSQL 权限

设置 PostgreSQL 服务器以运行 Debezium 连接器需要能够执行复制的数据库用户。复制只能由具有适当权限的数据库用户执行,且只能针对配置的数量的主机执行。

虽然默认情况下,超级用户具有必要的 REPLICATIONLOGIN 角色,如 安全 中所述,最好不要为 Debezium 复制用户提供升级的特权。反之,创建一个具有最低所需权限的 Debezium 用户。

先决条件

  • PostgreSQL 管理权限。

流程

  1. 要为用户提供复制权限,请定义 至少具有 REPLICATIONLOGIN 权限的 PostgreSQL 角色,然后将该角色授予该用户。例如:

    CREATE ROLE <name> REPLICATION LOGIN;

2.6.5.3. 设置特权以启用 Debezium 创建 PostgreSQL 出版物

来自为表创建的 publications 的 PostgreSQL 源表的 Debezium 流改变事件。出版物包含一组经过过滤的更改事件,这些事件由一个或多个表生成。每个发布中的数据会根据发布规格过滤。该规格可由 PostgreSQL 数据库管理员或 Debezium 连接器创建。要允许 Debezium PostgreSQL 连接器创建发布并指定要复制到它们的数据,连接器必须使用数据库中的特定特权操作。

有几个选项可用于确定如何创建发布。通常,在设置连接器前,最好为您要捕获的表手动创建发布。但是,您可以将环境配置为允许 Debezium 自动创建发布的方式,并指定添加到它们中的数据。

Debezium 使用 include list 和 exclude list 属性来指定在发布中如何插入数据。有关启用 Debezium 创建发布的选项的更多信息,请参阅 publication.autocreate.mode

要使 Debezium 创建 PostgreSQL 出版物,它必须以具有以下特权的用户身份运行:

  • 数据库中的复制特权,将表添加到发布中。
  • 数据库上的 CREATE 特权来添加发布。
  • 对表进行 SELECT 特权,以复制初始表数据。表所有者会自动具有表的 SELECT 权限。

要向发布中添加表,用户必须是表的所有者。但是,由于源表已存在,您需要一种与原始所有者共享所有权的机制。要启用共享所有权,您可以创建一个 PostgreSQL 复制策略,然后将现有的表所有者和复制用户添加到组中。

流程

  1. 创建复制组。

    CREATE ROLE <replication_group>;
  2. 将表的原始所有者添加到组。

    GRANT REPLICATION_GROUP TO <original_owner>;
  3. 将 Debezium 复制用户添加到组中。

    GRANT REPLICATION_GROUP TO <replication_user>;
  4. 将表的所有权转让到 < replication_group>

    ALTER TABLE <table_name> OWNER TO REPLICATION_GROUP;

要使 Debezium 指定捕获配置,必须将 publication.autocreate.mode 的值设置为 filtered

2.6.5.4. 配置 PostgreSQL 以允许使用 Debezium 连接器主机复制

要启用 Debezium 来复制 PostgreSQL 数据,您必须配置数据库,以允许使用运行 PostgreSQL 连接器的主机复制。要指定允许使用数据库复制的客户端,请在基于 PostgreSQL 基于主机的身份验证文件 pg_hba.conf 中添加条目。有关 pg_hba.conf 文件的更多信息,请参阅 PostgreSQL 文档。

流程

  • pg_hba.conf 文件中添加条目,以指定可以使用数据库主机复制的 Debezium 连接器主机。例如,

    pg_hba.conf 文件示例:

    local   replication     <youruser>                          trust   1
    host    replication     <youruser>  127.0.0.1/32            trust   2
    host    replication     <youruser>  ::1/128                 trust   3

    表 2.154. pg_hba.conf 设置的描述
    描述

    1

    指示服务器允许在本地复制 & lt;youruser >,即在服务器机器上。

    2

    指示服务器允许 localhost 上的 & lt;youruser > 接收使用 IPV4 的复制更改。

    3

    指示服务器允许 localhost 上的 & lt;youruser > 接收使用 IPV6 的复制更改。

注意

有关网络掩码的更多信息,请参阅 PostgreSQL 文档

2.6.5.5. 支持的 PostgreSQL 拓扑

PostgreSQL 连接器可以与独立 PostgreSQL 服务器或 PostgreSQL 服务器集群一起使用。

2.6.5.5.1. PostgreSQL 15 或更早版本集群

当您在运行 PostgreSQL 15 或更早版本的环境中部署 Debezium 时,您只能在集群的主服务器上配置逻辑复制插槽。您无法在集群中的副本服务器上配置逻辑复制。

因此,Debezium PostgreSQL 连接器可以连接并只能与主服务器通信。如果主服务器失败,连接器将停止。要从失败中恢复,您必须修复集群,然后将原始主服务器提升到 主服务器,或将不同的 PostgreSQL 服务器提升到 主服务器。如需更多信息,请参阅 失败后从新主中提取数据

2.6.5.5.2. PostgreSQL 16 或更高版本集群

当使用 PostgreSQL 16 或更高版本集群部署 Debezium 时,您可以在副本服务器上设置逻辑复制插槽。此功能使 Debezium 从主服务器以外的服务器捕获更改事件。但请注意,Debezium 连接到副本服务器通常会遇到比到主服务器的连接更高的延迟。

另外,请记住,PostgreSQL 副本服务器上的复制插槽不会自动与主服务器上的对应插槽同步。要在 PostgreSQL 16 集群中失败后进行恢复,您应该定期执行手动同步,以推进待机服务器上的复制插槽的位置,以匹配主服务器中的位置。

2.6.5.5.3. 带有 PostgreSQL 17 或更高版本的集群的 Debezium

当使用 PostgreSQL 17 或更高版本部署 Debezium 时,您可以在主服务器上设置逻辑复制插槽,并为故障转移启用这些插槽。PostgreSQL 可以自动将故障转移插槽的状态传播到一个或多个副本服务器。在启用自动复制的环境中,如果发生失败,可用的副本会自动提升到主副本。Debezium 可以继续更改新的主服务器,而无需任何配置更改,从而帮助确保连接器不会丢失任何事件。

重要

将 Debezium 与 PostgreSQL 17 搭配使用,配置故障转移复制插槽的功能是技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅技术预览功能支持范围

2.6.5.6. 配置 PostgreSQL 以管理 Debezium WAL 磁盘空间消耗

在某些情况下,WAL 文件消耗的 PostgreSQL 磁盘空间可能会激增或增加通常的比例。这种情况有几个可能的原因:

  • 连接器接收数据的 LSN 最多可在服务器的 pg_replication_slots 视图的 confirmed_flush_lsn 列中提供。比这个 LSN 旧的数据不再可用,数据库负责重新声明磁盘空间。

    另外,在 pg_replication_slots 视图中,restart_lsn 列还包含连接器可能需要的最旧 WAL 的 LSN。如果 confirmed_flush_lsn 的值定期增加,并且 restart_lsn lags 的值需要回收空间。

    数据库通常会在批处理块中回收磁盘空间。这是预期的行为,用户不需要任何操作。

  • 在被跟踪的数据库中有很多更新,但只有少量更新与连接器捕获更改的表和模式相关。这种情形可以通过定期的心跳事件来轻松解决。设置 heartbeat.interval.ms 连接器配置属性。

    注意

    要使连接器从 heartbeat 表检测和处理事件,您必须将表添加到由 publication.name 属性指定的 PostgreSQL 出版物中。如果此发布规定了 Debezium 部署,则连接器将使用定义的发布。如果发布尚未配置为自动复制数据库中更改 FOR ALL TABLES,则必须明确将 heartbeat 表添加到发布中,例如:

    ALTER PUBLICATION < publicationName > ADD TABLE < heartbeatTableName > ;

  • PostgreSQL 实例包含多个数据库,其中一个是高流量数据库。Debezium 捕获另一个数据库(与其他数据库相比)的低流量更改。然后 Debezium 无法确认 LSN 作为每个数据库的复制插槽工作,并且没有调用 Debezium。由于 WAL 由所有数据库共享,因此所使用的数量往往会增长,直到 Debezium 捕获更改的数据库发出事件为止。要解决此问题,需要:

    • 使用 heartbeat.interval.ms 连接器配置属性启用定期心跳记录生成。
    • 定期从 Debezium 捕获更改的数据库发出更改事件。

    然后,单独的进程会通过插入新行或重复更新同一行来定期更新表。然后,PostgreSQL 会调用 Debezium,它会确认最新的 LSN,并允许数据库回收 WAL 空间。此任务可以通过 heartbeat.action.query 连接器配置属性自动执行。

为同一数据库服务器设置多个连接器

Debezium 使用复制插槽从数据库流更改。这些复制插槽以 LSN (Log Sequence Number)的形式维护当前位置,它指向 Debezium 连接器消耗的 WAL 中的位置。这有助于 PostgreSQL 保留 WAL 可用,直到 Debezium 处理为止。单个复制插槽只能用于单个消费者或进程 - 因为不同的消费者可能具有不同的状态,并可能需要来自不同位置的数据。

因为复制插槽只能被单个连接器使用,因此必须为每个 Debezium 连接器创建一个唯一的复制插槽。虽然当连接器不处于活跃状态时,Postgres 可能允许其他连接器消耗复制插槽 - 这可能会危险,因为它可能会导致数据丢失,因为插槽只会在 [请参阅更多] 后发出一次更改。

除了复制插槽外,Debezium 在使用 pgoutput 插件时使用发布来流传输事件。与复制插槽类似,发布位于数据库级别,并为一组表定义。因此,每个连接器都需要一个唯一的发布,除非连接器在同一个表中工作。有关启用 Debezium 创建发布的选项的更多信息,请参阅 publication.autocreate.mode

有关如何为每个连接器设置唯一的复制插槽名称和发布名称,请参阅 slot.namepublication.name

2.6.5.7. 升级 Debezium 从中捕获的 PostgreSQL 数据库

当您升级 Debezium 使用的 PostgreSQL 数据库时,您必须执行特定的步骤来防止数据丢失,并确保 Debezium 继续操作。通常,Debezium 可以应对因网络故障和其他中断造成的中断。例如,当连接器监控的数据库服务器停止或崩溃时,连接器重新建立与 PostgreSQL 服务器通信后,它将继续从日志序列号(LSN)偏移记录的最后位置读取。连接器从 Kafka Connect offsets 主题检索最后一次记录的偏移信息,并查询配置的 PostgreSQL 复制插槽以获取具有相同值的日志序列号(LSN)。

要使连接器从 PostgreSQL 数据库启动和捕获更改事件,必须存在复制插槽。但是,作为 PostgreSQL 升级过程的一部分,复制插槽会被删除,在升级完成后不会恢复原始插槽。因此,当连接器重启并从复制插槽请求最后已知的偏移时,PostgreSQL 无法返回信息。

您可以创建新的复制插槽,但您必须执行超过创建新插槽来保护数据丢失。新的复制插槽只能为创建插槽后发生的更改提供 LSN;它无法为升级前发生的事件提供偏移。当连接器重启时,它首先从 Kafka offsets 主题请求最后已知的偏移量。然后,它会向复制插槽发送请求,以返回从偏移主题检索的偏移信息。但是,新的复制插槽无法提供连接器从预期位置恢复流所需的信息。然后,连接器跳过日志中任何现有更改事件,它只从日志中的最新位置恢复流。这可能会导致静默的数据丢失:连接器不会发出跳过的事件的记录,也不会提供任何信息来指示事件被跳过。

有关如何执行 PostgreSQL 数据库升级的指导,以便 Debezium 能够继续捕获事件,同时尽量减少数据丢失的风险,请参阅以下步骤。

流程

  1. 临时停止写入数据库的应用程序,或者将其置于只读模式。
  2. 备份数据库。
  3. 临时禁用对数据库的写访问。
  4. 在您阻止写操作被保存到 write-ahead 日志(WAL)之前,验证数据库中发生的任何更改,并且 WAL LSN 是否反映在复制插槽上。
  5. 为连接器提供足够的时间,以捕获写入复制插槽的所有事件记录。
    此步骤可确保在考虑停机前发生的所有更改事件,并将其保存到 Kafka。
  6. 通过检查 flushed LSN 的值来验证连接器是否已消耗来自复制插槽的条目。
  7. 通过停止 Kafka Connect 来安全地关闭连接器。
    Kafka Connect 停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收的最后偏移量。

    注意

    作为停止整个 Kafka Connect 集群的替代选择,您可以通过删除连接器来停止连接器。不要删除偏移主题,因为它可能由其他 Kafka 连接器共享。之后,在恢复对数据库的写入访问并准备好重启连接器后,您必须重新创建连接器。

  8. 作为 PostgreSQL 管理员,丢弃主数据库服务器上的复制插槽。不要使用 slot.drop.on.stop 属性来丢弃复制插槽。此属性仅用于测试。
  9. 停止数据库。
  10. 使用批准的 PostgreSQL 升级过程,如 pg_upgradepg_dumppg_restore 执行升级。
  11. (可选)使用标准 Kafka 工具从偏移存储主题中删除连接器偏移。
    有关如何删除连接器偏移的示例,请参阅 如何在 Debezium 社区常见问题解答中删除连接器偏移
  12. 重新启动数据库。
  13. 作为 PostgreSQL 管理员,在数据库上创建 Debezium 逻辑复制插槽。在启用对数据库的写操作前,您必须创建插槽。否则,Debebe 无法捕获更改,从而导致数据丢失。

    有关设置复制插槽的详情,请参考 第 2.6.5.1 节 “为 Debezium pgoutput 插件配置复制插槽”

  14. 验证升级后是否仍然存在定义 Debezium 要捕获的表的发布。如果发布不可用,请以 PostgreSQL 管理员身份连接到数据库,以创建新的发布。
  15. 如果需要在上一步中创建新发布,请更新 Debezium 连接器配置,将新发布的名称添加到 publication.name 属性中。
  16. 在连接器配置中,重命名连接器。
  17. 在连接器配置中,将 slot.name 设置为 Debezium 复制插槽的名称。
  18. 验证新复制插槽是否可用。
  19. 恢复对数据库的写入访问权限,然后重新启动任何写入数据库的应用程序。
  20. 在连接器配置中,将 snapshot.mode 属性设置为 never,然后重新启动连接器。

    注意

    如果您无法验证 Debezium 是否已读取步骤 6 中的所有数据库更改,您可以通过设置 snapshot.mode=initial 将连接器配置为执行新快照。如果需要,您可以通过检查升级前立即进行的数据库备份内容来确认连接器是否从复制插槽读取所有更改。

2.6.6. 部署 Debezium PostgreSQL 连接器

您可以使用以下任一方法部署 Debezium PostgreSQL 连接器:

2.6.6.1. 使用 Streams for Apache Kafka 进行 PostgreSQL 连接器部署

部署 Debezium 连接器的首选方法是使用 Streams for Apache Kafka 来构建包含连接器插件的 Kafka Connect 容器镜像。

在部署过程中,您要创建和使用以下自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR,并包含有关镜像中包含的连接器工件的信息。
  • 提供包括连接器用来访问源数据库的信息的 KafkaConnector CR。在 Apache Kafka 的 Streams 启动 Kafka Connect pod 后,您可以通过应用 KafkaConnector CR 来启动连接器。

在 Kafka Connect 镜像的构建规格中,您可以指定用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Apicurio Registry 工件或 Debezium 脚本组件。当 Apache Kafka 的 Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。

KafkaConnect CR 中的 spec.build.output 参数指定在存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在容器 registry 中,如 quay.io 或 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。

注意

如果使用 KafkaConnect 资源创建集群,之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。

其他资源

2.6.6.2. 使用 Apache Kafka 的 Streams 部署 Debezium PostgreSQL 连接器

对于 Apache Kafka 的早期版本,要在 OpenShift 上部署 Debezium 连接器,首先需要为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选方法是使用 Apache Kafka 的 Streams 中的构建配置,来自动构建包含您要使用的 Debezium 连接器插件的 Kafka Connect 容器镜像。

在构建过程中,Apache Kafka Operator 的 Streams 将 KafkaConnect 自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。

新创建的容器被推送到 .spec.build.output 中指定的容器 registry,并用于部署 Kafka Connect 集群。在 Apache Kafka 的 Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector 自定义资源来启动构建中包含的连接器。

先决条件

  • 您可以访问安装了集群 Operator 的 OpenShift 集群。
  • Apache Kafka Operator 的 Streams 正在运行。
  • 部署了 Apache Kafka 集群,如 在 OpenShift 中部署和管理 Apache Kafka 的流 中所述。
  • Kafka Connect 部署在 Apache Kafka 的 Streams 中
  • 您有一个红帽构建的 Debezium 许可证。
  • OpenShift oc CLI 客户端已安装,或者您可以访问 OpenShift Container Platform Web 控制台。
  • 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限或您必须创建 ImageStream 资源:

    将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
    • 在 registry 中创建和管理镜像的帐户和权限。
    将构建镜像存储为原生 OpenShift ImageStream

流程

  1. 登录 OpenShift 集群。
  2. 为连接器创建 Debezium KafkaConnect 自定义资源(CR),或修改现有的资源。例如,使用名称 dbz-connect.yaml 创建 KafkaConnect CR,用于指定 metadata.annotationsspec.build 属性。以下示例显示了描述 KafkaConnect 自定义资源的 dbz-connect.yaml 文件摘录。

    例 2.42. 定义包含 Debezium 连接器的 KafkaConnect 自定义资源的 dbz-connect.yaml 文件

    在以下示例中,自定义资源被配置为下载以下工件:

    • Debezium PostgreSQL 连接器存档。
    • 红帽构建的 Apicurio Registry 归档。Apicurio Registry 是一个可选组件。只有在打算将 Avro serialization 与连接器一起使用时,才添加 Apicurio Registry 组件。
    • Debezium 脚本 SMT 归档以及您要用于 Debezium 连接器的相关脚本引擎。SMT 归档和脚本语言依赖项是可选组件。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才添加这些组件。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.9.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-postgres
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/3.0.8.Final-redhat-00004/debezium-connector-postgres-3.0.8.Final-redhat-00004-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.5.11.redhat-00001/apicurio-registry-distro-connect-converter-2.5.11.redhat-00001.zip  8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/3.0.8.Final-redhat-00004/debezium-scripting-3.0.8.Final-redhat-00004.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    表 2.155. Kafka Connect 配置设置的描述
    描述

    1

    strimzi.io/use-connector-resources 注解设置为 "true",以便 Cluster Operator 使用 KafkaConnector 资源在此 Kafka Connect 集群中配置连接器。

    2

    spec.build 配置指定存储构建镜像的位置,并列出镜像中包含的插件,以及插件工件的位置。

    3

    build.output 指定存储新构建的镜像的 registry。

    4

    指定镜像输出的名称和镜像名称。output.type 的有效值为 docker,可推送到容器 registry (如 Docker Hub 或 Quay)或 镜像流 (用于将镜像推送到内部 OpenShift ImageStream)。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定 build.output 的更多信息,请参阅 Streams for Apache Kafka API 参考中的 Build schema 参考

    5

    插件配置 列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括要用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。

    6

    artifacts.type 的值指定 artifacts.url 中指定的工件的文件类型。有效类型是 ziptgz、或 jar。Debezium 连接器存档以 .zip 文件格式提供。type 值必须与 url 字段中引用的文件类型匹配。

    7

    artifacts.url 的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Red Hat Maven 存储库中提供了 Debezium 连接器工件。OpenShift 集群必须有权访问指定的服务器。

    8

    (可选)指定下载 Apicurio Registry 组件的工件 类型和 url。包括 Apicurio Registry 工件,只有在您希望连接器使用 Apache Avro 来序列化事件键和值,使用红帽构建的 Apicurio Registry 的值,而不是使用默认的 JSON 转换。

    9

    (可选)指定 Debezium 脚本 SMT 归档的工件 类型和 url,以用于 Debezium 连接器。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才包括脚本 SMT。要使用脚本 SMT,您还必须部署 JSR 223 兼容脚本实施,如 groovy。

    10

    (可选)指定与 JSR 223 脚本实施的 JAR 文件的工件 类型和 url,这是 Debezium 脚本 SMT 所需的。

    重要

    如果您使用 Streams for Apache Kafka 将连接器插件合并到 Kafka Connect 镜像中,对于每个所需的脚本语言组件 artifacts.url 必须指定 JAR 文件的位置,并且 artifacts.type 的值也必须设置为 jar。无效的值会导致连接器在运行时失败。

    要启用将 Apache Groovy 语言与脚本 SMT 搭配使用,示例中的自定义资源会检索以下库的 JAR 文件:

    • groovy
    • groovy-jsr223 (脚本代理)
    • groovy-json (用于解析 JSON 字符串的模块)

    作为替代方案,Debezium 脚本 SMT 还支持使用 GraalVM JavaScript 的 JSR 223 实施。

  3. 输入以下命令将 KafkaConnect 构建规格应用到 OpenShift 集群:

    oc create -f dbz-connect.yaml

    根据自定义资源中指定的配置,Streams Operator 准备要部署的 Kafka Connect 镜像。
    构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。您在配置中列出的连接器工件在集群中可用。

  4. 创建一个 KafkaConnector 资源来定义您要部署的每个连接器的实例。
    例如,创建以下 KafkaConnector CR,并将它保存为 postgresql-inventory-connector.yaml

    例 2.43. 为 Debezium 连接器定义 KafkaConnector 自定义资源的 PostgreSQL -inventory-connector.yaml 文件

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-postgresql 1
    spec:
      class: io.debezium.connector.postgresql.PostgresConnector 2
      tasksMax: 1  3
      config:  4
        database.hostname: postgresql.debezium-postgresql.svc.cluster.local 5
        database.port: 5432   6
        database.user: debezium  7
        database.password: dbz  8
        database.dbname: mydatabase 9
        topic.prefix: inventory-connector-postgresql 10
        table.include.list: public.inventory  11
    
        ...
    表 2.156. 连接器配置设置的描述
    描述

    1

    要注册到 Kafka Connect 集群的连接器名称。

    2

    连接器类的名称。

    3

    可同时操作的任务数量。

    4

    连接器的配置。

    5

    主机数据库实例的地址。

    6

    数据库实例的端口号。

    7

    Debezium 用来连接到数据库的帐户名称。

    8

    Debezium 用来连接到数据库用户帐户的密码。

    9

    要从中捕获更改的数据库的名称。

    10

    数据库实例或集群的主题前缀。
    指定的名称只能从字母数字字符或下划线构成。
    因为主题前缀用作从此连接器接收更改事件的 Kafka 主题的前缀,因此该名称在集群中的连接器中必须是唯一的。
    如果您将连接器与 Avro 连接器集成,则此命名空间也用于相关的 Kafka Connect 模式的名称,以及对应的 Avro 模式的命名空间。

    11

    连接器捕获更改事件的表列表。

  5. 运行以下命令来创建连接器资源:

    oc create -n <namespace> -f <kafkaConnector>.yaml

    例如,

    oc create -n debezium -f postgresql-inventory-connector.yaml

    连接器注册到 Kafka Connect 集群,并开始针对 KafkaConnector CR 中的 spec.config.database.dbname 指定的数据库运行。连接器 pod 就绪后,Debezium 正在运行。

现在 ,您可以验证 Debezium PostgreSQL 部署

2.6.6.3. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium PostgreSQL 连接器

要部署 Debezium PostgreSQL 连接器,您需要构建包含 Debezium 连接器存档的自定义 Kafka Connect 容器镜像,并将此容器镜像推送到容器 registry。然后,您需要创建两个自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR。CR 中的 image 属性指定您创建的容器镜像的名称,以运行 Debezium 连接器。您可以将此 CR 应用到部署 Red Hat Streams for Apache Kafka 的 OpenShift 实例。Apache Kafka 的流提供将 Apache Kafka 到 OpenShift 的 operator 和镜像。
  • 定义 Debezium PostgreSQL 连接器的 KafkaConnector CR。将此 CR 应用到应用 KafkaConnect CR 的同一 OpenShift 实例。

先决条件

流程

  1. 为 Kafka Connect 创建 Debezium PostgreSQL 容器:

    1. 创建一个 Dockerfile,它使用 registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0 作为基础镜像。例如,在终端窗口中输入以下命令:

      cat <<EOF >debezium-container-for-postgresql.yaml 1
      FROM registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 2
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/3.0.8.Final-redhat-00004/debezium-connector-postgres-3.0.8.Final-redhat-00004-plugin.zip \
      && unzip debezium-connector-postgres-3.0.8.Final-redhat-00004-plugin.zip \
      && rm debezium-connector-postgres-3.0.8.Final-redhat-00004-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/
      USER 1001
      EOF
      描述

      1

      您可以指定您想要的任何文件名。

      2

      指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将此路径替换为您的目录的实际路径。

      该命令在当前目录中创建一个名为 debezium-container-for-postgresql.yaml 的 Dockerfile。

    2. 从您在上一步中创建的 debezium-container-for-postgresql.yaml Docker 文件中构建容器镜像。在包含该文件的目录中,打开终端窗口并输入以下命令之一:

      podman build -t debezium-container-for-postgresql:latest .
      docker build -t debezium-container-for-postgresql:latest .

      build 命令使用名称 debezium-container-for-postgresql 构建容器镜像。

    3. 将自定义镜像推送到容器 registry 中,如 quay.io 或内部容器 registry。容器镜像仓库必须可供您要部署镜像的 OpenShift 实例使用。输入以下命令之一:

      podman push <myregistry.io>/debezium-container-for-postgresql:latest
      docker push <myregistry.io>/debezium-container-for-postgresql:latest
    4. 创建新的 Debezium PostgreSQL KafkaConnect 自定义资源(CR)。例如,使用名称 dbz-connect.yaml 创建 KafkaConnect CR,用于指定 注解和 镜像 属性。以下示例显示了描述 KafkaConnect 自定义资源的 dbz-connect.yaml 文件摘录。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        image: debezium-container-for-postgresql 2
      
        ...
      描述

      1

      metadata.annotations 表示 KafkaConnector 资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。

      2

      spec.image 指定为运行 Debezium 连接器而创建的镜像的名称。此属性覆盖 Cluster Operator 中的 STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 变量。

    5. 运行以下命令,将 KafkaConnect CR 应用到 OpenShift Kafka 实例:

      oc create -f dbz-connect.yaml

      这会更新 OpenShift 中的 Kafka Connect 环境,以添加 Kafka Connector 实例,以指定您为运行 Debezium 连接器而创建的镜像的名称。

  2. 创建一个 KafkaConnector 自定义资源,用于配置 Debezium PostgreSQL 连接器实例。

    您可以在 .yaml 文件中配置 Debezium PostgreSQL 连接器,该文件指定连接器的配置属性。连接器配置可能会指示 Debezium 为模式和表的子集生成事件,或者可能会设置属性,以便 Debezium 忽略、掩码或截断指定列中的值,这些值是敏感、太大或不需要的。有关您可以为 Debezium PostgreSQL 连接器设置的配置属性的完整列表,请参阅 PostgreSQL 连接器属性

    以下示例显示了一个自定义资源摘录,它配置一个 Debezium 连接器,该连接器连接到 PostgreSQL 服务器主机 192.168.99.100,在端口 5432 上。此主机有一个名为 sampledb 的数据库,名为 public 的 schema,inventory-connector-postgresql 是服务器的逻辑名称。

    PostgreSQL inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector-postgresql  1
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.postgresql.PostgresConnector
        tasksMax: 1  2
        config:  3
          database.hostname: 192.168.99.100   4
          database.port: 5432
          database.user: debezium
          database.password: dbz
          database.dbname: sampledb
          topic.prefix: inventory-connector-postgresql   5
          schema.include.list: public   6
          plugin.name: pgoutput    7
    
          ...

    表 2.157. PostgreSQL inventory-connector.yaml 示例中的设置描述
    描述

    1

    用于在 Kafka Connect 中注册连接器的名称。

    2

    为此连接器创建的最大任务数量。因为 PostgreSQL 连接器使用单个连接器任务读取 PostgreSQL 服务器 binlog,以确保正确顺序和事件处理,所以一次只能运行一个任务。Kafka Connect 服务使用连接器启动一个或多个任务来执行工作,并在 Kafka Connect 服务集群中自动分发正在运行的任务。如果有任何服务停止或崩溃,任务将重新分发到运行的服务。

    3

    连接器的配置。

    4

    运行 PostgreSQL 服务器的数据库主机的名称。在本例中,数据库主机名是 192.168.99.100

    5

    唯一的主题前缀。主题前缀是 PostgreSQL 服务器或服务器集群的逻辑标识符。这个字符串作为前缀放在所有 Kafka 主题的名称前面,这些主题从连接器接收更改事件记录。

    6

    连接器只捕获 public 模式中的更改。可以将连接器配置为只捕获您选择的表中的更改。如需更多信息,请参阅 table.include.list

    7

    PostgreSQL 逻辑解码插件在 PostgreSQL 服务器上安装的名称。虽然连接器只支持使用 pgoutput 插件,但您必须将 plugin.name 明确设置为 pgoutput

  3. 使用 Kafka Connect 创建连接器实例。例如,如果您在 inventory-connector.yaml 文件中保存 KafkaConnector 资源,您将运行以下命令:

    oc apply -f inventory-connector.yaml

    这会注册 inventory-connector,连接器开始针对 KafkaConnector CR 中定义的 sampledb 数据库运行。

结果

连接器启动后,它会对 为连接器配置的 PostgreSQL 服务器数据库执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流传输到 Kafka 主题。

2.6.6.4. 验证 Debezium PostgreSQL 连接器是否正在运行

如果连接器正确启动且没有错误,它会为每个表创建一个主题,这些表配置为捕获连接器。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。

要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:

  • 验证连接器状态。
  • 验证连接器是否生成主题。
  • 验证主题是否填充了用于读取操作的事件("op":"r"),连接器在每个表的初始快照过程中生成的。

先决条件

  • 在 OpenShift 中,Debezium 连接器部署到 Streams for Apache Kafka。
  • 已安装 OpenShift oc CLI 客户端。
  • 访问 OpenShift Container Platform web 控制台。

流程

  1. 使用以下方法之一检查 KafkaConnector 资源的状态:

    • 在 OpenShift Container Platform Web 控制台中:

      1. 导航到 Home Search
      2. Search 页面中,点 Resources 打开 Select Resource 框,然后键入 KafkaConnector
      3. KafkaConnectors 列表中,点您要检查的连接器名称,如 inventory-connector-postgresql
      4. Conditions 部分中,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc describe KafkaConnector <connector-name> -n <project>

        例如,

        oc describe KafkaConnector inventory-connector-postgresql -n debezium

        该命令返回与以下输出类似的状态信息:

        例 2.44. KafkaConnector 资源状态

        Name:         inventory-connector-postgresql
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-postgresql
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-postgresql.inventory
            inventory-connector-postgresql.inventory.addresses
            inventory-connector-postgresql.inventory.customers
            inventory-connector-postgresql.inventory.geom
            inventory-connector-postgresql.inventory.orders
            inventory-connector-postgresql.inventory.products
            inventory-connector-postgresql.inventory.products_on_hand
        Events:  <none>
  2. 验证连接器是否已创建 Kafka 主题:

    • 通过 OpenShift Container Platform Web 控制台。

      1. 导航到 Home Search
      2. Search 页面上,单击 Resources 以打开 Select Resource 框,然后键入 KafkaTopic
      3. KafkaTopics 列表中,点您要检查的主题名称,如 inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
      4. Conditions 部分中,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc get kafkatopics

        该命令返回与以下输出类似的状态信息:

        例 2.45. KafkaTopic 资源状态

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-postgresql--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
  3. 检查主题内容。

    • 在终端窗口中输入以下命令:
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    例如,

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-postgresql.inventory.products_on_hand

    指定主题名称的格式与步骤 1 中 oc describe 命令返回的格式(如 inventory-connector-postgresql.inventory.addresses )。

    对于主题中的每个事件,命令会返回类似以下输出的信息:

    例 2.46. Debezium 更改事件的内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"postgresql","name":"inventory-connector-postgresql","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"postgresql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}

    在前面的示例中,有效负载 值显示连接器快照从表 inventory.products_on_hand 生成一个读取("op" ="r")事件。product_id 记录的 "before" 状态为 null,表示记录没有之前的值。"after" 状态对于 product_id101 的项目的 quantity 显示为 3

2.6.6.5. Debezium PostgreSQL 连接器配置属性的描述

Debezium PostgreSQL 连接器有许多配置属性,您可以使用它们来实现应用程序的正确连接器行为。许多属性具有默认值。有关属性的信息按如下方式进行组织:

所需的 Debezium PostgreSQL 连接器配置属性

除非默认值可用 否则需要以下配置属性。

表 2.158. 所需的连接器配置属性
属性默认描述

name

没有默认值

连接器的唯一名称。尝试使用相同名称再次注册将失败。所有 Kafka Connect 连接器都需要此属性。

connector.class

没有默认值

连接器的 Java 类的名称。对于 PostgreSQL 连接器,始终使用 io.debezium.connector.postgresql.PostgresConnector 的值。

tasks.max

1

应该为此连接器创建的最大任务数量。PostgreSQL 连接器始终使用单个任务,因此不使用这个值,因此始终可以接受默认值。

plugin.name

decoderbufs

PostgreSQL 逻辑解码插件在 PostgreSQL 服务器上安装的名称。

唯一支持的值是 pgoutput。您必须将 plugin.name 明确设置为 pgoutput

slot.name

debezium

创建用于特定数据库/schema 的特定插件的 PostgreSQL 逻辑解码插槽的名称。服务器使用此插槽将事件流传输到您要配置的 Debezium 连接器。

插槽名称必须符合 PostgreSQL 复制插槽命名规则其状态:"每个复制插槽都有一个名称,可包含小写字母、数字和下划线字符"。

slot.drop.on.stop

false

当连接器以安全、预期的方式停止时,是否删除逻辑复制插槽。默认行为是,当连接器停止时,复制插槽仍然为连接器配置。当连接器重启时,具有相同的复制插槽可让连接器开始处理它的位置。

仅在测试或开发环境中设置为 true。丢弃插槽允许数据库丢弃 WAL 段。当连接器重启时,它会执行新的快照,或者从 Kafka Connect offsets 主题中的持久偏移继续。

slot.failover

false

指定连接器是否创建故障转移插槽。如果省略此设置,或者主服务器运行 PostgreSQL 16 或更早版本,则连接器不会创建故障转移插槽。

注意

PostgreSQL 使用 synchronized_standby_slots 参数来配置主服务器和待机服务器之间的复制插槽同步。在主服务器中设置此参数,以指定它在待机服务器上与它同步的物理复制插槽。

重要

将 Debezium 与 PostgreSQL 17 搭配使用,配置故障转移复制插槽的功能是技术预览功能。
如需更多信息,请参阅 PostgreSQL 支持的拓扑

publication.name

dbz_publication

在使用 pgoutput 时为流更改创建的 PostgreSQL 出版物的名称。

如果此发布不存在,则会在启动时创建,它将 包含所有表。然后,Debezium 应用自己的 include/exclude 列表过滤(如果已配置),将发布限制为更改感兴趣的特定表的事件。连接器用户必须具有创建此出版物的超级用户权限,因此通常最好在首次启动连接器前创建发布。

如果发布已存在,无论是所有表,或者通过表子集配置,Debezium 会使用定义的发布。

database.hostname

没有默认值

PostgreSQL 数据库服务器的 IP 地址或主机名。

database.port

5432

PostgreSQL 数据库服务器的整数端口号。

database.user

没有默认值

用于连接到 PostgreSQL 数据库服务器的 PostgreSQL 数据库用户的名称。

database.password

没有默认值

连接到 PostgreSQL 数据库服务器时要使用的密码。

database.dbname

没有默认值

从中流传输更改的 PostgreSQL 数据库的名称。

topic.prefix

没有默认值

为特定的 PostgreSQL 数据库服务器或集群提供命名空间的主题前缀,在其中捕获 Debezium。前缀应该在所有其他连接器之间唯一,因为它用作从这个连接器接收记录的所有 Kafka 主题的主题名称前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、点和下划线。

警告

不要更改此属性的值。如果您更改了 name 值,重启后,而不是继续向原始主题发送事件,连接器会将后续事件发送到名称基于新值的主题。

schema.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您要 捕获更改的模式名称匹配。没有包含在 schema. include.list 中的架构 名称都会被捕获。默认情况下,所有非系统模式都会捕获其更改。

要匹配模式的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与架构的整个标识符匹配,它与 schema 名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,不要设置 schema.exclude.list 属性。

schema.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与 您不想 捕获更改的模式名称匹配。任何名称不包含在 schema. exclude.list 中的模式 都会捕获其更改,但系统模式除外。

要匹配模式的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与架构的整个标识符匹配,它与 schema 名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,请不要设置 schema.include.list 属性。

table.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您要捕获的表的完全限定表标识符匹配。当设置此属性时,连接器只从指定的表中捕获更改。每个标识符都是 schemaName.tableName 的形式。默认情况下,连接器捕获捕获更改的每个模式中的每个非系统表中的更改。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个标识符匹配;它不匹配表名称中可能存在的子字符串。
如果您在配置中包含此属性,不要设置 table.exclude.list 属性。

table.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您不想捕获的表的完全限定表标识符匹配。每个标识符都是 schemaName.tableName 的形式。当设置此属性时,连接器会捕获您没有指定的每个表中的更改。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个标识符匹配;它不匹配表名称中可能存在的子字符串。
如果您在配置中包含此属性,请不要设置 table.include.list 属性。

column.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与更改事件记录值中包含的列的完全限定域名匹配。列的完全限定域名格式为 schemaName.tableName.columnName

要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,表达式用于与列中的整个名称字符串匹配;它不匹配列名称中可能存在的子字符串。
如果您在配置中包含此属性,不要设置 column.exclude.list 属性。

column.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与应从更改事件记录值中排除的列的完全限定名称匹配。列的完全限定域名格式为 schemaName.tableName.columnName

要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,表达式用于与列中的整个名称字符串匹配;它不匹配列名称中可能存在的子字符串。
如果您在配置中包含此属性,请不要设置 column.include.list 属性。

skip.messages.without.change

false

指定在包含的列中没有更改时跳过发布消息。如果列没有包括每个 column.include.listcolumn.exclude.list 属性的更改,这将过滤消息。

注意

只有在表的 REPLICA IDENTITY 被设置为 FULL 时,才会应用此属性。

time.precision.mode

自适应性

时间、日期和时间戳可以以不同的精度类型表示:

adaptive 捕获数据库中的时间和时间戳值,其使用 millisecond, microsecond, 或 nanosecond 精度值,根据数据库列的类型类型。

adaptive_time_microseconds 捕获日期、日期和时间戳值,使用 millisecond、microsecond 或 nanosecond 精度值(根据数据库类型)来捕获数据库类型。

adaptive_time_microseconds 以数据库类型表示的时间、日期和时间戳值。一个例外是 TIME 类型字段,它总是被捕获为微秒。

connect always 代表时间和时间戳值,使用 Kafka Connect 的内置表示 TimeDate, 和 Timestamp,无论数据库列的精度是什么。如需更多信息,请参阅 临时值

decimal.handling.mode

precise

指定连接器应该如何处理 DECIMALNUMERIC 列的值:

precise 代表使用 java.math.BigDecimal 来以二进制形式代表改变事件的值。

double 代表使用 double 值来代表值。它可能会降低一些精度,但更加容易使用。

string 以特定格式的字符串来对值进行编码。这容易使用,但其代表的真实类型的信息可能会丢失。如需更多信息,请参阅 Decimal 类型

hstore.handling.mode

json

指定连接器应该如何处理 hstore 列的值:

map 代表使用 MAP

json 代表使用 json 字符串 代表值。此设置将值编码为格式的字符串,如 {"key" : "val"}。如需更多信息,请参阅 PostgreSQL HSTORE 类型

interval.handling.mode

numeric

指定连接器如何处理 interval 列的值:

numeric 代表使用大约微秒数的间隔。

string 代表间隔,使用 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 代表。例如: P1Y2M3DT4H5M6.78S。如需更多信息,请参阅 PostgreSQL 基本类型

database.sslmode

prefer

是否使用到 PostgreSQL 服务器的加密连接。选项包括:

禁用 使用未加密的连接。

允许 首先尝试使用未加密的连接,失败,如果无法建立安全(加密)连接。

首先会尝试使用安全(加密)连接,失败,否则未加密连接。

需要使用 安全(加密)连接,如果无法建立,则失败。

verify-ca 的行为如 require,但也会根据配置的证书颁发机构(CA)证书验证服务器证书。或者,如果没有找到有效的匹配 CA 证书。

verify-
full 的行为类似于 verify-ca,但也验证服务器证书是否与连接器要连接的主机匹配。如需更多信息 ,请参阅 PostgreSQL 文档

database.sslcert

没有默认值

包含客户端的 SSL 证书的文件路径。如需更多信息 ,请参阅 PostgreSQL 文档

database.sslkey

没有默认值

包含客户端 SSL 私钥的文件的路径。如需更多信息 ,请参阅 PostgreSQL 文档

database.sslpassword

没有默认值

database.sslkey 指定的文件访问客户端私钥的密码。如需更多信息 ,请参阅 PostgreSQL 文档

database.sslrootcert

没有默认值

包含验证服务器的根证书的文件的路径。如需更多信息 ,请参阅 PostgreSQL 文档

database.sslfactory

没有默认值

创建 SSL 套接字的类名称。使用 org.postgresql.ssl.NonValidatingFactory 在开发环境中禁用 SSL 验证。

database.tcpKeepAlive

true

启用 TCP keep-alive 探测,以验证数据库连接是否仍处于活动状态。如需更多信息 ,请参阅 PostgreSQL 文档

tombstones.on.delete

true

控制 删除 事件是否后跟 tombstone 事件。

true - 一个 delete 事件表示一个 delete 事件和后续的 tombstone 事件。

false - 仅有一个 delete 事件被抛出。

删除源记录后,发出 tombstone 事件(默认行为)可让 Kafka 完全删除与删除行的密钥相关的所有事件,以防为主题启用了 日志压缩

column.truncate.to.length.chars

不适用

可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。如果在列中的数据超过了在属性名中的 length 指定的字符长度时删节数据,设置此属性。将 length 设置为正整数值,例如 column.truncate.to.20.chars

一个列的完全限定名称会观察以下格式:< schemaName> . <tableName &gt; . <columnName&gt;。要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。

您可以在单个配置中指定多个长度不同的属性。

column.mask.with.length.chars

不适用

可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。如果您希望连接器屏蔽一组列的值,例如,如果它们包含敏感数据,则设置此属性。将 length 设置为一个正整数,替换在属性名称中的 length 指定的星号(*)的数量列中的数据。将 length 设置为 0 (zero),将指定列中的数据替换为空字符串。

列的完全限定域名观察以下格式: schemaName.tableName.columnName。要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。

您可以在单个配置中指定多个长度不同的属性。

column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt

不适用

可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。列的完全限定域名格式为 <schemaName>.<tableName>.<columnName>.
要匹配 Debezium 的名称,请使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。在生成的更改事件记录中,指定列的值替换为 pseudonyms。

一个 pseudonym,它包括了通过应用指定的 hashAlgorithmsalt 的结果的哈希值。根据使用的 hash 功能,引用完整性会被维护,而列值则替换为 pseudonyms。Java 加密架构标准算法 文档的 MessageDigest 部分中 描述了支持的哈希功能。

在以下示例中,CzQMA0cB5K 是一个随机选择的 salt。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

如有必要,pseudonym 会自动缩短到列的长度。连接器配置可以包含多个属性,以指定不同的哈希算法和 salt。

根据使用的 hashAlgorithm,所选的 salt 和实际数据集,可能无法完全屏蔽。

应该使用哈希策略版本 2 来确保在不同位置或系统中对值进行哈希处理。

column.propagate.source.type

不适用

可选的、以逗号分隔的正则表达式列表,与您希望连接器发送代表列元数据的额外参数匹配。当设置此属性时,连接器将以下字段添加到事件记录的 schema 中:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(用于变量宽度类型)。
启用连接器来发送此额外数据有助于在 sink 数据库中正确调整特定数字或基于字符的列。

列的完全限定域名观察以下格式之一: databaseName.tableName.columnName, 或 databaseName.schemaName.tableName.columnName
要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。

datatype.propagate.source.type

不适用

可选的、以逗号分隔的正则表达式列表,用于指定为数据库中列定义的数据类型的完全限定名称。当设置此属性时,对于带有匹配数据类型的列,连接器会发出事件记录,该记录在其 schema 中包含以下额外字段:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(用于变量宽度类型)。
启用连接器来发送此额外数据有助于在 sink 数据库中正确调整特定数字或基于字符的列。

列的完全限定域名观察以下格式之一: databaseName.tableName.typeName, 或 databaseName.schemaName.tableName.typeName
要匹配数据类型的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与数据类型的整个名称字符串匹配;表达式不匹配类型名称中可能存在的子字符串。

有关特定于 PostgreSQL 的数据类型名称的列表,请参阅 PostgreSQL 数据类型映射

message.key.columns

空字符串

一个表达式列表,用于指定连接器用来组成自定义消息键的表达式列表,用于更改它发布到指定表的 Kafka 主题。

默认情况下,Debebe 使用表的主键列作为发出的记录的消息键。使用默认键,或者为缺少主密钥的表指定一个键,您可以根据一个或多个列配置自定义消息密钥。

要为表建立自定义消息密钥,请列出表,后跟要用作消息键的列。每个列表条目都采用以下格式:

<fully-qualified_tableName> : & lt;keyColumn&gt;, <keyColumn>

To base a table key on multiple column name, insert commas between the column name.

每个完全限定表名称都是以下格式的正则表达式:

<schemaName> . & lt;tableName>

属性可以包含多个表的条目。使用分号分隔列表中的表条目。

以下示例为表 inventory.customerspurchase.orders 设置了消息键:

inventory.customers:pk1,pk2; (configured).purchaseorders:pk3,pk4

在示例中,列 pk1pk2 被指定为表 inventory.customer 的消息键。对于任何模式中的 purchaseorders 表,列 pk3pk4 充当消息键。

对用来创建自定义消息键的列数量没有限制。但是,最好使用指定唯一密钥所需的最小数量。

重要

如果您为此属性指定的表达式匹配不是表主键一部分的列,请将表的 REPLICA IDENTITY 设置为 FULL。如果将 REPLICA IDENTITY 设置为另一个值,如 DEFAULT,在删除操作后,连接器无法生成带有预期的 null 值的 tombstone 事件。

publication.autocreate.mode

all_tables

指定连接器是否和方式创建 发布。此设置仅在使用 pgoutput 插件 的连接器流更改时应用。

注意

要创建发布,连接器必须通过具有特定权限的数据库帐户访问 PostgreSQL。如需更多信息,请参阅设置特权,使 Debezium 创建 PostgreSQL 出版物

指定以下值之一:

all_tables
如果存在发布,连接器会使用它。
如果发布不存在,连接器会为数据库中的所有表创建一个发布,连接器会捕获更改。连接器运行以下 SQL 命令来创建发布:

CREATE PUBLICATION <publication_name> FOR ALL TABLES;
disabled
连接器不会尝试创建发布。配置为执行复制的数据库管理员或用户必须已创建了发布,然后才能运行连接器。如果连接器无法找到发布,连接器会抛出异常并停止。
过滤
如果发布不存在,连接器会按照以下格式运行 SQL 命令创建一个 SQL 命令:

CREATE PUBLICATION <publication_name> FOR TABLE <tABLE <tbl2, tbl3>
生成的发布程序包括与当前过滤器配置匹配的表,如 schema.include.list
schema.exclude.list、s table.include.list、和table.include.list,以及 table.exclude.list 配置属性。
如果发布存在,连接器会按照以下格式运行 SQL 命令来更新与当前过滤器配置匹配的表的发布:

ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl3>
no_tables
如果存在发布,连接器会使用它。如果发布不存在,连接器会创建一个发布程序,而无需以以下格式运行 SQL 命令来指定任何表:

CREATE PUBLICATION <publication_name>;

设置 no_tables 选项(如果您希望连接器只捕获逻辑解码消息),而不捕获任何其他更改事件,如 INSERT
UPDATEDELETE 操作的原因。

如果您选择这个选项,为了避免连接器发出和处理 READ 事件,您可以指定您不想捕获更改的模式或表,例如,使用 "table.exclude.list": "public.exclude" 或 " schema.exclude.list": "public"

replica.identity.autoset.values

空字符串

根据表名称,将此属性设置为连接器捕获的表的子集。https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY属性设置的副本身份值覆盖数据库中设置的副本身份值。

属性接受以逗号分隔的键值对列表。每个键都是匹配完全限定表名称的正则表达式;对应的值指定副本身份类型。例如:

<fqTableNameA>:<replicaIdentity1>, &lt;fqTableNameB>: &lt;replicaIdentity2>, &lt;fqTableNameC>: &lt;replicaIdentity3>

使用以下格式指定完全限定表名称:
SchemaName.TableName

将副本身份设置为以下值之一:

DEFAULT
记录值(如果存在),在更改事件前为主键列设置。这是非系统表的默认设置。
INDEX indexName
记录在更改事件前为指定索引定义的所有列设置的值。索引必须是唯一的,而不是部分,不可延迟,且必须包含标记为 NOT NULL 的列。如果指定的索引被丢弃,则生成的行为与您将值设为 NOTHING 的行为相同。
FULL
记录更改事件之前为行中的所有列设置的值。
NOTHING
在更改事件前记录行状态的信息。这是系统表的默认值。

schema1.*:FULL,schema2.table2:NOTHING,schema2.table3:INDEX idx_name

重要

replica.identity.autoset.values 属性只适用于连接器捕获的表。其他表将被忽略,即使它们与指定的表达式匹配。使用以下连接器属性指定表来捕获:

binary.handling.mode

bytes

指定在更改事件中如何表示二进制(字节)列。指定以下值之一:

bytes
将二进制数据表示为字节数组。
base64
以 base64 编码的字符串表示二进制数据。
base64-url-safe
将二进制数据表示为 base64-url-safe-encoded 字符串。
hex
以十六进制编码(base16)字符串表示二进制数据。

schema.name.adjustment.mode

none

指定应该如何调整模式名称,以便与连接器使用的消息转换器兼容。设置以下值之一:

none
不应用任何调整。
avro
将无法在 Avro 类型名称中使用的字符替换为下划线。
avro_unicode
将 Avro 类型名称中使用的下划线或字符替换为对应的 Unicode 字符,如 _uxxxx
注意

在前面的示例中,下划线字符(_)代表转义序列,相当于 Java 中的反斜杠。

field.name.adjustment.mode

none

指定应如何调整字段名称,以便与连接器使用的消息转换器兼容。指定以下值之一:

none
不要应用任何调整。
avro
将 Avro 类型名称中使用的字符替换为下划线。
avro_unicode
将 Avro 类型名称中使用的下划线或字符替换为对应的 Unicode 字符,如 _uxxxx
注意

在前面的示例中,下划线字符(_)代表转义序列,相当于 Java 中的反斜杠。

如需更多信息,请参阅 Avro 命名

money.fraction.digits

2

指定在将 Postgres money 类型转换为 java.math.BigDecimal(它代表更改事件中的值)时应使用多少位的十进制数字。仅在将 decimal.handling.mode 设置为 precise 时才适用。

message.prefix.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您希望连接器捕获的逻辑解码消息前缀的名称匹配。默认情况下,连接器会捕获所有逻辑解码信息。当设置此属性时,连接器只使用属性指定的前缀捕获逻辑解码消息。所有其他逻辑解码消息都排除。

要匹配消息前缀的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与整个消息前缀字符串匹配;表达式不匹配前缀中可能存在的子字符串。

如果您在配置中包含此属性,不要设置 message.prefix.exclude.list 属性。

有关 消息 事件结构及其排序语义的详情,请参考 消息 事件

message.prefix.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您不希望连接器捕获的逻辑解码消息前缀匹配。当设置此属性时,连接器不会捕获使用指定前缀的逻辑解码消息。所有其他消息都会被捕获。
要排除所有逻辑解码信息,请将此属性的值设置为 aws

要匹配消息前缀的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与整个消息前缀字符串匹配;表达式不匹配前缀中可能存在的子字符串。

如果您在配置中包含此属性,不要设置 message.prefix.include.list 属性。

有关 消息 事件结构及其排序语义的详情,请参考 消息 事件

高级 Debezium PostgreSQL 连接器配置属性

以下 高级配置 属性在大多数情况下可以正常工作,因此很少需要在连接器配置中指定。

表 2.159. 高级连接器配置属性
属性默认描述

converters

没有默认值

枚举连接器可以使用的 自定义转换器 实例的符号链接名称列表。例如,

isbn

您必须设置 converters 属性,以便连接器可以使用自定义转换器。

对于您为连接器配置的每个转换器,还必须添加一个 .type 属性,它指定实现转换器接口的类的完全限定域名。.type 属性使用以下格式:

<converterSymbolicName>.type

例如,

isbn.type: io.debezium.test.IsbnConverter

如果要进一步控制配置的转换器的行为,您可以添加一个或多个配置参数来将值传递给转换器。要将任何其他配置参数与转换器关联,请为参数名称添加转换器符号名称作为前缀。
例如,

isbn.schema.name: io.debezium.postgresql.type.Isbn

snapshot.isolation.mode

serializable

指定事务隔离级别以及锁定类型(若有),连接器会在初始快照或临时阻塞快照期间读取数据时应用。

每个隔离级别在一次优化并发和性能之间都有不同的平衡,并将数据一致性和准确性最大化。使用更严格的隔离级别的快照会带来更高的质量、更一致的数据,但由于锁定时间和较少的并发事务,改进的成本会降低性能。更严格的隔离级别可以提高效率,但会牺牲不一致的数据。有关 PostgreSQL 中事务隔离级别的更多信息,请参阅 PostgreSQL 文档

指定以下隔离级别之一:

serializable
默认和限制性隔离级别。这个选项可防止序列化异常,并提供最高级别的数据完整性。

为确保捕获的表的数据一致性,快照在使用可重复读取隔离级别的事务中运行,阻止表上的并发 DDL 更改,并锁定数据库以索引创建。设置此选项时,用户或管理员无法执行某些操作,如创建表索引,直到快照结束为止。整个表密钥范围保持锁定,直到快照完成为止。这个选项与连接器中可用的快照行为匹配,在引入此属性前。
repeatable_read
防止其他事务在快照期间更新表行。快照捕获的新记录可能会出现两次;首先,作为初始快照的一部分,然后在流传输阶段再次显示。但是,对于数据库镜像,这种一致性级别是可以容忍的。确保扫描表和阻止所选表上的 DDL 之间的数据一致性,以及整个数据库中的并发索引创建。允许序列化异常。
read_committed
在 PostgreSQL 中,Read Uncommitted 和 Read Committed 隔离模式的行为之间没有区别。因此,对于此属性,read_committed 选项有效地提供最严格的隔离级别。设置此选项会影响初始和临时阻塞快照的一些一致性,但在快照期间为其他用户提供更好的数据库性能。

通常,此事务一致性级别适合数据镜像。其他事务无法更新快照中的表行。但是,当在初始快照中添加记录时,次要数据不一致可能会发生,连接器稍后会在流阶段开始后重新捕获记录。
read_uncommitted
通常,此选项提供最严格的隔离级别。但是,如 read-committed 选项的描述中所述,对于 Debezium PostgreSQL 连接器,这个选项提供与 read_committed 选项相同的隔离级别。

snapshot.mode

Initial

指定在连接器启动时执行快照的条件:

always
连接器在每次启动时都执行快照。快照包括捕获的表的结构和数据。指定此值,每次连接器启动时,使用从捕获的表中数据的完整表示来填充主题。快照完成后,连接器将开始流传输后续数据库更改的事件记录。
Initial
连接器仅在没有为逻辑服务器名称记录偏移时才执行快照。
initial_only
连接器执行初始快照,然后停止,而无需处理任何后续更改。
no_data
连接器永远不会执行快照。当以这种方式配置连接器时,启动后,它的行为如下:

如果 Kafka offsets 主题中存在之前存储的 LSN,则连接器将继续从该位置流更改。如果没有存储 LSN,则连接器会在服务器上创建 PostgreSQL 逻辑复制插槽时从点开始流更改。只有在您知道所有感兴趣的数据仍然反映在 WAL 中时,才使用此快照模式。

never
弃用了 no_data
when_needed

连接器启动后,只有在检测到以下情况之一时才执行快照:

  • 它无法检测任何主题偏移。
  • 之前记录的偏移量指定了服务器上不可用的日志位置。

如需更多信息,请参阅 snapshot.mode 选项表

snapshot.locking.mode

none

指定连接器在执行 schema 快照时如何在表上锁定。
设置以下选项之一:

shared
连接器保管表锁定,可防止读取数据库模式和其他元数据的快照的初始部分访问。在初始阶段后,快照不再需要表锁定。
none
连接器会完全避免锁定。
警告

如果在快照过程中可能会出现模式,请不要使用此模式。

snapshot.query.mode

select_all

指定连接器在执行快照时如何查询数据。
设置以下选项之一:

select_all
连接器默认执行 选择所有查询,可以选择根据列包含和排除列表配置调整所选列。

与使用 snapshot.select.statement.overrides 属性相比,此设置可让您以更灵活的方式管理快照内容。

snapshot.include.collection.list

table.include.list中指定的所有表

可选的、以逗号分隔的正则表达式列表,与表的完全限定名称(<schemaName>.<tableName&gt;)匹配,包括在快照中。指定的项目必须在连接器的 table.include.list 属性中命名。只有在连接器的 snapshot.mode 属性设置为 never 以外的值时,此属性才会生效。
此属性不会影响增量快照的行为。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。

snapshot.lock.timeout.ms

10000

正整数值,用于指定在执行快照时等待获取表锁定的最大时间(以毫秒为单位)。如果连接器无法在这个时间段内获取表锁定,则快照会失败。连接器如何执行快照 提供详细信息。

snapshot.select.statement.overrides

没有默认值

指定要包含在快照中的表行。如果您希望快照只包括表中的行的子集,请使用该属性。此属性仅影响快照。它不适用于连接器从日志读取的事件。

属性包含以逗号分隔的完全限定表名称列表,格式为 < schemaName>.<tableName&gt;。例如,

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

用于列表中的每个表,添加一个进一步的配置属性,指定连接器在获取快照时在表上运行的 SELECT 语句。指定的 SELECT 语句决定了快照中包含的表行子集。使用以下格式指定此 SELECT 语句属性的名称:

snapshot.select.statement.overrides. <schemaName> . < tableName>.例如,snapshot.select.statement.overrides.customers.orders

Example:

从包括软删除列 delete_flagcustomers.orders 表中,如果您希望快照只包含没有软删除的记录,请添加以下属性:

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,连接器只包含 delete_flag = 0 的记录。

event.processing.failure.handling.mode

fail

指定连接器在处理事件时应如何响应异常:

失败 传播异常,表示有问题的事件的偏移,并导致连接器停止。

会警告 日志有问题的事件的偏移、跳过该事件并继续处理。

跳过 有问题的事件并继续处理。

max.batch.size

2048

正整数值,用于指定连接器进程的每个批处理事件的最大大小。

max.queue.size

8192

正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会在将事件写入 Kafka 前将事件放在阻塞队列中。当连接器比将信息写入 Kafka 的速度或 Kafka 不可用时,阻塞队列可能会提供从数据库读取更改事件的情况。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。

max.queue.size.in.bytes

0

指定阻塞队列的最大卷的长整数值,以字节为单位。默认情况下,没有为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。
如果也设置了 max.queue.size,当队列的大小达到由任一属性指定的限制时,写入队列会被阻断。例如,如果您设置了 max.queue.size=1000max.queue.size.in.bytes=5000,则在队列包含 1000 个记录后写入队列会被阻断,或者在队列中的记录卷达到 5000 字节。

poll.interval.ms

500

正整数值指定连接器在开始处理一系列事件前应等待新的更改事件数。默认值为 500 毫秒。

include.unknown.datatypes

false

当连接器遇到数据类型为 unknown 的字段时,指定连接器行为。默认行为是连接器省略更改事件中的字段,并记录警告。

如果您希望更改事件包含字段的不透明二进制表示,请将此属性设置为 true。这允许消费者解码字段。您可以通过设置 二进制处理模式 属性来控制确切的表示。

注意

include.unknown.datatypes 设为 true 时,消费者会面临向后兼容性问题。不仅可以在发行版本之间更改特定于数据库的二进制表示,但是如果 Debezium 最终支持数据类型,则数据类型将在逻辑类型中发送下游,这需要用户调整。通常,当遇到不支持的数据类型时,请创建一个功能请求,以便可以添加支持。

database.initial.statements

没有默认值

在建立 JDBC 连接时,连接器执行的分号分隔的 SQL 语句列表。要将分号用作字符而不是分隔符,请指定两个连续分号 ;;

连接器可以自行决定建立 JDBC 连接。因此,此属性仅适用于配置会话参数,而不执行 DML 语句。

当创建用于读取事务日志的连接时,连接器不会执行这些语句。

status.update.interval.ms

10000

将复制连接状态更新发送到服务器的频率,以毫秒为单位。
属性还控制在数据库关闭时检查数据库状态来检测死连接的频率。

heartbeat.interval.ms

0

控制连接器将心跳信息发送到 Kafka 主题的频率。默认行为是连接器不发送心跳消息。

心跳消息可用于监控连接器是否接收来自数据库的更改事件。心跳消息可能会帮助减少连接器重启时需要重新更改事件的数量。要发送心跳消息,请将此属性设置为正整数,这表示心跳消息之间的毫秒数。

当数据库中有很多更新被跟踪,但只需要少量的更新与连接器捕获更改的表和模式相关,则需要心跳消息。在这种情况下,连接器会正常从数据库事务日志读取,但很少会向 Kafka 发出更改记录。这意味着没有偏移更新提交到 Kafka,连接器没有向数据库发送最新检索到的 LSN 的机会。数据库会保留 WAL 文件,其中包含连接器已处理的事件。发送心跳消息可让连接器向数据库发送最新检索到的 LSN,它允许数据库回收不再需要的 WAL 文件所使用的磁盘空间。

heartbeat.action.query

没有默认值

指定当连接器发送心跳消息时连接器在源数据库上执行的查询。

这可用于解决 WAL 磁盘空间消耗 中描述的情况,其中捕获来自与高流量数据库在同一主机上的低流量数据库的更改会阻止 Debezium 处理 WAL 记录,从而使用数据库确认 WAL 位置。要解决这种情况,请在 low-traffic 数据库中创建一个心跳表,并将此属性设置为插入该表的声明,例如:

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

,它允许连接器从低流量数据库接收更改并确认其 LSN,这可防止在数据库主机上无限的 WAL 增长。

schema.refresh.mode

columns_diff

指定触发表的内存中模式刷新的条件。

columns_diff 是最安全的模式。它确保内存模式始终与数据库表的 schema 同步。

columns_diff_exclude_unchanged_toast 指示连接器刷新内存模式缓存(如果对传入消息派生的 schema 存在,除非未更改的 TOASTable 数据完全帐户)。

如果有经常更新的表,则这个设置可能会显著提高连接器性能,这些表很少有更新一部分的数据。但是,如果表中丢弃了 TOASTable 列,则内存中模式可能会变得过时。

snapshot.delay.ms

没有默认值

连接器在连接器启动时执行快照前应等待的时间(以毫秒为单位)。如果您要在集群中启动多个连接器,此属性可用于避免快照中断,这可能会导致连接器重新平衡。

streaming.delay.ms

0

指定连接器在完成快照后延迟流过程启动的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后马上重启快照,但在流传输过程开始前。设置一个延迟值,它高于为 Kafka Connect worker 设置的 offset.flush.interval.ms 属性的值。

snapshot.fetch.size

10240

在快照过程中,连接器在行的批处理中读取表内容。此属性指定批处理中的最大行数。

slot.stream.params

没有默认值

要传递给配置的逻辑解码插件的参数的分号分隔列表。例如: add-tables=public.table,public.table2;include-lsn=true.

slot.max.retries

6

如果连接到复制插槽失败,这是连续尝试连接的最大数量。

slot.retry.delay.ms

10000 (10 秒)

连接器无法连接到复制插槽时在重试尝试之间等待的毫秒数。

unavailable.value.placeholder

__debezium_unavailable_value

指定连接器提供的恒定,表示原始值是一个未由数据库提供的值。如果 unavailable.value.placeholder 的设置以 hex: 前缀开头,则预期字符串的其余部分代表十六进制编码的 octets。如需更多信息,请参阅 粘贴值

provide.transaction.metadata

false

决定连接器是否生成带有事务边界的事件,并使用事务元数据增强更改事件。如果您希望连接器进行此操作,请指定 true。如需更多信息,请参阅 事务元数据

flush.lsn.source

true

确定连接器是否应该提交源 PostgreSQL 数据库中已处理记录的 LSN,以便可以删除 WAL 日志。如果您不希望连接器提交已处理记录的 LSN,请指定 false

注意

如果将此属性的值设置为 false,Debezium 不会确认 LSN。未能确认 LSN 可能会导致 WAL 日志的不受控制的增长,这会压力测试存储容量,并可能导致性能下降,甚至数据丢失。要维护正常的服务,如果将此属性设置为 false,您必须配置一些其他机制来提交 LSN。

retriable.restart.connector.wait.ms

10000 (10 秒)

在发生 Retriable 错误后重启连接器前等待的毫秒数。

skipped.operations

t

以逗号分隔的操作类型列表,您希望连接器在流传输过程中跳过。您可以将连接器配置为跳过以下类型的操作:

  • c (插入/创建)
  • u (update)
  • D (删除)
  • T (截断)

如果您不希望连接器跳过任何操作,请将值设为 none

signal.data.collection

没有默认值

用于向连接器发送信号的数据收集的完全限定名称。
使用以下格式指定集合名称:
<schemaName> . < tableName>

signal.enabled.channels

source

为连接器启用的信号通道名称列表。默认情况下,以下频道可用:

  • source
  • kafka
  • file
  • jmx

notification.enabled.channels

没有默认值

为连接器启用的通知频道名称列表。默认情况下,以下频道可用:

  • sink
  • log
  • jmx

incremental.snapshot.chunk.size

1024

连接器在增量快照块期间获取并读取的最大行数。增加块大小可提供更高的效率,因为快照会减少对更大大小的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为可在您的环境中提供最佳性能的值。

incremental.snapshot.watermarking.strategy

insert_insert

指定连接器在增量快照中使用的水位线机制,以重复数据删除事件,这些事件可能会被增量快照捕获,然后在流恢复后重新捕获。
您可以指定以下选项之一:

insert_insert
当您发送一个信号来启动增量快照时,对于 Debezium 在快照期间读取的每个块,它会将条目写入信号数据收集来记录信号,以打开快照窗口。快照完成后,Debezium 会插入第二个条目来记录窗口的关闭。
insert_delete
当您发送一个信号来启动增量快照时,对于 Debezium 读取的每个块,它会将单个条目写入信号数据收集,以记录信号来打开快照窗口。快照完成后,会删除此条目。不会为关闭快照窗口的信号创建条目。设置这个选项以防止快速增长信号数据收集。

xmin.fetch.interval.ms

0

XMIN 将从复制插槽读取的频率(以毫秒为单位)。XMIN 值提供新复制插槽可从中开始的位置的低限。默认值 0 可禁用跟踪 XMIN 跟踪。

topic.naming.strategy

io.debezium.schema.SchemaTopicNamingStrategy

应用于确定数据更改的主题名称、模式更改、事务、心跳事件等主题名称,默认为 SchemaTopicNamingStrategy

topic.delimiter

.

指定主题名称的分隔符,默认为

topic.cache.size

10000

在绑定的并发散列映射中保存主题名称的大小。此缓存将有助于确定与给定数据收集对应的主题名称。

topic.heartbeat.prefix

__debezium-heartbeat

控制连接器发送心跳消息的主题名称。主题名称具有此模式:

topic.heartbeat.prefix.topic.prefix

例如,如果主题前缀是 fulfillment,则默认主题名称为 __debezium-heartbeat.fulfillment

topic.transaction

transaction

控制连接器发送事务元数据消息的主题名称。主题名称具有此模式:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认的主题名称是 fulfillment.transaction

snapshot.max.threads

1

指定连接器在执行初始快照时使用的线程数量。要启用并行初始快照,请将 属性设置为大于 1 的值。在并行初始快照中,连接器同时处理多个表。

注意

当您启用并行初始快照时,执行每个表快照的线程可能需要不同的时间来完成它们的工作。如果一个表的快照需要比其他表的快照完成的时间要长得多,则线程已完成其工作闲置。在某些环境中,网络设备(如负载均衡器或防火墙)会终止闲置以延长的时间间隔的连接。快照完成后,连接器无法关闭连接,从而导致异常和不完整的快照,即使连接器成功传输所有快照数据。

如果您遇到这个问题,请将 snapshot.max.threads 的值恢复到 1,然后重试快照。

custom.metric.tags

没有默认值

通过添加提供上下文信息的元数据来定义自定义 MBean 对象名称的标签。指定以逗号分隔的键值对列表。每个键代表 MBean 对象名称的标签,对应的值代表键的值,例如
k1=v1,k2=v2

连接器将指定的标签附加到基础 MBean 对象名称。标签可帮助您组织和分类指标数据。您可以定义标签来标识特定的应用程序实例、环境、区域、版本等。如需更多信息,请参阅自定义 MBean 名称

errors.max.retries

-1

指定连接器如何在生成 Retriable 错误的操作后响应,如连接错误。
设置以下选项之一:

-1
无限制。无论之前失败的数量如何,连接器总是自动重启,并重试操作。
0
disabled。连接器会立即失败,永远不会重试操作。重启连接器需要用户干预。
> 0
连接器会自动重启,直到达到指定的最大重试次数。下一次失败后,连接器会停止,用户需要干预才能重启它。

database.query.timeout.ms

600000 (10 分钟)

指定连接器等待查询完成的时间(以毫秒为单位)。将值设为 0 (零)以删除超时限制。

透传 PostgreSQL 连接器配置属性

连接器支持 通过传递 属性,使 Debezium 指定自定义配置选项来微调 Apache Kafka producer 和消费者的行为。有关 Kafka 生成者和消费者的完整配置属性范围的详情,请参考 Kafka 文档

用于通过属性配置 PostgreSQL 连接器如何与 Kafka 信号主题交互

Debezium 提供了一组 signal.* 属性,用于控制连接器如何与 Kafka 信号主题进行交互。

下表描述了 Kafka 信号 属性。

表 2.160. Kafka 信号配置属性
属性默认描述

signal.kafka.topic

<topic.prefix>-signal

连接器监控用于临时信号的 Kafka 主题的名称。

注意

如果禁用 自动主题创建,您必须手动创建所需的信号主题。需要信号主题来保留信号顺序。信号主题必须具有单个分区。

signal.kafka.groupId

kafka-signal

Kafka 用户使用的组 ID 的名称。

signal.kafka.bootstrap.servers

没有默认值

连接器用来建立到 Kafka 集群的初始连接的主机和端口对列表。每个对引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

signal.kafka.poll.timeout.ms

100

整数值,用于指定连接器在轮询信号时等待的最大毫秒数。

为信号频道配置 Kafka 消费者客户端的属性

Debezium 连接器提供信号 Kafka 使用者的直通配置。透传信号属性以 signals.consumer.* 前缀开始。例如,连接器将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 使用者。

Debezium 在将属性传递给 Kafka 信号消费者前从属性中剥离前缀。

用于配置 PostgreSQL 连接器接收器通知频道的直通属性

下表描述了可用于配置 Debezium sink 通知 频道的属性。

表 2.161. sink 通知配置属性
属性默认描述

notification.sink.topic.name

没有默认值

从 Debezium 接收通知的主题名称。当您将 notification.enabled.channels 属性配置为包含 sink 作为启用的通知频道之一时,需要此属性。

Debezium 连接器传递数据库驱动程序配置属性

Debezium 连接器提供数据库驱动程序的直通配置。透传数据库属性以前缀 driverPROFILE 开头。例如,连接器将 driver.foobar=false 等属性传递给 JDBC URL。

Debezium 在将属性传递给数据库驱动程序前从属性中剥离前缀。

2.6.7. 监控 Debezium PostgreSQL 连接器性能

Debezium PostgreSQL 连接器提供两种类型的指标,除了对 Zookeeper、Kafka 和 Kafka Connect 提供的 JMX 指标的支持外。

  • 快照指标 提供在执行快照时有关连接器操作的信息。
  • 流指标 在连接器捕获更改和流更改事件记录时提供有关连接器操作的信息。

Debezium 监控文档 提供了如何使用 JMX 公开这些指标的详细信息。

2.6.7.1. PostgreSQL 连接器快照和流 MBean 对象的自定义名称

Debezium 连接器通过 MBean 名称为连接器公开指标。这些指标(特定于每个连接器实例)提供有关连接器快照、流和架构历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为每个不同的连接器指标生成一个唯一的 MBean 名称。要查看连接器进程的指标,您可以将可观察性堆栈配置为监控其 MBean。但是,这些默认 MBean 名称取决于连接器配置,但配置更改可能会导致对 MBean 名称的更改。对 MBean 名称的更改会破坏连接器实例和 MBean 之间的链接,并破坏监控活动。在这种情况下,如果要恢复监控,您必须重新配置 observability 堆栈以使用新的 MBean 名称。

要防止监控 MBean 名称更改结果的中断,您可以配置自定义指标标签。您可以通过在连接器配置中添加 custom.metric.tags 属性来配置自定义指标。属性接受键值对,其中每个键代表 MBean 对象名称的标签,对应的值代表该标签的值。例如: k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称中。

为连接器配置 custom.metric.tags 属性后,您可以配置 observability 堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签,而不是可变 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了它如何构建 MBean 名称,或者连接器配置更改中的 topic.prefix,则指标集合不会中断,因为指标提取任务使用指定的标签模式来识别连接器。

使用自定义标签的更多优点是,您可以使用反映数据管道架构的标签,以便以适合您操作需求的方式组织指标。例如,您可以使用声明连接器活动类型、应用程序上下文或数据源的值来指定标签,如 db1-streaming-for-application-abc。如果您指定多个键值对,则所有指定的对都会附加到连接器的 MBean 名称中。

以下示例演示了标签如何修改默认的 MBean 名称。

例 2.47. 自定义标签如何修改连接器 MBean 名称

默认情况下,PostgreSQL 连接器使用以下 MBean 名称进行流传输指标:

debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>

如果将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 会生成以下自定义 MBean 名称:

debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

2.6.7.2. 在 PostgreSQL 数据库快照过程中监控 Debezium

MBeandebezium.postgres:type=connector-metrics,context=snapshot,server= <topic.prefix>

快照指标不会被公开,除非快照操作活跃,或者快照自上次连接器开始以来发生了某种情况。

下表列出了可用的快照指标。

属性类型描述

LastEvent

字符串

连接器读取的最后一个快照事件。

MilliSecondsSinceLastEvent

long

因为连接器已读取和处理最新事件,因此毫秒数。

TotalNumberOfEventsSeen

long

此连接器自上一次启动或重置以来看到的事件总数。

NumberOfEventsFiltered

long

已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。

CapturedTables

string[]

连接器捕获的表列表。

QueueTotalCapacity

int

在快照和主 Kafka Connect 循环之间传递事件的队列长度。

QueueRemainingCapacity

int

用于在快照和主 Kafka Connect 循环之间传递事件的队列的可用容量。

TotalTableCount

int

包括在快照中的表的总数。

RemainingTableCount

int

快照必须复制的表数。

SnapshotRunning

布尔值

快照是否已启动。

SnapshotPaused

布尔值

快照是否已暂停。

SnapshotAborted

布尔值

快照是中止的。

SnapshotCompleted

布尔值

快照是否完成。

SnapshotDurationInSeconds

long

快照到目前为止需要的秒数,即使未完成也是如此。也包括快照暂停的时间。

SnapshotPausedDurationInSeconds

long

快照暂停的秒数。如果快照暂停了多次,暂停的时间会增加。

RowsScanned

Map<String, Long>

包含快照中每个表扫描的行数的映射。在处理过程中,表会以递增方式添加到映射中。更新每 10,000 行扫描并完成表后。

MaxQueueSizeInBytes

long

队列的最大数量(以字节为单位)。如果将 max.queue.size.in.bytes 设置为正长值,则此指标可用。

CurrentQueueSizeInBytes

long

队列中记录的当前卷(以字节为单位)。

连接器还会在执行增量快照时提供以下附加快照指标:

属性类型描述

ChunkId

字符串

当前快照块的标识符。

ChunkFrom

字符串

定义当前块的主密钥集的低限。

ChunkTo

字符串

定义当前块的主密钥集的上限。

TableFrom

字符串

当前快照表的主密钥集合的低限。

TableTo

字符串

当前快照表的主键集合的上限。

2.6.7.3. 监控 Debezium PostgreSQL 连接器记录流

MBeandebezium.postgres:type=connector-metrics,context=streaming,server= <topic.prefix>

下表列出了可用的流指标。

属性类型描述

LastEvent

字符串

连接器读取的最后一个流事件。

MilliSecondsSinceLastEvent

long

因为连接器已读取和处理最新事件,因此毫秒数。

TotalNumberOfEventsSeen

long

源数据库自上次连接器启动后报告的数据更改事件总数,或者因为指标重置后。代表 Debezium 处理的数据更改工作负载。

TotalNumberOfCreateEventsSeen

long

自上次启动或指标重置以来,连接器处理的创建事件总数。

TotalNumberOfUpdateEventsSeen

long

自上次启动或指标重置以来,连接器处理的更新事件总数。

TotalNumberOfDeleteEventsSeen

long

自上次启动或指标重置后,连接器处理的删除事件总数。

NumberOfEventsFiltered

long

已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。

CapturedTables

string[]

连接器捕获的表列表。

QueueTotalCapacity

int

在流器和主 Kafka Connect 循环之间传递事件的队列长度。

QueueRemainingCapacity

int

用于在流程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。

Connected

布尔值

表示连接器当前是否已连接到数据库服务器的标记。

MilliSecondsBehindSource

long

最后一次更改事件的时间戳和连接器处理之间的毫秒数。这些值将纳入运行数据库服务器和连接器的机器上时钟之间的差别。

NumberOfCommittedTransactions

long

已提交的已处理事务的数量。

SourceEventPosition

Map<String, String>

最后一次接收的事件的协调。

LastTransactionId

字符串

最后一次处理的事务的事务标识符。

MaxQueueSizeInBytes

long

队列的最大数量(以字节为单位)。如果将 max.queue.size.in.bytes 设置为正长值,则此指标可用。

CurrentQueueSizeInBytes

long

队列中记录的当前卷(以字节为单位)。

2.6.8. Debezium PostgreSQL 连接器如何处理错误和问题

Debezium 是一个分布式系统,它捕获了多个上游数据库中的所有更改,永远不会丢失或丢失某个事件。当系统正常运行或被仔细管理时,Debezium 会在每次更改事件记录的发送 一次

重要

PostgreSQL 更改事件记录的确切交付只是一个技术预览功能。红帽不支持开发人员预览软件,且功能完整或生产就绪。不要将开发人员预览软件用于生产环境或关键业务工作负载。开发人员预览软件提供早期对即将推出的产品软件的访问权限,以将其包括在红帽产品产品中。客户可以使用此软件来测试功能并在开发过程中提供反馈。此软件可能没有任何文档,可以随时更改或删除,并且已获得有限的测试。红帽可能会提供在没有关联 SLA 的情况下对开发者预览软件提交反馈的方法。

有关 Red Hat Developer Preview 软件的支持范围的更多信息,请参阅 开发人员预览支持范围

如果出现错误,则系统不会丢失任何事件。但是,当从错误中恢复时,连接器可能会发出一些重复的更改事件。在这些常规情况下,Debezium (如 Kafka) 至少提供一次 更改事件。

详情包括在以下部分中:

配置和启动错误

在以下情况下,连接器会在尝试启动时失败,在日志中报告错误/exception,并停止运行:

  • 连接器的配置无效。
  • 连接器无法使用指定的连接参数成功连接到 PostgreSQL。
  • 连接器会从 PostgreSQL WAL 中的之前记录的位置(使用 LSN)中重启,PostgreSQL 不再有该历史记录可用。

在这些情况下,错误消息详细介绍了此问题,并可能是一个推荐的临时解决方案。在更正配置或解决 PostgreSQL 问题后,重启连接器。

PostgreSQL 变得不可用

当连接器运行时,它连接的 PostgreSQL 服务器可能会因为任意数量的原因不可用。如果发生这种情况,连接器会失败并显示错误并停止。当服务器再次可用时,重启连接器。

PostgreSQL 连接器以 PostgreSQL LSN 的形式存储最后一次处理的偏移量。在连接器重启并连接到服务器实例后,连接器与服务器通信,以继续从该特定偏移流。只要 Debezium 复制插槽保持不变,这个偏移就会可用。永远不会在主服务器中丢弃复制插槽,否则您将丢失数据。有关删除插槽的失败情况的详情,请参考下一部分。

PostgreSQL 15 或更早版本

在 PostgreSQL 15 或更早版本集群中,您只能在主服务器上创建逻辑复制插槽。因此,在 PostgreSQL 15 环境中,Debezium PostgreSQL 连接器只能从集群中的活跃主服务器捕获事件。在 PostgreSQL 15 集群中,主节点上的复制插槽不会传播到副本服务器。如果主服务器停机,您必须将备用节点提升为主节点。

PostgreSQL 16 或更高版本

当您将 Debezium 与 PostgreSQL 16 或更高版本搭配使用时,您可以在副本上创建逻辑复制插槽,但您必须手动将副本上的复制插槽与主服务器上的对应插槽同步。副本插槽的同步不是自动的。

PostgreSQL 17 或更高版本

当您将 Debezium 与 PostgreSQL 17 或更高版本搭配使用时,您可以在主服务器上配置复制插槽来进行自动故障转移,因此 Debezium 不会丢失任何更改事件。当为故障转移配置复制插槽时,PostgreSQL 会自动将复制插槽从主位置同步到副本,使 Debezium 在副本提升后继续从插槽读取,并成为新的主设备。

重要

将 Debezium 与 PostgreSQL 17 搭配使用,配置故障转移复制插槽的功能是技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅技术预览功能支持范围

注意

有些管理的 PostgreSQL 服务(如 AWS RDS 和 GCP CloudSQL)使用磁盘复制来实现复制备用。因此,这些服务会自动复制复制插槽,以便在故障切换后可用。

新的主设备必须配置为使用 pgoutput 插件,且必须包含您要捕获更改的数据库。然后,您只能将连接器指向新的服务器并重启连接器。

2.6.8.1. 从 PostgreSQL 17 集群中的故障中恢复

运行 PostgreSQL 17 或更高版本的环境支持使用故障转移复制插槽。如果在 PostgreSQL 17 或更高版本集群中发生失败,且待机配置了故障转移复制插槽,请完成以下步骤,使 Debezium 能够恢复捕获:

  1. 暂停 Debezium,直到您能够验证您有一个没有丢失数据的复制插槽。
  2. 在允许应用程序写入 主之前,重新创建 Debezium 复制插槽。如果您在重新创建复制插槽前允许应用程序写入新主,则应用程序可能会丢失更改事件。
  3. 重启连接器。
  4. 验证 Debezium 是否可以从复制插槽读取 LSN,以便在原始主失败前发生的任何更改。
    例如,在失败前从点恢复失败主的备份,并识别插槽中记录的最后一个位置。虽然检索备份数据可能非常困难,但检查备份提供了一种机制来可靠地确定 Debezium 是否消耗了所有更改。

2.6.8.2. 在 PostgreSQL 15 或更早的集群中失败后,从新的主服务器捕获数据

在 PostgreSQL 15 或更早版本的主服务器失败后,您可以决定将 Debezium 配置为从前一个副本服务器而不是原始的主服务器捕获数据。要启用 Debezium 从以前的副本服务器中捕获数据,请完成以下步骤。

流程

  1. 修复导致集群失败的条件。
  2. 在连接器停止时,更新连接器配置中的属性值,以反映新服务器的详情。例如,验证配置是否包含以下属性的正确值:

  3. 通过完成以下任务,将新的主服务器配置为使用 Debezium:

  4. 将备用 PostgreSQL 节点提升为主
  5. 重启连接器。
  6. 将快照模式设置为 always,并在新的主服务器上执行快照,以捕获服务器上数据的初始状态,并确保没有数据丢失。

Kafka Connect 进程正常停止

假设 Kafka Connect 正在以分布式模式运行,并且 Kafka Connect 进程安全停止。在关闭该进程前,Kafka Connect 会将进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程。新的连接器任务会准确开始处理之前任务停止的位置。处理过程中会有一个短暂的延迟,而连接器任务被安全停止并在新进程中重启。

Kafka Connect 进程崩溃

如果 Kafka Connector 进程意外停止,则它运行的连接器任务都会终止,而不会记录它们最近处理的偏移量。当 Kafka Connect 在分布式模式下运行时,Kafka Connect 会在其他进程中重启这些连接器任务。但是,PostgreSQL 连接器会从之前进程 记录 的最后偏移量中恢复。这意味着,新的替换任务可能会生成在崩溃前处理的一些相同的更改事件。重复事件的数量取决于偏移清除周期以及崩溃前数据更改的卷。

因为在从失败时恢复过程中可能会重复一些事件,所以消费者应始终预期一些重复的事件。Debezium 更改是幂等的,因此一系列事件始终产生相同的状态。

在每个更改事件记录中,Debezium 连接器会插入有关事件来源的特定于源的信息,包括 PostgreSQL 服务器事件的时间、服务器事务 ID 和写入事务更改的位置。消费者可以跟踪此信息,特别是 LSN,以确定事件是否重复。

Kafka 变得不可用

当连接器生成更改事件时,Kafka Connect 框架使用 Kafka producer API 在 Kafka 中记录这些事件。根据您在 Kafka Connect 配置中指定的频率,Kafka Connect 会定期记录这些更改事件中显示的最新偏移量。如果 Kafka 代理不可用,运行连接器的 Kafka Connect 进程会重复尝试重新连接到 Kafka 代理。换句话说,连接器任务会暂停,直到重新建立连接前,该连接会在连接器完全停止的地方恢复。

连接器在持续时间内停止

如果连接器安全停止,则数据库可以继续使用。所有更改都会记录在 PostgreSQL WAL 中。当连接器重启时,它会恢复其离开的位置流更改。也就是说,它会为所有在连接器停止期间进行的所有数据库更改生成更改事件记录。

正确配置的 Kafka 集群可以处理大量吞吐量。Kafka Connect 根据 Kafka 最佳实践编写,给定 Kafka Connect 连接器可以处理大量数据库更改事件。因此,在停止一段时间后,当 Debezium 连接器重启时,可能会捕获在停止期间进行的数据库更改。发生这种情况的速度取决于 Kafka 的功能和性能,以及对 PostgreSQL 中数据所做的更改卷。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.