12.5. Debezium 変更イベントレコードの絞り込み


デフォルトでは、Debezium は受信するすべてのデータ変更イベントを Kafka ブローカーに配信します。ただし、プロデューサーから出力されるイベントのサブセットだけが必要となるケースがほとんどです。該当するレコードだけを処理できるように、Debezium では フィルター 単一メッセージ変換 (SMT) を利用することができます。

カスタム SMT を作成してフィルターロジックをエンコードするのに Java を使用することは可能ですが、カスタムコーディングされた SMT の使用にはデメリットがあります。以下に例を示します。

  • 変換を事前にコンパイルし、それを Kafka Connect にデプロイする必要がある。
  • 変更が生じるたびにコードの再コンパイルおよび再デプロイが必要になり、運用の柔軟性が失われる。

フィルター SMT は、JSR 223 (Scripting for the Java™ Platform) と統合するスクリプト言語をサポートしています。

Debezium には、JSR 223 API の実装は同梱されていません。Debezium で式言語を使用するには、その言語の JSR 223 スクリプトエンジン実装をダウンロードする必要があります。Debezium をデプロイする方法によって、必要な成果物を Maven Central から自動的にダウンロードするか、成果物を手動でダウンロードし、言語実装で使用する他の JAR ファイルと共に Debezium コネクターのプラグインディレクトリーに追加することが可能です。

12.5.1. Debezium フィルター SMT の設定

セキュリティー上の理由から、フィルター SMT は Debezium コネクターアーカイブには含まれていません。代わりに、別のアーティファクト debezium-scripting-2.5.4.Final.tar.gz で提供されます。

Dockerfile からカスタム Kafka Connect コンテナーイメージを構築して Debezium コネクターをデプロイする場合、フィルター SMT を使用するには、明示的に SMT アーカイブをダウンロードし、コネクタープラグインと一緒にファイルをデプロイする必要があります。AMQ Streams を使用してコネクターをデプロイすると、Kafka Connect カスタムリソースで指定した設定パラメーターに基づいて、必要なアーティファクトを自動的にダウンロードすることができます。重要: フィルター SMT が Kafka Connect インスタンスに追加されると、インスタンスにコネクターを追加できる任意のユーザーはスクリプト式を実行することができます。許可されたユーザーだけがスクリプト式を実行できるようにするには、フィルター SMT を追加する前に、Kafka Connect インスタンスおよびその設定インターフェイスをセキュアにする必要があります。

以下の手順は、Dockerfile から Kafka Connect コンテナーイメージを構築する場合に適用されます。AMQ Streams を使用して Kafka Connect イメージを作成する場合は、お使いのコネクターのデプロイメントトピックに記載されている説明に従ってください。

手順

  1. ブラウザーから Red Hat build of Debezium ダウンロードサイト を開き、Debezium スクリプト SMT アーカイブ (debezium-scripting-2.5.4.Final.tar.gz) をダウンロードします。
  2. アーカイブのコンテンツを Kafka Connect 環境の Debezium プラグインのディレクトリーにデプロイメントします。
  3. JSR-223 スクリプトエンジンの実装を取得し、そのコンテンツを Kafka Connect 環境の Debezium プラグインのディレクトリーに追加します。
  4. Kafka Connect プロセスを再起動し、新しい JAR ファイルを取得します。

Groovy 言語には、クラスパスで以下のライブラリーが必要です。

  • groovy
  • groovy-json (任意)
  • groovy-jsr223

JavaScript 言語には、クラスパスで以下のライブラリーが必要です。

  • graalvm.js
  • graalvm.js.scriptengine

12.5.2. 例: Debezium フィルター SMT の基本設定

Debezium コネクターの Kafka Connect 設定でフィルター変換を設定します。設定で、ビジネスルールに基づくフィルター条件を定義して、対象のイベントを指定します。フィルター SMT がイベントストリームを処理すると、設定されたフィルター条件に対して各イベントを評価します。フィルター条件の基準を満たすイベントのみがブローカーに渡されます。

変更イベントレコードを絞り込むように Debezium コネクターを設定するには、Debezium コネクターの Kafka Connect 設定で Filter SMT を設定します。フィルター SMT の設定には、フィルター条件を定義する正規表現を指定する必要があります。

たとえば、コネクター設定に以下の設定を追加します。

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...

上記の例では、Groovy 式言語の使用を指定しています。正規表現 value.op == 'u' && value.before.id == 2 は、更新 (u) レコードで id 値が 2 のメッセージを除き、すべてのメッセージを削除します。

設定のカスタマイズ

前の例は、op フィールドを含む DML イベントのみを処理するように設計された単純な SMT 設定を示しています。コネクターが発行する可能性のある他の種類のメッセージ (ハートビートメッセージ、廃棄メッセージ、またはスキーマの変更とトランザクションに関するメタデータメッセージ) には、このフィールドは含まれません。処理の失敗を回避するために、特定のイベントのみに 変換を選択して適用する SMT 述語ステートメント を定義できます。

12.5.3. フィルターの式で使用される変数

Debezium は、特定の変数をフィルター SMT の評価コンテキストにバインドします。フィルター条件を指定する式を作成する場合、Debezium が評価コンテキストにバインドする変数を使用することができます。変数をバインドすることで、Debezium は SMT が式の条件を評価する際に変数の値を検索して解釈できるようにします。

以下の表に、Debezium がフィルター SMT の評価コンテキストにバインドする変数のリストを示します。

表12.4 フィルターの式で使用される変数
名前説明タイプ

鍵 (key)

メッセージのキー。

org.apache.kafka.connect​.data​.Struct

value

メッセージの値。

org.apache.kafka.connect.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

メッセージの値のスキーマ。

org.apache.kafka.connect​.data​.Schema

topic

ルーティング先トピックの名前。

文字列

headers

メッセージヘッダーの Java マッピング。キーフィールドはヘッダー名です。headers 変数は、以下のプロパティーを公開します。

  • value (タイプ: Object)
  • schema (タイプ: org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

式は、その変数に対して任意のメソッドを呼び出すことができます。式は、SMT がメッセージをどのように処理するかを定義するブール値に解決する必要があります。式のフィルター条件が true と評価されると、メッセージは維持されます。フィルター条件が false と評価されると、メッセージは削除されます。

式がそれ以外の効果を及ぼすことは許されません。つまり、式が渡す変数を変更することは許されません。

12.5.4. フィルター変換を一部適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。

12.5.5. 他のスクリプト言語によるフィルター条件の設定

フィルター条件を記述する方法は、使用するスクリプト言語によって異なります。

たとえば、基本設定の例 に示すように、式言語として Groovy を使用する場合、以下の式は id 値が 2 に設定された更新レコードを除くすべてのメッセージを削除します。

value.op == 'u' && value.before.id == 2

他の言語では、同じ条件を表すのに異なる方法が使用されます。

ヒント

Debezium MongoDB コネクターは、after および patch フィールドを構造体ではなくシリアライズされた JSON ドキュメントとして出力します。
MongoDB コネクターでフィルター SMT を使うには、まず JSON の配列フィールドを個別のドキュメントにデプロイメントする必要があります。
式の中で JSON パーサーを使用すると、配列の各項目について個別の出力文書を生成することができます。たとえば、表現言語として Groovy を使用している場合、groovy-json アーティファクトをクラスパスに追加し、(new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar' のような表現を追加しています。

JavaScript

式言語に JavaScript を使用する場合、以下の例に示すように、Struct#get() メソッドを呼び出してフィルター条件を指定することができます。

value.get('op') == 'u' && value.get('before').get('id') == 2

JavaScript with Graal.js

JavaScript with Graal.js を使用してフィルター条件を定義する場合、Groovy で使用する方法と類似の方法を使用します。以下に例を示します。

value.op == 'u' && value.before.id == 2

12.5.6. フィルター変換設定用のオプション

以下の表に、フィルター SMT で使用することができる設定オプションのリストを示します。

表12.5 フィルター SMT の設定オプション

プロパティー

デフォルト

説明

topic.regex

 

イベントのルーティング先トピックの名前を評価するオプションの正規表現で、フィルターロジックを適用するかどうかを決定します。ルーティング先トピックの名前が topic.regex の値とマッチする場合、変換はイベントをトピックに渡す前にフィルターロジックを適用します。トピックの名前が topic.regex の値とマッチしない場合は、SMT は変更せずにイベントをトピックに渡します。

language

 

式を記述する言語。jsr223. で始まる必要があります。たとえば、jsr223.groovyjsr223.graal.js。Debezium では、JSR 223 API ("Scripting for the Java ™ Platform") によるブートストラップだけがサポートされます。

condition

 

すべてのメッセージに対して評価される式。Boolean 値に評価されなければならず、結果が true の場合はメッセージを保持し、false の場合はメッセージを削除します。

null.handling.mode

keep

トランスフォーメーションが null (tombstone) メッセージをどのように扱うかを指定します。以下のオプションのいずれかを指定することができます。

keep
(デフォルト) メッセージを通過させます。
drop
メッセージを完全に削除します。
evaluate
メッセージにフィルター条件を適用します。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.