第14章 Debezium カスタムデータ型コンバーターの開発


重要

カスタム開発コンバーターの使用は、テクノロジープレビュー機能のみです。テクノロジープレビュー機能は、Red Hat 製品サポートのサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではない場合があります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

Debezium 変更イベントレコードの各フィールドは、ソーステーブルまたはデータコレクションのフィールドまたは列を表します。コネクターが変更イベントレコードを Kafka に送信すると、ソースの各フィールドのデータ型が Kafka Connect スキーマ型に変換されます。列の値も同様に、変換先フィールドのスキーマタイプに一致するように変換されます。コネクターごとに、コネクターが各データ型を変換する方法をデフォルトのマッピングで指定します。これらのデフォルトマッピングは、各コネクターのデータ型のドキュメントで説明されています。

通常、デフォルトのマッピングで十分ですが、一部のアプリケーションでは別のマッピングを適用する場合があります。たとえば、デフォルトのマッピングが UNIX エポック以降のミリ秒の形式を使用して列をエクスポートする場合は、カスタムマッピングが必要になることがありますが、ダウンストリームアプリケーションは、フォーマットされた文字列としてしか、列の値を消費できません。カスタムコンバーターを開発およびデプロイして、データ型マッピングをカスタマイズします。特定のタイプのすべての列に作用するようにカスタムコンバーターを設定するか、特定のテーブル列のみに適用されるように範囲を狭めてください。コンバーター関数は、指定された基準に一致する列のデータ型変換リクエストをインターセプトし、指定の変換を実行します。コンバーターは、指定された基準に一致しない列を無視します。

カスタムコンバーターは、Debezium サービスプロバイダーインターフェイス (SPI) を実装する Java クラスです。コネクター設定で converters プロパティーを設定して、カスタムコンバーターを有効化および設定します。converters プロパティーは、コネクターで使用できるコンバータを指定し、変換動作をさらに変更するサブプロパティーを含めることができます。

コネクターを開始すると、コネクター設定で有効になっているコンバーターがインスタンス化され、レジストリーに追加されます。レジストリーは、各コンバーターと、そのコンバーターの列またはフィールドを関連付けて処理します。Debezium が新しい変更イベントを処理するときはいつでも、設定されたコンバーターを呼び出して、登録されている列またはフィールドを変換します。

14.1. Debezium カスタムデータ型コンバーターの作成

以下の例は、インターフェイス io.debezium.spi.converter.CustomConverter を実装する Java クラスのコンバーター実装を示しています。

Copy to Clipboard Toggle word wrap
public interface CustomConverter<S, F extends ConvertedField> {

    @FunctionalInterface
    interface Converter {  
1

        Object convert(Object input);
    }

    public interface ConverterRegistration<S> { 
2

        void register(S fieldSchema, Converter converter); 
3

    }

    void configure(Properties props);

    void converterFor(F field, ConverterRegistration<S> registration); 
4

}
表14.1 io.debezium.spi.converter.CustomConverter インターフェイスを実装する Java コンバータークラスのフィールドの説明
項目説明

1

データをある型から別の型に変換します。

2

コンバーターを登録するためのコールバック。

3

現在のフィールドの指定のスキーマとコンバーターを登録します。同じフィールドに対して複数回呼び出すことはできません。

4

特定のフィールドで使用するカスタマイズされた値とスキーマコンバーターを登録します。

カスタムコンバーターメソッド

CustomConverter インターフェイスの実装には、以下のメソッドが含まれている必要があります。

configure()

コネクター設定に指定されたプロパティーをコンバーターインスタンスに渡します。configure メソッドは、コネクターが初期化されると実行されます。複数のコネクターでコンバーターを使用し、コネクターのプロパティー設定に基づいてその動作を変更できます。
configure メソッドは以下の引数を受け入れます。

props
コンバーターインスタンスに渡すプロパティーが含まれています。各プロパティーは、特定のタイプの列値を変換する形式を指定します。
converterFor()

コンバーターを登録して、データソースの特定の列またはフィールドを処理します。Debezium は、converterFor() メソッドを呼び出して、コンバーターに変換の registration を呼び出すように促します。converterFor メソッドは、各列に対して 1 回実行されます。
メソッドは以下の引数を受け入れます。

field
処理されるフィールドまたは列に関するメタデータを渡すオブジェクト。列のメタデータには、列またはフィールドの名前、テーブルまたはコレクションの名前、データ型、サイズなどを含めることができます。
登録
対象のスキーマ定義と列データを変換するコードを提供する io.debezium.spi.converter.CustomConverter.ConverterRegistration 型のオブジェクト。ソース列が、コンバーターが処理する型と一致する場合に、コンバーターは registration パラメーターを呼び出します。register メソッドを呼び出して、スキーマ内の各列のコンバーターを定義します。スキーマは、Kafka Connect SchemaBuilder API を使用して表されます。

14.1.1. Debezium カスタムコンバーターの例

以下の例は、以下の操作を実行する単純なコンバーターを実装します。

  • コネクター設定で指定された schema.name プロパティーの値に基づいてコンバーターを設定する configure メソッドを実行します。コンバーターの設定は、各インスタンスに固有です。
  • データ型が isbn に設定されているソース列の値を処理するコンバーターを登録する converterFor メソッドを実行します。

    • schema.name プロパティーに指定された値に基づいてターゲット STRING スキーマを識別します。
    • ソース列の ISBN データを String 値に変換します。

例14.1 簡単なカスタムコンバーター

Copy to Clipboard Toggle word wrap
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private SchemaBuilder isbnSchema;

    @Override
    public void configure(Properties props) {
        isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
    }

    @Override
    public void converterFor(RelationalColumn column,
            ConverterRegistration<SchemaBuilder> registration) {

        if ("isbn".equals(column.typeName())) {
            registration.register(isbnSchema, x -> x.toString());
        }
    }
}

14.1.2. Debezium および Kafka Connect API モジュールの依存関係

カスタムコンバーター Java プロジェクトには、Debezium API および Kafka Connect API ライブラリーモジュールに対するコンパイル依存関係があります。以下の例のように、これらのコンパイル依存関係は、プロジェクトの pom.xml に含める必要があります。

Copy to Clipboard Toggle word wrap
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version> 
1

</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>${version.kafka}</version> 
2

</dependency>
表14.2 pom.xml 内のコンパイル依存関係バージョンの説明
項目説明

1

${version.debezium} は Debezium コネクターのバージョンを表します。

2

${version.kafka} は、環境内の Apache Kafka のバージョンを表します。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat, Inc.