第 3 章 sink 连接器
Debezium 提供接收器连接器,可以使用 Apache Kafka 主题等源的事件。接收器(sink)连接器标准化数据格式,然后将事件数据保留给配置的 sink 存储库。其它系统、应用程序或用户可以从数据 sink 访问事件。
因为接收器连接器对消耗的事件数据应用一致的结构,所以从数据 sink 读取的下游应用程序可以更轻松地解释和处理这些数据。
目前,Debebe 提供以下接收器连接器:
3.1. JDBC 的 Debezium 连接器 复制链接链接已复制到粘贴板!
Debezium JDBC 连接器是一个 Kafka Connect sink 连接器实现,它可以消耗来自多个源主题的事件,然后使用 JDBC 驱动程序将这些事件写入关系数据库。此连接器支持各种数据库 dialects,包括 Db2、MySQL、Oracle、PostgreSQL 和 SQL Server。
3.1.1. Debezium JDBC 连接器如何工作 复制链接链接已复制到粘贴板!
Debezium JDBC 连接器是一个 Kafka Connect sink 连接器,因此需要 Kafka Connect 运行时。连接器定期轮询它订阅的 Kafka 主题,消耗这些主题的事件,然后将事件写入配置的关系数据库。连接器使用 upsert 语义和基本模式演进支持幂等写入操作。
Debezium JDBC 连接器提供以下功能:
3.1.1.1. Debezium JDBC 连接器如何使用复杂的更改事件的描述 复制链接链接已复制到粘贴板!
默认情况下,Debezium 源连接器生成复杂的分层更改事件。当 Debezium 连接器与其他 JDBC 接收器连接器实现一起使用时,您可能需要应用 ExtractNewRecordState
单消息转换(SMT)来扁平化更改事件有效负载,以便它们可以被接收器实现使用。如果您运行 Debezium JDBC sink 连接器,则不需要部署 SMT,因为 Debezium sink 连接器可以直接消耗原生 Debezium 更改事件,而无需使用转换。
当 JDBC sink 连接器消耗 Debezium 源连接器的复杂更改事件时,它会从原始 插入
或更新
事件的 after
部分中提取值。当接收器连接器消耗 delete 事件时,不会查询事件有效负载的一部分。
Debezium JDBC sink 连接器不是从 schema 更改主题读取的。如果您的源连接器被配置为捕获模式更改,在 JDBC 连接器配置中设置 topics
或 topics.regex
属性,以便连接器不会消耗模式更改主题。
3.1.1.2. 在-least-once 交付中 Debezium JDBC 连接器的描述 复制链接链接已复制到粘贴板!
Debezium JDBC sink 连接器保证至少处理来自 Kafka 主题的事件。
3.1.1.3. Debezium JDBC 使用多个任务的描述 复制链接链接已复制到粘贴板!
您可以在多个 Kafka Connect 任务中运行 Debezium JDBC sink 连接器。要在多个任务中运行连接器,请将 tasks.max
配置属性设置为您希望连接器使用的任务数量。Kafka Connect 运行时启动指定数量的任务,并为每个任务运行一个连接器实例。多个任务可以通过并行读取和处理来自多个源主题的更改来提高性能。
3.1.1.4. Debezium JDBC 连接器数据和列类型映射的描述 复制链接链接已复制到粘贴板!
要启用 Debezium JDBC 接收器连接器,将数据类型从入站消息字段正确映射到出站消息字段,连接器需要有关源事件中存在的每种字段的数据类型的信息。连接器支持跨不同数据库 dialects 范围内的列类型映射。要从 event 字段中的类型元数据正确转换目的地列类型,连接器应用为源数据库定义的数据类型映射。您可以通过在源连接器配置中设置
column.propagate.source.type
或 datatype.propagate.source.type
选项来增强连接器为列解析数据类型的方式。当您启用这些选项时,Debebe 包含额外的参数元数据,它可帮助 JDBC sink 连接器更准确地解析目的地列的数据类型。
要使 Debezium JDBC sink 连接器处理 Kafka 主题的事件,存在 Kafka 主题消息密钥时,必须为原语数据类型或 Struct
。另外,源消息的有效负载必须是 Struct
,它有一个没有嵌套结构类型的扁平化结构,或符合 Debezium 的复杂分层结构的嵌套结构。
如果 Kafka 主题中事件的结构不遵循这些规则,您必须实施自定义单一消息转换,以将源事件的结构转换为可用的格式。
3.1.1.5. Debezium JDBC 连接器如何处理源事件中的主键的描述 复制链接链接已复制到粘贴板!
默认情况下,Debebe JDBC sink 连接器不会将源事件中的任何字段转换为事件的主键。不幸的是,缺少稳定的主密钥可能会使事件处理复杂,具体取决于您的业务需求,或者当接收器连接器使用 upsert 语义时。要定义一致的主密钥,您可以将连接器配置为使用下表中描述的主密钥模式之一:
模式 | 描述 |
---|---|
| 创建表时没有指定主键字段。 |
| 主密钥由以下三列组成:
这些列的值源自 Kafka 事件协调。 |
|
主密钥由 Kafka 事件的密钥组成。 |
|
主键由 Kafka 事件的值组成。 |
|
主密钥由 Kafka 事件的标头组成。 |
如果将 primary.key.mode
设置为 kafka
,并将 schema.evolution
设置为 basic
,则一些数据库分离可能会抛出异常。当检测将 STRING
数据类型映射为变量长度字符串数据类型(如 TEXT
或 CLOB
)时,会发生此例外情况,且 dialect 不允许主键列具有未绑定长度的长度。要避免这个问题,请在您的环境中应用以下设置:
-
不要将
schema.evolution
设置为basic
。 - 事先创建数据库表和主要密钥映射。
如果列映射到不被视为目标数据库主密钥的数据类型,则 primary.key.fields
中需要明确的列列表,不包括此类列。请参考您的特定数据库厂商的文档,了解哪些数据类型是允许的。
当消耗 DELETE
或 tombstone 事件时,Debebe JDBC sink 连接器可以删除目标数据库中的行。默认情况下,jdbc sink 连接器不启用删除模式。
如果要删除行,您必须在连接器配置中明确设置 delete.enabled=true
。要使用此模式,还必须将 primary.key.fields
设置为没有值以外的值。前面的配置是必需的,因为根据主键映射删除,因此如果目标表没有主键映射,连接器将无法删除行。
3.1.1.7. 启用连接器执行幂等写入 复制链接链接已复制到粘贴板!
Debezium JDBC sink 连接器可以执行幂等写入,使其重复重新执行相同的记录,而不会更改最终的数据库状态。
要启用连接器执行幂等写入,您必须将连接器的 insert.mode
明确设置为 upsert
。根据指定的
主密钥是否已存在,作为更新或 插入
应用 upsert 操作。
如果主键值已存在,则操作会更新行中的值。如果指定的主密钥值不存在,则 插入
会添加一个新行。
每个数据库清理都会以不同的方式处理幂等写入,因为没有用于 upsert 操作的 SQL 标准。下表显示了 Debezium 支持的数据库偏移的 upsert
DML 语法:
dialect | upsert 语法 |
---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
3.1.1.8. Debezium JDBC 连接器的 schema evolution 模式 复制链接链接已复制到粘贴板!
您可以在 Debezium JDBC sink 连接器中使用以下架构演进模式:
模式 | 描述 |
---|---|
| 连接器不执行任何 DDL 模式演进。 |
| 连接器自动检测事件有效负载(但目标表中不存在)中的字段。连接器会更改目的地表来添加新字段。 |
当将 schema.evolution
设置为 basic
时,连接器会根据传入事件的结构自动创建或修改目标数据库表。
当首次从主题收到事件时,并且 destination 表尚不存在,Debezium JDBC sink 连接器使用事件的密钥,或者记录的模式结构来解析表的列结构。如果启用了架构演进,连接器先准备并执行 CREATE TABLE
SQL 语句,然后再将 DML 事件应用到目标表。
当 Debezium JDBC 连接器从主题接收事件时,如果记录的 schema 结构与目标表的 schema 结构不同,连接器会使用事件的密钥或其架构结构来识别哪些列是新的,且必须添加到数据库表中。如果启用了架构演进,连接器先准备并执行 ALTER TABLE
SQL 语句,然后再将 DML 事件应用到目标表。因为更改列数据类型、丢弃列和调整主键可被视为危险操作,所以连接器会被禁止执行这些操作。
每个字段的 schema 确定列是否为 NULL
或 NOT NULL
。模式还定义每个列的默认值。如果连接器尝试使用 nullability 设置或不需要的默认值创建表,则必须手动创建表、提前创建表,或者在 sink 连接器处理事件前调整相关字段的 schema。要调整 nullability 设置或默认值,您可以引入一个自定义单一消息转换,该转换在管道中应用更改,或修改源数据库中定义的列状态。
字段的数据类型根据预定义的映射集解析。有关更多信息,请参阅 JDBC 字段类型。
当您向目标数据库中已存在的表的事件结构引入新字段时,您必须将新字段定义为可选,或者字段必须在数据库 schema 中指定默认值。如果要从目标表中删除字段,请使用以下选项之一:
- 手动删除字段。
- 丢弃列。
- 为字段分配一个默认值。
- 定义可为空的字段。
3.1.1.9. 指定用于定义目标表和列名称字母大小写的选项 复制链接链接已复制到粘贴板!
Debezium JDBC 接收器连接器通过构建在目标数据库上执行的 DDL (schema 更改)或 DML (数据更改)SQL 语句来消耗 Kafka 信息。默认情况下,连接器使用源主题的名称和事件字段作为目标表中表和列名称的基础。构建的 SQL 不会自动 delimit 标识符,使用引号来保留原始字符串的大小写。因此,默认情况下,目标数据库中表或列名称的文本大小写完全取决于在未指定问题单时数据库如何处理名称字符串。
例如,如果目标数据库分离是 Oracle 且事件的主题为,则目的地表将创建为 ORDERS
,因为当名称未加引号时,Oracle 默认为大写名称。同样,如果目标数据库分离是 PostgreSQL,并且事件的主题为
ORDERS
,则目的地表将按照顺序创建,因为当名称未加引号时 PostgreSQL 默认为小写名称。
要明确保留 Kafka 事件中存在的表和字段名称的情况,在连接器配置中,将 quote.identifiers
属性的值设置为 true
。当设置此选项时,当传入的事件针对一个名为
的主题时,目标数据库 dialect 是 Oracle,连接器会创建一个带有名称 顺序的表表,因为构建的 SQL 将表名称定义为 orders
"orders"
。当连接器创建列名称时,启用引用会导致相同的行为。
连接空闲超时
Debezium 的 JDBC sink 连接器利用连接池来提高性能。连接池设计为建立初始连接集,保持指定数量的连接,并根据需要高效地分配与应用程序的连接。但是,当池中连接闲置时会出现一个质询,如果它们保持不活动状态超过数据库配置的空闲超时阈值,则可能会触发超时。
为了缓解闲置连接线程触发超时的可能性,连接池提供了一个定期验证每个连接活动的机制。此验证可确保连接保持活动状态,并防止数据库将它们标记为空闲。如果网络中断,如果 Debezium 尝试使用终止的连接,连接器会提示池生成新的连接。
默认情况下,Debezium JDBC sink 连接器不会执行闲置超时测试。但是,您可以通过设置 hibernate.c3p0.idle_test_period
属性,将连接器配置为请求池以指定间隔执行超时测试。例如:
超时配置示例
{ "hibernate.c3p0.idle_test_period": "300" }
{
"hibernate.c3p0.idle_test_period": "300"
}
Debezium JDBC sink 连接器使用 Hibernate C3P0 连接池。您可以通过在 hibernate.c3p0 configured 命名空间中设置属性来自定义 CP30 连接池。在前面的示例中,hibernate.c3p0.idle_test_period 属性的设置会将连接池配置为每 300 秒执行闲置超时测试。应用配置后,连接池每五分钟开始评估未使用的连接。
3.1.2. Debezium JDBC 连接器如何映射数据类型 复制链接链接已复制到粘贴板!
Debezium JDBC sink 连接器使用逻辑或原语 type-mapping 系统解析列的数据类型。原语类型包括值,如整数、浮动点、布尔值、字符串和字节数。通常,这些类型只使用特定的 Kafka Connect Schema
类型代码来表示。逻辑数据类型比较复杂的类型,包括基于 Struct
的类型(包括一组固定字段名称和模式)的值,或者以特定编码表示的值,如自 epoch 后的天数。
以下示例演示了原语和逻辑数据类型的代表结构:
原语字段模式
{ "schema": { "type": "INT64" } }
{
"schema": {
"type": "INT64"
}
}
逻辑字段模式
Kafka Connect 不是这些复杂逻辑类型的唯一源。实际上,Debezium 源连接器生成更改事件,它们带有类似逻辑类型的字段代表各种不同的数据类型,包括但不限于时间戳、日期甚至 JSON 数据。
Debezium JDBC sink 连接器使用这些原语和逻辑类型将列的类型解析为 JDBC SQL 代码,它代表列的类型。然后,底层 Hibernate 持久性框架使用这些 JDBC SQL 代码,将列的类型解析为逻辑数据类型,以供使用。下表演示了 Kafka Connect 和 JDBC SQL 类型之间的原语和逻辑映射,以及 Debezium 和 JDBC SQL 类型之间的逻辑映射。实际的最终列类型因每种数据库类型而异。
原语类型 | JDBC SQL 类型 |
---|---|
INT8 | type.TINYINT |
INT16 | 类型.SMALLINT |
INT32 | type.INTEGER |
INT64 | type.BIGINT |
FLOAT32 | 类型.FLOAT |
FLOAT64 | 类型.DOUBLE |
布尔值 | 类型.BOOLEAN |
字符串 | type.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR |
BYTES | type.VARBINARY |
逻辑类型 | JDBC SQL 类型 |
---|---|
org.apache.kafka.connect.data.Decimal | type.DECIMAL |
org.apache.kafka.connect.data.Date | type.DATE |
org.apache.kafka.connect.data.Time | Type.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp | Type.TIMESTAMP |
逻辑类型 | JDBC SQL 类型 |
---|---|
io.debezium.time.Date | type.DATE |
io.debezium.time.Time | Type.TIMESTAMP |
io.debezium.time.MicroTime | Type.TIMESTAMP |
io.debezium.time.NanoTime | Type.TIMESTAMP |
io.debezium.time.ZonedTime | Type.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp | Type.TIMESTAMP |
io.debezium.time.MicroTimestamp | Type.TIMESTAMP |
io.debezium.time.NanoTimestamp | Type.TIMESTAMP |
io.debezium.time.ZonedTimestamp | Type.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal | 类型.DOUBLE |
如果数据库不支持带有时区的时间或时间戳,则映射会在没有时区的情况下解析为对应的时间戳。
逻辑类型 | MySQL SQL 类型 | PostgreSQL SQL 类型 | SQL Server SQL 类型 |
---|---|---|---|
io.debezium.data.Bits |
|
|
|
io.debezium.data.Enum |
| type.VARCHAR | 不适用 |
io.debezium.data.Json |
|
| 不适用 |
io.debezium.data.EnumSet |
| 不适用 | 不适用 |
io.debezium.time.Year |
| 不适用 | 不适用 |
io.debezium.time.MicroDuration | 不适用 |
| 不适用 |
io.debezium.data.Ltree | 不适用 |
| 不适用 |
io.debezium.data.Uuid | 不适用 |
| 不适用 |
io.debezium.data.Xml | 不适用 |
|
|
除了上面的原语和逻辑映射外,如果更改事件的来源是 Debezium 源连接器,则列类型的解析及其长度、精度和规模可以通过启用列或数据类型传播来进一步影响。要强制实施传播,必须在源连接器配置中设置以下属性之一:
-
column.propagate.source.type
-
datatype.propagate.source.type
Debezium JDBC sink 连接器应用优先级更高的值。
例如,假设更改事件中包含以下字段模式:
启用列或数据类型传播的 Debezium 更改事件字段模式
在前面的示例中,如果没有设置 schema 参数,Debezium JDBC sink 连接器会将此字段映射到类型 Types.SMALLINT
。Type.SMALLINT
可以有不同的逻辑数据库类型,具体取决于数据库问题。对于 MySQL,示例中的列类型转换为没有指定长度的 TINYINT
列类型。如果为源连接器启用了列或数据类型传播,Debezium JDBC sink 连接器使用映射信息来优化数据类型映射过程,并使用类型 TINYINT (1)
创建一个列。
通常,当相同类型的数据库用于源和接收器数据库时,使用列或数据类型传播的效果会很高。
3.1.3. 部署 Debezium JDBC 连接器 复制链接链接已复制到粘贴板!
您可以使用以下任一方法部署 Debezium JDBC 连接器:
使用 Streams for Apache Kafka 自动创建一个包含连接器插件的镜像。
这是首选的方法。
-
从 Containerfile 构建自定义 Kafka Connect 容器镜像。
这个 Containerfile 部署方法已弃用。计划在以后的文档中删除这个方法的说明。
由于许可证要求,Debezium JDBC 连接器存档不包括 Debezium 连接到 Db2 和 Oracle 数据库所需的驱动程序。要启用连接器访问这些数据库,您必须在连接器环境中添加驱动程序。有关如何获取连接器未提供的驱动程序的详情,请参考 获取连接器存档中没有包括的驱动程序。
3.1.3.1. 获取没有包含在连接器归档中的 JDBC 驱动程序 复制链接链接已复制到粘贴板!
由于许可证要求,Debezium 连接到 Db2 数据库和 Oracle 数据库所需的 JDBC 驱动程序文件不包括在 Debezium JDBC 连接器存档中。这些驱动程序可从 Maven Central 下载。根据您使用的部署方法,您可以使用以下之一来检索驱动程序:
- 您可以使用 Streams for Apache Kafka 将连接器添加到 Kafka Connect 镜像中
-
将驱动程序的 Maven Central 位置添加到
KafkaConnect
自定义资源中的builds.plugins.artifact.url
,如 第 3.1.3.3 节 “使用 Streams for Apache Kafka 部署 Debezium JDBC 连接器” 所示。 - 您可以使用 Containerfile 为连接器构建容器镜像
-
在 Containerfile 中,插入一个
curl
命令,用于指定从 Maven Central 下载驱动程序文件的 URL。如需更多信息,请参阅 第 3.1.3.4 节 “通过从 Containerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium JDBC 连接器”。
3.1.3.2. 使用 Streams for Apache Kafka 进行 JDBC 连接器部署 复制链接链接已复制到粘贴板!
部署 Debezium 连接器的首选方法是使用 Streams for Apache Kafka 来构建包含连接器插件的 Kafka Connect 容器镜像。
在部署过程中,您要创建和使用以下自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnect
CR,并包含有关镜像中包含的连接器工件的信息。 -
提供包括连接器用来访问源数据库的信息的
KafkaConnector
CR。在 Apache Kafka 的 Streams 启动 Kafka Connect pod 后,您可以通过应用KafkaConnector
CR 来启动连接器。
在 Kafka Connect 镜像的构建规格中,您可以指定用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Apicurio Registry 工件或 Debezium 脚本组件。当 Apache Kafka 的 Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。
KafkaConnect
CR 中的 spec.build.output
参数指定在存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在容器 registry 中,如 quay.io 或 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。
如果使用 KafkaConnect
资源创建集群,之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。
其他资源
- 在 OpenShift 中部署和管理 Apache Kafka Streams 中的配置 Kafka 连接。
- 在 OpenShift 中部署和管理 Apache Kafka 的 Streams 中自动构建新容器镜像。
您可以使用 Apache Kafka 的 Streams 中的构建配置自动将 Kafka Connect 容器镜像构建到 OpenShift。构建镜像包括您指定的 Debezium 连接器插件。
在构建过程中,Apache Kafka Operator 的 Streams 将 KafkaConnect
自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。
新创建的容器被推送到 .spec.build.output
中指定的容器 registry,并用于部署 Kafka Connect 集群。在 Apache Kafka 的 Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector
自定义资源来启动构建中包含的连接器。
先决条件
- 您可以访问安装了集群 Operator 的 OpenShift 集群。
- Apache Kafka Operator 的 Streams 正在运行。
- 部署了 Apache Kafka 集群,如 在 OpenShift 中部署和管理 Apache Kafka 的流 中所述。
- Kafka Connect 部署在 Apache Kafka 的 Streams 中
- 您有一个 Kafka 主题,连接器可从中读取更改事件记录。
- 目标数据库已安装,并配置为接受 JDBC 连接。
- 您有一个红帽构建的 Debezium 许可证。
-
OpenShift
oc
CLI 客户端已安装,或者您可以访问 OpenShift Container Platform Web 控制台。 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限或您必须创建 ImageStream 资源:
- 将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
- 在 registry 中创建和管理镜像的帐户和权限。
- 将构建镜像存储为原生 OpenShift ImageStream
- ImageStream 资源部署到集群中,以存储新的容器镜像。您必须为集群显式创建 ImageStream。默认情况下,镜像流不可用。如需有关 ImageStreams 的更多信息,请参阅在 OpenShift Container Platform 中管理镜像流。
流程
- 登录 OpenShift 集群。
为连接器创建 Debezium
KafkaConnect
自定义资源(CR),或修改现有的资源。
例如,使用名称dbz-jdbc-connect.yaml
创建一个KafkaConnect
CR,用于指定注解和
镜像
属性,如以下摘录所示:在以下示例中,自定义资源被配置为下载以下工件:- Debezium JDBC 连接器存档。
连接到 Oracle 或 Db2 sink 数据库所需的 JDBC 驱动程序。您可以为其他 sink 目的地省略此条目。
定义包含 Debezium 连接器的
KafkaConnect
自定义资源的dbz-jdbc-connect.yaml
文件Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表 3.5. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources
注解设置为"true"
,以便 Cluster Operator 使用KafkaConnector
资源在此 Kafka Connect 集群中配置连接器。2
spec.build
配置指定存储构建镜像的位置,并列出镜像中包含的插件,以及插件工件的位置。3
build.output
指定存储新构建的镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type
的有效值为docker
,可推送到容器 registry (如 Quay)或镜像流
(用于将镜像推送到内部 OpenShift ImageStream)。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定build.output
的更多信息,请参阅 {Name configuringStreamsOpenShift} 中的 Apache Kafka Build schema 参考。5
插件配置
列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称
,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括要用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。6
artifacts.type
的值指定artifacts.url
中指定的工件的文件类型。有效类型是zip
、tgz
、或jar
。Debezium 连接器存档以.zip
文件格式提供。JDBC 驱动程序文件采用.jar
格式。type
值必须与url
字段中引用的文件类型匹配。7
artifacts.url
的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。OpenShift 集群必须有权访问指定的服务器。8
(仅用于 Db2 或 Oracle sinks)指定 Maven Central 中的 JDBC JDBC 驱动程序的位置。Debezium 连接到这些数据库所需的驱动程序不包括在 Debezium 连接器存档中。
这个示例为 Oracle Database JDBC 驱动程序提供 Maven URL。Db2 JDBC 驱动程序位于以下 Maven 位置
:https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.9.0/jcc-11.5.9.0.jar
输入以下命令将
KafkaConnect
构建规格应用到 OpenShift 集群:oc create -f dbz-jdbc-connect.yaml
oc create -f dbz-jdbc-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 根据自定义资源中指定的配置,Streams Operator 准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。您在配置中列出的连接器工件在集群中可用。创建一个
KafkaConnector
资源来定义您要部署的每个连接器的实例。
例如,创建以下KafkaConnector
CR,并将它保存为orders-to-postgresql-jdbc-connector.yaml
orders-to-postgresql-jdbc-connector.yaml
文件,该文件为 Debezium 连接器定义KafkaConnector
自定义资源Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表 3.6. 连接器配置设置的描述 项 描述 1
要注册到 Kafka Connect 集群的连接器名称。
2
连接器类的名称。
3
可同时操作的任务数量。
4
连接器的配置。
5
sink 数据库的 JDBC 连接 URL。URL 指定端口号以及连接到数据库所需的任何身份验证属性。例如,
jdbc:oracle:thin:@myhost.example.com:1521/myservice
6
Debezium 用来连接到数据库的帐户名称。
7
Debezium 用来连接到数据库用户帐户的密码。
8
指定连接器读取的 Kafka 主题的逗号分隔列表。每个主题中的事件被流传输到 sink 数据库中名称相同的表。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
oc create -n debezium -f jdbc-inventory-connector.yaml
oc create -n debezium -f jdbc-inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 连接器注册到 Kafka Connect 集群,并开始针对
KafkaConnector
CR 中的spec.config.database.dbname
指定的数据库运行。连接器 pod 就绪后,Debezium 正在运行。
您可以通过构建包含 Debezium 连接器存档的自定义 Kafka Connect 容器镜像来部署 Debezium JDBC 连接器,然后将此容器镜像推送到容器 registry。之后,您可以创建以下自定义资源(CR)来定义连接器配置:
-
定义 Kafka Connect 实例的
KafkaConnect
CR。CR 中的image
属性指定您创建的容器镜像的名称,以运行 Debezium 连接器。您可以将此 CR 应用到部署 Red Hat Streams for Apache Kafka 的 OpenShift 实例。Apache Kafka 的流提供将 Apache Kafka 到 OpenShift 的 operator 和镜像。 -
定义 Debezium JDBC 连接器的
KafkaConnector
CR。将此 CR 应用到应用KafkaConnect
CR 的同一 OpenShift 实例。
本节中描述的部署方法已弃用,并计划在以后的文档版本中删除。
先决条件
- 目标数据库已安装,并配置为接受 JDBC 连接。
- Apache Kafka 的流部署在 OpenShift 中,它正在运行 Apache Kafka 和 Kafka Connect。如需更多信息,请参阅在 OpenShift 中部署和管理 Apache Kafka 流。
- podman 或 Docker 已安装。
- 您有一个 Kafka 主题,连接器可从中读取更改事件记录。
- 目标数据库已安装,并配置为接受 JDBC 连接。
- 如果您希望 JDBC 连接器将数据发送到 Db2 或 Oracle 数据库,则 Kafka Connect 服务器可以访问 Maven Central 来为这些数据库下载 JDBC 驱动程序。您还可以使用驱动程序的本地副本,或使用本地 Maven 存储库或其他 HTTP 服务器可用的副本。
-
您有在容器 registry (如
quay.io
或docker.io
)中创建和管理容器的帐户和权限,您要添加将运行 Debezium 连接器的容器。
流程
在 Kafka Connect 上创建 Debezium JDBC 连接器容器:
-
创建一个 Containerfile,它使用
registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0
作为基础镜像。例如,在终端窗口中输入以下命令:
-
创建一个 Containerfile,它使用
+ .Descriptions of Containerfile settings for building a custom Kafka Connect container image
+
.Descriptions of Containerfile settings for building a custom Kafka Connect container image
项 | 描述 |
---|---|
1 | 您可以指定您想要的任何文件名。 |
2 | 指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将此路径替换为您的目录的实际路径。 |
+ 命令在当前目录中创建一个名为 debezium-jdbc-connector-container.yaml
的 Containerfile。
从您在上一步中创建的
debezium-jdbc-connector-container.yaml
Containerfile 构建容器镜像。在包含该文件的目录中,打开终端窗口并输入以下命令之一:podman build -t debezium-jdbc-connector-container:latest .
podman build -t debezium-jdbc-connector-container:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-jdbc-connector-container:latest .
docker build -t debezium-jdbc-connector-container:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 前面的命令使用名称
debezium-jdbc-connector-container
构建容器镜像。将自定义镜像推送到容器 registry,如 quay.io 或内部容器 registry。容器镜像仓库必须可供您要部署镜像的 OpenShift 实例使用。输入以下命令之一:
podman push <myregistry.io>/debezium-jdbc-connector-container:latest
podman push <myregistry.io>/debezium-jdbc-connector-container:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-jdbc-connector-container:latest
docker push <myregistry.io>/debezium-jdbc-connector-container:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 创建新的 Debezium Oracle KafkaConnect 自定义资源(CR)。例如,使用名称
dbz-connect.yaml
创建KafkaConnect
CR,用于指定注解和
镜像
属性。以下示例显示了描述KafkaConnect
自定义资源的dbz-connect.yaml
文件摘录。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
项 | 描述 |
---|---|
1 |
您必须将 |
2 |
|
输入以下命令将
KafkaConnect
CR 应用到 OpenShift Kafka Connect 环境:oc create -f dbz-connect.yaml
oc create -f dbz-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 该命令添加一个 Kafka Connect 实例,用于指定为运行 Debezium 连接器而创建的镜像的名称。
创建一个
KafkaConnector
自定义资源,用于配置 Debezium JDBC 连接器实例。您可以在
.yaml
文件中配置 Debezium JDBC 连接器,该文件指定连接器的配置属性。以下示例显示了
dbz-connect.yaml
文件的摘录,它为KafkaConnect
自定义资源设置几个关键属性。
连接器在端口5432
上建立到 PostgreSQL 服务器 sink 的 JDBC 连接。
有关可用连接器属性的完整范围的详情,请参阅 Debezium JDBC 连接器配置属性的描述。例 3.1.
jdbc-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
项 | 描述 |
---|---|
1 | 使用 Kafka Connect 服务注册的连接器名称。 |
2 | Apache Kafka 集群的流名称。 |
3 | Debezium JDBC 连接器类的名称。 |
4 | sink 数据库的 JDBC 地址。 |
5 | Debezium 用来连接到数据库的帐户名称。 |
6 | Debezium 用来向数据库用户帐户进行身份验证的密码。 |
7 | 指定连接器读取的 Kafka 主题的逗号分隔列表。每个主题中的事件被流传输到 sink 数据库中名称相同的表。 |
使用 Kafka Connect 创建连接器实例。例如,如果您在
jdbc-connector.yaml
文件中保存KafkaConnector
资源,您将运行以下命令:oc apply -f jdbc-connector.yaml
oc apply -f jdbc-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 前面的命令注册
orders-topic-to-postgresql-via-jdbc-sink-connector
。根据KafkaConnector
CR 中指定的,连接器启动并开始从订购
主题中读取。
3.1.4. Debezium JDBC 连接器配置属性的描述 复制链接链接已复制到粘贴板!
Debezium JDBC sink 连接器有几个配置属性,可用于实现满足您需求的连接器行为。许多属性具有默认值。有关属性的信息按如下方式进行组织:
3.1.4.1. JDBC 连接器 Kafka 使用者属性 复制链接链接已复制到粘贴板!
属性 | 默认 | 描述 |
---|---|---|
没有默认值 | 连接器的唯一名称。如果您在注册连接器时尝试重复使用此名称,失败结果。所有 Kafka Connect 连接器都需要此属性。 | |
没有默认值 |
连接器的 Java 类的名称。对于 Debezium JDBC 连接器,请指定 | |
1 | 用于此连接器的最大任务数量。 | |
没有默认值 |
要使用的主题列表,用逗号分开。不要将此属性与 | |
没有默认值 |
指定要使用的主题的正则表达式。在内部,正则表达式被编译到 |
3.1.4.2. JDBC 连接器连接属性 复制链接链接已复制到粘贴板!
属性 | 默认 | 描述 |
---|---|---|
| 要使用的连接供应商实施。 | |
没有默认值 | 用于连接到数据库的 JDBC 连接 URL。 | |
没有默认值 | 连接器用来连接到数据库的数据库用户帐户名称。 | |
没有默认值 | 连接器用来连接到数据库的密码。 | |
| 指定池中的最小连接数。 | |
| 指定池维护的最大并发连接数。 | |
| 指定当连接池超过其最大大小时连接器尝试获取的连接数量。 | |
| 指定在丢弃未使用连接前保留了未使用的连接的秒数。 |
3.1.4.3. JDBC 连接器运行时属性 复制链接链接已复制到粘贴板!
属性 | 默认 | 描述 |
---|---|---|
| 指定插入 JDBC 临时值时使用的时区。 | |
|
指定连接器处理 | |
|
指定连接器是否处理 注意
虽然 从版本 9.7 开始,Db2 中提供了对
为确保 JDBC 连接器可以处理从 Db2 收到的
提交上述查询的用户帐户需要截断表中的 | |
| 指定用于将事件插入到数据库中的策略。可用的选项如下:
| |
| 指定连接器如何解析来自事件的主键列。
| |
没有默认值 |
要从中派生主键列或以逗号分隔的字段列表。 | |
| 指定生成的 SQL 语句是否使用引号来分隔符表和列名称。如需了解更多详细信息,请参阅 JDBC 引用大小写-sensitivity 部分。 | |
| 指定连接器如何演变目的地表模式。如需更多信息,请参阅 第 3.1.1.8 节 “Debezium JDBC 连接器的 schema evolution 模式”。可用的选项如下:
| |
|
指定连接器用来构造目标表名称的字符串模式。 | |
|
指定安装 PostgreSQL PostGIS 扩展的架构名称。默认为 | |
|
指定连接器是否在 | |
| 指定尝试一起将批处理到目标表中的记录数量。 注意
请注意,如果您在 Connect worker 属性中将 | |
| 指定是否启用 Debezium JDBC 连接器的缩减缓冲。 选择以下设置之一:
要在启用缩减缓冲区时优化 PostgreSQL sink 数据库中的查询处理,还必须通过将 | |
空字符串 |
一个可选的、以逗号分隔的字段名称列表,与要从 change 事件值包含的字段的完全限定域名匹配。字段的完全限定域名格式为 | |
空字符串 |
一个可选的、以逗号分隔的字段名称列表,与要从更改事件值中排除的字段名称匹配。字段的完全限定域名格式为 | |
5 | 指定在尝试清空目标数据库更改后连接器执行的最大重试次数会导致某些数据库错误。如果重试次数超过重试值,接收器连接器会进入 FAILED 状态。 | |
1000 | 指定连接器等待重试失败的 flush 操作的毫秒数。 注意
当您同时设置 |
3.1.4.4. JDBC 连接器可扩展属性 复制链接链接已复制到粘贴板!
属性 | 默认 | 描述 |
---|---|---|
|
指定 | |
|
指定
|
3.1.4.5. JDBC 连接器 hibernate 透传属性 复制链接链接已复制到粘贴板!
Kafka Connect 支持 passthrough 配置,允许您通过直接从连接器配置传递某些属性来修改底层系统的行为。默认情况下,一些 Hibernate 属性通过 JDBC 连接器 连接属性(如 connection
.url,connection.username
, 和 connection.pool unix_size
)和通过连接器的 运行时属性 (例如 use.time.zone
,quote.identifiers
)公开。
如果要自定义其他 Hibernate 行为,您可以通过在连接器配置中添加使用 hibernate
decisions 命名空间的属性来利用 passthrough 机制。例如,要帮助 Hibernate 解析目标数据库的类型和版本,您可以添加 hibernate.dialect
属性并将其设置为数据库的完全限定域名,例如 org.hibernate.dialect.MariaDBDialect
.
3.1.5. JDBC 连接器常见问题 复制链接链接已复制到粘贴板!
ExtractNewRecordState
单一消息转换 是否需要?- 否,实际上这是 Debezium JDBC 连接器与其他实施的不同因素之一。虽然连接器能够像其竞争者一样消耗扁平化的事件,但它还可使 Debezium 的复杂更改事件结构原生进行,而无需任何特定类型的转换。
- 如果列的类型已更改,或者列被重命名或丢弃,这是否由架构演进处理?
- 否,Debezium JDBC 连接器不会对现有列进行任何更改。连接器支持的 schema 演进非常基本的。它只是将事件结构中的字段与表的列列表进行比较,然后添加表中尚未定义为列的任何字段。如果列的类型或默认值更改,连接器不会在目标数据库中调整它们。如果重新命名了列,旧的列保留为原样,连接器会将带有新名称的列附加到表中;但是,在旧列中带有数据的现有行保持不变。应手动处理这些类型的架构更改。
- 如果列的类型没有解析为我想要的类型,如何强制映射到不同的数据类型?
- Debezium JDBC 连接器使用复杂的类型系统来解析列的数据类型。有关此类型系统如何将特定字段的 schema 定义解析为 JDBC 类型的详情,请参考 第 3.1.1.4 节 “Debezium JDBC 连接器数据和列类型映射的描述” 部分。如果要应用不同的数据类型映射,请手动定义表来显式获取首选列类型。
- 如何在不更改 Kafka 主题名称的情况下为表名称指定前缀或后缀?
-
要在目标表名称中添加前缀或后缀,请调整 collection.name.format 连接器配置属性以应用您想要的前缀或后缀。例如,若要使用
jdbc_
前缀所有表名称,请使用值jdbc_${topic}
指定collection.name.format
配置属性。如果连接器订阅了名为orders
的主题,则生成的表将创建为jdbc_orders
。 - 为什么一些列自动加引号,即使标识符引用未启用?
-
在某些情况下,特定的列或表名称可能会被明确加引号,即使未启用
quote.identifiers
。当列或表名称以 开头或使用其他被视为非法语法的特定约定时,这通常是必要的。例如,当 primary.key.mode 设为kafka
时,如果列的名称加引号,一些数据库只允许列名称以下划线开头。引用行为是特定于不同的数据库,不同的数据库类型不同。