12.3. イベントの内容に応じた変更イベントレコードのトピックへのルーティング
デフォルトでは、Debezium はテーブルから読み取るすべての変更イベントを 1 つの静的なトピックにストリーミングします。ただし、イベントの内容に応じて、選択したイベントを別のトピックに再ルーティングする必要がある状況が考えられます。メッセージをその内容に基づいてルーティングするプロセスは、コンテンツベースのルーティング メッセージングパターンで説明されています。このパターンを Debezium に適用するには、コンテンツベースのルーティング 単一メッセージ変換 (SMT) を使用して、イベントごとに評価される式を記述します。イベントがどのように評価されるかに応じて、SMT はイベントメッセージを元の宛先トピックにルーティングするか、あるいは式で指定したトピックに再ルーティングします。
カスタム SMT を作成してルーティングロジックをエンコードするのに Java を使用することは可能ですが、カスタムコーディングされた SMT の使用にはデメリットがあります。以下に例を示します。
- 変換を事前にコンパイルし、それを Kafka Connect にデプロイする必要がある。
- 変更が生じるたびにコードの再コンパイルおよび再デプロイが必要になり、運用の柔軟性が失われる。
コンテンツベースのルーティング SMT は、JSR 223 (Scripting for the Java™ Platform) と統合するスクリプト言語をサポートしています。
Debezium には、JSR 223 API の実装は同梱されていません。Debezium で式言語を使用するには、その言語の JSR 223 スクリプトエンジン実装をダウンロードする必要があります。Debezium をデプロイする方法によって、必要な成果物を Maven Central から自動的にダウンロードするか、成果物を手動でダウンロードし、言語実装で使用する他の JAR ファイルと共に Debezium コネクターのプラグインディレクトリーに追加することが可能です。
12.3.1. Debezium コンテンツベースのルーティング SMT の設定
セキュリティー上の理由から、コンテンツベースのルーティング SMT は Debezium コネクターアーカイブには含まれていません。代わりに、別のアーティファクト debezium-scripting-2.5.4.Final.tar.gz
で提供されます。
Dockerfile からカスタム Kafka Connect コンテナーイメージを構築して Debezium コネクターを導入する場合、フィルター SMT を使用するには、Kafka Connect 環境に SMT アーティファクトを明示的に追加する必要があります。AMQ Streams を使用してコネクターをデプロイすると、Kafka Connect カスタムリソースで指定した設定パラメーターに基づいて、必要なアーティファクトを自動的にダウンロードすることができます。重要: ルーティング SMT が Kafka Connect インスタンスに追加されると、インスタンスにコネクターを追加できる任意のユーザーはスクリプト式を実行することができます。許可されたユーザーだけがスクリプト式を実行できるようにするには、ルーティング SMT を追加する前に、Kafka Connect インスタンスおよびその設定インターフェイスをセキュアにする必要があります。
以下の手順は、Dockerfile から Kafka Connect コンテナーイメージを構築する場合に適用されます。AMQ Streams を使用して Kafka Connect イメージを作成する場合は、お使いのコネクターのデプロイメントトピックに記載されている説明に従ってください。
手順
-
ブラウザーから Red Hat build of Debezium ダウンロードサイト を開き、Debezium スクリプト SMT アーカイブ (
debezium-scripting-2.5.4.Final.tar.gz
) をダウンロードします。 - アーカイブのコンテンツを Kafka Connect 環境の Debezium プラグインのディレクトリーにデプロイメントします。
- JSR-223 スクリプトエンジンの実装を取得し、そのコンテンツを Kafka Connect 環境の Debezium プラグインのディレクトリーに追加します。
- Kafka Connect プロセスを再起動し、新しい JAR ファイルを取得します。
Groovy 言語には、クラスパスで以下のライブラリーが必要です。
-
groovy
-
groovy-json
(任意) -
groovy-jsr223
JavaScript 言語には、クラスパスで以下のライブラリーが必要です。
-
graalvm.js
-
graalvm.js.scriptengine
12.3.2. 例: Debezium コンテンツベースルーティングの基本設定
イベントの内容に基づいて変更イベントレコードをルーティングするように Debezium コネクターを設定するには、コネクターの Kafka Connect 設定で ContentBasedRouter
SMT を設定します。
コンテンツベースのルーティング SMT 設定では、絞り込みの条件を定義する正規表現を指定する必要があります。設定で、ルーティングの条件を定義する正規表現を作成します。式は、イベントレコードを評価するためのパターンを定義します。また、パターンにマッチするイベントをルーティングする宛先トピックの名前も指定します。指定するパターンで、テーブルの挿入、更新、または削除操作などのイベントタイプを指定する場合もあります。特定の列または行の値を照合するパターンを定義することもできます。
たとえば、すべての更新 (u
) レコードを updates
トピックに再ルーティングするには、コネクター設定に以下の設定を追加します。
... transforms=route transforms.route.type=io.debezium.transforms.ContentBasedRouter transforms.route.language=jsr223.groovy transforms.route.topic.expression=value.op == 'u' ? 'updates' : null ...
上記の例では、Groovy
式言語の使用を指定しています。
パターンにマッチしないレコードは、デフォルトのトピックにルーティングされます。
設定のカスタマイズ
前の例は、op
フィールドを含む DML イベントのみを処理するように設計された単純な SMT 設定を示しています。コネクターが発行する他の種類のメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションまたはスキーマの変更に関するメタデータメッセージ) には、このフィールドは含まれません。処理の失敗を回避するために、特定のイベントのみに 変換を選択して適用する SMT 述語ステートメント を定義できます。
12.3.3. Debezium コンテンツベースルーティングの式で使用される変数
Debezium は、特定の変数を SMT の評価コンテキストにバインドします。ルーティング先を制御するための条件を指定する式を作成する場合、SMT はこれらの変数の値を検索して解釈し、式の条件を評価することができます。
以下の表に、Debezium がコンテンツベースのルーティング SMT の評価コンテキストにバインドする変数のリストを示します。
名前 | 説明 | タイプ |
---|---|---|
| メッセージのキー。 |
|
| メッセージの値。 |
|
| Schema of the message key. |
|
| メッセージの値のスキーマ。 |
|
| ルーティング先トピックの名前。 | 文字列 |
|
メッセージヘッダーの Java マッピング。キーフィールドはヘッダー名です。
|
|
式は、その変数に対して任意のメソッドを呼び出すことができます。式は、SMT がメッセージをどのように処理するかを定義するブール値に解決する必要があります。式のルーティング条件が true
と評価されると、メッセージは維持されます。ルーティング条件が false
と評価されると、メッセージは削除されます。
式がそれ以外の効果を及ぼすことは許されません。つまり、式が渡す変数を変更することは許されません。
12.3.4. コンテンツベースのルーティング変換を一部適用するオプション
データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。
- 変換用の SMT 述語を設定する。
- SMT に topic.regex 設定オプションを使用する。
12.3.5. 他のスクリプト言語によるコンテンツベースのルーティング条件の設定
コンテンツベースのルーティング条件を記述する方法は、使用するスクリプト言語によって異なります。たとえば、基本設定の例 に示すように、式言語として Groovy
を使用する場合、以下の式はすべての更新 (u
) レコードを updates
トピックにルーティングし、他のレコードをデフォルトのトピックにルーティングします。
value.op == 'u' ? 'updates' : null
他の言語では、同じ条件を表すのに異なる方法が使用されます。
Debezium MongoDB コネクターは、after
および patch
フィールドを構造体ではなくシリアライズされた JSON ドキュメントとして出力します。
MongoDB コネクターで ContentBasedRouting SMT を使うには、まず JSON の配列フィールドを個別のドキュメントにデプロイメントする必要があります。
式の中で JSON パーサーを使用すると、配列の各項目について個別の出力文書を生成することができます。たとえば、表現言語として Groovy を使用している場合、groovy-json
アーティファクトをクラスパスに追加し、(new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'
のような表現を追加しています。
JavaScript
式言語に JavaScript を使用する場合、以下の例に示すように、Struct#get()
メソッドを呼び出してコンテンツベースのルーティング条件を指定することができます。
value.get('op') == 'u' ? 'updates' : null
JavaScript with Graal.js
JavaScript with Graal.js を使用してコンテンツベースのルーティング条件を作成する場合、Groovy で使用する方法と類似の方法を使用します。以下に例を示します。
value.op == 'u' ? 'updates' : null
12.3.6. コンテンツベースのルーティング変換設定用のオプション
プロパティー | デフォルト | 説明 |
イベントのルーティング先トピックの名前を評価するオプションの正規表現で、条件ロジックを適用するかどうかを決定します。ルーティング先トピックの名前が | ||
式を記述する言語。 | ||
すべてのメッセージに対して評価される式。 | ||
|
トランスフォーメーションが
|