8.2. 在 Debezium 连接器中使用自定义转换器
自定义转换器对源表中的特定列或列类型执行操作,以指定如何将源中的数据类型转换为 Kafka Connect 模式类型。要使用连接器的自定义转换器,您需要将 converter JAR 文件与连接器文件部署,然后将连接器配置为使用转换器。
自定义转换器旨在修改 Debezium 关系数据库源连接器发出的消息。您无法将 Debezium MongoDB 连接器或 Debezium JDBC sink 连接器配置为使用自定义转换器。
8.2.1. 部署自定义转换器 复制链接链接已复制到粘贴板!
先决条件
- 您有一个自定义转换器 Java 程序。
流程
-
要将自定义转换器与 Debezium 连接器搭配使用,请将 Java 项目导出到 JAR 文件,并为您要使用它的每个 Debezium 连接器将文件复制到包含 JAR 文件的目录中。
例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect
)的子目录中,每个连接器 JAR 都有自己的子目录(/kafka/connect/debezium-connector-db2
、/kafka/connect/debezium-connector-mysql
等等)。要将转换器与连接器一起使用,请将 converter JAR 文件添加到连接器的子目录中。
要将转换器与多个连接器搭配使用,您必须将转换器 JAR 文件的副本放在每个连接器子目录中。
8.2.2. 配置连接器以使用自定义转换器 复制链接链接已复制到粘贴板!
要启用连接器使用自定义转换器,请添加属性来指定 Debezium 源连接器的配置的名称和类。您无法将 Debezium JDBC sink 连接器配置为使用自定义转换器。如果转换器需要更多信息来自定义特定数据类型的格式,您可以定义其他配置选项以提供该信息。
先决条件
- 您 创建并部署了 自定义转换器。
- 部署了 Debezium 关系数据库源连接器。
流程
通过在连接器配置中添加以下强制属性,为连接器实例启用转换器:
converters: <converterSymbolicName> <converterSymbolicName>.type: <fullyQualifiedConverterClassName>
converters: <converterSymbolicName>
1 <converterSymbolicName>.type: <fullyQualifiedConverterClassName>
2 Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表 8.3. 启用转换器的连接器配置属性描述 项 描述 1
必需的
converters
属性枚举用于连接器的转换器实例的符号链接名称列表。此属性列出的值作为您为转换器指定的其他属性名称的前缀。2
必需的 &
lt;converterSymbolicName > .type
属性指定实现转换器的类的名称。例如,对于较早的 自定义转换器示例,您可以在连接器配置中添加以下属性:
converters: isbn isbn.type: io.debezium.test.IsbnConverter
converters: isbn isbn.type: io.debezium.test.IsbnConverter
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 要将其他属性与自定义转换器关联,请为属性名称加引号符号名称加前缀,后跟句点(
.
)。符号链接名称是一个标签,指定为converters
属性的值。例如,要为前面的isbn
converter 添加属性,以指定传递给转换器代码中configure
方法的schema.name
,请添加以下属性:isbn.schema.name: io.debezium.postgresql.type.Isbn
isbn.schema.name: io.debezium.postgresql.type.Isbn
Copy to Clipboard Copied! Toggle word wrap Toggle overflow