12.8. 送信トレイパターンを使用する Debezium MongoDB コネクターの設定


注記

この SMT は、DebeziumMongoDB コネクターでのみ使用されます。リレーショナルデータベースに送信トレイイベントルーター SMT を使用する方法については、送信トレイ イベントルーター を参照してください。

送信トレイパターンを使用することで、複数の (マイクロ) サービス間で安全かつ確実にデータを交換することができます。送信トレイパターンの実装により、サービスの内部状態 (通常はそのデータベースに永続化される) と同じデータを必要とするサービスで使用されるイベントの状態との間に不整合が生じるのを防ぐことができます。

Debezium アプリケーションに送信トレイパターンを実装するには、Debezium コネクターを以下のように設定します。

  • 送信トレイコレクションの変更をキャプチャーする
  • Debezium MongoDB 送信トレイイベントルーター単一メッセージ変換 (SMT) を適用する

MongoDB 送信トレイ SMT を適用するように設定された Debezium コネクターは、送信トレイコレクションで生じた変更だけをキャプチャーする必要があります。詳細は、変換を選択的に適用するオプション を参照してください。

コネクターが複数の送信トレイコレクションの変更をキャプチャーすることができるのは、それぞれの送信トレイコレクションが同じ構造を持つ場合に限ります。

注記

この SMT を使用するには、実際のビジネスコレクションの操作と送信トレイコレクションへの挿入を、ビジネスコレクションおよび送信トレイコレクション間にデータの不整合が発生するのを回避するために、MongoDB 4.0 以降でサポートされているマルチドキュメントトランザクションの一部として実行する必要があります。将来の更新では、既存のデータを更新し、マルチドキュメントトランザクションなしで ACID トランザクションに送信トレイイベントを挿入できるようにするために、送信トレイイベントを、独立した送信トレイコレクションとしてではなく、既存コレクションのサブドキュメントとして保存するための追加の設定をサポートする予定です。

送信トレイパターンの詳細については、Reliable Microservices Data Exchange With the Outbox Pattern を参照してください。

詳細は以下のセクションを参照してください。

12.8.1. Debezium MongoDB 送信トレイメッセージの例

Debezium MongoDB 送信トレイイベントルーター SMT の設定方法を理解するには、以下の Debezium 送信トレイメッセージの例を検討してください。

# Kafka Topic: outbox.event.order
# Kafka Message key: "b2730779e1f596e275826f08"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

MongoDB 送信トレイイベントルーター SMT を適用するように設定された Debezium コネクターは、次の例で示すとおり、raw Debezium 変更イベントメッセージを変換して上記のメッセージを生成します。

# Kafka Message key: { "id": "{\"$oid\": \"596e275826f08b2730779e1f\"}" }
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "patch": null,
  "after": "{\"_id\": {\"$oid\": \"596e275826f08b2730779e1f\"}, \"aggregateid\": {\"$oid\": \"b2730779e1f596e275826f08\"}, \"aggregatetype\": \"Order\", \"type\": \"OrderCreated\", \"payload\": {\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}}",
  "source": {
    "version": "2.1.4.Final",
    "connector": "mongodb",
    "name": "fulfillment",
    "ts_ms": 1558965508000,
    "snapshot": false,
    "db": "inventory",
    "rs": "rs0",
    "collection": "customers",
    "ord": 31,
    "h": 1546547425148721999
  },
  "op": "c",
  "ts_ms": 1556890294484
}

この Debezium 送信トレイメッセージの例は、デフォルトの送信トレイイベントルーター設定 に基づいています。ここでは、送信トレイコレクション構造および集約に基づくイベントルーティングを想定しています。動作をカスタマイズするために、送信トレイイベントルーター SMT にはさまざまな 設定オプション が用意されています。

12.8.2. Debezium mongodb 送信トレイイベントルーター SMT が要求する送信トレイコレクションの構造

デフォルトの MongoDB 送信トレイイベントルーター SMT 設定を適用するには、送信トレイコレクションに以下のフィールドがなければなりません。

{
  "_id": "objectId",
  "aggregatetype": "string",
  "aggregateid": "objectId",
  "type": "string",
  "payload": "object"
}
表12.8 要求される送信トレイコレクションフィールドの説明
フィールド結果

id

イベントの一意の ID が含まれます。送信トレイメッセージでは、この値はヘッダーです。たとえば、重複するメッセージを削除するために、この ID を使用することができます。

別の送信トレイコレクションフィールドから一意のイベント ID を取得するには、コネクター設定で collection.field.event.id SMT オプションを設定します。

aggregatetype

コネクターが送信トレイメッセージを出力するトピックの名前に SMT が追加する値が含まれます。デフォルトの動作では、この値は route.topic.replacement SMT オプションのデフォルトの ${routedByValue} 変数を置き換えます。

たとえば、デフォルト設定では、route.by.field SMT オプションは aggregatetype に設定され、route.topic.replacement SMT オプションは outbox.event.${routedByValue} に設定されます。アプリケーションが 2 つのドキュメントを送信トレイコレクションに追加するとします。最初のドキュメントでは、aggregatetype フィールドの値は customers です。2 番目のドキュメントでは、aggregatetype フィールドの値は orders です。コネクターは、最初のドキュメントを outbox.event.customers トピックに送信します。コネクターは、2 番目のドキュメントを outbox.event.orders トピックに出力します。

別の送信トレイコレクションフィールドからこの値を取得するには、コネクター設定で route.by.field SMT オプションを設定します。

aggregateid

ペイロードの ID を提供するイベントのキーが含まれます。SMT は、この値を出力される送信トレイメッセージのキーとして使用します。これは、Kafka パーティションで正しい順序を維持するのに重要です。

別の送信トレイコレクションフィールドからイベントキーを取得するには、コネクター設定で collection.field.event.key SMT オプションを設定します。

payload

送信トレイ変更イベントの表現。デフォルトの構造は JSON です。デフォルトでは、Kafka のメッセージ値は payload 値のみで設定されます。ただし、送信トレイイベントで追加のフィールドが含まれるように設定されている場合、Kafka メッセージ値にはペイロードと追加フィールドの両方のエンベロープエンプトが含まれ、各フィールドは個別に表されます。詳細は、Emitting messages with additional fields を参照してください。

別の送信トレイコレクションフィールドからイベントペイロードを取得するには、コネクター設定で collection.field.event.payload SMT オプションを設定します。

追加のカスタムフィールド

送信トレイコレクションの追加フィールドは、ペイロードセクション内またはメッセージヘッダーのいずれかとして 送信トレイイベント に追加できます。

たとえば、イベントを分類または整理するのに役立つユーザー定義の値を伝えるフィールド eventType などがあります。

12.8.3. 基本的な Debezium MongoDB 送信トレイイベントルーター SMT 設定

送信トレイパターンをサポートするように Debezium MongoDB コネクターを設定するには、outbox.MongoEventRouter SMT を設定します。SMT のデフォルト動作を取得するには、以下の例のようにオプションを指定せずにコネクター設定に追加します。

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter

設定のカスタマイズ

コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションの変更に関するメタデータメッセージ) を発行する場合があります。送信トレイコレクションで発生したイベントにのみ変換を適用するには、これらのイベントのみに 変換を選択的に適用する SMT 述語ステートメント を定義します。

12.8.4. MongoDB 送信トレイイベントルーター変換を選択的に適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。

12.8.5. Debezium MongoDB 送信トレイメッセージでペイロードフォーマットとして Avro を使用

MongoDB 送信ボックスイベントルーター SMT は、任意のペイロード形式をサポートします。送信トレイコレクションの payload フィールド値は透過的に渡されます。JSON を使用する代わりに、Avro を使用することもできます。これは、メッセージフォーマットの管理や、送信トレイイベントスキーマの後方互換性を維持した進化の確保に役立ちます。

送信トレイメッセージペイロード用にソースアプリケーションがどのように Avro フォーマットのコンテンツを生成するかは、本ドキュメントの範囲外です。1 つの可能性として、Kafka Avro Serializer クラスを利用して Generic Record インスタンスをシリアライズすることができます。Kafka メッセージの値が正確な Avro バイナリーデータとなるようにするには、以下の設定をコネクターに適用します。

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter

デフォルトでは、payload フィールドの値 (Avro データ) が唯一のメッセージ値となります。値変換器として ByteArrayConverter を 設定すると、payload フィールドの値がそのまま Kafka メッセージの値に伝搬されます。

これは、他の SMT に推奨される BinaryDataConverter とは異なることに注意してください。これは、MongoDB が内部でバイト配列を保存する際に取っているアプローチが異なるためです。

ハートビート、トランザクションメタデータ、またはスキーマ変更イベントを出力するように Debezium コネクターを設定することができます (サポートはコネクターによって異なります)。これらのイベントは ByteArrayConverter でシリアライズできないため、コンバーターがこれらのイベントのシリアライズ方法を認識できるように追加の設定を指定する必要があります。例として、以下の設定では、スキーマがない状態で Apache Kafka JsonConverter を使用することを示しています。

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

委譲 Converter 実装は delegate.converter.type オプションで指定します。コンバーターで追加の設定オプションが必要な場合は (例: 上記の schemas.enable=false を使用したスキーマの無効化)、それらを指定することもできます。

12.8.6. Debezium MongoDB 送信トレイメッセージへの追加フィールドの出力

送信トレイコレクションに含まれるフィールドの値を、出力される送信トレイメッセージに追加することができます。例えば、aggregatetype フィールドに purchase-order という値を持ち、event Type というフィールドに order-created および order-shipped という値を持つ送信トレイコレクションを考えてみましょう。field:placement:alias の 構文でフィールドを追加できる。

placement に許可されている値は、headerenvelopepartition です。

eventType フィールドの値を送信トレイメッセージのヘッダーに出力するには、以下のような SMT を設定します。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:header:type

結果は、type がキーとして Kafka メッセージのヘッダー、および eventType 列の値はその値になります。

eventType フィールドの値を送信トレイメッセージのエンベロープに出力するには、以下のような SMT を設定します。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:envelope:type

送信メッセージをどのパーティションで生成するかを制御するには、SMT を次のように設定します。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=partitionField:partition

なお、partition 配置については、エイリアスを追加しても効果はありません。

12.8.7. JSON としてエスケープされた JSON 文字列の拡張

デフォルトでは、Debezium 送信トレイメッセージの payload は文字列として表されます。文字列の元のソースが JSON 形式の場合、次の例に示すように、結果の Kafka メッセージはエスケープシーケンスを使用して文字列を表します。

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

メッセージの内容を展開し、エスケープされた JSON を元のエスケープされていない JSON 形式に変換するように、送信トレイイベントルーターを設定できます。変換された文字列では、コンパニオンスキーマは元の JSON ドキュメントから推定されます。次の例は、結果の Kafka メッセージで展開された JSON を示しています。

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

変換時の文字列変換を有効にするには、collection.expand.json.payload の値を true に設定し、次の例に示すように StringConverter を使用します。

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
transforms.outbox.collection.expand.json.payload=true
value.converter=org.apache.kafka.connect.storage.StringConverter

12.8.8. 送信トレイイベントルーター変換設定用のオプション

次の表で、送信トレイイベントルーター SMT に指定することのできるオプションを説明します。表の グループ 列は、Kafka の設定オプションクラスを示しています。

表12.9 送信トレイイベントルーター SMT 設定オプションの説明
オプションデフォルトグループ説明

collection.op.invalid.behavior

warn

コレクション

送信トレイコレクションも更新操作がある場合の SMT の動作を決定します。設定可能な値は以下のとおりです。

  • warn: SMT はログに警告を記録し、次の送信トレイコレクションドキュメントに進みます。
  • error: SMT はログにエラーを記録し、次の送信トレイコレクションドキュメントに進みます。
  • fatal: SMT はログにエラーを記録し、コネクターは処理を停止します。

送信トレイコレクションのすべての変更は、挿入または削除操作であると想定されます。つまり、送信トレイコレクションはキューとして機能します。送信トレイコレクション内のドキュメントの更新は許可されていません。SMT は、送信トレイコレクションの削除操作を自動的に除外します (進行中の送信トレイイベントが削除されるため)。

collection.field.event.id

_id

コレクション

一意のイベント ID が含まれる送信トレイコレクションフィールドを指定します。この ID は、出力されるイベントのヘッダーの id キーに保存されます。

collection.field.event.key

aggregateid

コレクション

イベントキーが含まれる送信トレイコレクションフィールドを指定します。このフィールドに値が含まれる場合、SMT はその値を出力される送信トレイメッセージのキーとして使用します。これは、Kafka パーティションで正しい順序を維持するのに重要です。

collection.field.event.timestamp

 

コレクション

デフォルトでは、出力される送信トレイメッセージのタイムスタンプは、Debezium イベントのタイムスタンプです。送信トレイメッセージで別のタイムスタンプを使用するには、このオプションを出力される送信トレイメッセージに使用するタイムスタンプが含まれる送信トレイコレクションフィールドに設定します。

collection.field.event.payload

payload

コレクション

イベントペイロードが含まれる送信トレイコレクションフィールドを指定します。

collection.expand.json.payload

false

コレクション

String ペイロードの JSON 拡張を実行するかどうかを指定します。コンテンツが見つからない場合や、解析エラーが発生した場合、コンテンツはそのまま保持されます。

詳細は、expanding escaped json セクションを参照してください。

collection.fields.additional.placement

 

コレクション、エンベロープ

送信トレイメッセージのヘッダーまたはエンベロープに追加する 1 つまたは複数の送信トレイコレクションフィールドを指定します。ペアのコンマ区切りリストを指定します。それぞれのペアで、フィールドの名前および値をヘッダーとエンベロープのどちらに含めるかを指定します。ペア内の値はコロンで区切ります。以下に例を示します。

id:header,my-field:envelope

フィールドのエイリアスを指定するには、3 番目の値としてエイリアスが含まれるトリオを指定します。以下にフィールドを示します。

id:header,my-field:envelope:my-alias

2 番目の値は配置で、常に header または envelope でなければなりません。

設定例は、Debezium 送信トレイメッセージへの追加フィールドの出力 に記載されています。

collection.field.event.schema.version

 

コレクション、スキーマ

このオプションを設定すると、Kafka Connect スキーマ Javadoc で説明されているように、その値がスキーマバージョンとして使用されます。

route.by.field

aggregatetype

ルーター

送信トレイコレクションのフィールドの名前を指定します。デフォルトでは、このフィールドで指定された値が、コネクターが送信トレイメッセージを出力するトピックの名前の一部になります。例を 要求される送信トレイコレクションの説明 に示します。

route.topic.regex

(?<routedByValue>.*)

ルーター

送信トレイ SMT が RegexRouter で送信トレイコレクションドキュメントに適用する正規表現を指定します。この正規表現は、route.topic.replacement SMT オプションの設定の一部です。

+ デフォルトの動作では、SMT は route.topic.replacement SMT オプションの設定のデフォルト ${routedByValue} 変数を route.by.field 送信トレイ SMT オプションの設定に置き換えることです。

route.topic.replacement

outbox.event​.${routedByValue}

ルーター

コネクターが送信トレイメッセージを出力するトピックの名前を指定します。デフォルトのトピック名では、outbox.event. の後に送信トレイコレクションドキュメントの aggregatetype フィールドの値が続きます。たとえば、aggregatetype の値が 顧客 の場合には、トピック名は outbox.event.customers になります。

+ トピック名を変更するには、次の操作を行います。

route.tombstone.on.empty.payload

false

ルーター

空または null のペイロードによってコネクターがトゥームストーンイベントを出力するかどうかを示します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.