11.3. CloudEvents フォーマットでの Debezium 変更イベントレコードの出力
CloudEvents は、共通の方法でイベントデータを記述するための仕様です。その目的は、サービス、プラットフォーム、およびシステム間の相互運用性を提供することです。Debezium では、Db2、MongoDB、MySQL、Oracle、PostgreSQL、または SQL Server コネクターを設定して、CloudEvents 仕様に準拠した変更イベントレコードを出力することができます。
CloudEvents フォーマットでの変更イベントレコードの出力は、テクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
CloudEvents 仕様は、以下の項目を定義します。
- 標準化されたイベント属性のセット
- カスタム属性を定義するためのルール
- イベントフォーマットを JSON や Apache Avro などのシリアライズした表現にマッピングするためのエンコーディングルール
- Apache Kafka、HTTP、または AMQP 等のトランスポート層のプロトコルバインディング
CloudEvents 仕様に準拠する変更イベントレコードを出力するように Debezium コネクターを設定するために、Debezium では Kafka Connect メッセージコンバーターである io.debezium.converters.CloudEventsConverter
を利用することができます。
現時点では、構造化マッピングモードだけが使用できます。CloudEvents 変更イベントエンベロープは JSON または Avro にすることができ、各エンベロープタイプの データ
形式として JSON または Avro を使用できます。CloudEvents フォーマットでの変更イベントの出力に関する情報は、以下のように整理されます。
Avro 使用の詳細は、以下を参照してください。
11.3.1. CloudEvents フォーマットでの Debezium 変更イベントレコードの例
以下の例は、PostgreSQL コネクターから出力される CloudEvents 変更イベントレコードを示しています。この例では、PostgreSQL コネクターは CloudEvents フォーマットエンベロープおよび data
フォーマットとして JSON を使用するように設定されています。
{ "id" : "name:test_server;lsn:29274832;txId:565", 1 "source" : "/debezium/postgresql/test_server", 2 "specversion" : "1.0", 3 "type" : "io.debezium.postgresql.datachangeevent", 4 "time" : "2020-01-13T13:55:39.738Z", 5 "datacontenttype" : "application/json", 6 "iodebeziumop" : "r", 7 "iodebeziumversion" : "2.5.4.Final", 8 "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578923739738", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumlsn" : "29274832", "iodebeziumxmin" : null, "iodebeziumtxid": "565", 9 "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : { 10 "before" : null, "after" : { "pk" : 1, "name" : "Bob" } } }
項目 | 説明 |
---|---|
1 | 変更イベントの内容に基づいてコネクターが変更イベントに生成する一意の ID。 |
2 |
イベントのソースで、コネクター設定の |
3 | CloudEvents 仕様のバージョン。 |
4 |
変更イベントを生成したコネクタータイプ。このフィールドの形式は |
5 | ソースデータベースの変更時刻。 |
6 |
|
7 |
操作の ID。許容値は、 |
8 |
Debezium 変更イベントから認識されるすべての |
9 |
コネクターで有効にすると、Debezium 変更イベントから認識されるそれぞれの |
10 |
実際のデータ変更。操作およびコネクターによって、データに |
以下の例も、PostgreSQL コネクターから出力される CloudEvents 変更イベントレコードを示しています。この例でも、PostgreSQL コネクターは CloudEvents フォーマットエンベロープとして JSON を使用するように設定されていますが、ここではコネクターは data
フォーマットに Avro を使用するように設定されています。
{ "id" : "name:test_server;lsn:33227720;txId:578", "source" : "/debezium/postgresql/test_server", "specversion" : "1.0", "type" : "io.debezium.postgresql.datachangeevent", "time" : "2020-01-13T14:04:18.597Z", "datacontenttype" : "application/avro", 1 "dataschema" : "http://my-registry/schemas/ids/1", 2 "iodebeziumop" : "r", "iodebeziumversion" : "2.5.4.Final", "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578924258597", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumtxId" : "578", "iodebeziumlsn" : "33227720", "iodebeziumxmin" : null, "iodebeziumtxid": "578", "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : "AAAAAAEAAgICAg==" 3 }
項目 | 説明 |
---|---|
1 |
|
2 | Avro データが準拠するスキーマの URI。 |
3 |
|
data
属性に加えてエンベロープに Avro を使用することもできます。
11.3.2. Debezium CloudEvents コンバーターの設定例
Debezium コネクター設定で io.debezium.converters.CloudEventsConverter
を設定します。次の特性を持つ変更イベントレコードを出力するように CloudEvents コンバーターを設定する方法を以下の例に示します。
- エンベロープとして JSON を使用する。
-
http://my-registry/schemas/ids/1
のスキーマレジストリーを使用して、データ
属性をバイナリー Avro データとしてシリアライズする。
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json", 1
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
項目 | 説明 |
---|---|
1 |
|
CloudEvents コンバーターは、Kafka レコードの値を変換します。レコードのキーを操作する場合は、同じコネクター設定で key.converter
を指定することができます。たとえば、StringConverter
、LongConverter
、JsonConverter
、または AvroConverter
を指定できます。
11.3.3. メタデータのソースと一部の CloudEvents フィールドの設定
デフォルトでは、metadata.source
プロパティーは次の例に示すように 3 つの部分で構成されます。
"value,id:generate,type:generate"
最初の部分は、レコードのメタデータを取得するためのソースを指定します。許可される値は value
と header
です。次の部分では、CloudEvent の id
フィールドと type
フィールドを取得する方法を指定します。許可される値は generate
と header
です。
レコードメタデータの取得
CloudEvent を構築するには、コンバーターにソース、操作、およびトランザクションのメタデータが必要です。通常、コンバーターはレコードの値からメタデータを取得できます。ただし、場合によっては、コンバーターがレコードを受信する前に、レコードの値がメタデータを含まないような形でレコードが処理されることがあります (レコードが送信トレイイベントルーター SMT によって処理された後など)。必要なメタデータを保持するには、次の方法を使用してレコードのヘッダーでメタデータを渡します。
手順
-
たとえば
HeaderFrom
SMT を使用して、レコードがコンバーターに到達する前にレコードのヘッダーにメタデータを記録するメカニズムを実装します。 -
コンバーターの
metadata.source
プロパティーの値をheader
に設定します。
次の例は、送信トレイイベントルーター SMT と HeaderFrom
SMT を使用するコネクターの設定を示しています。
... "tombstones.on.delete": false, "transforms": "addMetadataHeaders,outbox", "transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.addMetadataHeaders.fields": "source,op,transaction", "transforms.addMetadataHeaders.headers": "source,op,transaction", "transforms.addMetadataHeaders.operation": "copy", "transforms.addMetadataHeaders.predicate": "isHeartbeat", "transforms.addMetadataHeaders.negate": true, "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.expand.json.payload": true, "transforms.outbox.table.fields.additional.placement": "type:header", "predicates": "isHeartbeat", "predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.isHeartbeat.pattern": "__debezium-heartbeat.*", "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.metadata.source": "header", "header.converter": "org.apache.kafka.connect.json.JsonConverter", "header.converter.schemas.enable": true ...
HeaderFrom
変換を使用するには、廃棄メッセージとハートビートメッセージのフィルタリングが必要な場合があります。
metadata.source
プロパティーの header
値はグローバル設定です。そのため、プロパティーの値の一部 (id
や type
ソースなど) を省略した場合でも、コンバーターは省略された部分の header
値を生成します。
CloudEvent の id
と type
の取得
デフォルトでは、CloudEvents コンバーターは CloudEvent の id
フィールドと type
フィールドの値を自動的に生成します。デフォルトを変更し、適切なヘッダーにフィールドの値を指定することにより、コンバーターがこれらのフィールドに値を入力する方法をカスタマイズできます。以下に例を示します。
"value.converter.metadata.source": "value,id:header,type:header"
上記の設定を有効にすると、CloudEvents コンバーターに渡す値を含む id
および type
ヘッダーを追加するようにアップストリーム機能を設定できます。
id
ヘッダーにのみ値を指定する場合は、以下を使用します。
"value.converter.metadata.source": "value,id:header,type:generate"
ヘッダーにメタデータ、id
、および type
を指定するには、以下の短い構文を使用します。
"value.converter.metadata.source": "header"
11.3.4. Debezium CloudEvents コンバーター設定オプション
CloudEvent コンバーターを使用するように Debezium コネクターを設定する場合、以下のオプションを指定できます。
オプション | デフォルト | 説明 |
|
CloudEvents エンベロープ構造に使用するエンコーディングタイプ。値は | |
|
| |
該当なし |
JSON を使用する際に、ベースとなるコンバーターに渡される任意の設定オプション。 | |
該当なし |
Avro を使用する際に、ベースとなるコンバーターに渡される任意の設定オプション。 | |
none |
コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。値は | |
none |
スキーマレジストリーに登録されるスキーマの CloudEvents スキーマ名を指定します。 | |
|
コンバーターがクラウドイベントを生成するときにエクステンション属性を含めるかどうかを指定します。値は | |
|
コンバーターがメタデータを取得するソース (ソース、操作、トランザクション) と、CloudEvent の
設定例については、メタデータのソースと一部の CloudEvents フィールドの設定 を参照してください。 |