第 3 章 sink 连接器


Debezium 提供接收器连接器,可以使用 Apache Kafka 主题等源的事件。接收器(sink)连接器标准化数据格式,然后将事件数据保留给配置的 sink 存储库。其它系统、应用程序或用户可以从数据 sink 访问事件。

因为接收器连接器对消耗的事件数据应用一致的结构,所以从数据 sink 读取的下游应用程序可以更轻松地解释和处理这些数据。

目前,Debebe 提供以下接收器连接器:

3.1. JDBC 的 Debezium 连接器

Debezium JDBC 连接器是一个 Kafka Connect sink 连接器实现,它可以消耗来自多个源主题的事件,然后使用 JDBC 驱动程序将这些事件写入关系数据库。此连接器支持各种数据库 dialects,包括 Db2、MySQL、Oracle、PostgreSQL 和 SQL Server。

3.1.1. Debezium JDBC 连接器如何工作

Debezium JDBC 连接器是一个 Kafka Connect sink 连接器,因此需要 Kafka Connect 运行时。连接器定期轮询它订阅的 Kafka 主题,消耗这些主题的事件,然后将事件写入配置的关系数据库。连接器使用 upsert 语义和基本模式演进支持幂等写入操作。

Debezium JDBC 连接器提供以下功能:

3.1.1.1. Debezium JDBC 连接器如何使用复杂的更改事件的描述

默认情况下,Debezium 源连接器生成复杂的分层更改事件。当 Debezium 连接器与其他 JDBC 接收器连接器实现一起使用时,您可能需要应用 ExtractNewRecordState 单消息转换(SMT)来扁平化更改事件有效负载,以便它们可以被接收器实现使用。如果您运行 Debezium JDBC sink 连接器,则不需要部署 SMT,因为 Debezium sink 连接器可以直接消耗原生 Debezium 更改事件,而无需使用转换。

当 JDBC sink 连接器消耗 Debezium 源连接器的复杂更改事件时,它会从原始 插入 或更新 事件的 after 部分中提取值。当接收器连接器消耗 delete 事件时,不会查询事件有效负载的一部分。

重要

Debezium JDBC sink 连接器不是从 schema 更改主题读取的。如果您的源连接器被配置为捕获模式更改,在 JDBC 连接器配置中设置 topicstopics.regex 属性,以便连接器不会消耗模式更改主题。

3.1.1.2. 在-least-once 交付中 Debezium JDBC 连接器的描述

Debezium JDBC sink 连接器保证至少处理来自 Kafka 主题的事件。

3.1.1.3. Debezium JDBC 使用多个任务的描述

您可以在多个 Kafka Connect 任务中运行 Debezium JDBC sink 连接器。要在多个任务中运行连接器,请将 tasks.max 配置属性设置为您希望连接器使用的任务数量。Kafka Connect 运行时启动指定数量的任务,并为每个任务运行一个连接器实例。多个任务可以通过并行读取和处理来自多个源主题的更改来提高性能。

3.1.1.4. Debezium JDBC 连接器数据和列类型映射的描述

要启用 Debezium JDBC 接收器连接器,将数据类型从入站消息字段正确映射到出站消息字段,连接器需要有关源事件中存在的每种字段的数据类型的信息。连接器支持跨不同数据库 dialects 范围内的列类型映射。要从 event 字段中的类型元数据正确转换目的地列类型,连接器应用为源数据库定义的数据类型映射。您可以通过在源连接器配置中设置 column.propagate.source.typedatatype.propagate.source.type 选项来增强连接器为列解析数据类型的方式。当您启用这些选项时,Debebe 包含额外的参数元数据,它可帮助 JDBC sink 连接器更准确地解析目的地列的数据类型。

要使 Debezium JDBC sink 连接器处理 Kafka 主题的事件,存在 Kafka 主题消息密钥时,必须为原语数据类型或 Struct。另外,源消息的有效负载必须是 Struct,它有一个没有嵌套结构类型的扁平化结构,或符合 Debezium 的复杂分层结构的嵌套结构。

如果 Kafka 主题中事件的结构不遵循这些规则,您必须实施自定义单一消息转换,以将源事件的结构转换为可用的格式。

3.1.1.5. Debezium JDBC 连接器如何处理源事件中的主键的描述

默认情况下,Debebe JDBC sink 连接器不会将源事件中的任何字段转换为事件的主键。不幸的是,缺少稳定的主密钥可能会使事件处理复杂,具体取决于您的业务需求,或者当接收器连接器使用 upsert 语义时。要定义一致的主密钥,您可以将连接器配置为使用下表中描述的主密钥模式之一:

模式描述

none

创建表时没有指定主键字段。

kafka

主密钥由以下三列组成:

  • __connect_topic
  • __connect_partition
  • __connect_offset

这些列的值源自 Kafka 事件协调。

record_key

主密钥由 Kafka 事件的密钥组成。

如果主键是原语类型,请通过设置 primary.key.fields 属性来指定要使用的列名称。如果主键是 struct 类型,则 struct 中的字段将映射为主密钥的列。您可以使用 primary.key.fields 属性将主键限制为列的子集。

record_value

主键由 Kafka 事件的值组成。

因为 Kafka 事件的值始终是一个 Struct,因此默认情况下,值中的所有字段都是主键的列。要使用主键中的字段子集,请设置 primary.key.fields 属性,以指定您要从中派生主键列的值中以逗号分隔的字段列表。

record_header

主密钥由 Kafka 事件的标头组成。

Kafka 事件的标头可以包含多个标头,每个标头可以是 Struct 或 primitives 数据类型,连接器会对这些标头进行 Struct。因此,此 Struct 中的所有字段成为主密钥的列。要使用主键中的字段子集,请设置 primary.key.fields 属性,以指定您要从中派生主键列的值中以逗号分隔的字段列表。

重要

如果将 primary.key.mode 设置为 kafka,并将 schema.evolution 设置为 basic,则一些数据库分离可能会抛出异常。当检测将 STRING 数据类型映射为变量长度字符串数据类型(如 TEXTCLOB )时,会发生此例外情况,且 dialect 不允许主键列具有未绑定长度的长度。要避免这个问题,请在您的环境中应用以下设置:

  • 不要将 schema.evolution 设置为 basic
  • 事先创建数据库表和主要密钥映射。
重要

如果列映射到不被视为目标数据库主密钥的数据类型,则 primary.key.fields 中需要明确的列列表,不包括此类列。请参考您的特定数据库厂商的文档,了解哪些数据类型是允许的。

3.1.1.6. 配置 Debezium JDBC 连接器以在使用 DELETEtombstone 事件时删除行

当消耗 DELETEtombstone 事件时,Debebe JDBC sink 连接器可以删除目标数据库中的行。默认情况下,jdbc sink 连接器不启用删除模式。

如果要删除行,您必须在连接器配置中明确设置 delete.enabled=true。要使用此模式,还必须将 primary.key.fields 设置为没有值以外的值。前面的配置是必需的,因为根据主键映射删除,因此如果目标表没有主键映射,连接器将无法删除行。

3.1.1.7. 启用连接器执行幂等写入

Debezium JDBC sink 连接器可以执行幂等写入,使其重复重新执行相同的记录,而不会更改最终的数据库状态。

要启用连接器执行幂等写入,您必须将连接器的 insert.mode 明确设置为 upsert根据指定的 主密钥是否已存在,作为更新或 插入 应用 upsert 操作。

如果主键值已存在,则操作会更新行中的值。如果指定的主密钥值不存在,则 插入 会添加一个新行。

每个数据库清理都会以不同的方式处理幂等写入,因为没有用于 upsert 操作的 SQL 标准。下表显示了 Debezium 支持的数据库偏移的 upsert DML 语法:

dialectupsert 语法

Db2

MERGE …​

MySQL

在重复密钥更新 …​ 上插入 …​

Oracle

MERGE …​

PostgreSQL

INSERT …​ ON CONFLICT …​ DO UPDATE SET …​

SQL Server

MERGE …​

3.1.1.8. Debezium JDBC 连接器的 schema evolution 模式

您可以在 Debezium JDBC sink 连接器中使用以下架构演进模式:

模式描述

none

连接器不执行任何 DDL 模式演进。

基本的

连接器自动检测事件有效负载(但目标表中不存在)中的字段。连接器会更改目的地表来添加新字段。

当将 schema.evolution 设置为 basic 时,连接器会根据传入事件的结构自动创建或修改目标数据库表。

当首次从主题收到事件时,并且 destination 表尚不存在,Debezium JDBC sink 连接器使用事件的密钥,或者记录的模式结构来解析表的列结构。如果启用了架构演进,连接器先准备并执行 CREATE TABLE SQL 语句,然后再将 DML 事件应用到目标表。

当 Debezium JDBC 连接器从主题接收事件时,如果记录的 schema 结构与目标表的 schema 结构不同,连接器会使用事件的密钥或其架构结构来识别哪些列是新的,且必须添加到数据库表中。如果启用了架构演进,连接器先准备并执行 ALTER TABLE SQL 语句,然后再将 DML 事件应用到目标表。因为更改列数据类型、丢弃列和调整主键可被视为危险操作,所以连接器会被禁止执行这些操作。

每个字段的 schema 确定列是否为 NULLNOT NULL。模式还定义每个列的默认值。如果连接器尝试使用 nullability 设置或不需要的默认值创建表,则必须手动创建表、提前创建表,或者在 sink 连接器处理事件前调整相关字段的 schema。要调整 nullability 设置或默认值,您可以引入一个自定义单一消息转换,该转换在管道中应用更改,或修改源数据库中定义的列状态。

字段的数据类型根据预定义的映射集解析。如需更多信息,请参阅 第 3.1.2 节 “Debezium JDBC 连接器如何映射数据类型”

重要

当您向目标数据库中已存在的表的事件结构引入新字段时,您必须将新字段定义为可选,或者字段必须在数据库 schema 中指定默认值。如果要从目标表中删除字段,请使用以下选项之一:

  • 手动删除字段。
  • 丢弃列。
  • 为字段分配一个默认值。
  • 定义可为空的字段。

3.1.1.9. 指定用于定义目标表和列名称字母大小写的选项

Debezium JDBC 接收器连接器通过构建在目标数据库上执行的 DDL (schema 更改)或 DML (数据更改)SQL 语句来消耗 Kafka 信息。默认情况下,连接器使用源主题的名称和事件字段作为目标表中表和列名称的基础。构建的 SQL 不会自动 delimit 标识符,使用引号来保留原始字符串的大小写。因此,默认情况下,目标数据库中表或列名称的文本大小写完全取决于在未指定问题单时数据库如何处理名称字符串。

例如,如果目标数据库分离是 Oracle 且事件的主题为,则目的地表将创建为 ORDERS,因为当名称未加引号时,Oracle 默认为大写名称。同样,如果目标数据库分离是 PostgreSQL,并且事件的主题为 ORDERS,则目的地表将按照顺序创建,因为当名称未加引号时 PostgreSQL 默认为小写名称。

要明确保留 Kafka 事件中存在的表和字段名称的情况,在连接器配置中,将 quote.identifiers 属性的值设置为 true。当设置此选项时,当传入的事件针对一个名为 orders 的主题时,目标数据库 dialect 是 Oracle,连接器会创建一个带有名称 顺序的表表,因为构建的 SQL 将表名称定义为 "orders"。当连接器创建列名称时,启用引用会导致相同的行为。

连接空闲超时

Debezium 的 JDBC sink 连接器利用连接池来提高性能。连接池设计为建立初始连接集,保持指定数量的连接,并根据需要高效地分配与应用程序的连接。但是,当池中连接闲置时会出现一个质询,如果它们保持不活动状态超过数据库配置的空闲超时阈值,则可能会触发超时。

为了缓解闲置连接线程触发超时的可能性,连接池提供了一个定期验证每个连接活动的机制。此验证可确保连接保持活动状态,并防止数据库将它们标记为空闲。如果网络中断,如果 Debezium 尝试使用终止的连接,连接器会提示池生成新的连接。

默认情况下,Debezium JDBC sink 连接器不会执行闲置超时测试。但是,您可以通过设置 hibernate.c3p0.idle_test_period 属性,将连接器配置为请求池以指定间隔执行超时测试。例如:

超时配置示例

{
  "hibernate.c3p0.idle_test_period": "300"
}

Debezium JDBC sink 连接器使用 Hibernate C3P0 连接池。您可以通过在 hibernate.c3p0 configured 命名空间中设置属性来自定义 CP30 连接池。在前面的示例中,hibernate.c3p0.idle_test_period 属性的设置会将连接池配置为每 300 秒执行闲置超时测试。应用配置后,连接池每五分钟开始评估未使用的连接。

3.1.2. Debezium JDBC 连接器如何映射数据类型

Debezium JDBC sink 连接器使用逻辑或原语 type-mapping 系统解析列的数据类型。原语类型包括值,如整数、浮动点、布尔值、字符串和字节数。通常,这些类型只使用特定的 Kafka Connect Schema 类型代码来表示。逻辑数据类型比较复杂的类型,包括基于 Struct的类型(包括一组固定字段名称和模式)的值,或者以特定编码表示的值,如自 epoch 后的天数。

以下示例演示了原语和逻辑数据类型的代表结构:

原语字段模式

{
  "schema": {
    "type": "INT64"
  }
}

逻辑字段模式

[
  "schema": {
    "type": "INT64",
    "name": "org.apache.kafka.connect.data.Date"
  }
]

Kafka Connect 不是这些复杂逻辑类型的唯一源。实际上,Debezium 源连接器生成更改事件,它们带有类似逻辑类型的字段代表各种不同的数据类型,包括但不限于时间戳、日期甚至 JSON 数据。

Debezium JDBC sink 连接器使用这些原语和逻辑类型将列的类型解析为 JDBC SQL 代码,它代表列的类型。然后,底层 Hibernate 持久性框架使用这些 JDBC SQL 代码,将列的类型解析为逻辑数据类型,以供使用。下表演示了 Kafka Connect 和 JDBC SQL 类型之间的原语和逻辑映射,以及 Debezium 和 JDBC SQL 类型之间的逻辑映射。实际的最终列类型因每种数据库类型而异。

表 3.1. Kafka Connect Primitives 和 Column 数据类型之间的映射
原语类型JDBC SQL 类型

INT8

type.TINYINT

INT16

类型.SMALLINT

INT32

type.INTEGER

INT64

type.BIGINT

FLOAT32

类型.FLOAT

FLOAT64

类型.DOUBLE

布尔值

Types.BOOLEAN

字符串

type.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR

BYTES

type.VARBINARY

表 3.2. Kafka Connect 逻辑类型和列数据类型之间的映射
逻辑类型JDBC SQL 类型

org.apache.kafka.connect.data.Decimal

Types.DECIMAL

org.apache.kafka.connect.data.Date

Types.DATE

org.apache.kafka.connect.data.Time

Type.TIMESTAMP

org.apache.kafka.connect.data.Timestamp

Type.TIMESTAMP

表 3.3. Debezium 逻辑类型和列数据类型之间的映射
逻辑类型JDBC SQL 类型

io.debezium.time.Date

Types.DATE

io.debezium.time.Time

Type.TIMESTAMP

io.debezium.time.MicroTime

Type.TIMESTAMP

io.debezium.time.NanoTime

Type.TIMESTAMP

io.debezium.time.ZonedTime

Types.TIME_WITH_TIMEZONE

io.debezium.time.Timestamp

Type.TIMESTAMP

io.debezium.time.MicroTimestamp

Type.TIMESTAMP

io.debezium.time.NanoTimestamp

Type.TIMESTAMP

io.debezium.time.ZonedTimestamp

Types.TIMESTAMP_WITH_TIMEZONE

io.debezium.data.VariableScaleDecimal

类型.DOUBLE

重要

如果数据库不支持带有时区的时间或时间戳,则映射会在没有时区的情况下解析为对应的时间戳。

表 3.4. Debezium 特定逻辑类型和列数据类型之间的映射
逻辑类型MySQL SQL 类型PostgreSQL SQL 类型SQL Server SQL 类型

io.debezium.data.Bits

bit(n)

bit (n)bit varying

varbinary(n)

io.debezium.data.Enum

enum

type.VARCHAR

不适用

io.debezium.data.Json

json

json

不适用

io.debezium.data.EnumSet

set

不适用

不适用

io.debezium.time.Year

year(n)

不适用

不适用

io.debezium.time.MicroDuration

不适用

interval

不适用

io.debezium.data.Ltree

不适用

ltree

不适用

io.debezium.data.Uuid

不适用

uuid

不适用

io.debezium.data.Xml

不适用

xml

xml

除了上面的原语和逻辑映射外,如果更改事件的来源是 Debezium 源连接器,则列类型的解析及其长度、精度和规模可以通过启用列或数据类型传播来进一步影响。要强制实施传播,必须在源连接器配置中设置以下属性之一:

  • column.propagate.source.type
  • datatype.propagate.source.type

Debezium JDBC sink 连接器应用优先级更高的值。

例如,假设更改事件中包含以下字段模式:

启用列或数据类型传播的 Debezium 更改事件字段模式

{
  "schema": {
    "type": "INT8",
    "parameters": {
      "__debezium.source.column.type": "TINYINT",
      "__debezium.source.column.length": "1"
    }
  }
}

在前面的示例中,如果没有设置 schema 参数,Debezium JDBC sink 连接器会将此字段映射到类型 Types.SMALLINTType.SMALLINT 可以有不同的逻辑数据库类型,具体取决于数据库问题。对于 MySQL,示例中的列类型转换为没有指定长度的 TINYINT 列类型。如果为源连接器启用了列或数据类型传播,Debezium JDBC sink 连接器使用映射信息来优化数据类型映射过程,并使用类型 TINYINT (1) 创建一个列。

注意

通常,当相同类型的数据库用于源和接收器数据库时,使用列或数据类型传播的效果会很高。

3.1.3. 部署 Debezium JDBC 连接器

要部署 Debezium JDBC 连接器,您要安装 Debezium JDBC 连接器存档,配置连接器,并通过将配置添加到 Kafka Connect 来启动连接器。

先决条件

流程

  1. 下载 Debezium JDBC 连接器插件存档
  2. 将文件提取到 Kafka Connect 环境中。
  3. (可选)从 Maven Central 下载 JDBC 驱动程序,并将下载的驱动程序文件提取到包含 JDBC sink 连接器 JAR 文件的目录。

    注意

    JDBC sink 连接器不包含 Oracle 和 Db2 的驱动程序。您必须下载驱动程序并手动安装它们。

  4. 将驱动程序 JAR 文件添加到安装了 JDBC sink 连接器的路径。
  5. 确保安装 JDBC sink 连接器的路径是 Kafka Connect plugin.path 的一部分。
  6. 重启 Kafka Connect 进程以获取新的 JAR 文件。

3.1.3.1. Debezium JDBC 连接器配置

通常,您可以通过提交指定连接器配置属性的 JSON 请求来注册 Debezium JDBC 连接器。以下示例显示了注册 Debezium JDBC sink 连接器实例的 JSON 请求,该连接器使用最常见的配置设置使用来自名为 order 的主题的事件:

示例: Debezium JDBC 连接器配置

{
    "name": "jdbc-connector",  1
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",  2
        "tasks.max": "1",  3
        "connection.url": "jdbc:postgresql://localhost/db",  4
        "connection.username": "pguser",  5
        "connection.password": "pgpassword",  6
        "insert.mode": "upsert",  7
        "delete.enabled": "true",  8
        "primary.key.mode": "record_key",  9
        "schema.evolution": "basic",  10
        "database.time_zone": "UTC",  11
        "topics": "orders" 12
    }
}

JDBC 连接器配置设置的描述

描述

1

使用 Kafka Connect 服务注册时分配给连接器的名称。

2

JDBC sink 连接器类的名称。

3

为此连接器创建的最大任务数量。

4

连接器用来连接到它写入的 sink 数据库的 JDBC URL。

5

用于身份验证的数据库用户的名称。

6

用于身份验证的数据库用户的密码。

7

连接器使用的 insert.mode

8

启用删除数据库中的记录。如需更多信息,请参阅 delete.enabled 配置属性。

9

指定用于解析主键列的方法。如需更多信息,请参阅 primary.key.mode 配置属性。

10

启用连接器来改变目标数据库的模式。如需更多信息,请参阅 schema.evolution 配置属性。

11

指定编写 temporal 字段类型时使用的时区。

12

要使用的主题列表,用逗号分开。

有关您可以为 Debezium JDBC 连接器设置的配置属性的完整列表,请参阅 JDBC 连接器属性

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务记录了配置并启动执行以下操作的接收器连接器任务:

  • 连接到数据库。
  • 使用订阅的 Kafka 主题的事件。
  • 将事件写入配置的数据库。

3.1.4. Debezium JDBC 连接器配置属性的描述

Debezium JDBC sink 连接器有几个配置属性,可用于实现满足您需求的连接器行为。许多属性具有默认值。有关属性的信息按如下方式进行组织:

表 3.5. JDBC 连接器通用属性
属性默认描述

name

没有默认值

连接器的唯一名称。如果您在注册连接器时尝试重复使用此名称,失败结果。所有 Kafka Connect 连接器都需要此属性。

connector.class

没有默认值

连接器的 Java 类的名称。对于 Debezium JDBC 连接器,请指定 io.debezium.connector.jdbc.JdbcSinkConnector 的值。

tasks.max

1

用于此连接器的最大任务数量。

topics

没有默认值

要使用的主题列表,用逗号分开。不要将此属性与 topics.regex 属性结合使用。

topics.regex

没有默认值

指定要使用的主题的正则表达式。在内部,正则表达式被编译到 java.util.regex.Pattern。不要将此属性与 topics 属性结合使用。

表 3.6. JDBC 连接器连接属性
属性默认描述

connection.provider

org.hibernate.c3p0.internal.C3P0ConnectionProvider

要使用的连接供应商实施。

connection.url

没有默认值

用于连接到数据库的 JDBC 连接 URL。

connection.username

没有默认值

连接器用来连接到数据库的数据库用户帐户名称。

connection.password

没有默认值

连接器用来连接到数据库的密码。

connection.pool.min_size

5

指定池中的最小连接数。

connection.pool.max_size

32

指定池维护的最大并发连接数。

connection.pool.acquire_increment

32

指定当连接池超过其最大大小时连接器尝试获取的连接数量。

connection.pool.timeout

1800

指定在丢弃未使用连接前保留了未使用的连接的秒数。

表 3.7. JDBC 连接器运行时属性
属性默认描述

database.time_zone

UTC

指定插入 JDBC 临时值时使用的时区。

delete.enabled

false

指定连接器处理 DELETEtombstone 事件,并从数据库中删除对应的行。使用这个选项时,您需要将 primary.key.mode 设置为 record.key

truncate.enabled

false

指定连接器是否处理 TRUNCATE 事件,并从数据库截断对应的表。

注意

虽然 从版本 9.7 开始,Db2 中提供了对 TRUNCATE 语句的支持,但目前 JDBC 连接器无法处理 Db2 连接器发出的标准 TRUNCATE 事件。

为确保 JDBC 连接器可以处理从 Db2 收到的 TRUNCATE 事件,方法是使用标准 TRUNCATE TABLE 语句的替代方式来执行截断。例如:

ALTER TABLE & lt;table_name& gt; ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE

提交上述查询的用户帐户需要截断表中的 ALTER 特权。

insert.mode

insert

指定用于将事件插入到数据库中的策略。可用的选项如下:

insert
指定所有事件都应构建基于 INSERT的 SQL 语句。仅在没有使用主键时使用这个选项,或者当您确定没有更新到现有主键值时。
update
指定所有事件都应构建基于 UPDATE的 SQL 语句。只有在确定连接器只接收应用到现有行的事件时才使用这个选项。
upsert
指定连接器使用 upsert 语义将事件添加到表中。也就是说,如果主密钥不存在,连接器将执行 INSERT 操作,如果密钥存在,连接器将执行 UPDATE 操作。需要幂等写入时,连接器应配置为使用此选项。

primary.key.mode

none

指定连接器如何解析来自事件的主键列。

none
指定没有创建主键列。
kafka

指定连接器使用 Kafka 协调作为主键列。键协调由事件的主题名称、分区和偏移定义,并使用以下名称映射到列:

  • __connect_topic
  • __connect_partition
  • __connect_offset
record_key
指定主键列来自事件的记录键。如果记录键是原语类型,则需要 primary.key.fields 属性来指定主键列的名称。如果记录键是一个 struct 类型,primary.key.fields 属性是可选的,可用于将事件键中的列的子集指定为表的主键。
record_value
指定主键列来自事件的值。您可以设置 primary.key.fields 属性,将主键定义为来自事件值的字段子集;否则,将所有字段都默认使用。

primary.key.fields

没有默认值

要从中派生主键列或以逗号分隔的字段列表。

primary.key.mode 设为 record_key 且事件的密钥是 primitive 类型时,这个属性预期会指定用于键的列名称。

primary.key.mode 设为 record_key 时,使用非原始键或 record_value,此属性指定来自 key 或 value 的以逗号分隔的字段名称列表。如果 primary.key.mode 设为 record_key,使用非原始键或 record_value,且未指定此属性,则连接器从记录键或记录值的所有字段生成主键,具体取决于指定的模式。

quote.identifiers

false

指定生成的 SQL 语句是否使用引号来分隔符表和列名称。详情请查看 第 3.1.1.9 节 “指定用于定义目标表和列名称字母大小写的选项” 部分。

schema.evolution

none

指定连接器如何演变目的地表模式。如需更多信息,请参阅 第 3.1.1.8 节 “Debezium JDBC 连接器的 schema evolution 模式”。可用的选项如下:

none
指定连接器不会演进 destination 模式。
基本的
指定发生基本演变。连接器通过将传入事件的记录模式与数据库表结构进行比较,为表添加缺少的列。

table.name.format

${topic}

指定连接器用来构造目标表名称的字符串模式。
当属性设置为默认值 ${topic} 时,连接器从 Kafka 读取事件后,它会将事件记录写入与源主题名称匹配的目的地表中。

您还可以将此属性配置为从传入事件记录中的特定字段提取值,然后使用这些值动态生成目标表的名称。此功能从消息源中的值生成表名称,否则需要使用自定义 Kafka Connect 单消息转换(SMT)。

要将属性配置为动态生成目标表的名称,请将其值设为 ${source._field_} 等模式。当您指定这种类型的模式时,连接器从 Debezium 更改事件 的源 块中提取值,然后使用这些值来构建表名称。例如,您可以将 属性的值设置为模式 ${source.schema}_${source.table}。基于此模式,如果连接器读取 source 块中的 schema 字段包含值、usertable 字段的事件,则连接器将事件记录写入名为 user_ tab 的表。

dialect.postgres.postgis.schema

public

指定安装 PostgreSQL PostGIS 扩展的架构名称。默认为 public ;但是,如果在不同的架构中安装了 PostGIS 扩展,则应使用此属性来指定备用模式名称。

dialect.sqlserver.identity.insert

false

指定连接器是否在 INSERTUPSERT 操作前自动将 IDENTITY_INSERT 设置为 SQL Server 表的身份列,然后在操作后立即取消设置它。当默认设置(false)生效时,INSERTUPSERT 操作到表的 IDENTITY 列中会导致 SQL 异常。

batch.size

500

指定尝试一起将批处理到目标表中的记录数量。

注意

请注意,如果您在 Connect worker 属性中将 consumer.max.poll.records 设置为一个小于 batch.size 的值,则批处理将受 consumer.max.poll.records 的上限,并且不会达到所需的 batch.size。您还可以使用连接器配置中的 consumer.override. max.poll.records 来配置连接器的底层消费者的 max.poll.records。

use.reduction.buffer

false

指定是否启用 Debezium JDBC 连接器的缩减缓冲。

选择以下设置之一:

false
(默认)连接器会写入消耗 Kafka 的每个更改事件,作为单独的逻辑 SQL 更改。
true
连接器使用缩减缓冲来减少更改事件,然后再将其写入 sink 数据库。也就是说,如果多个事件引用同一主键,连接器会整合 SQL 查询并只写入一个逻辑 SQL 更改,具体取决于最近偏移记录中报告的行状态。
选择这个选项来减少目标数据库上的 SQL 负载。

要在启用缩减缓冲区时优化 PostgreSQL sink 数据库中的查询处理,还必须通过将 reWriteBatchedInserts 参数添加到 JDBC 连接 URL 来启用批处理查询。

field.include.list

空字符串

一个可选的、以逗号分隔的字段名称列表,与要从 change 事件值包含的字段的完全限定域名匹配。字段的完全限定域名格式为 fieldNametopicName:_fieldName_

如果您在配置中包含此属性,请不要设置 field.exclude.list 属性。

field.exclude.list

空字符串

一个可选的、以逗号分隔的字段名称列表,与要从更改事件值中排除的字段名称匹配。字段的完全限定域名格式为 fieldNametopicName:_fieldName_

如果您在配置中包含此属性,请不要设置 field.include.list 属性。

表 3.8. JDBC 连接器可扩展属性
属性默认描述

column.naming.strategy

i.d.c.j.n.DefaultColumnNamingStrategy

指定 ColumnNamingStrategy 实现的完全限定类名称,连接器用于从事件字段名称解析列名称。

默认情况下,连接器使用字段名称作为列名称。

table.naming.strategy

i.d.c.j.n.DefaultTableNamingStrategy

指定 表NamingStrategy 实现的完全限定类名称,连接器用于解析传入事件主题名称中的表名称。

默认行为是:

  • table.name.format 配置属性中的 ${topic} 占位符替换为事件的主题。
  • 将点(.)替换为下划线(_)来清理表名称。

JDBC 连接器 hibernate 透传属性

Kafka Connect 支持 passthrough 配置,允许您通过直接从连接器配置传递某些属性来修改底层系统的行为。默认情况下,一些 Hibernate 属性通过 JDBC 连接器 连接属性(如 connection.url,connection.username, 和 connection.pool unix_size)和通过连接器的 运行时属性 (如 database.time_zone,quote.identifiers)公开。

如果要自定义其他 Hibernate 行为,您可以通过在连接器配置中添加使用 hibernate decisions 命名空间的属性来利用 passthrough 机制。例如,要帮助 Hibernate 解析目标数据库的类型和版本,您可以添加 hibernate.dialect 属性并将其设置为数据库的完全限定域名,例如 org.hibernate.dialect.MariaDBDialect.

3.1.5. JDBC 连接器常见问题

ExtractNewRecordState 单一消息转换 是否需要
否,实际上这是 Debezium JDBC 连接器与其他实施的不同因素之一。虽然连接器能够像其竞争者一样消耗扁平化的事件,但它还可使 Debezium 的复杂更改事件结构原生进行,而无需任何特定类型的转换。
如果列的类型已更改,或者列被重命名或丢弃,这是否由架构演进处理?
否,Debezium JDBC 连接器不会对现有列进行任何更改。连接器支持的 schema 演进非常基本的。它只是将事件结构中的字段与表的列列表进行比较,然后添加表中尚未定义为列的任何字段。如果列的类型或默认值更改,连接器不会在目标数据库中调整它们。如果重新命名了列,旧的列保留为原样,连接器会将带有新名称的列附加到表中;但是,在旧列中带有数据的现有行保持不变。应手动处理这些类型的架构更改。
如果列的类型没有解析为我想要的类型,如何强制映射到不同的数据类型?
Debezium JDBC 连接器使用复杂的类型系统来解析列的数据类型。有关此类型系统如何将特定字段的 schema 定义解析为 JDBC 类型的详情,请参考 第 3.1.1.4 节 “Debezium JDBC 连接器数据和列类型映射的描述” 部分。如果要应用不同的数据类型映射,请手动定义表来显式获取首选列类型。
如何在不更改 Kafka 主题名称的情况下为表名称指定前缀或后缀?
要在目标表名称中添加前缀或后缀,请调整 table.name.format 连接器配置属性以应用您想要的前缀或后缀。例如,若要使用 jdbc_ 前缀所有表名称,请使用值 jdbc_${topic} 指定 table.name.format 配置属性。如果连接器订阅了名为 orders 的主题,则生成的表将创建为 jdbc_orders
为什么一些列自动加引号,即使标识符引用未启用?
在某些情况下,特定的列或表名称可能会被明确加引号,即使未启用 quote.identifiers。当列或表名称以 开头或使用其他被视为非法语法的特定约定时,这通常是必要的。例如,当 primary.key.mode 设为 kafka 时,如果列的名称加引号,一些数据库只允许列名称以下划线开头。引用行为是特定于不同的数据库,不同的数据库类型不同。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.