11.3. CloudEvents フォーマットでの Debezium 変更イベントレコードの出力


CloudEvents は、共通の方法でイベントデータを記述するための仕様です。その目的は、サービス、プラットフォーム、およびシステム間の相互運用性を提供することです。Debezium では、Db2、MongoDB、MySQL、Oracle、PostgreSQL、または SQL Server コネクターを設定して、CloudEvents 仕様に準拠した変更イベントレコードを出力することができます。

重要

CloudEvents フォーマットでの変更イベントレコードの出力は、テクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

CloudEvents 仕様は、以下の項目を定義します。

  • 標準化されたイベント属性のセット
  • カスタム属性を定義するためのルール
  • イベントフォーマットを JSON や Apache Avro などのシリアライズした表現にマッピングするためのエンコーディングルール
  • Apache Kafka、HTTP、または AMQP 等のトランスポート層のプロトコルバインディング

CloudEvents 仕様に準拠する変更イベントレコードを出力するように Debezium コネクターを設定するために、Debezium では Kafka Connect メッセージコンバーターである io.debezium.converters.CloudEventsConverter を利用することができます。

現時点では、構造化マッピングモードだけが使用できます。CloudEvents 変更イベントエンベロープは JSON または Avro にすることができ、各エンベロープタイプの データ 形式として JSON または Avro を使用できます。CloudEvents フォーマットでの変更イベントの出力に関する情報は、以下のように整理されます。

Avro 使用の詳細は、以下を参照してください。

11.3.1. CloudEvents フォーマットでの Debezium 変更イベントレコードの例

以下の例は、PostgreSQL コネクターから出力される CloudEvents 変更イベントレコードを示しています。この例では、PostgreSQL コネクターは CloudEvents フォーマットエンベロープおよび data フォーマットとして JSON を使用するように設定されています。

{
  "id" : "name:test_server;lsn:29274832;txId:565",   1
  "source" : "/debezium/postgresql/test_server",     2
  "specversion" : "1.0",                             3
  "type" : "io.debezium.postgresql.datachangeevent", 4
  "time" : "2020-01-13T13:55:39.738Z",               5
  "datacontenttype" : "application/json",            6
  "iodebeziumop" : "r",                              7
  "iodebeziumversion" : "2.5.4.Final",        8
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578923739738",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumlsn" : "29274832",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "565",                           9
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : {                                         10
    "before" : null,
    "after" : {
      "pk" : 1,
      "name" : "Bob"
    }
  }
}
表11.3 CloudEvents 変更イベントレコードのフィールドの説明
項目説明

1

変更イベントの内容に基づいてコネクターが変更イベントに生成する一意の ID。

2

イベントのソースで、コネクター設定の topic.prefix プロパティーで指定されたデータベースの論理名です。

3

CloudEvents 仕様のバージョン。

4

変更イベントを生成したコネクタータイプ。このフィールドの形式は io.debezium.CONNECTOR_TYPE.datachangeevent です。CONNECTOR_TYPE の有効な値は db2mongodbmysqloraclepostgresql、または sqlserver です。

5

ソースデータベースの変更時刻。

6

data 属性のコンテンツタイプを記述します。この例のように、設定可能な値は jsonavro です。

7

操作の ID。許容値は、r (読み取り)、c (作成)、u (更新)、または d (削除) です。

8

Debezium 変更イベントから認識されるすべての source 属性は、属性名の前にiodebezium を追加して CloudEvents エクステンション属性にマッピングされます。

9

コネクターで有効にすると、Debezium 変更イベントから認識されるそれぞれの transaction 属性は、属性名の前に iodebeziumtx を追加して CloudEvents エクステンション属性にマッピングされます。

10

実際のデータ変更。操作およびコネクターによって、データに beforeafter または patch フィールドが含まれる場合があります。

以下の例も、PostgreSQL コネクターから出力される CloudEvents 変更イベントレコードを示しています。この例でも、PostgreSQL コネクターは CloudEvents フォーマットエンベロープとして JSON を使用するように設定されていますが、ここではコネクターは data フォーマットに Avro を使用するように設定されています。

{
  "id" : "name:test_server;lsn:33227720;txId:578",
  "source" : "/debezium/postgresql/test_server",
  "specversion" : "1.0",
  "type" : "io.debezium.postgresql.datachangeevent",
  "time" : "2020-01-13T14:04:18.597Z",
  "datacontenttype" : "application/avro",            1
  "dataschema" : "http://my-registry/schemas/ids/1", 2
  "iodebeziumop" : "r",
  "iodebeziumversion" : "2.5.4.Final",
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578924258597",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "578",
  "iodebeziumlsn" : "33227720",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "578",
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : "AAAAAAEAAgICAg=="                        3
}
表11.4 Avro を使用してデータをフォーマットするコネクターの CloudEvents イベントレコードのフィールドに関する説明
項目説明

1

data 属性に Avro バイナリーデータが含まれていることを示します。

2

Avro データが準拠するスキーマの URI。

3

data 属性には、base64 でエンコードされた Avro バイナリーデータが含まれます。

data 属性に加えてエンベロープに Avro を使用することもできます。

11.3.2. Debezium CloudEvents コンバーターの設定例

Debezium コネクター設定で io.debezium.converters.CloudEventsConverter を設定します。次の特性を持つ変更イベントレコードを出力するように CloudEvents コンバーターを設定する方法を以下の例に示します。

  • エンベロープとして JSON を使用する。
  • http://my-registry/schemas/ids/1 のスキーマレジストリーを使用して、データ 属性をバイナリー Avro データとしてシリアライズする。
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",          1
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
表11.5 CloudEvents コンバーター設定のフィールドの説明
項目説明

1

json はデフォルトであるため、serializer.type の指定は任意です。

CloudEvents コンバーターは、Kafka レコードの値を変換します。レコードのキーを操作する場合は、同じコネクター設定で key.converter を指定することができます。たとえば、StringConverterLongConverterJsonConverter、または AvroConverter を指定できます。

11.3.3. メタデータのソースと一部の CloudEvents フィールドの設定

デフォルトでは、metadata.source プロパティーは次の例に示すように 3 つの部分で構成されます。

"value,id:generate,type:generate"

最初の部分は、レコードのメタデータを取得するためのソースを指定します。許可される値は valueheader です。次の部分では、CloudEvent の id フィールドと type フィールドを取得する方法を指定します。許可される値は generateheader です。

レコードメタデータの取得

CloudEvent を構築するには、コンバーターにソース、操作、およびトランザクションのメタデータが必要です。通常、コンバーターはレコードの値からメタデータを取得できます。ただし、場合によっては、コンバーターがレコードを受信する前に、レコードの値がメタデータを含まないような形でレコードが処理されることがあります (レコードが送信トレイイベントルーター SMT によって処理された後など)。必要なメタデータを保持するには、次の方法を使用してレコードのヘッダーでメタデータを渡します。

手順

  1. たとえば HeaderFrom SMT を使用して、レコードがコンバーターに到達する前にレコードのヘッダーにメタデータを記録するメカニズムを実装します。
  2. コンバーターの metadata.source プロパティーの値を header に設定します。

次の例は、送信トレイイベントルーター SMT と HeaderFrom SMT を使用するコネクターの設定を示しています。

...
"tombstones.on.delete": false,
"transforms": "addMetadataHeaders,outbox",
"transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.addMetadataHeaders.fields": "source,op,transaction",
"transforms.addMetadataHeaders.headers": "source,op,transaction",
"transforms.addMetadataHeaders.operation": "copy",
"transforms.addMetadataHeaders.predicate": "isHeartbeat",
"transforms.addMetadataHeaders.negate": true,
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.expand.json.payload": true,
"transforms.outbox.table.fields.additional.placement": "type:header",
"predicates": "isHeartbeat",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.metadata.source": "header",
"header.converter": "org.apache.kafka.connect.json.JsonConverter",
"header.converter.schemas.enable": true
...
注記

HeaderFrom 変換を使用するには、廃棄メッセージとハートビートメッセージのフィルタリングが必要な場合があります。

metadata.source プロパティーの header 値はグローバル設定です。そのため、プロパティーの値の一部 (idtype ソースなど) を省略した場合でも、コンバーターは省略された部分の header 値を生成します。

CloudEvent の idtype の取得

デフォルトでは、CloudEvents コンバーターは CloudEvent の id フィールドと type フィールドの値を自動的に生成します。デフォルトを変更し、適切なヘッダーにフィールドの値を指定することにより、コンバーターがこれらのフィールドに値を入力する方法をカスタマイズできます。以下に例を示します。

"value.converter.metadata.source": "value,id:header,type:header"

上記の設定を有効にすると、CloudEvents コンバーターに渡す値を含む id および type ヘッダーを追加するようにアップストリーム機能を設定できます。

id ヘッダーにのみ値を指定する場合は、以下を使用します。

"value.converter.metadata.source": "value,id:header,type:generate"

ヘッダーにメタデータ、id、および type を指定するには、以下の短い構文を使用します。

"value.converter.metadata.source": "header"

11.3.4. Debezium CloudEvents コンバーター設定オプション

CloudEvent コンバーターを使用するように Debezium コネクターを設定する場合、以下のオプションを指定できます。

表11.6 CloudEvents コンバーター設定オプションの説明

オプション

デフォルト

説明

serializer.type

json

CloudEvents エンベロープ構造に使用するエンコーディングタイプ。値は json または avro に指定できます。

data.serializer.type

json

data 属性に使用するエンコーディングタイプ。値は json または avro に指定できます。

json. ...

該当なし

JSON を使用する際に、ベースとなるコンバーターに渡される任意の設定オプション。json. 接頭辞が削除されます。

avro. ...

該当なし

Avro を使用する際に、ベースとなるコンバーターに渡される任意の設定オプション。avro. 接頭辞が削除されます。たとえば、Avro データ の場合は、avro.schema.registry.url オプションを指定します。

schema.name.adjustment.mode

none

コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。値は none または avro にすることができます。

schema.cloudevents.name

none

スキーマレジストリーに登録されるスキーマの CloudEvents スキーマ名を指定します。serializer.typejson の場合、設定は無視され、値はスキーマレスになります。設定されていない場合は、デフォルトのアルゴリズム ${serverName}.${databaseName}.CloudEvents.Envelope を使用してスキーマ名が生成されます。

extension.attributes.enable

true

コンバーターがクラウドイベントを生成するときにエクステンション属性を含めるかどうかを指定します。値は true または false です。

metadata.source

value,id:generate,type:generate

コンバーターがメタデータを取得するソース (ソース、操作、トランザクション) と、CloudEvent の id および type フィールドの名前を指定する、コンマ区切りリスト。リストの最初の要素は、メタデータのソースを指定するグローバル設定です。メタデータのソースは、value または header になります。この最初の要素の後には、CloudEvent フィールドの名前 (id または type) と、フィールドの値を取得するためのソース (generate または header) を指定するペアのセットが続きます。各ペアの値はコロンで区切ります。以下に例を示します。

value,id:header,type:generate

設定例については、メタデータのソースと一部の CloudEvents フィールドの設定 を参照してください。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.