搜索

第 13 章 开发 Debezium 自定义数据类型转换器

download PDF
重要

使用自定义开发的转换器只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅 https://access.redhat.com/support/offerings/techpreview

Debezium 更改事件记录中的每个字段代表源表或数据收集中的字段或列。当连接器向 Kafka 发送更改事件记录时,它会将源中的每个字段的数据类型转换为 Kafka Connect 模式类型。列值同样转换为与目的地字段的 schema 类型匹配。对于每个连接器,默认映射指定连接器如何转换每个数据类型。这些默认映射在每种连接器的数据类型文档中描述。

虽然默认映射通常就足够了,但对于某些应用程序,您可能需要应用备用映射。例如,如果默认映射导出自 UNIX epoch 以来的毫秒格式,您可能需要自定义映射,但下游应用只能以格式化的字符串形式消耗列值。您可以通过开发和部署自定义转换器来自定义数据类型映射。您可以将自定义转换器配置为对某个类型的所有列执行操作,或者您可以缩小其范围,使其仅适用于特定的表列。转换程序功能会截获与指定条件匹配的列的数据类型转换请求,然后执行指定的转换。转换程序会忽略与指定条件不匹配的列。

自定义转换器是实现 Debezium 服务供应商接口(SPI)的 Java 类。您可以通过在连接器配置中设置 converters 属性来启用和配置自定义转换器。converters 属性指定可用于连接器的转换器,可以包含进一步修改转换行为的子属性。

启动连接器后,连接器配置中启用的转换器将被实例化并添加到 registry 中。registry 将每个转换器与要处理的列或字段相关联。每当 Debezium 处理一个新的更改事件时,它会调用配置的转换器来转换其注册的列或字段。

注意

以下说明仅适用于 Debezium 关系数据库连接器。您不能使用此信息为 Debezium MongoDB 连接器创建自定义转换器。

13.1. 创建 Debezium 自定义数据类型转换器

以下示例显示了实现接口 io.debezium.spi.converter.CustomConverter 的 Java 类的转换器实现:

public interface CustomConverter<S, F extends ConvertedField> {

    @FunctionalInterface
    interface Converter {  1
        Object convert(Object input);
    }

    public interface ConverterRegistration<S> { 2
        void register(S fieldSchema, Converter converter); 3
    }

    void configure(Properties props);

    void converterFor(F field, ConverterRegistration<S> registration); 4
}
表 13.1. 实现 io.debezium.spi.converter.CustomConverter 接口的 Java 转换类的描述
描述

1

将一个类型的数据转换为另一个类型。

2

注册转换器的回调。

3

为当前字段注册给定模式和转换器。对于同一字段,不应多次调用。

4

注册自定义值和模式转换器,以用于特定字段。

自定义转换器方法

CustomConverter 接口的实现必须包括以下方法:

configure()

将连接器配置中指定的属性传递给转换器实例。configure 方法在连接器初始化时运行。您可以使用带有多个连接器的转换器,并根据连接器的属性设置修改其行为。
configure 方法接受以下参数:

Proprops
包含传递到转换器实例的属性。每个属性指定转换特定类型的列值的格式。
converterFor()

注册转换器以处理数据源中的特定列或字段。Debezium 调用 converterFor () 方法,以提示输入转换器 来调用转换的注册converterFor 方法为每个列运行一次。
这个方法接受以下参数:

field
一个对象,用于传递有关所处理字段或列的元数据。列元数据可以包括列或字段的名称、表或集合的名称、数据类型、大小等。
注册
一个类型为 io.debezium.spi.converter.CustomConverterRegistration 的对象,它提供目标模式定义以及转换列数据的代码。当源列与应处理的类型匹配时,转换器调用 注册 参数。调用 register 方法,以定义 schema 中各个列的转换器。模式使用 Kafka Connect SchemaBuilder API 表示。

13.1.1. Debezium 自定义转换器示例

以下示例实现了一个简单的转换器,它执行以下操作:

  • 运行 configure 方法,它根据连接器配置中指定的 schema.name 属性的值配置转换器。转换程序配置特定于每个实例。
  • 运行 converterFor 方法,它会在 data type 设置为 isbn 的源列中注册转换器来处理值。

    • 根据为 schema.name 属性指定的值标识目标 STRING 模式。
    • 将源列中的 ISBN 数据转换为 String 值。

例 13.1. 简单的自定义转换器

public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private SchemaBuilder isbnSchema;

    @Override
    public void configure(Properties props) {
        isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
    }

    @Override
    public void converterFor(RelationalColumn column,
            ConverterRegistration<SchemaBuilder> registration) {

        if ("isbn".equals(column.typeName())) {
            registration.register(isbnSchema, x -> x.toString());
        }
    }
}

13.1.2. Debezium 和 Kafka Connect API 模块依赖项

自定义转换器 Java 项目对 Debezium API 和 Kafka Connect API 库模块有编译依赖项。这些编译依赖项必须包含在项目的 pom.xml 中,如下例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version> 1
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>${version.kafka}</version> 2
</dependency>
表 13.2. 在 pom.xml中编译依赖项版本的描述
描述

1

${version.debezium} 代表 Debezium 连接器的版本。

2

${version.kafka} 代表您环境中的 Apache Kafka 版本。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.