12.7. 送信トレイパターンを使用する Debezium コネクターの設定
送信トレイパターンを使用することで、複数の (マイクロ) サービス間で安全かつ確実にデータを交換することができます。送信トレイパターンの実装により、サービスの内部状態 (通常はそのデータベースに永続化される) と同じデータを必要とするサービスで使用されるイベントの状態との間に不整合が生じるのを防ぐことができます。
Debezium アプリケーションに送信トレイパターンを実装するには、Debezium コネクターを以下のように設定します。
- 送信トレイテーブルの変更をキャプチャーする
- Debezium 送信トレイイベントルーター単一メッセージ変換 (SMT) を適用する
送信トレイ SMT を適用するように設定された Debezium コネクターは、送信トレイテーブルで生じた変更だけをキャプチャーする必要があります。詳細は、変換を選択的に適用するオプション を参照してください。
コネクターが複数の送信トレイテーブルの変更をキャプチャーすることができるのは、それぞれの送信トレイテーブルが同じ構造を持つ場合に限ります。
送信トレイパターンが有用な理由およびその動作については、Reliable Microservices Data Exchange With the Outbox Pattern を参照してください。
送信トレイイベントルーター SMT は MongoDB コネクターと互換性がありません。
MongoDB ユーザーは、MongoDB 送信トレイイベントルーター SMT を実行できます。
詳細は以下のセクションを参照してください。
12.7.1. Debezium 送信トレイメッセージの例
Debezium 送信トレイイベントルーター SMT の設定方法を理解するには、以下の Debezium 送信トレイメッセージの例を確認してください。
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
送信トレイイベントルーター SMT を適用するように設定された Debezium コネクターは、以下のような Debezium のオリジナルメッセージを変換して上記のメッセージを生成します。
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f" # Kafka Message Headers: "" # Kafka Message Timestamp: 1556890294484 { "before": null, "after": { "id": "406c07f3-26f0-4eea-a50c-109940064b8f", "aggregateid": "1", "aggregatetype": "Order", "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}", "timestamp": 1556890294344, "type": "OrderCreated" }, "source": { "version": "2.1.4.Final", "connector": "postgresql", "name": "dbserver1-bare", "db": "orderdb", "ts_usec": 1556890294448870, "txId": 584, "lsn": 24064704, "schema": "inventory", "table": "outboxevent", "snapshot": false, "last_snapshot_record": null, "xmin": null }, "op": "c", "ts_ms": 1556890294484 }
この Debezium 送信トレイメッセージの例は、デフォルトの送信トレイイベントルーター設定 に基づいています。ここでは、送信トレイテーブル構造および集約に基づくイベントルーティングを想定しています。動作をカスタマイズするために、送信トレイイベントルーター SMT にはさまざまな 設定オプション が用意されています。
12.7.2. Debezium 送信トレイイベントルーター SMT が要求する送信トレイテーブルの構造
デフォルトの送信トレイイベントルーター SMT 設定を適用するには、送信トレイテーブルに以下の列がなければなりません。
Column | Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null type | character varying(255) | not null payload | jsonb |
列 | 結果 |
---|---|
|
イベントの一意の ID が含まれます。送信トレイメッセージでは、この値はヘッダーです。たとえば、重複するメッセージを削除するために、この ID を使用することができます。 |
|
コネクターが送信トレイメッセージを出力するトピックの名前に SMT が追加する値が含まれます。デフォルトの動作では、この値は |
|
ペイロードの ID を提供するイベントのキーが含まれます。SMT は、この値を出力される送信トレイメッセージのキーとして使用します。これは、Kafka パーティションで正しい順序を維持するのに重要です。 |
|
送信トレイ変更イベントの表現。デフォルトの構造は JSON です。デフォルトでは、Kafka のメッセージ値は |
追加のカスタム列 |
送信トレイテーブルの追加の列は、ペイロードセクション内またはメッセージヘッダーのいずれかとして 送信トレイイベント に追加できます。 |
12.7.3. Debezium 送信トレイイベントルーター SMT の基本設定
送信トレイパターンをサポートするように Debezium コネクターを設定するには、outbox.EventRouter
SMT を設定します。SMT のデフォルト動作を取得するには、以下の例のようにオプションを指定せずにコネクター設定に追加します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
設定のカスタマイズ
コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションまたはスキーマの変更に関するメタデータメッセージ) を発行する場合があります。outbox テーブルで発生したイベントのみに変換を適用するには、これらのイベントのみに 変換を選択的に適用する SMT 述語ステートメント を定義します。
12.7.4. 送信トレイイベントルーター変換を選択的に適用するオプション
データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。
- 変換用の SMT 述語を設定する。
-
SMT の
route.topic.regex
設定オプションを使用する。
12.7.5. Debezium 送信トレイメッセージでのペイロードフォーマットとしての Avro の使用
送信トレイイベントルーター SMT は、任意のペイロードフォーマットをサポートします。送信トレイテーブルの payload
カラムの値は、透過的に渡されます。JSON を使用する代わりに、Avro を使用することもできます。これは、メッセージフォーマットの管理や、送信トレイイベントスキーマの後方互換性を維持した進化の確保に役立ちます。
送信トレイメッセージペイロード用にソースアプリケーションがどのように Avro フォーマットのコンテンツを生成するかは、本ドキュメントの範囲外です。1 つの可能性として、Kafka Avro Serializer
クラスを利用して Generic Record
インスタンスをシリアライズすることができます。Kafka メッセージの値が正確な Avro バイナリーデータとなるようにするには、以下の設定をコネクターに適用します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter
デフォルトでは、payload
列の値 (Avro データ) が唯一のメッセージ値となります。BinaryDataConverter
を値のコンバーターとして設定すると、payload
列の値がそのまま Kafka メッセージの値に反映されます。
ハートビート、トランザクションメタデータ、またはスキーマ変更イベントを出力するように Debezium コネクターを設定することができます (サポートはコネクターによって異なります)。これらのイベントは BinaryDataConverter
でシリアライズできないため、コンバーターがこれらのイベントのシリアライズ方法を認識できるように追加の設定を指定する必要があります。例として、以下の設定では、スキーマがない状態で Apache Kafka JsonConverter
を使用することを示しています。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schemas.enable=false
委譲 Converter
実装は delegate.converter.type
オプションで指定します。コンバーターで追加の設定オプションが必要な場合は (例: 上記の schemas.enable=false
を使用したスキーマの無効化)、それらを指定することもできます。
コンバーター io.debezium.converters.ByteBufferConverter
は Debezium バージョン 1.9 以降非推奨となり、2.0 で削除されました。さらに、Kafka Connect を使用する場合は、Debezium 2.x にアップグレードする前にコネクターの設定を更新する必要があります。
12.7.6. Debezium 送信トレイメッセージへの追加フィールドの出力
送信トレイテーブルに含まれる列の値を、出力される送信トレイメッセージに追加することができます。例えば、aggregatetype
列に purchase-order
という値を持ち、event Type
という列に order-created
および order-shipped
という値を持つ outbox テーブルを考えてみましょう。column:placement:alias
の構文でフィールドを追加することができます。
placement
に許可されている値は、header
、envelope
、partition
です。
eventType
列の値を送信トレイメッセージのヘッダーに出力するには、以下のような SMT を設定します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=eventType:header:type
結果は、型
がキーとして Kafka メッセージのヘッダー、および eventType
列の値はその値になります。
eventType
列の値を送信トレイメッセージのエンベロープに出力するには、以下のような SMT を設定します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=eventType:envelope:type
送信メッセージをどのパーティションで生成するかを制御するには、SMT を次のように設定します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=partitionColumn:partition
なお、partition
配置については、エイリアスを追加しても効果はありません。
12.7.7. JSON としてエスケープされた JSON 文字列の拡張
Debezium 送信トレイメッセージに String として表現された payload
が含まれていることに気付いたかもしれません。そのため、この文字列が実際の JSON の場合、以下のように Kafka メッセージでエスケープされて表示されます。
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
送信トレイイベントルーターを使用すると、JSON ドキュメント自体から推測されるコンパニオンスキーマを使用して、このメッセージコンテンツを実際 JSON に展開できます。これにより、Kafka メッセージは以下のようになります。
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123 }
この変換を有効にするには、table.expand.json.payload
を true に設定し、以下のように JsonConverter
を使用する必要があります。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.expand.json.payload=true value.converter=org.apache.kafka.connect.json.JsonConverter
12.7.8. 送信トレイイベントルーター変換設定用のオプション
次の表で、送信トレイイベントルーター SMT に指定することのできるオプションを説明します。表の グループ 列は、Kafka の設定オプションクラスを示しています。
オプション | デフォルト | グループ | 説明 |
---|---|---|---|
| テーブル |
送信トレイテーブルに
送信トレイテーブルのすべての変更は、 | |
| テーブル |
一意のイベント ID が含まれる送信トレイテーブル列を指定します。この ID は、出力されるイベントのヘッダーの | |
| テーブル | イベントキーが含まれる送信トレイテーブル列を指定します。この列に値が含まれる場合、SMT はその値を出力される送信トレイメッセージのキーとして使用します。これは、Kafka パーティションで正しい順序を維持するのに重要です。 | |
テーブル | デフォルトでは、出力される送信トレイメッセージのタイムスタンプは、Debezium イベントのタイムスタンプです。送信トレイメッセージで別のタイムスタンプを使用するには、このオプションを出力される送信トレイメッセージに使用するタイムスタンプが含まれる送信トレイテーブル列に設定します。 | ||
| テーブル | イベントペイロードが含まれる送信トレイテーブル列を指定します。 | |
| テーブル |
String ペイロードの JSON 拡張を実行するかどうかを指定します。コンテンツが見つからない場合や、解析エラーが発生した場合、コンテンツはそのまま保持されます。 | |
| テーブル |
JSON 拡張プロパティー
| |
テーブル、エンベロープ | 送信トレイメッセージのヘッダーまたはエンベロープに追加する 1 つまたは複数の送信トレイテーブル列を指定します。ペアのコンマ区切りリストを指定します。それぞれのペアで、列の名前および値をヘッダーとエンベロープのどちらに含めるかを指定します。ペア内の値はコロンで区切ります。以下に例を示します。
列のエイリアスを指定するには、3 番目の値としてエイリアスが含まれるトリオを指定します。以下に例を示します。
2 番目の値は配置で、常に 設定例は、Debezium 送信トレイメッセージへの追加フィールドの出力 に記載されています。 | ||
テーブル、スキーマ | このオプションを設定すると、Kafka Connect スキーマ Javadoc で説明されているように、その値がスキーマバージョンとして使用されます。 | ||
| ルーター | 送信トレイテーブルの列の名前を指定します。デフォルトの動作では、この列の値が、コネクターが送信トレイメッセージを出力するトピックの名前の一部になります。例を 要求される送信トレイテーブルの説明 に示します。 | |
| ルーター |
送信トレイ SMT が RegexRouter で送信トレイテーブルレコードに適用する正規表現を指定します。この正規表現は、 | |
| ルーター |
コネクターが送信トレイメッセージを出力するトピックの名前を指定します。デフォルトのトピック名では、
| |
| ルーター |
空または |