第 8 章 开发 Debezium 自定义数据类型转换器


重要

使用自定义开发的转换器只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅 https://access.redhat.com/support/offerings/techpreview

Debezium 更改事件记录中的每个字段代表源表或数据收集中的字段或列。当连接器向 Kafka 发送更改事件记录时,它会将源中每个字段的数据类型转换为 Kafka Connect 模式类型。列值应转换为与 destination 字段的 schema 类型匹配。对于每个连接器,默认映射指定连接器如何转换每种数据类型。这些默认映射在每种连接器的数据类型文档中描述。

虽然默认映射通常足够,但对于某些应用程序,您可能希望应用备用映射。例如,如果默认映射使用从 UNIX epoch 开始的毫秒导出列,则可能需要自定义映射,但下游应用只能使用列值作为格式化字符串。您可以通过开发和部署自定义转换器来自定义数据类型映射。您可以将自定义转换器配置为对特定类型的所有列执行操作,或者您可以缩小其范围,以便仅应用到特定的表列。converter 功能截获与指定条件匹配的任何列的数据类型转换请求,然后执行指定的转换。转换程序会忽略与指定条件不匹配的列。

自定义转换器是实现 Debezium 服务供应商接口(SPI)的 Java 类。您可以通过在连接器配置中设置 converters 属性来启用和配置自定义转换器。converters 属性指定连接器可用的转换器,可以包含进一步修改转换行为的子属性。

启动连接器后,连接器配置中启用的转换器会实例化,并添加到 registry 中。registry 将每个转换器与相关列或字段相关联。每当 Debezium 处理新的更改事件时,它会调用配置的转换器来转换它注册的列或字段。

注意

以下说明只适用于 Debezium 关系数据库源连接器。您不能使用此信息为 Debezium MongoDB 连接器或 Debezium JDBC sink 连接器创建自定义转换器。

8.1. 创建 Debezium 自定义数据类型转换器

以下示例显示了实现接口 io.debezium.spi.converter.CustomConverter 的 Java 类的转换器实现:

public interface CustomConverter<S, F extends ConvertedField> {

    @FunctionalInterface
    interface Converter {  
1

        Object convert(Object input);
    }

    public interface ConverterRegistration<S> { 
2

        void register(S fieldSchema, Converter converter); 
3

    }

    void configure(Properties props);

    void converterFor(F field, ConverterRegistration<S> registration); 
4

}
Copy to Clipboard Toggle word wrap
Expand
表 8.1. 用于实现 io.debezium.spi.converter.CustomConverter 接口的 Java 转换类中的字段描述
描述

1

将数据从一个类型转换为另一个类型。

2

注册转换器的回调。

3

为当前字段注册给定模式和转换器。对于同一字段,不应多次调用一次。

4

注册自定义值和模式转换器,以用于特定字段。

自定义转换器方法

CustomConverter 接口的实现必须包括以下方法:

configure()

将连接器配置中指定的属性传递给转换器实例。configure 方法在连接器初始化时运行。您可以将转换器与多个连接器一起使用,并根据连接器的属性设置修改其行为。
configure 方法接受以下参数:

props
包含传递给转换器实例的属性。每个属性指定转换特定类型的列值的格式。
converterFor()

注册转换器以处理数据源中的特定列或字段。Debezium 调用 converterFor () 方法,以提示转换器来调用转换过程。converterFor 方法为每个列运行一次。
这个方法接受以下参数:

field
传递所处理字段或列的元数据的对象。列元数据可以包含列或字段的名称、表或集合的名称、数据类型、大小等。
注册
一个类型为 io.debezium.spi.converter.CustomConverter.ConverterRegistration 的对象,提供目标 schema 定义和转换列数据的代码。当源列与转换器应处理的类型匹配时,转换器会调用 registration 参数。调用 register 方法来定义架构中的每个列的转换器。模式通过 Kafka Connect SchemaBuilder API 来表示。

8.1.1. Debezium 自定义转换器示例

以下示例实现一个简单的转换器,它执行以下操作:

  • 运行 configure 方法,它根据连接器配置中指定的 schema.name 属性的值配置转换器。转换程序配置特定于每个实例。
  • 运行 converterFor 方法,它将注册转换器以处理数据类型设置为 isbn 的源列中的值。

    • 根据为 schema.name 属性指定的值,标识目标 STRING 模式。
    • 将源列中的 ISBN 数据转换为 String 值。

例 8.1. 简单的自定义转换器

public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private String isbnSchemaName;

    @Override
    public void configure(Properties props) {
        isbnSchemaName = props.getProperty("schema.name");
    }

    @Override
    public void converterFor(RelationalColumn column,
            ConverterRegistration<SchemaBuilder> registration) {

        if ("isbn".equals(column.typeName())) {
            registration.register(SchemaBuilder.string().name(isbnSchemaName)), x -> x.toString());
        }
    }
}
Copy to Clipboard Toggle word wrap

8.1.2. Debezium 和 Kafka Connect API 模块依赖项

自定义转换器 Java 项目编译了 Debezium API 和 Kafka Connect API 库模块的依赖项。这些编译依赖关系必须包含在项目的 pom.xml 中,如下例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version> 
1

</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>${version.kafka}</version> 
2

</dependency>
Copy to Clipboard Toggle word wrap
Expand
表 8.2. 在 pom.xml中编译依赖项版本的描述
描述

1

${version.debezium} 代表 Debezium 连接器的版本。

2

${version.kafka} 代表您环境中的 Apache Kafka 版本。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat