12.3. CloudEvents フォーマットでの Debezium 変更イベントレコードの出力
CloudEvents は、共通の方法でイベントデータを記述するための仕様です。その目的は、サービス、プラットフォーム、およびシステム間の相互運用性を提供することです。Debezium では、Db2、MongoDB、MySQL、Oracle、PostgreSQL、または SQL Server コネクターを設定して、CloudEvents 仕様に準拠した変更イベントレコードを出力することができます。
CloudEvents フォーマットでの変更イベントレコードの出力は、テクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
CloudEvents 仕様は、以下の項目を定義します。
- 標準化されたイベント属性のセット
- カスタム属性を定義するためのルール
- イベントフォーマットを JSON や Apache Avro などのシリアライズした表現にマッピングするためのエンコーディングルール
- Apache Kafka、HTTP、または AMQP 等のトランスポート層のプロトコルバインディング
CloudEvents 仕様に準拠する変更イベントレコードを出力するように Debezium コネクターを設定するために、Debezium では Kafka Connect メッセージコンバーターである io.debezium.converters.CloudEventsConverter
を利用することができます。
現時点では、構造化マッピングモードだけが使用できます。CloudEvents 変更イベントエンベロープは JSON または Avro にすることができ、各エンベロープタイプの データ
形式として JSON または Avro を使用できます。CloudEvents フォーマットでの変更イベントの出力に関する情報は、以下のように整理されます。
Avro 使用の詳細については、以下を参照してください。
12.3.1. CloudEvents フォーマットでの Debezium 変更イベントレコードの例
以下の例は、PostgreSQL コネクターから出力される CloudEvents 変更イベントレコードを示しています。この例では、PostgreSQL コネクターは CloudEvents フォーマットエンベロープおよび data
フォーマットとして JSON を使用するように設定されています。
{ "id" : "name:test_server;lsn:29274832;txId:565", "source" : "/debezium/postgresql/test_server", "specversion" : "1.0", "type" : "io.debezium.postgresql.datachangeevent", "time" : "2020-01-13T13:55:39.738Z", "datacontenttype" : "application/json", "iodebeziumop" : "r", "iodebeziumversion" : "2.3.7.Final", "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578923739738", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumlsn" : "29274832", "iodebeziumxmin" : null, "iodebeziumtxid": "565", "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : { "before" : null, "after" : { "pk" : 1, "name" : "Bob" } } }
{
"id" : "name:test_server;lsn:29274832;txId:565",
"source" : "/debezium/postgresql/test_server",
"specversion" : "1.0",
"type" : "io.debezium.postgresql.datachangeevent",
"time" : "2020-01-13T13:55:39.738Z",
"datacontenttype" : "application/json",
"iodebeziumop" : "r",
"iodebeziumversion" : "2.3.7.Final",
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578923739738",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumlsn" : "29274832",
"iodebeziumxmin" : null,
"iodebeziumtxid": "565",
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : {
"before" : null,
"after" : {
"pk" : 1,
"name" : "Bob"
}
}
}
項目 | 説明 |
---|---|
1 | 変更イベントの内容に基づいてコネクターが変更イベントに生成する一意の ID。 |
2 |
イベントのソースで、コネクター設定の |
3 | CloudEvents 仕様のバージョン。 |
4 |
変更イベントを生成したコネクタータイプ。このフィールドの形式は |
5 | ソースデータベースの変更時刻。 |
6 |
|
7 |
操作の ID。許容値は、 |
8 |
Debezium 変更イベントから認識されるすべての |
9 |
コネクターで有効にすると、Debezium 変更イベントから認識されるそれぞれの |
10 |
実際のデータ変更。操作およびコネクターによって、データに |
以下の例も、PostgreSQL コネクターから出力される CloudEvents 変更イベントレコードを示しています。この例でも、PostgreSQL コネクターは CloudEvents フォーマットエンベロープとして JSON を使用するように設定されていますが、ここではコネクターは data
フォーマットに Avro を使用するように設定されています。
{ "id" : "name:test_server;lsn:33227720;txId:578", "source" : "/debezium/postgresql/test_server", "specversion" : "1.0", "type" : "io.debezium.postgresql.datachangeevent", "time" : "2020-01-13T14:04:18.597Z", "datacontenttype" : "application/avro", "dataschema" : "http://my-registry/schemas/ids/1", "iodebeziumop" : "r", "iodebeziumversion" : "2.3.7.Final", "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578924258597", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumtxId" : "578", "iodebeziumlsn" : "33227720", "iodebeziumxmin" : null, "iodebeziumtxid": "578", "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : "AAAAAAEAAgICAg==" }
{
"id" : "name:test_server;lsn:33227720;txId:578",
"source" : "/debezium/postgresql/test_server",
"specversion" : "1.0",
"type" : "io.debezium.postgresql.datachangeevent",
"time" : "2020-01-13T14:04:18.597Z",
"datacontenttype" : "application/avro",
"dataschema" : "http://my-registry/schemas/ids/1",
"iodebeziumop" : "r",
"iodebeziumversion" : "2.3.7.Final",
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578924258597",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumtxId" : "578",
"iodebeziumlsn" : "33227720",
"iodebeziumxmin" : null,
"iodebeziumtxid": "578",
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : "AAAAAAEAAgICAg=="
}
項目 | 説明 |
---|---|
1 |
|
2 | Avro データが準拠するスキーマの URI。 |
3 |
|
data
属性に加えてエンベロープに Avro を使用することもできます。
12.3.2. Debezium CloudEvents コンバーターの設定例
Debezium コネクター設定で io.debezium.converters.CloudEventsConverter
を設定します。次の特性を持つ変更イベントレコードを出力するように CloudEvents コンバーターを設定する方法を以下の例に示します。
- エンベロープとして JSON を使用する。
-
http://my-registry/schemas/ids/1
のスキーマレジストリーを使用して、データ
属性をバイナリー Avro データとしてシリアライズする。
... "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.serializer.type" : "json", "value.converter.data.serializer.type" : "avro", "value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1" ...
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
項目 | 説明 |
---|---|
1 |
|
CloudEvents コンバーターは、Kafka レコードの値を変換します。レコードのキーを操作する場合は、同じコネクター設定で key.converter
を指定することができます。たとえば、StringConverter
、LongConverter
、JsonConverter
、または AvroConverter
を指定できます。
12.3.3. Debezium CloudEvents コンバーター設定オプション
CloudEvent コンバーターを使用するように Debezium コネクターを設定する場合、以下のオプションを指定できます。
オプション | デフォルト | 説明 |
|
CloudEvents エンベロープ構造に使用するエンコーディングタイプ。値は | |
|
| |
該当なし |
JSON を使用する際に、ベースとなるコンバーターに渡される任意の設定オプション。 | |
該当なし |
Arvo を使用する際に、ベースとなるコンバーターに渡される任意の設定オプション。 | |
none |
コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。値は |