Debezium スタートガイド
Red Hat build of Debezium 3.0.8 向け
概要
はじめに
このチュートリアルでは、Debezium を使用して MySQL データベースの更新をキャプチャーする方法を紹介します。データベースのデータが変更されると、結果となるイベントストリームを確認できます。
Red Hat ドキュメントへのフィードバック
Red Hat ドキュメントに関するご意見やご感想をお寄せください。
改善を提案するには、Jira 課題を作成し、変更案を説明してください。ご要望に迅速に対応できるよう、できるだけ詳細にご記入ください。
前提条件
-
Red Hat カスタマーポータルのアカウントがある。このアカウントを使用すると、Red Hat Jira Software インスタンスにログインできます。
アカウントをお持ちでない場合は、アカウントを作成するように求められます。
手順
- Create issue にアクセスします。
- Summary テキストボックスに、問題の簡単な説明を入力します。
Description テキストボックスに、次の情報を入力します。
- 問題が見つかったページの URL。
-
問題の詳細情報。
他のフィールドの情報はデフォルト値のままにすることができます。
- Create をクリックして、Jira 課題をドキュメントチームに送信します。
フィードバックをご提供いただきありがとうございました。
第1章 このチュートリアルについて
このチュートリアルには、次の手順が含まれています。
- 簡単なサンプルデータベースを含む MySQL データベースサーバーを OpenShift にデプロイします。
- Streams for Apache Kafka でカスタムリソースを適用して、Debezium MySQL コネクタープラグインを含む Kafka Connect コンテナーイメージを自動的にビルドします。
- Debezium MySQL コネクターリソースを作成して、データベースの変更をキャプチャーします。
- コネクターのデプロイメントを確認します。
- コネクターがデータベースから Kafka トピックに発行する変更イベントを表示します。
前提条件
- OpenShift と Streams for Apache Kafka に精通している。
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- Streams for Apache Kafka Operator が実行されている。
- Apache Kafka クラスターが OpenShift 上の Streams for Apache Kafka のデプロイと管理 に記載されているとおりにデプロイされている。
- Red Hat build of Debezium のライセンスを所有している。
-
OpenShift 管理ツールの使用方法を理解している。OpenShift
oc
CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、コンテナーレジストリーにアクセスするためのパーミッションを持っているか、OpenShift 上に ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
- ImageStream リソースを新規コンテナーイメージを保存するためにクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。
第2章 Debezium の紹介
Debezium は、既存のデータベースからの情報をイベントストリームに変換する分散プラットフォームであり、アプリケーションがデータベース内の行レベルの変更を検出して即座に対応できるようにします。
Debezium は Apache Kafka の上に構築され、一連の Kafka Connect 互換コネクターを提供します。各コネクターは、特定のデータベース管理システム (DBMS) で動作します。コネクターは、変更が発生したときにそれを検出し、各変更イベントのレコードを Kafka トピックにストリーミングすることで、DBMS のデータ変更の履歴を記録します。その後、使用する側のアプリケーションが、結果のイベントレコードを Kafka トピックから読み取れるようになります。
Debezium は、Kafka の信頼性の高いストリーミングプラットフォームを利用することで、アプリケーションがデータベースで発生した変更を正確かつ完全に利用できるようにします。アプリケーションが予期せず停止したり、接続が失われたりしても、停止中に発生したイベントを見逃すことはありません。アプリケーションの再起動後、中断した時点からトピックからの読み取りを再開します。
次のチュートリアルでは、簡単な設定で Debezium MySQL コネクター をデプロイして使用する方法を示しています。Debezium コネクターのデプロイおよび使用の詳細は、コネクターのドキュメントを参照してください。
第3章 サービスの起動
Debezium を使用するには、Kafka と Kafka Connect を備えた Streams for Apache Kafka、データベース、および Debezium コネクターサービスが必要です。このチュートリアルのサービスを実行するには、以下を行う必要があります。
3.1. MySQL データベースのデプロイ
サンプルの inventory
データベースを含む MySQL データベースサーバーをデプロイします。このデータベースには、データが事前入力されたテーブルがいくつか含まれています。Debezium MySQL コネクターは、サンプルテーブルで発生する変更をキャプチャーし、変更イベントレコードを Apache Kafka トピックに送信します。
手順
以下のコマンドを実行して MySQL データベースを起動します。このコマンドは、
inventory
データベースのサンプルで設定されている MySQL データベースサーバーを起動します。
Copy to clipboardCopied$ oc new-app -l app=mysql --name=mysql quay.io/debezium/example-mysql:latest
以下のコマンドを実行して MySQL データベースのクレデンシャルを設定します。このコマンドによって MySQL データベースのデプロイメント設定が更新され、ユーザー名とパスワードが追加されます。
Copy to clipboardCopied$ oc set env deployment/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
以下のコマンドを実行して MySQL データベースが稼働していることを検証します。コマンドの実行後、MySQL データベースが稼働し、Pod の準備が整っていることを表す出力が表示されます。
Copy to clipboardCopied$ oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23s
新しいターミナルを開き、
inventory
データベースのサンプルにログインします。このコマンドは、MySQL データベースを実行している Pod で MySQL コマンドラインクライアントを開きます。クライアントは以前に設定したユーザー名とパスワードを使用します。
Copy to clipboardCopied$ oc exec mysql-1-2gzx5 -it -- mysql -u mysqluser -pmysqlpw inventory mysql: [Warning] Using a password on the command line interface can be insecure. Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 7 Server version: 5.7.29-log MySQL Community Server (GPL) Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql>
inventory
データベースのテーブルをリスト表示します。
Copy to clipboardCopiedmysql> show tables; +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | geom | | orders | | products | | products_on_hand | +---------------------+ 6 rows in set (0.00 sec)
データベースを調べ、それに含まれるデータを確認します。たとえば、
customers
テーブルを表示します。
Copy to clipboardCopiedmysql> select * from customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
3.2. Kafka Connect のデプロイ
MySQL データベースをデプロイした後、Streams for Apache Kafka を使用して、Debezium MySQL コネクタープラグインを含む Kafka Connect コンテナーイメージをビルドします。デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、MySQL コネクターアーティファクトに関する情報をイメージに含める
KafkaConnect
CR。 -
MySQL コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnector
CR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnector
CR を適用してコネクターを起動します。
ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect
カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドでは、Red Hat Maven リポジトリーから必要なアーティファクトをダウンロードし、イメージに組み込みます。新規に作成されたコンテナーは .spec.build.output
で指定されたコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。
コンテナーイメージは、quay.io
などの外部コンテナーレジストリー、または OpenShift ImageStream に保存できます。ImageStream は自動的に作成されないため、コンテナーイメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する 必要があります。
Streams for Apache Kafka が Kafka Connect イメージをビルドして保存した後、KafkaConnector
カスタムリソースを使用してコネクターを起動します。
前提条件
- Streams for Apache Kafka が OpenShift クラスター上で実行されている。
- Streams for Apache Kafka Cluster Operator が OpenShift クラスターにインストールされている。
- ImageStream を利用できる (KafkaConnect コンテナーイメージを OpenShift ImageStream に保存する場合)。
- Apache Kafka と Kafka Connect が Streams for Apache Kafka 上で実行されている。
手順
-
OpenShift クラスターにログインし、
debezium
などのプロジェクトを作成または開きます。 コネクターの Debezium
KafkaConnect
カスタムリソース (CR) を作成するか、既存のリソースを変更します。
以下の例は、KafkaConnect
カスタムリソースを記述したdbz-connect.yaml
ファイルからの抜粋を示しています。metadata.annotations
およびspec.build
プロパティーが必要です。例3.1 Debezium コネクターを含む
KafkaConnect
カスタムリソースを定義したdbz-connect.yaml
ファイル
Copy to clipboardCopiedapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9092 version: 3.9.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/3.0.8.Final-redhat-00004/debezium-connector-mysql-3.0.8.Final-redhat-00004-plugin.zip 7 ...
表3.1 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resources
アノテーションを"true"
に設定して、クラスター Operator がKafkaConnector
リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build
設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.output
は、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.type
の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker
、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestream
です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.output
を指定する方法の詳細は、Streams for Apache Kafka ビルドスキーマリファレンス ドキュメントを参照してください。5
plugins
設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインname
と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.type
の値は、artifacts.url
で指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip
、tgz
、またはjar
です。Debezium コネクターアーカイブは、.zip
ファイル形式で提供されます。JDBC ドライバーファイルは.jar
形式です。type
の値は、url
フィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.url
の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。以下のコマンドを入力して、
KafkaConnect
ビルド仕様を OpenShift クラスターに適用します。
Copy to clipboardCopiedoc create -f dbz-connect.yaml
カスタムリソースで指定された設定に基づいて、Streams for Apache Kafka Operator はデプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnector
リソースを作成して、MySQL コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnector
CR を作成し、debezium-inventory-connector.yaml
として保存します。例3.2 Debezium コネクターの
KafkaConnector
カスタムリソースを定義したmysql-inventory-connector.yaml
ファイル
Copy to clipboardCopiedapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: inventory-connector 1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 database.hostname: mysql 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.server.id: 184054 topic.prefix: dbserver1 9 table.include.list: inventory.* 10 schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 11 schema.history.internal.kafka.topic: schema-changes.inventory 12
表3.2 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
一度に 1 つのタスクのみを実行します。MySQL コネクターは MySQL サーバーの
binlog
を読み取るため、単一のコネクタータスクを使用して、適切な順序とイベント処理を確保します。Kafka Connect サービスは、コネクターを使用して 1 つ以上のタスクを開始し、作業を完了します。実行中のタスクを Kafka Connect サービスのクラスター全体に自動的に分散します。サービスが停止またはクラッシュした場合、タスクは実行中のサービスに再分散されます。4
コネクターの設定。
5
MySQL データベースインスタンスのホスト名またはアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースに接続するユーザーアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
MySQL サーバーまたはクラスターのトピック接頭辞。この文字列は、コネクターがイベントレコードを送信する全 Kafka トピックの名前の前に付けます。
10
コネクターが変更イベントをキャプチャーするテーブルのリスト。コネクターは、
inventory
テーブルで発生した場合にのみ、変更を検出します。11
DDL ステートメントをデータベーススキーマの履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。これは、コネクターが変更イベントレコードを送信するブローカーと同じです。再起動後、コネクターは、コネクターが読み取りを再開する時点で binlog に存在しているデータベーススキーマを復元します。
12
データベーススキーマ履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
以下のコマンドを実行してコネクターリソースを作成します。
Copy to clipboardCopiedoc create -n <namespace> -f <kafkaConnector>.yaml
以下に例を示します。
Copy to clipboardCopiedoc create -n debezium -f mysql-inventory-connector.yaml
コネクターは Kafka Connect クラスターに登録され、
KafkaConnector
CR のspec.config.database.dbname
で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、コネクターが作成されたことを確認 し、inventory
データベースの変更のキャプチャーが開始したことを確認する準備が整いました。
3.3. 例: 単純な OpenShift ImageStream
オブジェクト定義
次の例は、単純な ImageStream
オブジェクトの定義を示しています。
apiVersion: image.openshift.io/v1
kind: ImageStream
metadata:
name: kafka-connect-dbz-mysql
spec:
lookupPolicy:
local: true
Copy to clipboardCopied追加リソース:
3.4. コネクターのデプロイメントの確認
コネクターがエラーなしで正常に起動すると、コネクターでキャプチャーするように設定した各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが OpenShift 上の Streams for Apache Kafka にデプロイされている。
-
OpenShift
oc
CLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnector
リソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
- Home → Search に移動します。
-
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnector
を入力します。 - KafkaConnectors リストから、確認するコネクターの名前をクリックします (例: inventory-connector)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
Copy to clipboardCopiedoc describe KafkaConnector <connector-name> -n <project>
以下に例を示します。
Copy to clipboardCopiedoc describe KafkaConnector inventory-connector -n debezium
このコマンドは、以下の出力のようなステータス情報を返します。
例3.3
KafkaConnector
リソースのステータス
Copy to clipboardCopiedName: inventory-connector Namespace: debezium Labels: strimzi.io/cluster=my-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: dbserver1 dbserver1.inventory.addresses dbserver1.inventory.customers dbserver1.inventory.geom dbserver1.inventory.orders dbserver1.inventory.products dbserver1.inventory.products_on_hand Events: <none>
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
- Home → Search に移動します。
-
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopic
を入力します。 - KafkaTopics リストから確認するトピックの名前をクリックします (例: dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
Copy to clipboardCopiedoc get kafkatopics
このコマンドは、以下の出力のようなステータス情報を返します。
例3.4
KafkaTopic
リソースのステータス
Copy to clipboardCopiedNAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs my-cluster 1 1 True connect-cluster-offsets my-cluster 25 1 True connect-cluster-status my-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 True dbserver1---a96f69b23d6118ff415f772679da623fbbb99421 my-cluster 1 1 True dbserver1.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 my-cluster 1 1 True dbserver1.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b my-cluster 1 1 True dbserver1.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 my-cluster 1 1 True dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d my-cluster 1 1 True dbserver1.inventory.products---df0746db116844cee2297fab611c21b56f82dcef my-cluster 1 1 True dbserver1.inventory.products-on-hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 my-cluster 1 1 True schema-changes.inventory my-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1 1 True
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
Copy to clipboardCopiedoc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
以下に例を示します。
Copy to clipboardCopiedoc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
トピック名を指定する形式は、ステップ 1 で返された
oc describe
コマンドと同じです (例:dbserver1.inventory.addresses
)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例3.5 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00001","connector":"mysql","name":"dbserver1","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
上記の例の
payload
値は、コネクタースナップショットがテーブルdbserver1.products_on_hand
から読み込み ("op" ="r"
) イベントを生成したことを示しています。product_id
レコードの"before"
状態はnull
であり、レコードに以前の値が存在しないことを示しています。"after"
状態は、product_id
101
を持つ項目のquantity
が3
であることを示しています。
これで Debezium コネクターが inventory
データベースからキャプチャーする変更イベントを表示する 準備が整いました。
第4章 変更イベントの表示
Debezium MySQL コネクターのデプロイ後に、inventory
データベースへの変更のキャプチャーが開始されます。
コネクターが起動すると、一連の Apache Kafka トピックにイベントが書き込まれます。各トピックは、MySQL データベース内のテーブルの 1 つを表します。各トピックの名前は、データベースサーバーの名前 dbserver1
で始まります。
コネクターは、次の Kafka トピックに書き込みます。
dbserver1
- 変更がキャプチャーされるテーブルに適用される DDL ステートメントが書き込まれるスキーマ変更トピック。
dbserver1.inventory.products
-
inventory
データベースのproducts
テーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.products_on_hand
-
inventory
データベースのproducts_on_hand
テーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.customers
-
inventory
データベースのcustomers
テーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.orders
-
inventory
データベースのorders
テーブルの変更イベントレコードを受け取ります。
このチュートリアルの残りの部分では、dbserver1.inventory.customers
Kafka トピックを調べます。トピックを詳しく見ていくと、さまざまな種類の変更イベントがどのように表されているかがわかり、各イベントをキャプチャーしたコネクターに関する情報が見つかります。
このチュートリアルには、次のセクションが含まれています。
4.1. 作成 イベントの表示
dbserver1.inventory.customers
トピックを表示すると、MySQL コネクターが inventory
データベースの 作成 イベントをどのようにキャプチャーしたが分かります。この場合、作成 イベントは、データベースに追加された新規顧客をキャプチャーします。
手順
新しいターミナルを開き、
kafka-console-consumer
を使用してトピックの最初からdbserver1.inventory.customers
トピックを使用します。このコマンドは、Kafka (
my-cluster-kafka-0
) を実行している Pod で簡単なコンシューマー (kafka-console-consumer.sh
) を実行します。
Copy to clipboardCopied$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
コンシューマーは、
customers
テーブルの各行に 1 つずつ、4 つのメッセージ (JSON 形式) を返します。各メッセージには、対応するテーブル行のイベントレコードが含まれます。各イベントには、キー と 値 という 2 つの JSON ドキュメントがあります。キーは行のプライマリーキーに対応し、値は行の詳細 (行に含まれるフィールド、各フィールドの値、および行で実行された操作のタイプ) を表します。
最後のイベントでは、キー の詳細を確認します。
最後のイベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
Copy to clipboardCopied{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" } ], "optional":false, "name":"dbserver1.inventory.customers.Key" }, "payload":{ "id":1004 } }
このイベントには、
schema
とpayload
の 2 つの部分があります。schema
には、ペイロードの内容を記述する Kafka Connect スキーマが含まれています。この場合、ペイロードはdbserver1.inventory.customers.Key
という名前のstruct
です。これはオプションではなく、必須フィールド (int32
型のID
) を 1 つ持っています。payload
には、値が1004
のid
フィールドが 1 つあります。イベントの キー を確認すると、このイベントは
id
のプライマリーキー列の値が1004
であるinventory.customers
テーブルの行に提供されることが分かります。同じイベントの 値 の詳細を確認します。
イベントの 値 は、行が作成されたことを示し、その行に含まれる内容 (この場合は挿入された行の
id
、first_name
、last_name
、およびemail
) を表します。最後のイベントの 値 の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
Copy to clipboardCopied{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "int64", "optional": false, "field": "ts_sec" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "boolean", "optional": true, "field": "snapshot" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "db" }, { "type": "string", "optional": true, "field": "table" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "int64", "optional": true, "field": "ts_us" }, { "type": "int64", "optional": true, "field": "ts_ns" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "3.0.8.Final", "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "inventory", "table": "customers" }, "op": "r", "ts_ms": 1486500577691, "ts_us": 1486500577691547, "ts_ns": 1486500577691547930 } }
イベントのこの部分ははるかに長くなりますが、イベントの キー と同様に
schema
とpayload
もあります。schema
には、dbserver1.inventory.customers.Envelope
(バージョン 1) という名前の Kafka Connect スキーマが含まれており、次の 5 つのフィールドがあります。op
-
操作のタイプを記述する文字列値が含まれる必須フィールド。MySQL コネクターの値は、
c
(作成または挿入)、u
(更新)、d
(削除)、およびr
(読み取り) です (スナップショットの場合)。 before
-
任意のフィールド。存在する場合は、イベント発生 前 の行の状態が含まれます。この構造は、
dbserver1.inventory.customers.Value
Kafka Connect スキーマによって記述されます。dbserver1
コネクターは、このスキーマをinventory.customers
テーブルのすべての行に使用します。 after
-
任意のフィールド。存在する場合は、イベント発生 後 の行の状態が含まれます。この構造は、
before
で使用されるのと同じdbserver1.inventory.customers.Value
Kafka Connect スキーマで記述されます。 source
-
イベントのソースメタデータを記述する構造が含まれる必須のフィールド。MySQL の場合、次の複数のフィールドが含まれます。コネクター名、イベントが記録された
binlog
ファイルの名前、binlog
ファイルでのイベント発生位置、イベント内の行 (複数ある場合)、影響を受けるデータベースおよびテーブルの名前、変更を行った MySQL スレッド ID、このイベントはスナップショットの一部であったかどうか、MySQL サーバー ID (ある場合)、および秒単位のタイムスタンプ。 ts_ms
- 任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックを使用) が含まれます。
注記イベントの JSON 表現は、記述される行よりもはるかに長くなります。これは、Kafka Connect はすべてのイベントのキーと値とともに、ペイロード を記述する スキーマ を提供するためです。今後、この構造は変更される可能性があります。ただし、特に使用する側のアプリケーションが時間とともに進化する場合は、キーと値のスキーマがイベント自体にあると、メッセージを理解するのが非常に容易になります。
Debezium MySQL コネクターは、データベーステーブルの構造に基づいてこれらのスキーマを構築します。DDL ステートメントを使用して MySQL データベースのテーブル定義を変更する場合、コネクターはこの DDL ステートメントを読み取り、Kafka Connect スキーマを更新します。これは、イベント発生時にイベントの発生元となったテーブルとまったく同じように各イベントを構造化する唯一の方法です。ただし、単一テーブルのイベントがすべて含まれる Kafka トピックには、テーブル定義の各状態に対応するイベントが含まれる場合があります。
JSON コンバーターはすべてのメッセージにキーと値のスキーマを含めるため、非常に詳細なイベントを生成します。
イベントの キー および 値 スキーマを、
inventory
データベースの状態と比較します。MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。
Copy to clipboardCopiedmysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
これは、確認したイベントレコードがデータベースのレコードと一致することを示しています。
4.2. データベースの更新および 更新 イベントの表示
Debezium MySQL コネクターが inventory
データベースで 作成 イベントをキャプチャーする方法を確認しました。次に、レコードの 1 つを変更し、コネクターがこれをどのようにキャプチャーするかを見てみましょう。
この手順を完了すると、データベースのコミットで変更した内容の詳細を確認する方法と、変更イベントを比較して、他の変更と関連していつ変更が発生したかを判断する方法を知ることができます。
手順
MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。
Copy to clipboardCopiedmysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
更新された
customers
テーブルを表示します。
Copy to clipboardCopiedmysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
kafka-console-consumer
を実行しているターミナルに切り替え、新しい 5 番目のイベントを確認します。customers
テーブルのレコードを変更したため、Debezium MySQL コネクターが新しいイベントを生成しました。新しい JSON ドキュメントが 2 つあるはずです。1 つはイベントの キー のドキュメントで、もう 1 つは新しいイベントの 値 のドキュメントです。更新 イベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
Copy to clipboardCopied{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key", "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }
この キー は、以前のイベントの キー と同じです。
新しいイベントの 値 は次のとおりです。
schema
セクションには変更がないため、payload
セクションのみを表しています (書式を調整して読みやすくしてあります)。
Copy to clipboardCopied{ "schema": {...}, "payload": { "before": { 1 "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { 2 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 3 "name": "3.0.8.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501486, "gtid": null, "file": "mysql-bin.000003", "pos": 364, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "u", 4 "ts_ms": 1486501486308, 5 "ts_us": 1486501486308910, 6 "ts_ns": 1486501486308910814 7 } }
表4.1 更新 イベント値のペイロード内におけるフィールドの説明 項目 説明 1
before
フィールドには、データベースがコミットされる前の行に存在する値が表示されます。元のfirst_name
の値はAnne
です。2
after
フィールドには、変更イベント後の行の状態が表示されます。first_name
の値はAnne Marie
になりました。3
source
フィールド構造体には以前と同じ値が多数ありますが、ts_sec
およびpos
フィールドは更新されています (他の状況ではfile
が変更されることがあります)。4
op
フィールドの値はu
になっており、更新によってこの行が変更されたことを示しています。5
ts_ms
、ts_us
、ts_ns
フィールドには、Debezium がこのイベントを処理した時刻を示すタイムスタンプが表示されます。payload
セクションを見ると、更新 イベントに関する重要な情報を確認できます。-
before
とafter
構造を比較することで、コミットが原因で影響を受けた行で実際に何が変更されたかを判断できます。 -
source
構造を確認して、MySQL の変更の記録に関する情報を確認できます (トレーサビリティーを提供)。 -
イベントの
payload
セクションと、同じトピック (または別のトピック) の他のイベントを比較することで、別のイベントと同じ MySQL コミットの前、後、または一部としてイベントが発生したかどうかを判断できます。
-
4.3. データベースのレコードの削除および 削除 イベントの表示
Debezium MySQL コネクターが inventory
データベースで 作成 および 更新 イベントをキャプチャーする方法を確認しました。次に、レコードの 1 つを削除し、コネクターがこれをどのようにキャプチャーするかを見てみましょう。
この手順を完了すると、削除 イベントの詳細を見つける方法と、Kafka が ログコンパクション を使用して、コンシューマーがすべてのイベントを取得できる状態で 削除 イベントの数を減らす方法を知ることができます。
手順
MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。
Copy to clipboardCopiedmysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)
注記上記のコマンドが外部キー制約違反で失敗する場合は、以下のステートメントを使用して、addresses テーブルから顧客のアドレスの参照を削除する必要があります。
Copy to clipboardCopiedmysql> DELETE FROM addresses WHERE customer_id=1004;
kafka-console-consumer
を実行しているターミナルに切り替え、2 つ の新しいイベントを表示します。customers
テーブルの行を削除したため、Debezium MySQL コネクターが 2 つの新しいイベントを生成しました。最初の新規イベントの キー および 値 を確認します。
最初の新規イベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
Copy to clipboardCopied{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key", "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }
この キー は、これまで確認した 2 つのイベントの キー と同じです。
最初の新規イベントの 値 は以下のとおりです (書式を調整して読みやすくしてあります)。
Copy to clipboardCopied{ "schema": {...}, "payload": { "before": { 1 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": null, 2 "source": { 3 "name": "3.0.8.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501558, "gtid": null, "file": "mysql-bin.000003", "pos": 725, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "d", 4 "ts_ms": 1486501558315, 5 "ts_us": 1486501558315901, 6 "ts_ns": 1486501558315901687 7 } }
表4.2 イベント値のフィールドの説明 項目 説明 1
before
フィールドは、データベースのコミットで削除した行の状態を表しています。2
after
フィールドはnull
で、行が存在しなくなったことが分かります。3
source
フィールド構造には、ts_sec
フィールドとpos
フィールドの値が変更されていることを除いて、以前と同じ値が多数あります。状況によっては、file
の値も変更される可能性があります。4
op
フィールドの値はd
になり、レコードが削除されたことを示しています。5
ts_ms
、ts_us
、ts_ns
フィールドには、Debezium がイベントを処理した日時を示すタイムスタンプが表示されます。このように、このイベントは、行の削除を処理するために必要な情報をコンシューマーに提供します。古い値も提供されます。これは、コンシューマーによっては削除を適切に処理するのに古い値が必要になることがあるからです。
2 つ目の新規イベントの キー および 値 を確認します。
2 つ目の新規イベントの 値 は以下のとおりです (書式を調整して読みやすくしてあります)。
Copy to clipboardCopied{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }
繰り返しになりますが、この キー は、これまで確認した 3 つのイベントのキーと同じです。
同じイベントの 値 は以下のとおりです (書式を調整して読みやすくしてあります)。
Copy to clipboardCopied{ "schema": null, "payload": null }
Kafka が ログコンパクション に設定されている場合、トピックの後半に同じキーを持つメッセージが 1 つ以上あると、トピックから古いメッセージが削除されます。この最後のイベントには、キーと空の値があるため、tombstone (トゥームストーン) イベントと呼ばれます。これは、Kafka が同じキーを持つこれまでのメッセージをすべて削除することを意味します。これまでのメッセージが削除されても、tombstone イベントであるため、コンシューマーは最初からトピックを読み取ることができ、イベントを見逃しません。
4.4. Kafka Connect サービスの再起動
Debezium MySQL コネクターが作成、更新、および削除イベントをキャプチャーする方法を確認しました。次に、コネクターが稼働していない場合でもどのように変更イベントをキャプチャーするかを見てみましょう。
Kafka Connect サービスは、登録されたコネクターのタスクを自動的に管理します。そのため、Kafka Connect サービスがオフラインになった場合は、サービスの再起動時に、実行されていないタスクをすべて開始します。つまり、Debezium が稼働していない場合でも、サービスは変更をデータベースに報告できます。
この手順では、Kafka Connect を停止し、データベースのデータを一部変更した後、Kafka Connect を再起動して変更イベントを確認します。
手順
Kafka Connect サービスを停止します。
Kafka Connect デプロイメントの設定を開きます。
Copy to clipboardCopied$ oc edit deployment/my-connect-cluster-connect
デプロイメントの設定が表示されます。
Copy to clipboardCopiedapiVersion: apps.openshift.io/v1 kind: Deployment metadata: ... spec: replicas: 1 ...
-
spec.replicas
の値を0
に変更します。 - 設定を保存します。
Kafka Connect サービスが停止したことを確認します。
このコマンドを実行すると、Kafka Connect サービスが完了し、稼働している Pod がないことを確認できます。
Copy to clipboardCopied$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-1-dxcs9 0/1 Completed 0 7h
Kafka Connect サービスが停止している間に、MySQL クライアントを実行しているターミナルに切り替え、新しいレコードをデータベースに追加します。
Copy to clipboardCopiedmysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
Kafka Connect サービスを再起動します。
Kafka Connect サービスのデプロイメント設定を開きます。
Copy to clipboardCopied$ oc edit deployment/my-connect-cluster-connect
デプロイメントの設定が表示されます。
Copy to clipboardCopiedapiVersion: apps.openshift.io/v1 kind: Deployment metadata: ... spec: replicas: 0 ...
-
spec.replicas
の値を1
に変更します。 - デプロイメント設定を保存します。
Kafka Connect サービスが再起動したことを確認します。
このコマンドを実行すると、Kafka Connect サービスが稼働中で、Pod の準備ができていることを確認できます。
Copy to clipboardCopied$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-2-q9kkl 1/1 Running 0 74s
-
kafka-console-consumer.sh
を実行しているターミナルに切り替えます。新しいイベントを受け取ると表示されます。 Kafka Connect がオフラインだったときに作成したレコードを確認します (書式を調整して読みやすくしてあります)。
Copy to clipboardCopied{ ... "payload":{ "id":1005 } } { ... "payload":{ "before":null, "after":{ "id":1005, "first_name":"Sarah", "last_name":"Thompson", "email":"kitt@acme.com" }, "source":{ "version":"3.0.8.Final", "connector":"mysql", "name":"dbserver1", "ts_ms":1582581502000, "snapshot":"false", "db":"inventory", "table":"customers", "server_id":223344, "gtid":null, "file":"mysql-bin.000004", "pos":364, "row":0, "thread":5, "query":null }, "op":"c", "ts_ms":1582581502317 } }
第5章 次のステップ
チュートリアルを完了したら、次のステップを検討してください。
チュートリアルをさらに試してみる。
MySQL コマンドラインクライアントを使用して、データベーステーブルの行を追加、変更、および削除し、トピックへの影響を確認します。外部キーによって参照される行は削除できないことに注意してください。
Debezium のデプロイメントを計画する。
Debezium を OpenShift または Red Hat Enterprise Linux にインストールできます。詳細は、以下を参照してください。
改訂日時: 2025-04-11