搜索

第 4 章 JDBC 的 Debezium 连接器(开发者预览)

download PDF

Debezium JDBC 连接器是一个 Kafka Connect sink 连接器实现,它可以使用多个源主题的事件,然后使用 JDBC 驱动程序将这些事件写入关系数据库。这个连接器支持各种数据库划分,包括 Db2、MySQL、Oracle、PostgreSQL 和 SQL Server。

重要

Debezium JDBC 连接器是开发者预览软件。红帽以任何方式支持开发人员预览软件,且功能不完整或生产就绪。对于生产环境或关键业务工作负载,不要使用开发人员预览软件。开发人员预览软件可提前访问即将发布的产品软件。客户可以使用此软件测试功能并在开发过程中提供反馈。此软件可能会随时更改或删除,并收到有限的测试。

有关 Red Hat Developer Preview 软件支持范围的更多信息,请参阅 开发人员预览支持范围

4.1. Debezium JDBC 连接器的工作方式

Debezium JDBC 连接器是一个 Kafka Connect sink 连接器,因此需要 Kafka Connect 运行时。连接器会定期轮询订阅的 Kafka 主题,使用来自这些主题的事件,然后将事件写入配置的关联数据库。连接器使用 upsert 语义和基本模式演进来支持幂等写入操作。

Debezium JDBC 连接器提供以下功能:

4.1.1. Debezium JDBC 连接器如何使用复杂更改事件的描述

默认情况下,Debezium 源连接器会生成复杂的分层更改事件。当 Debezium 连接器与其他 JDBC sink 连接器实现一起使用时,您可能需要应用 ExtractNewRecordState 单一消息转换(SMT)来扁平化更改事件的有效负载,以便 sink 实现可以消耗它们。如果您运行 Debezium JDBC sink 连接器,则不需要部署 SMT,因为 Debezium sink 连接器可以直接使用原生 Debezium 更改事件,而无需使用转换。

当 JDBC sink 连接器使用来自 Debezium 源连接器的复杂更改事件时,它会从原始 插入 或更新 事件的 after 部分中提取值。当接收器(sink)连接器消耗 delete 事件时,不会查询事件有效负载的一部分。

重要

Debezium JDBC sink 连接器没有设计为从架构更改主题中读取。如果您的源连接器被配置为捕获模式更改,在 JDBC 连接器配置中,设置 topicstopics.regex 属性,以便连接器不会使用来自架构更改主题。

4.1.2. Debezium JDBC 连接器在交付时描述

Debezium JDBC sink 连接器保证了从 Kafka 主题使用的事件至少被处理一次。

4.1.3. Debezium JDBC 使用多个任务的描述

您可以在多个 Kafka Connect 任务中运行 Debezium JDBC sink 连接器。要在多个任务中运行连接器,请将 tasks.max 配置属性设置为您希望连接器使用的任务数量。Kafka Connect 运行时启动指定任务数量,并为每个任务运行一个连接器实例。多个任务可以通过从并行多个源主题读取和处理更改来提高性能。

4.1.4. Debezium JDBC 连接器数据和列类型映射的描述

要启用 Debezium JDBC sink 连接器,可以正确地将数据类型从入站消息字段映射到出站消息字段,连接器需要源事件中存在的每种字段的数据类型信息。连接器在不同的数据库间支持广泛的列类型映射。要正确从事件字段中的 类型 元数据转换 destination 列类型,连接器会应用为源数据库定义的数据类型映射。您可以通过在源连接器配置中设置 column.propagate.source.typedatatype.propagate.source.type 选项来提高连接器为列解析数据类型的方式。当您启用这些选项时,Debezium 包含额外的参数元数据,它可帮助 JDBC sink 连接器更准确地解析目的地列的数据类型。

要使 Debezium JDBC sink 连接器处理 Kafka 主题的事件,当存在时 Kafka 主题消息密钥必须是原语数据类型或 Struct。此外,源消息的有效负载必须是 Struct,它具有没有嵌套结构类型的扁平化结构,或者符合 Debezium 复杂分层结构的嵌套结构布局。

如果 Kafka 主题中的事件结构没有遵循这些规则,您必须实现自定义单个消息转换,将源事件的结构转换为可用格式。

4.1.5. 有关 Debezium JDBC 连接器如何处理源事件中主密钥的描述

默认情况下,Debezium JDBC sink 连接器不会将源事件中的任何字段转换为事件的主键。不幸的是,缺少 stable 主键可能会使事件处理复杂,具体取决于您的业务需求,或者 sink 连接器使用 upsert 语义。要定义一致的主密钥,您可以将连接器配置为使用下表中描述的主要密钥模式之一:

模式描述

none

创建表时没有指定主键字段。

kafka

主键由以下三列组成:

  • __connect_topic
  • __connect_partition
  • __connect_offset

这些列的值来自 Kafka 事件的协调。

record_key

主键由 Kafka 事件的密钥组成。

如果 primary 键是一个原语类型,请通过设置 primary.key.fields 属性指定要使用的列的名称。如果主键是 struct 类型,则 struct 中的字段将映射为主密钥的列。您可以使用 primary.key.fields 属性将主键限制为列的子集。

record_value

主键由 Kafka 事件的值组成。

因为 Kafka 事件的值始终是一个 Struct,所以默认情况下,值中的所有字段将变为主键的列。要使用主键中的字段子集,请设置 primary.key.fields 属性,以指定您要从中派生主键列的值中以逗号分隔的字段列表。

重要

如果将 primary.key.mode 设置为 kafka,并将 schema.evolution 设置为 basic,则一些数据库中断可能会抛出异常。当 dialect 将 STRING 数据类型映射到变量长度字符串数据类型时(如 TEXTCLOB ),而 dialect 不允许主键列具有未绑定长度。要避免这个问题,请在您的环境中应用以下设置:

  • 不要将 schema.evolution 设置为 basic
  • 事先创建数据库表和主密钥映射。

4.1.6. 将 Debezium JDBC 连接器配置为在消耗 DELETEtombstone 事件时删除行

当消耗 DELETEtombstone 事件时,Debezium JDBC sink 连接器可以删除目标数据库中的行。默认情况下,JDBC sink 连接器不会启用删除模式。

如果要连接器删除行,则必须在连接器配置中明确设置 delete.enabled=true。要使用此模式,还必须将 primary.key.fields 设置为 none 以外的值。上述配置是必要的,因为根据主密钥映射执行删除,因此如果目标表没有主键映射,则连接器无法删除行。

4.1.7. 启用连接器来执行幂等写入

Debezium JDBC sink 连接器可以执行幂等写入,使其可以重复重新执行相同的记录,而不更改最终数据库状态。

要启用连接器来执行幂等写入,您必须将连接器的 insert.mode 明确设置为 upsert根据指定的 主密钥是否已存在,upsert 操作将作为更新或 插入 来应用。

如果主键值已存在,则操作会更新行中的值。如果指定的主键值不存在,则 插入 会添加一个新行。

每个数据库划分都以不同的方式处理幂等写入,因为 upsert 操作没有 SQL 标准。下表显示了 Debezium 支持的 数据库的 upsert DML 语法:

dialectUPSERT 语法

Db2

MERGE …​

MySQL

在重复的密钥更新 …​ 上插入 …​

Oracle

MERGE …​

PostgreSQL

在冲突的 …​ DO UPDATE SET …​

SQL Server

MERGE …​

4.1.8. Debezium JDBC 连接器的 schema evolution 模式

您可以将以下模式演进模式与 Debezium JDBC sink 连接器一起使用:

模式描述

none

连接器不执行任何 DDL 模式演进。

基本的

连接器自动检测事件有效负载中的字段,但目标表中不存在。连接器更改目标表以添加新字段。

schema.evolution 设置为 basic 时,连接器会根据传入事件的结构自动创建或修改目标数据库表。

当第一次从主题收到事件时,目标表尚不存在,Debezium JDBC sink 连接器使用事件的密钥,或记录的 schema 结构来解决表的列结构。如果启用了 schema evolution,连接器会在将 DML 事件应用到目标表前准备并执行 CREATE TABLE SQL 语句。

当 Debezium JDBC 连接器从主题接收事件时,如果记录的模式结构与目标表的 schema 结构不同,则连接器使用事件的密钥或其架构结构来识别哪些列是新的列,且必须添加到数据库表中。如果启用了 schema evolution,连接器会在将 DML 事件应用到目标表前准备并执行 ALTER TABLE SQL 语句。由于更改列数据类型、丢弃列和调整主键可以被视为危险的操作,因此禁止连接器执行这些操作。

每个字段的 schema 确定列是否为 NULL 还是 NOT NULL。架构还定义每个列的默认值。如果连接器试图创建带有 nullability 设置的表或不需要的默认值,则必须在接收器连接器处理事件前手动创建表,或者调整相关字段的 schema。要调整 nullability 设置或默认值,您可以引入一个自定义单一消息转换来应用管道中的更改,或修改源数据库中定义的列状态。

字段的数据类型根据预定义的映射集合来解决。如需更多信息,请参阅 第 4.2 节 “Debezium JDBC 连接器如何映射数据类型”

重要

当您向目标数据库中已存在的表的事件结构引入新字段时,您必须将新字段定义为可选,或者字段必须在数据库 schema 中指定默认值。如果要从目标表中删除字段,请使用以下选项之一:

  • 手动删除字段。
  • 丢弃列。
  • 为字段分配默认值。
  • 将字段定义为可为空。

4.1.9. 指定选项来定义目标表和列名称的字母大小写

Debezium JDBC sink 连接器通过构建 DDL (schema 更改)或 DML (数据更改)在目标数据库上执行的 SQL 语句来消耗 Kafka 信息。默认情况下,连接器使用源主题的名称和事件字段作为目标表中的表和列名称的基础。构建的 SQL 不会自动使用引号限定标识符,以保留原始字符串的大小写情况。因此,默认情况下,目标数据库中表或列名称的文本大小完全取决于在未指定问题单时数据库如何处理名称字符串。

例如,如果目标数据库划分是 Oracle,且事件的主题为,则目标 表将创建为 ORDERS,因为 Oracle 在名称没有加引号时默认为大写名称。同样,如果目标数据库划分是 PostgreSQL,且事件的主题为 ORDERS,则目标表将以 订购 的形式创建,因为 PostgreSQL 在名称没有加引号时默认为小写名称。

要明确保留 Kafka 事件中存在的表和字段名称的大小写,在连接器配置中,将 quote.identifiers 属性的值设置为 true。当设定此选项时,当传入的事件用于名为 orders 的主题,目标数据库划分是 Oracle 时,连接器会创建一个名称 orders 的表,因为构建的 SQL 将表的名称定义为 "orders "。当连接器创建列名称时,启用引用会导致行为相同。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.