12.7. 送信トレイパターンを使用する Debezium コネクターの設定


送信トレイパターンを使用することで、複数の (マイクロ) サービス間で安全かつ確実にデータを交換することができます。送信トレイパターンの実装により、サービスの内部状態 (通常はそのデータベースに永続化される) と同じデータを必要とするサービスで使用されるイベントの状態との間に不整合が生じるのを防ぐことができます。

Debezium アプリケーションに送信トレイパターンを実装するには、Debezium コネクターを以下のように設定します。

  • 送信トレイテーブルの変更をキャプチャーする
  • Debezium 送信トレイイベントルーター単一メッセージ変換 (SMT) を適用する

送信トレイ SMT を適用するように設定された Debezium コネクターは、送信トレイテーブルで生じた変更だけをキャプチャーする必要があります。詳細は、変換を選択的に適用するオプション を参照してください。

コネクターが複数の送信トレイテーブルの変更をキャプチャーすることができるのは、それぞれの送信トレイテーブルが同じ構造を持つ場合に限ります。

送信トレイパターンが有用な理由およびその動作については、Reliable Microservices Data Exchange With the Outbox Pattern を参照してください。

注記

送信トレイイベントルーター SMT は MongoDB コネクターと互換性がありません。

MongoDB ユーザーは、MongoDB 送信トレイイベントルーター SMT を実行できます。

詳細は以下のセクションを参照してください。

12.7.1. Debezium 送信トレイメッセージの例

Debezium 送信トレイイベントルーター SMT の設定方法を理解するには、以下の Debezium 送信トレイメッセージの例を確認してください。

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

送信トレイイベントルーター SMT を適用するように設定された Debezium コネクターは、以下のような Debezium のオリジナルメッセージを変換して上記のメッセージを生成します。

# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "before": null,
  "after": {
    "id": "406c07f3-26f0-4eea-a50c-109940064b8f",
    "aggregateid": "1",
    "aggregatetype": "Order",
    "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
    "timestamp": 1556890294344,
    "type": "OrderCreated"
  },
  "source": {
    "version": "2.1.4.Final",
    "connector": "postgresql",
    "name": "dbserver1-bare",
    "db": "orderdb",
    "ts_usec": 1556890294448870,
    "txId": 584,
    "lsn": 24064704,
    "schema": "inventory",
    "table": "outboxevent",
    "snapshot": false,
    "last_snapshot_record": null,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1556890294484
}

この Debezium 送信トレイメッセージの例は、デフォルトの送信トレイイベントルーター設定 に基づいています。ここでは、送信トレイテーブル構造および集約に基づくイベントルーティングを想定しています。動作をカスタマイズするために、送信トレイイベントルーター SMT にはさまざまな 設定オプション が用意されています。

12.7.2. Debezium 送信トレイイベントルーター SMT が要求する送信トレイテーブルの構造

デフォルトの送信トレイイベントルーター SMT 設定を適用するには、送信トレイテーブルに以下の列がなければなりません。

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  |
表12.6 要求される送信トレイテーブル列の説明
結果

id

イベントの一意の ID が含まれます。送信トレイメッセージでは、この値はヘッダーです。たとえば、重複するメッセージを削除するために、この ID を使用することができます。

別の送信トレイテーブル列から一意のイベント ID を取得するには、コネクター設定で table.field.event.id SMT オプションを設定します。

aggregatetype

コネクターが送信トレイメッセージを出力するトピックの名前に SMT が追加する値が含まれます。デフォルトの動作では、この値は route.topic.replacement SMT オプションのデフォルトの ${routedByValue} 変数を置き換えます。

たとえば、デフォルト設定では、route.by.field SMT オプションは aggregatetype に設定され、route.topic.replacement SMT オプションは outbox.event.${routedByValue} に設定されます。アプリケーションが送信トレイテーブルに 2 つのレコードを追加するとします。最初のレコードでは、aggregatetype 列の値は customers です。2 つ目のレコードでは、aggregatetype 列の値は orders です。コネクターは、最初のレコードを outbox.event.customers トピックに出力します。コネクターは、2 番目のレコードを outbox.event.orders トピックに出力します。

別の送信トレイテーブル列からこの値を取得するには、コネクター設定で route.by.field SMT オプションを設定します。

aggregateid

ペイロードの ID を提供するイベントのキーが含まれます。SMT は、この値を出力される送信トレイメッセージのキーとして使用します。これは、Kafka パーティションで正しい順序を維持するのに重要です。

別の送信トレイテーブル列からイベントキーを取得するには、コネクター設定で table.field.event.key SMT オプション を設定します。

payload

送信トレイ変更イベントの表現。デフォルトの構造は JSON です。デフォルトでは、Kafka のメッセージ値は payload 値のみで設定されます。ただし、送信トレイイベントで追加のフィールドが含まれるように設定されている場合、Kafka メッセージ値にはペイロードと追加フィールドの両方のエンベロープエンプトが含まれ、各フィールドは個別に表されます。詳細は、Emitting messages with additional fields を参照してください。

別の送信トレイテーブル列からイベントペイロードを取得するには、コネクター設定で table.field.event.payload SMT オプションを設定します。

追加のカスタム列

送信トレイテーブルの追加の列は、ペイロードセクション内またはメッセージヘッダーのいずれかとして 送信トレイイベント に追加できます。

たとえば、イベントを分類または整理するのに役立つユーザー定義の値を伝える列 eventType などがあります。

12.7.3. Debezium 送信トレイイベントルーター SMT の基本設定

送信トレイパターンをサポートするように Debezium コネクターを設定するには、outbox.EventRouter SMT を設定します。SMT のデフォルト動作を取得するには、以下の例のようにオプションを指定せずにコネクター設定に追加します。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

設定のカスタマイズ

コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションまたはスキーマの変更に関するメタデータメッセージ) を発行する場合があります。outbox テーブルで発生したイベントのみに変換を適用するには、これらのイベントのみに 変換を選択的に適用する SMT 述語ステートメント を定義します。

12.7.4. 送信トレイイベントルーター変換を選択的に適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。

12.7.5. Debezium 送信トレイメッセージでのペイロードフォーマットとしての Avro の使用

送信トレイイベントルーター SMT は、任意のペイロードフォーマットをサポートします。送信トレイテーブルの payload カラムの値は、透過的に渡されます。JSON を使用する代わりに、Avro を使用することもできます。これは、メッセージフォーマットの管理や、送信トレイイベントスキーマの後方互換性を維持した進化の確保に役立ちます。

送信トレイメッセージペイロード用にソースアプリケーションがどのように Avro フォーマットのコンテンツを生成するかは、本ドキュメントの範囲外です。1 つの可能性として、Kafka Avro Serializer クラスを利用して Generic Record インスタンスをシリアライズすることができます。Kafka メッセージの値が正確な Avro バイナリーデータとなるようにするには、以下の設定をコネクターに適用します。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter

デフォルトでは、payload 列の値 (Avro データ) が唯一のメッセージ値となります。BinaryDataConverter を値のコンバーターとして設定すると、payload 列の値がそのまま Kafka メッセージの値に反映されます。

ハートビート、トランザクションメタデータ、またはスキーマ変更イベントを出力するように Debezium コネクターを設定することができます (サポートはコネクターによって異なります)。これらのイベントは BinaryDataConverter でシリアライズできないため、コンバーターがこれらのイベントのシリアライズ方法を認識できるように追加の設定を指定する必要があります。例として、以下の設定では、スキーマがない状態で Apache Kafka JsonConverter を使用することを示しています。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

委譲 Converter 実装は delegate.converter.type オプションで指定します。コンバーターで追加の設定オプションが必要な場合は (例: 上記の schemas.enable=false を使用したスキーマの無効化)、それらを指定することもできます。

注記

コンバーター io.debezium.converters.ByteBufferConverter は Debezium バージョン 1.9 以降非推奨となり、2.0 で削除されました。さらに、Kafka Connect を使用する場合は、Debezium 2.x にアップグレードする前にコネクターの設定を更新する必要があります。

12.7.6. Debezium 送信トレイメッセージへの追加フィールドの出力

送信トレイテーブルに含まれる列の値を、出力される送信トレイメッセージに追加することができます。例えば、aggregatetype 列に purchase-order という値を持ち、event Type という列に order-created および order-shipped という値を持つ outbox テーブルを考えてみましょう。column:placement:alias の構文でフィールドを追加することができます。

placement に許可されている値は、headerenvelopepartition です。

eventType 列の値を送信トレイメッセージのヘッダーに出力するには、以下のような SMT を設定します。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type

結果は、 がキーとして Kafka メッセージのヘッダー、および eventType 列の値はその値になります。

eventType 列の値を送信トレイメッセージのエンベロープに出力するには、以下のような SMT を設定します。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type

送信メッセージをどのパーティションで生成するかを制御するには、SMT を次のように設定します。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionColumn:partition

なお、partition 配置については、エイリアスを追加しても効果はありません。

12.7.7. JSON としてエスケープされた JSON 文字列の拡張

Debezium 送信トレイメッセージに String として表現された payload が含まれていることに気付いたかもしれません。そのため、この文字列が実際の JSON の場合、以下のように Kafka メッセージでエスケープされて表示されます。

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

送信トレイイベントルーターを使用すると、JSON ドキュメント自体から推測されるコンパニオンスキーマを使用して、このメッセージコンテンツを実際 JSON に展開できます。これにより、Kafka メッセージは以下のようになります。

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

この変換を有効にするには、table.expand.json.payload を true に設定し、以下のように JsonConverter を使用する必要があります。

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.json.JsonConverter

12.7.8. 送信トレイイベントルーター変換設定用のオプション

次の表で、送信トレイイベントルーター SMT に指定することのできるオプションを説明します。表の グループ 列は、Kafka の設定オプションクラスを示しています。

表12.7 送信トレイイベントルーター SMT 設定オプションの説明
オプションデフォルトグループ説明

table.op.invalid.behavior

warn

テーブル

送信トレイテーブルに UPDATE 操作がある場合の SMT の動作を決定します。設定可能な値は以下のとおりです。

  • warn: SMT はログに警告を記録し、次の送信トレイテーブルレコードに進みます。
  • error: SMT はログにエラーを記録し、次の送信トレイテーブルレコードに進みます。
  • fatal: SMT はログにエラーを記録し、コネクターは処理を停止します。

送信トレイテーブルのすべての変更は、INSERT 操作であると想定されます。つまり、送信トレイテーブルはキューとして機能し、送信トレイテーブルのレコードに対する更新は許可されません。SMT は、送信トレイテーブルの DELETE 操作を自動的に除外します。

table.field.event.id

id

テーブル

一意のイベント ID が含まれる送信トレイテーブル列を指定します。この ID は、出力されるイベントのヘッダーの id キーに保存されます。

table.field.event.key

aggregateid

テーブル

イベントキーが含まれる送信トレイテーブル列を指定します。この列に値が含まれる場合、SMT はその値を出力される送信トレイメッセージのキーとして使用します。これは、Kafka パーティションで正しい順序を維持するのに重要です。

table.field.event.timestamp

 

テーブル

デフォルトでは、出力される送信トレイメッセージのタイムスタンプは、Debezium イベントのタイムスタンプです。送信トレイメッセージで別のタイムスタンプを使用するには、このオプションを出力される送信トレイメッセージに使用するタイムスタンプが含まれる送信トレイテーブル列に設定します。

table.field.event.payload

payload

テーブル

イベントペイロードが含まれる送信トレイテーブル列を指定します。

table.expand.json.payload

false

テーブル

String ペイロードの JSON 拡張を実行するかどうかを指定します。コンテンツが見つからない場合や、解析エラーが発生した場合、コンテンツはそのまま保持されます。

詳細は、expanding escaped json セクションを参照してください。

table.json.payload.null.behavior

ignore

テーブル

JSON 拡張プロパティー table.expand.json.payload を有効にすると、送信トレイテーブルの null 値を含む json ペイロードの動作を決定します。設定可能な値は以下のとおりです。

  • ignore: null 値を無視します。
  • optional_bytes - null 値を保持し、null をオプションの接続バイトとして扱います。

table.fields.additional.placement

 

テーブル、エンベロープ

送信トレイメッセージのヘッダーまたはエンベロープに追加する 1 つまたは複数の送信トレイテーブル列を指定します。ペアのコンマ区切りリストを指定します。それぞれのペアで、列の名前および値をヘッダーとエンベロープのどちらに含めるかを指定します。ペア内の値はコロンで区切ります。以下に例を示します。

id:header,my-field:envelope

列のエイリアスを指定するには、3 番目の値としてエイリアスが含まれるトリオを指定します。以下に例を示します。

id:header,my-field:envelope:my-alias

2 番目の値は配置で、常に header または envelope でなければなりません。

設定例は、Debezium 送信トレイメッセージへの追加フィールドの出力 に記載されています。

table.field.event.schema.version

 

テーブル、スキーマ

このオプションを設定すると、Kafka Connect スキーマ Javadoc で説明されているように、その値がスキーマバージョンとして使用されます。

route.by.field

aggregatetype

ルーター

送信トレイテーブルの列の名前を指定します。デフォルトの動作では、この列の値が、コネクターが送信トレイメッセージを出力するトピックの名前の一部になります。例を 要求される送信トレイテーブルの説明 に示します。

route.topic.regex

(?<routedByValue>.*)

ルーター

送信トレイ SMT が RegexRouter で送信トレイテーブルレコードに適用する正規表現を指定します。この正規表現は、route.topic.replacement SMT オプションの設定の一部です。

デフォルトの動作では、SMT は route.topic.replacement SMT オプションの設定のデフォルトの ${routedByValue} 変数を route.by.field 送信トレイ SMT オプションの設定に置き換えます。

route.topic.replacement

outbox.event​.${routedByValue}

ルーター

コネクターが送信トレイメッセージを出力するトピックの名前を指定します。デフォルトのトピック名では、outbox.event. の後に送信トレイテーブルレコードの aggregatetype 列の値が続きます。たとえば、aggregatetype の値が customers の場合、トピック名は outbox.event.customers になります。

トピック名を変更するには、次の操作を行います。

route.tombstone.on.empty.payload

false

ルーター

空または null のペイロードによってコネクターがトゥームストーンイベントを出力するかどうかを示します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.