7.9. 送信トレイパターンを使用する Debezium コネクターの設定
送信トレイパターンを使用することで、複数の (マイクロ) サービス間で安全かつ確実にデータを交換することができます。送信トレイパターンの実装により、サービスの内部状態 (通常はそのデータベースに永続化される) と同じデータを必要とするサービスで使用されるイベントの状態との間に不整合が生じるのを防ぐことができます。
Debezium アプリケーションに送信トレイパターンを実装するには、Debezium コネクターを以下のように設定します。
- 送信トレイテーブルの変更をキャプチャーする
- Debezium 送信トレイイベントルーター単一メッセージ変換 (SMT) を適用する
送信トレイ SMT を適用するように設定された Debezium コネクターは、送信トレイテーブルで生じた変更だけをキャプチャーする必要があります。詳細は、変換を選択的に適用するオプション を参照してください。
コネクターが複数の送信トレイテーブルの変更をキャプチャーすることができるのは、それぞれの送信トレイテーブルが同じ構造を持つ場合に限ります。
送信トレイパターンが有用な理由およびその動作については、Reliable Microservices Data Exchange With the Outbox Pattern を参照してください。
送信トレイイベントルーター SMT は MongoDB コネクターと互換性がありません。
MongoDB ユーザーは、MongoDB 送信ボックスイベントルーター SMT を実行できます。
詳細は以下のセクションを参照してください。
7.9.1. Debezium 送信トレイメッセージの例
Debezium 送信トレイイベントルーター SMT の設定方法を理解するには、以下の Debezium 送信トレイメッセージの例を確認してください。
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
送信トレイイベントルーター SMT を適用するように設定された Debezium コネクターは、以下のような Debezium のオリジナルメッセージを変換して上記のメッセージを生成します。
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f" # Kafka Message Headers: "" # Kafka Message Timestamp: 1556890294484 { "before": null, "after": { "id": "406c07f3-26f0-4eea-a50c-109940064b8f", "aggregateid": "1", "aggregatetype": "Order", "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}", "timestamp": 1556890294344, "type": "OrderCreated" }, "source": { "version": "2.7.3.Final", "connector": "postgresql", "name": "dbserver1-bare", "db": "orderdb", "ts_usec": 1556890294448870, "txId": 584, "lsn": 24064704, "schema": "inventory", "table": "outboxevent", "snapshot": false, "last_snapshot_record": null, "xmin": null }, "op": "c", "ts_ms": 1556890294484, "ts_us": 1556890294484651, "ts_ns": 1556890294484651402 }
この Debezium 送信トレイメッセージの例は、デフォルトの送信トレイイベントルーター設定 に基づいています。ここでは、送信トレイテーブル構造および集約に基づくイベントルーティングを想定しています。動作をカスタマイズするために、送信トレイイベントルーター SMT にはさまざまな 設定オプション が用意されています。
7.9.2. Debezium 送信トレイイベントルーター SMT が要求する送信トレイテーブルの構造
デフォルトの送信トレイイベントルーター SMT 設定を適用するには、送信トレイテーブルに以下の列がなければなりません。
Column | Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null type | character varying(255) | not null payload | jsonb |
列 | 結果 |
---|---|
|
イベントの一意の ID が含まれます。送信トレイメッセージでは、この値はヘッダーです。たとえば、重複するメッセージを削除するために、この ID を使用することができます。 |
|
コネクターが送信トレイメッセージを出力するトピックの名前に SMT が追加する値が含まれます。デフォルトの動作では、この値は |
|
ペイロードの ID を提供するイベントのキーが含まれます。SMT は、この値を出力される送信トレイメッセージのキーとして使用します。これは、Kafka パーティションで正しい順序を維持するのに重要です。 |
|
送信トレイ変更イベントの表現。デフォルトの構造は JSON です。デフォルトでは、Kafka のメッセージ値は |
追加のカスタム列 |
送信トレイテーブルの列は、ペイロードセクション内またはメッセージヘッダーのいずれかとして 送信トレイイベントに追加 できます。 |
7.9.3. Debezium 送信トレイイベントルーター SMT の基本設定
送信トレイパターンをサポートするように Debezium コネクターを設定するには、outbox.EventRouter
SMT を設定します。SMT のデフォルトの動作を取得するには、次の例のように、オプションを指定せずにコネクター設定に追加します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
設定のカスタマイズ
コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションまたはスキーマの変更に関するメタデータメッセージ) を発行する場合があります。outbox テーブルで発生したイベントのみに変換を適用するには、これらのイベントのみに 変換を選択的に適用する SMT 述語ステートメント を定義します。
7.9.4. 送信トレイイベントルーター変換を選択的に適用するオプション
データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。
- 変換用の SMT 述語を設定する。
-
SMT の
route.topic.regex
設定オプションを使用する。
7.9.5. ペイロードのシリアライズ形式
送信トレイイベントルーター SMT は、任意のペイロードフォーマットをサポートします。SMT は、変更せずに送信トレイテーブルから読み取る payload
列値を渡します。SMT がこれらの列値を Kafka メッセージフィールドに変換する方法は、SMT の設定方法によって異なります。データをシリアライズする一般的なペイロード形式は JSON および Avro です。
7.9.5.1. シリアル化形式としての JSON の使用
送信トレイイベントルーター SMT のデフォルトのシリアライズ形式は JSON です。この形式を使用するには、ソース列のデータ型は JSON にする必要があります(例:PostgreSQL の jsonb
)。
7.9.5.1.1. JSON としてエスケープされた JSON 文字列の生成
Debezium 送信トレイメッセージが ペイロード
を JSON String として表すと、結果の Kafka メッセージは次の例のように文字列をエスケープします。
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
送信トレイイベントルーターを使用すると、メッセージコンテンツを実際 JSON に展開し、JSON ドキュメントからコンパニオンスキーマを得ることができます。結果の Kafka メッセージは以下の例のようにフォーマットされます。
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123 }
送信トレイイベントルーター変換の使用を有効にするには、table.expand.json.payload
を true に設定し、次の例に示すように JsonConverter
を使用します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.expand.json.payload=true value.converter=org.apache.kafka.connect.json.JsonConverter
7.9.5.2. Apache Avro を Debezium 送信トレイメッセージでペイロードフォーマットとして使用する
Apache Avro は、データをシリアライズする一般的なフレームワークです。Avro の使用は、メッセージフォーマットの管理や、送信トレイイベントスキーマの後方互換性を維持した進化の確保に役立ちます。
送信トレイメッセージペイロード用にソースアプリケーションがどのように Avro フォーマットのコンテンツを生成するかは、本ドキュメントの範囲外です。1 つの可能性として、KafkaAvroSerializer
クラスを利用して GenericRecord
インスタンスをシリアライズできます。Kafka メッセージの値が正確な Avro バイナリーデータとなるようにするには、以下の設定をコネクターに適用します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter
デフォルトでは、payload
列の値 (Avro データ) が唯一のメッセージ値となります。データを Avro 形式で保存する場合は、列の形式を、PostgreSQL の bytea
などのバイナリーデータ型に設定する必要があります。SMT の値コンバーターも、payload
列のバイナリー値を Kafka メッセージの値に伝播するように、BinaryDataConverter
に設定する必要があります。
ハートビート、トランザクションメタデータ、またはスキーマ変更イベントを出力するように Debezium コネクターを設定することができます (サポートはコネクターによって異なります)。これらのイベントは BinaryDataConverter
でシリアライズできないため、コンバーターがこれらのイベントのシリアライズ方法を認識できるように追加の設定を指定する必要があります。例として、以下の設定では、スキーマがない状態で Apache Kafka JsonConverter
を使用することを示しています。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schemas.enable=false
委譲 Converter
実装は delegate.converter.type
オプションで指定します。コンバーターで追加の設定オプションが必要な場合は (例: 上記の schemas.enable=false
を使用したスキーマの無効化)、それらを指定することもできます。
次の例は、Apicurio Registry で委譲コンバーターを使用して、データを Avro 形式に変換します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=io.apicurio.registry.utils.converter.AvroConverter value.converter.delegate.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 value.converter.delegate.converter.apicurio.registry.auto-register=true value.converter.delegate.converter.registry.find-latest=true
最後に、次の例は、Confluent Schema Registry で delegate コンバーターを使用して、データを Avro 形式に変換するように SMT を設定する方法を示しています。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=io.confluent.connect.avro.AvroConverter value.converter.delegate.converter.type.basic.auth.credentials.source=USER_INFO value.converter.delegate.converter.type.basic.auth.user.info={CREDENTIALS} value.converter.delegate.converter.type.schema.registry.url={URL}
上記の設定例では、AvroConverter
がデリゲートコンバーターとして設定されているため、サードパーティーライブラリーが必要です。サードパーティーライブラリーをクラスパスに追加する方法は、このドキュメントの範囲外です。
7.9.6. Debezium 送信トレイメッセージへの追加フィールドの出力
送信トレイテーブルに含まれる列の値を、出力される送信トレイメッセージに追加することができます。たとえば、aggregatetype
列に purchase-order
という値を持ち、eventType
という列に order-created
および order-shipped
という値を持つ outbox テーブルを考えてみましょう。column:placement:alias
の構文でフィールドを追加することができます。
placement
に許可されている値は、header
、envelope
、partition
です。
eventType
列の値を送信トレイメッセージのヘッダーに出力するには、以下のような SMT を設定します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=eventType:header:type
結果は、型
がキーとして Kafka メッセージのヘッダー、および eventType
列の値はその値になります。
eventType
列の値を送信トレイメッセージのエンベロープに出力するには、以下のような SMT を設定します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=eventType:envelope:type
送信メッセージをどのパーティションで生成するかを制御するには、SMT を次のように設定します。
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=partitionColumn:partition
なお、partition
配置については、エイリアスを追加しても効果はありません。
7.9.7. 送信トレイイベントルーター変換設定用のオプション
次の表で、送信トレイイベントルーター SMT に指定することのできるオプションを説明します。表の グループ 列は、Kafka の設定オプションクラスを示しています。
オプション | デフォルト | グループ | 説明 |
---|---|---|---|
| テーブル |
送信トレイテーブルに
送信トレイテーブルのすべての変更は、 | |
| テーブル |
一意のイベント ID が含まれる送信トレイテーブル列を指定します。この ID は、出力されるイベントのヘッダーの | |
| テーブル | イベントキーが含まれる送信トレイテーブル列を指定します。この列に値が含まれる場合、SMT はその値を出力される送信トレイメッセージのキーとして使用します。これは、Kafka パーティションで正しい順序を維持するのに重要です。 | |
テーブル | デフォルトでは、出力される送信トレイメッセージのタイムスタンプは、Debezium イベントのタイムスタンプです。送信トレイメッセージで別のタイムスタンプを使用するには、このオプションを出力される送信トレイメッセージに使用するタイムスタンプが含まれる送信トレイテーブル列に設定します。 | ||
| テーブル | イベントペイロードが含まれる送信トレイテーブル列を指定します。 | |
| テーブル |
String ペイロードの JSON 拡張を実行するかどうかを指定します。コンテンツが見つからなかった場合や、解析エラーの場合は、コンテンツは "そのまま" として保持されます。 | |
| テーブル |
JSON 拡張プロパティー
| |
テーブル、エンベロープ | 送信トレイメッセージのヘッダーまたはエンベロープに追加する 1 つまたは複数の送信トレイテーブル列を指定します。ペアのコンマ区切りリストを指定します。それぞれのペアで、列の名前および値をヘッダーとエンベロープのどちらに含めるかを指定します。ペア内の値はコロンで区切ります。以下に例を示します。
列のエイリアスを指定するには、3 番目の値としてエイリアスが含まれるトリオを指定します。以下に例を示します。
2 番目の値は配置で、常に 設定例は、Debezium 送信トレイメッセージへの追加フィールドの出力 に記載されています。 | ||
| テーブル、エンベロープ |
| |
テーブル、スキーマ | このオプションを設定すると、Kafka Connect スキーマ Javadoc で説明されているように、その値がスキーマバージョンとして使用されます。 | ||
| ルーター | 送信トレイテーブルの列の名前を指定します。デフォルトの動作では、この列の値が、コネクターが送信トレイメッセージを出力するトピックの名前の一部になります。例を 要求される送信トレイテーブルの説明 に示します。 | |
| ルーター |
送信トレイ SMT が RegexRouter で送信トレイテーブルレコードに適用する正規表現を指定します。この正規表現は、 | |
| ルーター |
コネクターが送信トレイメッセージを出力するトピックの名前を指定します。デフォルトのトピック名では、
| |
| ルーター |
空または |