第 3 章 sink 连接器
Debezium 提供接收器连接器,可以使用 Apache Kafka 主题等源的事件。接收器(sink)连接器标准化数据格式,然后将事件数据保留给配置的 sink 存储库。其它系统、应用程序或用户可以从数据 sink 访问事件。
因为接收器连接器对消耗的事件数据应用一致的结构,所以从数据 sink 读取的下游应用程序可以更轻松地解释和处理这些数据。
目前,Debebe 提供以下接收器连接器:
3.1. JDBC 的 Debezium 连接器
Debezium JDBC 连接器是一个 Kafka Connect sink 连接器实现,它可以消耗来自多个源主题的事件,然后使用 JDBC 驱动程序将这些事件写入关系数据库。此连接器支持各种数据库 dialects,包括 Db2、MySQL、Oracle、PostgreSQL 和 SQL Server。
3.1.1. Debezium JDBC 连接器如何工作
Debezium JDBC 连接器是一个 Kafka Connect sink 连接器,因此需要 Kafka Connect 运行时。连接器定期轮询它订阅的 Kafka 主题,消耗这些主题的事件,然后将事件写入配置的关系数据库。连接器使用 upsert 语义和基本模式演进支持幂等写入操作。
Debezium JDBC 连接器提供以下功能:
- 第 3.1.1.1 节 “Debezium JDBC 连接器如何使用复杂的更改事件的描述”
- 第 3.1.1.2 节 “在-least-once 交付中 Debezium JDBC 连接器的描述”
- 第 3.1.1.3 节 “Debezium JDBC 使用多个任务的描述”
- 第 3.1.1.4 节 “Debezium JDBC 连接器数据和列类型映射的描述”
- 第 3.1.1.5 节 “Debezium JDBC 连接器如何处理源事件中的主键的描述”
-
第 3.1.1.6 节 “配置 Debezium JDBC 连接器以在使用
DELETE
或 tombstone 事件时删除行” - 第 3.1.1.7 节 “启用连接器执行幂等写入”
- 第 3.1.1.8 节 “Debezium JDBC 连接器的 schema evolution 模式”
- 第 3.1.1.9 节 “指定用于定义目标表和列名称字母大小写的选项”
- 连接空闲超时
3.1.1.1. Debezium JDBC 连接器如何使用复杂的更改事件的描述
默认情况下,Debezium 源连接器生成复杂的分层更改事件。当 Debezium 连接器与其他 JDBC 接收器连接器实现一起使用时,您可能需要应用 ExtractNewRecordState
单消息转换(SMT)来扁平化更改事件有效负载,以便它们可以被接收器实现使用。如果您运行 Debezium JDBC sink 连接器,则不需要部署 SMT,因为 Debezium sink 连接器可以直接消耗原生 Debezium 更改事件,而无需使用转换。
当 JDBC sink 连接器消耗 Debezium 源连接器的复杂更改事件时,它会从原始 插入
或更新
事件的 after
部分中提取值。当接收器连接器消耗 delete 事件时,不会查询事件有效负载的一部分。
Debezium JDBC sink 连接器不是从 schema 更改主题读取的。如果您的源连接器被配置为捕获模式更改,在 JDBC 连接器配置中设置 topics
或 topics.regex
属性,以便连接器不会消耗模式更改主题。
3.1.1.2. 在-least-once 交付中 Debezium JDBC 连接器的描述
Debezium JDBC sink 连接器保证至少处理来自 Kafka 主题的事件。
3.1.1.3. Debezium JDBC 使用多个任务的描述
您可以在多个 Kafka Connect 任务中运行 Debezium JDBC sink 连接器。要在多个任务中运行连接器,请将 tasks.max
配置属性设置为您希望连接器使用的任务数量。Kafka Connect 运行时启动指定数量的任务,并为每个任务运行一个连接器实例。多个任务可以通过并行读取和处理来自多个源主题的更改来提高性能。
3.1.1.4. Debezium JDBC 连接器数据和列类型映射的描述
要启用 Debezium JDBC 接收器连接器,将数据类型从入站消息字段正确映射到出站消息字段,连接器需要有关源事件中存在的每种字段的数据类型的信息。连接器支持跨不同数据库 dialects 范围内的列类型映射。要从 event 字段中的类型元数据正确转换目的地列类型,连接器应用为源数据库定义的数据类型映射。您可以通过在源连接器配置中设置
column.propagate.source.type
或 datatype.propagate.source.type
选项来增强连接器为列解析数据类型的方式。当您启用这些选项时,Debebe 包含额外的参数元数据,它可帮助 JDBC sink 连接器更准确地解析目的地列的数据类型。
要使 Debezium JDBC sink 连接器处理 Kafka 主题的事件,存在 Kafka 主题消息密钥时,必须为原语数据类型或 Struct
。另外,源消息的有效负载必须是 Struct
,它有一个没有嵌套结构类型的扁平化结构,或符合 Debezium 的复杂分层结构的嵌套结构。
如果 Kafka 主题中事件的结构不遵循这些规则,您必须实施自定义单一消息转换,以将源事件的结构转换为可用的格式。
3.1.1.5. Debezium JDBC 连接器如何处理源事件中的主键的描述
默认情况下,Debebe JDBC sink 连接器不会将源事件中的任何字段转换为事件的主键。不幸的是,缺少稳定的主密钥可能会使事件处理复杂,具体取决于您的业务需求,或者当接收器连接器使用 upsert 语义时。要定义一致的主密钥,您可以将连接器配置为使用下表中描述的主密钥模式之一:
模式 | 描述 |
---|---|
| 创建表时没有指定主键字段。 |
| 主密钥由以下三列组成:
这些列的值源自 Kafka 事件协调。 |
|
主密钥由 Kafka 事件的密钥组成。 |
|
主键由 Kafka 事件的值组成。 |
|
主密钥由 Kafka 事件的标头组成。 |
如果将 primary.key.mode
设置为 kafka
,并将 schema.evolution
设置为 basic
,则一些数据库分离可能会抛出异常。当检测将 STRING
数据类型映射为变量长度字符串数据类型(如 TEXT
或 CLOB
)时,会发生此例外情况,且 dialect 不允许主键列具有未绑定长度的长度。要避免这个问题,请在您的环境中应用以下设置:
-
不要将
schema.evolution
设置为basic
。 - 事先创建数据库表和主要密钥映射。
如果列映射到不被视为目标数据库主密钥的数据类型,则 primary.key.fields
中需要明确的列列表,不包括此类列。请参考您的特定数据库厂商的文档,了解哪些数据类型是允许的。
3.1.1.6. 配置 Debezium JDBC 连接器以在使用 DELETE
或 tombstone 事件时删除行
当消耗 DELETE
或 tombstone 事件时,Debebe JDBC sink 连接器可以删除目标数据库中的行。默认情况下,jdbc sink 连接器不启用删除模式。
如果要删除行,您必须在连接器配置中明确设置 delete.enabled=true
。要使用此模式,还必须将 primary.key.fields
设置为没有值以外的值。前面的配置是必需的,因为根据主键映射删除,因此如果目标表没有主键映射,连接器将无法删除行。
3.1.1.7. 启用连接器执行幂等写入
Debezium JDBC sink 连接器可以执行幂等写入,使其重复重新执行相同的记录,而不会更改最终的数据库状态。
要启用连接器执行幂等写入,您必须将连接器的 insert.mode
明确设置为 upsert
。根据指定的
主密钥是否已存在,作为更新或 插入
应用 upsert 操作。
如果主键值已存在,则操作会更新行中的值。如果指定的主密钥值不存在,则 插入
会添加一个新行。
每个数据库清理都会以不同的方式处理幂等写入,因为没有用于 upsert 操作的 SQL 标准。下表显示了 Debezium 支持的数据库偏移的 upsert
DML 语法:
dialect | upsert 语法 |
---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
3.1.1.8. Debezium JDBC 连接器的 schema evolution 模式
您可以在 Debezium JDBC sink 连接器中使用以下架构演进模式:
模式 | 描述 |
---|---|
| 连接器不执行任何 DDL 模式演进。 |
| 连接器自动检测事件有效负载(但目标表中不存在)中的字段。连接器会更改目的地表来添加新字段。 |
当将 schema.evolution
设置为 basic
时,连接器会根据传入事件的结构自动创建或修改目标数据库表。
当首次从主题收到事件时,并且 destination 表尚不存在,Debezium JDBC sink 连接器使用事件的密钥,或者记录的模式结构来解析表的列结构。如果启用了架构演进,连接器先准备并执行 CREATE TABLE
SQL 语句,然后再将 DML 事件应用到目标表。
当 Debezium JDBC 连接器从主题接收事件时,如果记录的 schema 结构与目标表的 schema 结构不同,连接器会使用事件的密钥或其架构结构来识别哪些列是新的,且必须添加到数据库表中。如果启用了架构演进,连接器先准备并执行 ALTER TABLE
SQL 语句,然后再将 DML 事件应用到目标表。因为更改列数据类型、丢弃列和调整主键可被视为危险操作,所以连接器会被禁止执行这些操作。
每个字段的 schema 确定列是否为 NULL
或 NOT NULL
。模式还定义每个列的默认值。如果连接器尝试使用 nullability 设置或不需要的默认值创建表,则必须手动创建表、提前创建表,或者在 sink 连接器处理事件前调整相关字段的 schema。要调整 nullability 设置或默认值,您可以引入一个自定义单一消息转换,该转换在管道中应用更改,或修改源数据库中定义的列状态。
字段的数据类型根据预定义的映射集解析。如需更多信息,请参阅 第 3.1.2 节 “Debezium JDBC 连接器如何映射数据类型”。
当您向目标数据库中已存在的表的事件结构引入新字段时,您必须将新字段定义为可选,或者字段必须在数据库 schema 中指定默认值。如果要从目标表中删除字段,请使用以下选项之一:
- 手动删除字段。
- 丢弃列。
- 为字段分配一个默认值。
- 定义可为空的字段。
3.1.1.9. 指定用于定义目标表和列名称字母大小写的选项
Debezium JDBC 接收器连接器通过构建在目标数据库上执行的 DDL (schema 更改)或 DML (数据更改)SQL 语句来消耗 Kafka 信息。默认情况下,连接器使用源主题的名称和事件字段作为目标表中表和列名称的基础。构建的 SQL 不会自动 delimit 标识符,使用引号来保留原始字符串的大小写。因此,默认情况下,目标数据库中表或列名称的文本大小写完全取决于在未指定问题单时数据库如何处理名称字符串。
例如,如果目标数据库分离是 Oracle 且事件的主题为,则目的地表将创建为 ORDERS
,因为当名称未加引号时,Oracle 默认为大写名称。同样,如果目标数据库分离是 PostgreSQL,并且事件的主题为
ORDERS
,则目的地表将按照顺序创建,因为当名称未加引号时 PostgreSQL 默认为小写名称。
要明确保留 Kafka 事件中存在的表和字段名称的情况,在连接器配置中,将 quote.identifiers
属性的值设置为 true
。当设置此选项时,当传入的事件针对一个名为
的主题时,目标数据库 dialect 是 Oracle,连接器会创建一个带有名称 顺序的表表,因为构建的 SQL 将表名称定义为 orders
"orders"
。当连接器创建列名称时,启用引用会导致相同的行为。
连接空闲超时
Debezium 的 JDBC sink 连接器利用连接池来提高性能。连接池设计为建立初始连接集,保持指定数量的连接,并根据需要高效地分配与应用程序的连接。但是,当池中连接闲置时会出现一个质询,如果它们保持不活动状态超过数据库配置的空闲超时阈值,则可能会触发超时。
为了缓解闲置连接线程触发超时的可能性,连接池提供了一个定期验证每个连接活动的机制。此验证可确保连接保持活动状态,并防止数据库将它们标记为空闲。如果网络中断,如果 Debezium 尝试使用终止的连接,连接器会提示池生成新的连接。
默认情况下,Debezium JDBC sink 连接器不会执行闲置超时测试。但是,您可以通过设置 hibernate.c3p0.idle_test_period
属性,将连接器配置为请求池以指定间隔执行超时测试。例如:
超时配置示例
{ "hibernate.c3p0.idle_test_period": "300" }
Debezium JDBC sink 连接器使用 Hibernate C3P0 连接池。您可以通过在 hibernate.c3p0 configured 命名空间中设置属性来自定义 CP30 连接池。在前面的示例中,hibernate.c3p0.idle_test_period 属性的设置会将连接池配置为每 300 秒执行闲置超时测试。应用配置后,连接池每五分钟开始评估未使用的连接。
3.1.2. Debezium JDBC 连接器如何映射数据类型
Debezium JDBC sink 连接器使用逻辑或原语 type-mapping 系统解析列的数据类型。原语类型包括值,如整数、浮动点、布尔值、字符串和字节数。通常,这些类型只使用特定的 Kafka Connect Schema
类型代码来表示。逻辑数据类型比较复杂的类型,包括基于 Struct
的类型(包括一组固定字段名称和模式)的值,或者以特定编码表示的值,如自 epoch 后的天数。
以下示例演示了原语和逻辑数据类型的代表结构:
原语字段模式
{ "schema": { "type": "INT64" } }
逻辑字段模式
[ "schema": { "type": "INT64", "name": "org.apache.kafka.connect.data.Date" } ]
Kafka Connect 不是这些复杂逻辑类型的唯一源。实际上,Debezium 源连接器生成更改事件,它们带有类似逻辑类型的字段代表各种不同的数据类型,包括但不限于时间戳、日期甚至 JSON 数据。
Debezium JDBC sink 连接器使用这些原语和逻辑类型将列的类型解析为 JDBC SQL 代码,它代表列的类型。然后,底层 Hibernate 持久性框架使用这些 JDBC SQL 代码,将列的类型解析为逻辑数据类型,以供使用。下表演示了 Kafka Connect 和 JDBC SQL 类型之间的原语和逻辑映射,以及 Debezium 和 JDBC SQL 类型之间的逻辑映射。实际的最终列类型因每种数据库类型而异。
原语类型 | JDBC SQL 类型 |
---|---|
INT8 | type.TINYINT |
INT16 | 类型.SMALLINT |
INT32 | type.INTEGER |
INT64 | type.BIGINT |
FLOAT32 | 类型.FLOAT |
FLOAT64 | 类型.DOUBLE |
布尔值 | Types.BOOLEAN |
字符串 | type.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR |
BYTES | type.VARBINARY |
逻辑类型 | JDBC SQL 类型 |
---|---|
org.apache.kafka.connect.data.Decimal | Types.DECIMAL |
org.apache.kafka.connect.data.Date | Types.DATE |
org.apache.kafka.connect.data.Time | Type.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp | Type.TIMESTAMP |
逻辑类型 | JDBC SQL 类型 |
---|---|
io.debezium.time.Date | Types.DATE |
io.debezium.time.Time | Type.TIMESTAMP |
io.debezium.time.MicroTime | Type.TIMESTAMP |
io.debezium.time.NanoTime | Type.TIMESTAMP |
io.debezium.time.ZonedTime | Types.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp | Type.TIMESTAMP |
io.debezium.time.MicroTimestamp | Type.TIMESTAMP |
io.debezium.time.NanoTimestamp | Type.TIMESTAMP |
io.debezium.time.ZonedTimestamp | Types.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal | 类型.DOUBLE |
如果数据库不支持带有时区的时间或时间戳,则映射会在没有时区的情况下解析为对应的时间戳。
逻辑类型 | MySQL SQL 类型 | PostgreSQL SQL 类型 | SQL Server SQL 类型 |
---|---|---|---|
io.debezium.data.Bits |
|
|
|
io.debezium.data.Enum |
| type.VARCHAR | 不适用 |
io.debezium.data.Json |
|
| 不适用 |
io.debezium.data.EnumSet |
| 不适用 | 不适用 |
io.debezium.time.Year |
| 不适用 | 不适用 |
io.debezium.time.MicroDuration | 不适用 |
| 不适用 |
io.debezium.data.Ltree | 不适用 |
| 不适用 |
io.debezium.data.Uuid | 不适用 |
| 不适用 |
io.debezium.data.Xml | 不适用 |
|
|
除了上面的原语和逻辑映射外,如果更改事件的来源是 Debezium 源连接器,则列类型的解析及其长度、精度和规模可以通过启用列或数据类型传播来进一步影响。要强制实施传播,必须在源连接器配置中设置以下属性之一:
-
column.propagate.source.type
-
datatype.propagate.source.type
Debezium JDBC sink 连接器应用优先级更高的值。
例如,假设更改事件中包含以下字段模式:
启用列或数据类型传播的 Debezium 更改事件字段模式
{ "schema": { "type": "INT8", "parameters": { "__debezium.source.column.type": "TINYINT", "__debezium.source.column.length": "1" } } }
在前面的示例中,如果没有设置 schema 参数,Debezium JDBC sink 连接器会将此字段映射到类型 Types.SMALLINT
。Type.SMALLINT
可以有不同的逻辑数据库类型,具体取决于数据库问题。对于 MySQL,示例中的列类型转换为没有指定长度的 TINYINT
列类型。如果为源连接器启用了列或数据类型传播,Debezium JDBC sink 连接器使用映射信息来优化数据类型映射过程,并使用类型 TINYINT (1)
创建一个列。
通常,当相同类型的数据库用于源和接收器数据库时,使用列或数据类型传播的效果会很高。
3.1.3. 部署 Debezium JDBC 连接器
要部署 Debezium JDBC 连接器,您要安装 Debezium JDBC 连接器存档,配置连接器,并通过将配置添加到 Kafka Connect 来启动连接器。
先决条件
- 已安装 Apache ZooKeeper、Apache Kafka 和 Kafka Connect。
- 目标数据库安装并配置为接受 JDBC 连接。
流程
- 下载 Debezium JDBC 连接器插件存档。
- 将文件提取到 Kafka Connect 环境中。
(可选)从 Maven Central 下载 JDBC 驱动程序,并将下载的驱动程序文件提取到包含 JDBC sink 连接器 JAR 文件的目录。
注意JDBC sink 连接器不包含 Oracle 和 Db2 的驱动程序。您必须下载驱动程序并手动安装它们。
- 将驱动程序 JAR 文件添加到安装了 JDBC sink 连接器的路径。
-
确保安装 JDBC sink 连接器的路径是 Kafka Connect
plugin.path
的一部分。 - 重启 Kafka Connect 进程以获取新的 JAR 文件。
3.1.3.1. Debezium JDBC 连接器配置
通常,您可以通过提交指定连接器配置属性的 JSON 请求来注册 Debezium JDBC 连接器。以下示例显示了注册 Debezium JDBC sink 连接器实例的 JSON 请求,该连接器使用最常见的配置设置使用来自名为 order
的主题的事件:
示例: Debezium JDBC 连接器配置
{ "name": "jdbc-connector", 1 "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", 2 "tasks.max": "1", 3 "connection.url": "jdbc:postgresql://localhost/db", 4 "connection.username": "pguser", 5 "connection.password": "pgpassword", 6 "insert.mode": "upsert", 7 "delete.enabled": "true", 8 "primary.key.mode": "record_key", 9 "schema.evolution": "basic", 10 "database.time_zone": "UTC", 11 "topics": "orders" 12 } }
JDBC 连接器配置设置的描述
项 | 描述 |
---|---|
1 | 使用 Kafka Connect 服务注册时分配给连接器的名称。 |
2 | JDBC sink 连接器类的名称。 |
3 | 为此连接器创建的最大任务数量。 |
4 | 连接器用来连接到它写入的 sink 数据库的 JDBC URL。 |
5 | 用于身份验证的数据库用户的名称。 |
6 | 用于身份验证的数据库用户的密码。 |
7 | 连接器使用的 insert.mode。 |
8 | 启用删除数据库中的记录。如需更多信息,请参阅 delete.enabled 配置属性。 |
9 | 指定用于解析主键列的方法。如需更多信息,请参阅 primary.key.mode 配置属性。 |
10 | 启用连接器来改变目标数据库的模式。如需更多信息,请参阅 schema.evolution 配置属性。 |
11 | 指定编写 temporal 字段类型时使用的时区。 |
12 | 要使用的主题列表,用逗号分开。 |
有关您可以为 Debezium JDBC 连接器设置的配置属性的完整列表,请参阅 JDBC 连接器属性。
您可以使用 POST
命令将此配置发送到正在运行的 Kafka Connect 服务。该服务记录了配置并启动执行以下操作的接收器连接器任务:
- 连接到数据库。
- 使用订阅的 Kafka 主题的事件。
- 将事件写入配置的数据库。
3.1.4. Debezium JDBC 连接器配置属性的描述
Debezium JDBC sink 连接器有几个配置属性,可用于实现满足您需求的连接器行为。许多属性具有默认值。有关属性的信息按如下方式进行组织:
属性 | 默认 | 描述 |
---|---|---|
没有默认值 | 连接器的唯一名称。如果您在注册连接器时尝试重复使用此名称,失败结果。所有 Kafka Connect 连接器都需要此属性。 | |
没有默认值 |
连接器的 Java 类的名称。对于 Debezium JDBC 连接器,请指定 | |
1 | 用于此连接器的最大任务数量。 | |
没有默认值 |
要使用的主题列表,用逗号分开。不要将此属性与 | |
没有默认值 |
指定要使用的主题的正则表达式。在内部,正则表达式被编译到 |
属性 | 默认 | 描述 |
---|---|---|
| 要使用的连接供应商实施。 | |
没有默认值 | 用于连接到数据库的 JDBC 连接 URL。 | |
没有默认值 | 连接器用来连接到数据库的数据库用户帐户名称。 | |
没有默认值 | 连接器用来连接到数据库的密码。 | |
| 指定池中的最小连接数。 | |
| 指定池维护的最大并发连接数。 | |
| 指定当连接池超过其最大大小时连接器尝试获取的连接数量。 | |
| 指定在丢弃未使用连接前保留了未使用的连接的秒数。 |
属性 | 默认 | 描述 |
---|---|---|
| 指定插入 JDBC 临时值时使用的时区。 | |
|
指定连接器处理 | |
|
指定连接器是否处理 注意
虽然 从版本 9.7 开始,Db2 中提供了对
为确保 JDBC 连接器可以处理从 Db2 收到的
提交上述查询的用户帐户需要截断表中的 | |
| 指定用于将事件插入到数据库中的策略。可用的选项如下:
| |
| 指定连接器如何解析来自事件的主键列。
| |
没有默认值 |
要从中派生主键列或以逗号分隔的字段列表。 | |
| 指定生成的 SQL 语句是否使用引号来分隔符表和列名称。详情请查看 第 3.1.1.9 节 “指定用于定义目标表和列名称字母大小写的选项” 部分。 | |
| 指定连接器如何演变目的地表模式。如需更多信息,请参阅 第 3.1.1.8 节 “Debezium JDBC 连接器的 schema evolution 模式”。可用的选项如下:
| |
|
指定连接器用来构造目标表名称的字符串模式。 | |
|
指定安装 PostgreSQL PostGIS 扩展的架构名称。默认为 | |
|
指定连接器是否在 | |
| 指定尝试一起将批处理到目标表中的记录数量。 注意
请注意,如果您在 Connect worker 属性中将 | |
| 指定是否启用 Debezium JDBC 连接器的缩减缓冲。 选择以下设置之一:
要在启用缩减缓冲区时优化 PostgreSQL sink 数据库中的查询处理,还必须通过将 | |
空字符串 |
一个可选的、以逗号分隔的字段名称列表,与要从 change 事件值包含的字段的完全限定域名匹配。字段的完全限定域名格式为 | |
空字符串 |
一个可选的、以逗号分隔的字段名称列表,与要从更改事件值中排除的字段名称匹配。字段的完全限定域名格式为 |
属性 | 默认 | 描述 |
---|---|---|
|
指定 | |
|
指定
|
JDBC 连接器 hibernate
透传属性
Kafka Connect 支持 passthrough 配置,允许您通过直接从连接器配置传递某些属性来修改底层系统的行为。默认情况下,一些 Hibernate 属性通过 JDBC 连接器 连接属性(如 connection
.url,connection.username
, 和 connection.pool unix_size
)和通过连接器的 运行时属性 (如 database.time_zone
,quote.identifiers
)公开。
如果要自定义其他 Hibernate 行为,您可以通过在连接器配置中添加使用 hibernate
decisions 命名空间的属性来利用 passthrough 机制。例如,要帮助 Hibernate 解析目标数据库的类型和版本,您可以添加 hibernate.dialect
属性并将其设置为数据库的完全限定域名,例如 org.hibernate.dialect.MariaDBDialect
.
3.1.5. JDBC 连接器常见问题
ExtractNewRecordState
单一消息转换 是否需要?- 否,实际上这是 Debezium JDBC 连接器与其他实施的不同因素之一。虽然连接器能够像其竞争者一样消耗扁平化的事件,但它还可使 Debezium 的复杂更改事件结构原生进行,而无需任何特定类型的转换。
- 如果列的类型已更改,或者列被重命名或丢弃,这是否由架构演进处理?
- 否,Debezium JDBC 连接器不会对现有列进行任何更改。连接器支持的 schema 演进非常基本的。它只是将事件结构中的字段与表的列列表进行比较,然后添加表中尚未定义为列的任何字段。如果列的类型或默认值更改,连接器不会在目标数据库中调整它们。如果重新命名了列,旧的列保留为原样,连接器会将带有新名称的列附加到表中;但是,在旧列中带有数据的现有行保持不变。应手动处理这些类型的架构更改。
- 如果列的类型没有解析为我想要的类型,如何强制映射到不同的数据类型?
- Debezium JDBC 连接器使用复杂的类型系统来解析列的数据类型。有关此类型系统如何将特定字段的 schema 定义解析为 JDBC 类型的详情,请参考 第 3.1.1.4 节 “Debezium JDBC 连接器数据和列类型映射的描述” 部分。如果要应用不同的数据类型映射,请手动定义表来显式获取首选列类型。
- 如何在不更改 Kafka 主题名称的情况下为表名称指定前缀或后缀?
-
要在目标表名称中添加前缀或后缀,请调整 table.name.format 连接器配置属性以应用您想要的前缀或后缀。例如,若要使用
jdbc_
前缀所有表名称,请使用值jdbc_${topic}
指定table.name.format
配置属性。如果连接器订阅了名为orders
的主题,则生成的表将创建为jdbc_orders
。 - 为什么一些列自动加引号,即使标识符引用未启用?
-
在某些情况下,特定的列或表名称可能会被明确加引号,即使未启用
quote.identifiers
。当列或表名称以 开头或使用其他被视为非法语法的特定约定时,这通常是必要的。例如,当 primary.key.mode 设为kafka
时,如果列的名称加引号,一些数据库只允许列名称以下划线开头。引用行为是特定于不同的数据库,不同的数据库类型不同。