検索

第9章 ポストプロセッサーを使用したイベントメッセージの変更

download PDF

ポストプロセッサーは、単一メッセージ変換 (SMT) が実行する変更のような、メッセージごとの軽量の変更を実行します。ただし、Debezium はイベントチェーン内で変換よりも早い段階でポストプロセッサーを呼び出すため、ポストプロセッサーはメッセージがメッセージングランタイムに渡される前にメッセージを処理できます。ポストプロセッサーは Debezium コンテキスト内からメッセージを処理できるため、変換よりも効率的にイベントペイロードを変更できます。

変換によってメッセージを変更するには、メッセージのイミュータブルな ConnectRecord、より正確には SourceRecord を再作成する必要があります。対照的に、ポストプロセッサーは Debezium スコープ内で動作するため、メッセージのイベントペイロード内のミュータブルな Struct タイプを操作して、SourceRecord の構築前にペイロードを変更できます。Debezium との緊密な統合により、ポストプロセッサーは、データベース接続に関する Debezium メタデータ、リレーショナルスキーマモデルなど、Debezium の内部情報にアクセスできるようになります。当該アクセスにより、このような内部情報に依存するタスクを実行する際の効率が向上します。たとえば、Reselect columns ポストプロセッサーは、データベースを自動的に再クエリーしてレコードを再選択し、元の変更イベントから除外された列を取得できます。

Debezium は次のポストプロセッサーを提供します。

  • Reselect columns:: 変更イベントによって提供されなかった可能性のある特定の列を再選択します (たとえば、現在の変更により変更されなかった TOASTed 列や Oracle LOB 列など)。

9.1. reselect columns ポストプロセッサーを使用した、変更イベントレコードへのソースフィールドの追加

パフォーマンスを向上させ、ストレージのオーバーヘッドを削減するために、データベースは特定の列に外部ストレージを使用できます。このタイプのストレージは、PostgreSQL TOAST (Oversized-Attribute Storage Technique)、Oracle Large Object (LOB)、または Oracle Exadata Extended String データ型など、大量のデータを格納する列に使用されます。I/O オーバーヘッドを削減してクエリー速度を向上させるために、テーブル行のデータが変更されると、データベースは新しい値を含む列のみを取得し、外部に保存された未変更の列のデータを無視します。その結果、外部に保存された列の値はデータベースログに記録されず、Debezium はその後イベントレコードを発行するときにその列を省略します。必要な値が省略されたイベントレコードを受信するダウンストリームのコンシューマーでは、処理エラーが発生する可能性があります。

外部に保存された列の値がイベントのデータベースログエントリーに存在しない場合、Debezium はイベントのレコードを発行するときに、欠落している値を unavailable.value.placeholder sentinel 値に置き換えます。これらの sentinel 値は、バイトの場合はバイト配列、文字列の場合は文字列、マップの場合はキーと値のマップなど、適切に型指定されたフィールドに挿入されます。

最初のクエリーで利用できなかった列のデータを取得するには、Debezium の reselect columns ポストプロセッサー (ReselectColumnsPostProcessor) を適用します。ポストプロセッサーは、テーブルから 1 つ以上の列を再選択するように設定できます。ポストプロセッサーを設定すると、再選択の対象として指定した列名に対してコネクターが発行するイベントが監視されます。指定された列のイベントを検出すると、ポストプロセッサーはソーステーブルを再クエリーして指定された列のデータを取得し、現在の状態を取得します。

次の列タイプを再選択するようにポストプロセッサーを設定できます。

  • null 列。
  • unavailable.value.placeholder sentinel 値を含む列。
注記

ReselectColumnsPostProcessor のポストプロセッサーは、Debezium ソースコネクターでのみ使用できます。
Post プロセッサーは、Debezium JDBC シンクコネクターと連携するように設計されていません。

ReselectColumnsPostProcessor ポストプロセッサーの使用方法の詳細は、次のトピックを参照してください。

9.1.1. キーレステーブルでの Debezium ReselectColumnsPostProcessor の使用

reselect columns ポストプロセッサーは、変更する行を返す再選択クエリーを生成します。クエリーの WHERE 句を構築するために、ポストプロセッサーはデフォルトで、テーブルのプライマリーキー列またはテーブルに定義されている一意のインデックスに基づくリレーショナルテーブルモデルを使用します。

キーレステーブルの場合、ReselectColumnsPostProcessor が送信する SELECT クエリーによって複数の行が返されることがあります。その場合、Debezium は常に最初の行のみを使用します。返される行の順序は指定できません。ポストプロセッサーがキーレステーブルに対して一貫して使用可能な結果を返すようにするには、一意の行を識別できるカスタムキーを指定することが推奨されます。カスタムキーは、列の組み合わせに基づいてソーステーブルのレコードを一意に識別できる必要があります。

このようなカスタムメッセージキーを定義するには、コネクター設定で message.key.columns プロパティーを使用します。カスタムキーを定義した後に、reselect.use.event.key 設定プロパティーを true に設定します。このオプションを設定すると、ポストプロセッサーは、プライマリーキー列の代わりに、指定されたイベントキーフィールドを選択基準として使用できるようになります。必ず設定をテストして、再選択クエリーが期待どおりの結果を返すことを確認してください。

9.1.2. 例: Debezium ReselectColumnsPostProcessor の設定

Post プロセッサーの設定は、カスタムコンバーター または 単一メッセージ変換(SMT) の設定と似ています。コネクターが ReselectColumnsPostProcessor を使用できるようにするには、コネクター設定に次のエントリーを追加します。

  "post.processors": "reselector", 1
  "reselector.type": "io.debezium.processors.reselect.ReselectColumnsPostProcessor", 2
  "reselector.reselect.columns.include.list": "<schema>.<table>:<column>,<schema>.<table>:<column>", 3
  "reselector.reselect.unavailable.values": "true", 4
  "reselector.reselect.null.values": "true" 5
  "reselector.reselect.use.event.key": "false" 6
項目説明

1

ポストプロセッサー接頭辞のコンマ区切りリスト。

2

ポストプロセッサーの完全修飾クラスタイプ名。

3

<schema>.<table>:<column> の形式で指定された、列名のコンマ区切りリスト。

4

unavailable.value.placeholder sentinel 値を含む列の再選択を有効または無効にします。

5

null の列の再選択を有効または無効にします。

6

イベントのキーフィールド名に基づく再選択を有効または無効にします。

9.1.3. Debezium reselect columns ポストプロセッサーの設定プロパティーの説明

次の表に、Reselect Columns ポストプロセッサーに設定できる設定オプションを示します。

表9.1 Reselect columns ポストプロセッサーの設定オプション

プロパティー

デフォルト

説明

reselect.columns.include.list

デフォルトなし

ソースデータベースから再選択する列名のコンマ区切りリスト。列名を指定するには、+ <schema>.<table>:_<column>_ の形式を使用します。

reselect.columns.exclude.list プロパティーを設定する場合は、このプロパティーは設定しないでください。

reselect.columns.exclude.list

デフォルトなし

再選択から除外するソースデータベース内の列名のコンマ区切りリスト。列名を指定するには、+ <schema>.<table>:_<column>_ の形式を使用します。

reselect.columns.include.list プロパティーを設定する場合は、このプロパティーは設定しないでください。

reselect.unavailable.values

true

列の値がコネクターの unavailable.value.placeholder プロパティーによって提供される場合に、ポストプロセッサーが reselect.columns.include.list フィルターに一致する列を再選択するかどうかを指定します。

reselect.null.values

true

列の値が null の場合に、ポストプロセッサーが reselect.columns.include.list フィルターに一致する列を再選択するかどうかを指定します。

reselect.use.event.key

false

ポストプロセッサーがイベントのキーフィールド名に基づいて再選択を行うか、リレーショナルテーブルのプライマリーキーの列名を使用するかを指定します。

デフォルトでは、再選択クエリーはリレーショナルテーブルのプライマリーキー列または一意のキーインデックスに基づきます。プライマリーキーを持たないテーブルの場合は、このプロパティーを true に設定し、コネクター設定で message.key.columns プロパティーを設定して、コネクターがイベント作成時に使用するカスタムキーを指定します。これにより、ポストプロセッサーは指定されたキーフィールド名を SQL 再選択クエリーのプライマリーキーとして使用するようになります。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.