第 13 章 开发 Debezium 自定义数据类型转换器
使用自定义开发的转换器只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅 https://access.redhat.com/support/offerings/techpreview。
Debezium 更改事件记录中的每个字段代表源表或数据收集中的字段或列。当连接器向 Kafka 发送更改事件记录时,它会将源中的每个字段的数据类型转换为 Kafka Connect 模式类型。列值同样转换为与目的地字段的 schema 类型匹配。对于每个连接器,默认映射指定连接器如何转换每个数据类型。这些默认映射在每种连接器的数据类型文档中描述。
虽然默认映射通常就足够了,但对于某些应用程序,您可能需要应用备用映射。例如,如果默认映射导出自 UNIX epoch 以来的毫秒格式,您可能需要自定义映射,但下游应用只能以格式化的字符串形式消耗列值。您可以通过开发和部署自定义转换器来自定义数据类型映射。您可以将自定义转换器配置为对某个类型的所有列执行操作,或者您可以缩小其范围,使其仅适用于特定的表列。转换程序功能会截获与指定条件匹配的列的数据类型转换请求,然后执行指定的转换。转换程序会忽略与指定条件不匹配的列。
自定义转换器是实现 Debezium 服务供应商接口(SPI)的 Java 类。您可以通过在连接器配置中设置 converters
属性来启用和配置自定义转换器。converters
属性指定可用于连接器的转换器,可以包含进一步修改转换行为的子属性。
启动连接器后,连接器配置中启用的转换器将被实例化并添加到 registry 中。registry 将每个转换器与要处理的列或字段相关联。每当 Debezium 处理一个新的更改事件时,它会调用配置的转换器来转换其注册的列或字段。
以下说明仅适用于 Debezium 关系数据库连接器。您不能使用此信息为 Debezium MongoDB 连接器创建自定义转换器。
13.1. 创建 Debezium 自定义数据类型转换器
以下示例显示了实现接口 io.debezium.spi.converter.CustomConverter
的 Java 类的转换器实现:
public interface CustomConverter<S, F extends ConvertedField> { @FunctionalInterface interface Converter { 1 Object convert(Object input); } public interface ConverterRegistration<S> { 2 void register(S fieldSchema, Converter converter); 3 } void configure(Properties props); void converterFor(F field, ConverterRegistration<S> registration); 4 }
项 | 描述 |
---|---|
1 | 将一个类型的数据转换为另一个类型。 |
2 | 注册转换器的回调。 |
3 | 为当前字段注册给定模式和转换器。对于同一字段,不应多次调用。 |
4 | 注册自定义值和模式转换器,以用于特定字段。 |
自定义转换器方法
CustomConverter
接口的实现必须包括以下方法:
configure()
将连接器配置中指定的属性传递给转换器实例。
configure
方法在连接器初始化时运行。您可以使用带有多个连接器的转换器,并根据连接器的属性设置修改其行为。configure
方法接受以下参数:Proprops
- 包含传递到转换器实例的属性。每个属性指定转换特定类型的列值的格式。
converterFor()
注册转换器以处理数据源中的特定列或字段。Debezium 调用
converterFor ()
方法,以提示输入转换器来调用转换的注册
。converterFor
方法为每个列运行一次。
这个方法接受以下参数:field
- 一个对象,用于传递有关所处理字段或列的元数据。列元数据可以包括列或字段的名称、表或集合的名称、数据类型、大小等。
注册
-
一个类型为
io.debezium.spi.converter.CustomConverterRegistration
的对象,它提供目标模式定义以及转换列数据的代码。当源列与应处理的类型匹配时,转换器调用注册
参数。调用register
方法,以定义 schema 中各个列的转换器。模式使用 Kafka ConnectSchemaBuilder
API 表示。
13.1.1. Debezium 自定义转换器示例
以下示例实现了一个简单的转换器,它执行以下操作:
-
运行
configure
方法,它根据连接器配置中指定的schema.name
属性的值配置转换器。转换程序配置特定于每个实例。 运行
converterFor
方法,它会在 data type 设置为isbn
的源列中注册转换器来处理值。-
根据为
schema.name
属性指定的值标识目标STRING
模式。 -
将源列中的 ISBN 数据转换为
String
值。
-
根据为
例 13.1. 简单的自定义转换器
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { private SchemaBuilder isbnSchema; @Override public void configure(Properties props) { isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name")); } @Override public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { if ("isbn".equals(column.typeName())) { registration.register(isbnSchema, x -> x.toString()); } } }
13.1.2. Debezium 和 Kafka Connect API 模块依赖项
自定义转换器 Java 项目对 Debezium API 和 Kafka Connect API 库模块有编译依赖项。这些编译依赖项必须包含在项目的 pom.xml
中,如下例所示:
<dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>${version.debezium}</version> 1 </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>${version.kafka}</version> 2 </dependency>
项 | 描述 |
---|---|
1 |
|
2 |
|