13.3. イベントの内容に応じた変更イベントレコードのトピックへのルーティング


デフォルトでは、Debezium はテーブルから読み取るすべての変更イベントを 1 つの静的なトピックにストリーミングします。ただし、イベントの内容に応じて、選択したイベントを別のトピックに再ルーティングする必要がある状況が考えられます。メッセージをその内容に基づいてルーティングするプロセスは、コンテンツベースのルーティング メッセージングパターンで説明されています。このパターンを Debezium に適用するには、コンテンツベースのルーティング 単一メッセージ変換 (SMT) を使用して、イベントごとに評価される式を記述します。イベントがどのように評価されるかに応じて、SMT はイベントメッセージを元の宛先トピックにルーティングするか、あるいは式で指定したトピックに再ルーティングします。

カスタム SMT を作成してルーティングロジックをエンコードするのに Java を使用することは可能ですが、カスタムコーディングされた SMT の使用にはデメリットがあります。以下に例を示します。

  • 変換を事前にコンパイルし、それを Kafka Connect にデプロイする必要がある。
  • 変更が生じるたびにコードの再コンパイルおよび再デプロイが必要になり、運用の柔軟性が失われる。

コンテンツベースのルーティング SMT は、JSR 223 (Scripting for the Java™ Platform) と統合するスクリプト言語をサポートしています。

Debezium には、JSR 223 API の実装は同梱されていません。Debezium で式言語を使用するには、その言語の JSR 223 スクリプトエンジン実装をダウンロードする必要があります。Debezium をデプロイする方法によって、必要な成果物を Maven Central から自動的にダウンロードするか、成果物を手動でダウンロードし、言語実装で使用する他の JAR ファイルと共に Debezium コネクターのプラグインディレクトリーに追加することが可能です。

13.3.1. Debezium コンテンツベースのルーティング SMT の設定

セキュリティー上の理由から、コンテンツベースのルーティング SMT は Debezium コネクターアーカイブには含まれていません。代わりに、別のアーティファクト debezium-scripting-2.3.7.Final.tar.gz で提供されます。

Dockerfile からカスタム Kafka Connect コンテナーイメージを構築して Debezium コネクターを導入する場合、フィルター SMT を使用するには、Kafka Connect 環境に SMT アーティファクトを明示的に追加する必要があります。AMQ Streams を使用してコネクターをデプロイすると、Kafka Connect カスタムリソースで指定した設定パラメーターに基づいて、必要なアーティファクトを自動的にダウンロードすることができます。重要: ルーティング SMT が Kafka Connect インスタンスに追加されると、インスタンスにコネクターを追加できる任意のユーザーはスクリプト式を実行することができます。許可されたユーザーだけがスクリプト式を実行できるようにするには、ルーティング SMT を追加する前に、Kafka Connect インスタンスおよびその設定インターフェイスをセキュアにする必要があります。

以下の手順は、Dockerfile から Kafka Connect コンテナーイメージを構築する場合に適用されます。AMQ Streams を使用して Kafka Connect イメージを作成する場合は、お使いのコネクターのデプロイメントトピックに記載されている説明に従ってください。

手順

  1. ブラウザーから Red Hat build of Debezium ダウンロードサイト を開き、Debezium スクリプト SMT アーカイブ (debezium-scripting-2.3.7.Final.tar.gz) をダウンロードします。
  2. アーカイブのコンテンツを Kafka Connect 環境の Debezium プラグインのディレクトリーにデプロイメントします。
  3. JSR-223 スクリプトエンジンの実装を取得し、そのコンテンツを Kafka Connect 環境の Debezium プラグインのディレクトリーに追加します。
  4. Kafka Connect プロセスを再起動し、新しい JAR ファイルを取得します。

Groovy 言語には、クラスパスで以下のライブラリーが必要です。

  • groovy
  • groovy-json (任意)
  • groovy-jsr223

JavaScript 言語には、クラスパスで以下のライブラリーが必要です。

  • graalvm.js
  • graalvm.js.scriptengine

13.3.2. 例: Debezium コンテンツベースルーティングの基本設定

イベントの内容に基づいて変更イベントレコードをルーティングするように Debezium コネクターを設定するには、コネクターの Kafka Connect 設定で ContentBasedRouter SMT を設定します。

コンテンツベースのルーティング SMT 設定では、絞り込みの条件を定義する正規表現を指定する必要があります。設定で、ルーティングの条件を定義する正規表現を作成します。式は、イベントレコードを評価するためのパターンを定義します。また、パターンにマッチするイベントをルーティングする宛先トピックの名前も指定します。指定するパターンで、テーブルの挿入、更新、または削除操作などのイベントタイプを指定する場合もあります。特定の列または行の値を照合するパターンを定義することもできます。

たとえば、すべての更新 (u) レコードを updates トピックに再ルーティングするには、コネクター設定に以下の設定を追加します。

...
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
...

上記の例では、Groovy 式言語の使用を指定しています。

パターンにマッチしないレコードは、デフォルトのトピックにルーティングされます。

設定のカスタマイズ

前の例は、op フィールドを含む DML イベントのみを処理するように設計された単純な SMT 設定を示しています。コネクターが発行する他の種類のメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションまたはスキーマの変更に関するメタデータメッセージ) には、このフィールドは含まれません。処理の失敗を回避するために、特定のイベントのみに 変換を選択して適用する SMT 述語ステートメント を定義できます。

13.3.3. Debezium コンテンツベースルーティングの式で使用される変数

Debezium は、特定の変数を SMT の評価コンテキストにバインドします。ルーティング先を制御するための条件を指定する式を作成する場合、SMT はこれらの変数の値を検索して解釈し、式の条件を評価することができます。

以下の表に、Debezium がコンテンツベースのルーティング SMT の評価コンテキストにバインドする変数のリストを示します。

表13.2 コンテンツベースルーティングの式で使用される変数
名前説明タイプ

鍵 (key)

メッセージのキー。

org.apache.kafka.connect​.data​.Struct

メッセージの値。

org.apache.kafka.connect​.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

メッセージの値のスキーマ。

org.apache.kafka.connect​.data​.Schema

topic

ルーティング先トピックの名前。

String

headers

メッセージヘッダーの Java マッピング。キーフィールドはヘッダー名です。headers 変数は、以下のプロパティーを公開します。

  • value (タイプ: Object)
  • schema (タイプ: org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String,​ io.debezium​.transforms​.scripting​.RecordHeader>

式は、その変数に対して任意のメソッドを呼び出すことができます。式は、SMT がメッセージをどのように処理するかを定義するブール値に解決する必要があります。式のルーティング条件が true と評価されると、メッセージは維持されます。ルーティング条件が false と評価されると、メッセージは削除されます。

式がそれ以外の効果を及ぼすことは許されません。つまり、式が渡す変数を変更することは許されません。

13.3.4. コンテンツベースのルーティング変換を一部適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。

13.3.5. 他のスクリプト言語によるコンテンツベースのルーティング条件の設定

コンテンツベースのルーティング条件を記述する方法は、使用するスクリプト言語によって異なります。たとえば、基本設定の例 に示すように、式言語として Groovy を使用する場合、以下の式はすべての更新 (u) レコードを updates トピックにルーティングし、他のレコードをデフォルトのトピックにルーティングします。

value.op == 'u' ? 'updates' : null

他の言語では、同じ条件を表すのに異なる方法が使用されます。

ヒント

Debezium MongoDB コネクターは、after および patch フィールドを構造体ではなくシリアライズされた JSON ドキュメントとして出力します。
MongoDB コネクターで ContentBasedRouting SMT を使うには、まず JSON の配列フィールドを個別のドキュメントにデプロイメントする必要があります。
式の中で JSON パーサーを使用すると、配列の各項目について個別の出力文書を生成することができます。例えば、表現言語として Groovy を使用している場合、groovy-json アーティファクトをクラスパスに追加し、(new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar' のような表現を追加しています。

JavaScript

式言語に JavaScript を使用する場合、以下の例に示すように、Struct#get() メソッドを呼び出してコンテンツベースのルーティング条件を指定することができます。

value.get('op') == 'u' ? 'updates' : null

JavaScript with Graal.js

JavaScript with Graal.js を使用してコンテンツベースのルーティング条件を作成する場合、Groovy で使用する方法と類似の方法を使用します。以下に例を示します。

value.op == 'u' ? 'updates' : null

13.3.6. コンテンツベースのルーティング変換設定用のオプション

プロパティー

デフォルト

説明

topic.regex

 

イベントのルーティング先トピックの名前を評価するオプションの正規表現で、条件ロジックを適用するかどうかを決定します。ルーティング先トピックの名前が topic.regex の値とマッチする場合、変換はイベントをトピックに渡す前に条件ロジックを適用します。トピックの名前が topic.regex の値とマッチしない場合は、SMT は変更せずにイベントをトピックに渡します。

language

 

式を記述する言語。jsr223. で始まる必要があります。例えば、jsr223.groovyjsr223.graal.js。Debezium では、JSR 223 API (Scripting for the Java ™ Platform) によるブートストラップだけがサポートされます。

topic.expression

 

すべてのメッセージに対して評価される式。String 値に評価する必要があり、null 値以外の場合はメッセージを新しいトピックに再ルーティングし、null 値の場合はメッセージをデフォルトのトピックにルーティングします。

null.handling.mode

keep

トランスフォーメーションが null (tombstone) メッセージをどのように扱うかを指定します。以下のオプションのいずれかを指定することができます。

keep
(デフォルト) メッセージを通過させます。
drop
メッセージを完全に削除します。
evaluate
メッセージに条件ロジックを適用します。
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat, Inc.