第14章 Debezium カスタムデータ型コンバーターの開発
カスタム開発コンバーターの使用は、テクノロジープレビュー機能のみです。テクノロジープレビュー機能は、Red Hat 製品サポートのサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではない場合があります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
Debezium 変更イベントレコードの各フィールドは、ソーステーブルまたはデータコレクションのフィールドまたは列を表します。コネクターが変更イベントレコードを Kafka に送信すると、ソースの各フィールドのデータ型が Kafka Connect スキーマ型に変換されます。列の値も同様に、変換先フィールドのスキーマタイプに一致するように変換されます。コネクターごとに、コネクターが各データ型を変換する方法をデフォルトのマッピングで指定します。これらのデフォルトマッピングは、各コネクターのデータ型のドキュメントで説明されています。
通常、デフォルトのマッピングで十分ですが、一部のアプリケーションでは別のマッピングを適用する場合があります。たとえば、デフォルトのマッピングが UNIX エポック以降のミリ秒の形式を使用して列をエクスポートする場合は、カスタムマッピングが必要になることがありますが、ダウンストリームアプリケーションは、フォーマットされた文字列としてしか、列の値を消費できません。カスタムコンバーターを開発およびデプロイして、データ型マッピングをカスタマイズします。特定のタイプのすべての列に作用するようにカスタムコンバーターを設定するか、特定のテーブル列のみに適用されるように範囲を狭めてください。コンバーター関数は、指定された基準に一致する列のデータ型変換リクエストをインターセプトし、指定の変換を実行します。コンバーターは、指定された基準に一致しない列を無視します。
カスタムコンバーターは、Debezium サービスプロバイダーインターフェイス (SPI) を実装する Java クラスです。コネクター設定で converters
プロパティーを設定して、カスタムコンバーターを有効化および設定します。converters
プロパティーは、コネクターで使用できるコンバータを指定し、変換動作をさらに変更するサブプロパティーを含めることができます。
コネクターを開始すると、コネクター設定で有効になっているコンバーターがインスタンス化され、レジストリーに追加されます。レジストリーは、各コンバーターと、そのコンバーターの列またはフィールドを関連付けて処理します。Debezium が新しい変更イベントを処理するときはいつでも、設定されたコンバーターを呼び出して、登録されている列またはフィールドを変換します。
14.1. Debezium カスタムデータ型コンバーターの作成
以下の例は、インターフェイス io.debezium.spi.converter.CustomConverter
を実装する Java クラスのコンバーター実装を示しています。
public interface CustomConverter<S, F extends ConvertedField> { @FunctionalInterface interface Converter { Object convert(Object input); } public interface ConverterRegistration<S> { void register(S fieldSchema, Converter converter); } void configure(Properties props); void converterFor(F field, ConverterRegistration<S> registration); }
public interface CustomConverter<S, F extends ConvertedField> {
@FunctionalInterface
interface Converter {
Object convert(Object input);
}
public interface ConverterRegistration<S> {
void register(S fieldSchema, Converter converter);
}
void configure(Properties props);
void converterFor(F field, ConverterRegistration<S> registration);
}
項目 | 説明 |
---|---|
1 | データをある型から別の型に変換します。 |
2 | コンバーターを登録するためのコールバック。 |
3 | 現在のフィールドの指定のスキーマとコンバーターを登録します。同じフィールドに対して複数回呼び出すことはできません。 |
4 | 特定のフィールドで使用するカスタマイズされた値とスキーマコンバーターを登録します。 |
カスタムコンバーターメソッド
CustomConverter
インターフェイスの実装には、以下のメソッドが含まれている必要があります。
configure()
コネクター設定に指定されたプロパティーをコンバーターインスタンスに渡します。
configure
メソッドは、コネクターが初期化されると実行されます。複数のコネクターでコンバーターを使用し、コネクターのプロパティー設定に基づいてその動作を変更できます。configure
メソッドは以下の引数を受け入れます。props
- コンバーターインスタンスに渡すプロパティーが含まれています。各プロパティーは、特定のタイプの列値を変換する形式を指定します。
converterFor()
コンバーターを登録して、データソースの特定の列またはフィールドを処理します。Debezium は、
converterFor()
メソッドを呼び出して、コンバーターに変換のregistration
を呼び出すように促します。converterFor
メソッドは、各列に対して 1 回実行されます。
メソッドは以下の引数を受け入れます。field
- 処理されるフィールドまたは列に関するメタデータを渡すオブジェクト。列のメタデータには、列またはフィールドの名前、テーブルまたはコレクションの名前、データ型、サイズなどを含めることができます。
登録
-
対象のスキーマ定義と列データを変換するコードを提供する
io.debezium.spi.converter.CustomConverter.ConverterRegistration
型のオブジェクト。ソース列が、コンバーターが処理する型と一致する場合に、コンバーターはregistration
パラメーターを呼び出します。register
メソッドを呼び出して、スキーマ内の各列のコンバーターを定義します。スキーマは、Kafka ConnectSchemaBuilder
API を使用して表されます。
14.1.1. Debezium カスタムコンバーターの例
以下の例は、以下の操作を実行する単純なコンバーターを実装します。
-
コネクター設定で指定された
schema.name
プロパティーの値に基づいてコンバーターを設定するconfigure
メソッドを実行します。コンバーターの設定は、各インスタンスに固有です。 データ型が
isbn
に設定されているソース列の値を処理するコンバーターを登録するconverterFor
メソッドを実行します。-
schema.name
プロパティーに指定された値に基づいてターゲットSTRING
スキーマを識別します。 -
ソース列の ISBN データを
String
値に変換します。
-
例14.1 簡単なカスタムコンバーター
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { private SchemaBuilder isbnSchema; @Override public void configure(Properties props) { isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name")); } @Override public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { if ("isbn".equals(column.typeName())) { registration.register(isbnSchema, x -> x.toString()); } } }
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private SchemaBuilder isbnSchema;
@Override
public void configure(Properties props) {
isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
}
@Override
public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
if ("isbn".equals(column.typeName())) {
registration.register(isbnSchema, x -> x.toString());
}
}
}
14.1.2. Debezium および Kafka Connect API モジュールの依存関係
カスタムコンバーター Java プロジェクトには、Debezium API および Kafka Connect API ライブラリーモジュールに対するコンパイル依存関係があります。以下の例のように、これらのコンパイル依存関係は、プロジェクトの pom.xml
に含める必要があります。
<dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>${version.kafka}</version> </dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${version.kafka}</version>
</dependency>
項目 | 説明 |
---|---|
1 |
|
2 |
|