Debezium の RHEL へのインストール
Red Hat Enterprise Linux (RHEL) での Red Hat build of Debezium 2.5.4 の使用
概要
はじめに
多様性を受け入れるオープンソースの強化
Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。まずは、マスター (master)、スレーブ (slave)、ブラックリスト (blacklist)、ホワイトリスト (whitelist) の 4 つの用語の置き換えから始めます。この取り組みは膨大な作業を要するため、用語の置き換えは、今後の複数のリリースにわたって段階的に実施されます。詳細は、Red Hat CTO である Chris Wright のメッセージ をご覧ください。
第1章 Debezium の概要
Debezium for Red Hat Integration は、データベース操作をキャプチャーし、行レベル操作のデータ変更イベントレコードを作成して、Apache Kafka トピックに変更イベントレコードをストリーミングする分散型プラットフォームです。Debezium は Apache Kafka に構築され、AMQ Streams とデプロイおよび統合されます。
Debezium によって、データベーステーブルの行レベルの変更がキャプチャーされ、対応する変更イベントが AMQ Streams に渡されます。アプリケーションはこれらの 変更イベントストリーム を読み取りでき、変更イベントが発生した順にアクセスできます。
Debezium は、Debezium for Red Hat Integration のアップストリームコミュニティープロジェクトです。
Debezium には、以下を含む複数の用途があります。
- データレプリケーション
- キャッシュの更新およびインデックスの検索
- モノリシックアプリケーションの簡素化
- データ統合
- ストリーミングクエリーの有効化
Debezium は、以下の一般的なデータベース用の Apache Kafka Connect コネクターを提供します。
第2章 Debezium コネクターの RHEL へのインストール
コネクタープラグインで Kafka Connect を拡張して、AMQ Streams 経由で Debezium コネクターをインストールします。AMQ Streams のデプロイ後に、Kafka Connect で Debezium をコネクター設定としてデプロイできます。
2.1. Kafka トピック作成に関する推奨事項
Debezium は、データを複数の Apache Kafka トピックに保存します。トピックは、管理者が事前に作成する必要があります。または、Kafka Connect を設定して トピックを自動的に設定します。
以下のリストで、トピックの作成時に考慮すべき制限および推奨事項を説明します。
- Debezium Db2、MySQL、Oracle、および SQL Server コネクターのデータベーススキーマ履歴トピック
上記の各コネクターには、データベーススキーマ履歴トピックが必要です。データベーススキーマ履歴トピックを手動で作成する場合でも、Kafka ブローカーを使用してトピックを自動的に作成するか、Kafka Connect を使用してトピックを作成する 場合でも、トピックが以下のように設定されていることを確認してください。
- 無限または非常に長い保持期間
- 3 以上の実稼働環境でのレプリケーション係数
- 単一パーティション
- その他のトピック
指定のレコードの 最後の変更イベントのみが保存されるように Kafka ログコンパクション を有効にする場合は、Apache Kafka で以下のトピックプロパティーを設定します。
-
min.compaction.lag.ms
トピックコンシューマーがすべてのイベントを受信してマーカーを削除するのに十分な時間を確保するには、シンクコネクターに予想される最大ダウンタイムよりも大きい値を前述のプロパティーの値に指定します。たとえば、シンクコネクターに更新を適用する際に発生する可能性のあるダウンタイムを考慮してください。
-
- 実稼働でレプリケート
単一パーティション
単一パーティションルールを緩和できますが、アプリケーションはデータベースの異なる行に対して順不同のイベントを処理する必要があります。単一行のイベントは、引き続き完全に順序付けされます。複数のパーティションを使用する場合は、Kafka がキーをハッシュ化してパーティションを決定するのがデフォルトの動作になります。その他のパーティションストラテジーでは、単一メッセージ変換 (SMT: Single Message Transformations) を使用して、各レコードにパーティション番号を設定する必要があります。
2.2. Debezium コネクター設定の計画
Debezium コネクターをデプロイする前に、コネクターの設定方法を決定してください。設定は、コネクターの動作を指定する情報を提供し、Debezium がソースデータベースに接続することを可能にします。
コネクター設定を JSON として指定し、コネクターを登録する準備ができたら、curl
を使用して設定を Kafka Connect API エンドポイントに送信します。
前提条件
- ソースデータベースがデプロイされ、Debezium コネクターがデータベースにアクセスできる。
コネクターがソースデータベースにアクセスするために必要な以下の情報を把握している。
- データベースホストの名前または IP アドレス
- データベースに接続するためのポート番号
- コネクターがデータベースへのサインインに使用できるアカウントの名前
- データベースユーザーアカウントのパスワード
- データベースの名前
- コネクターの情報の取得元であるテーブルの名前
- コネクターの変更イベントの出力先である Kafka ブローカーの名前
- コネクターのデータベース履歴情報の送信先である Kafka トピックの名前
手順
Debezium コネクターに適用する設定を JSON 形式で指定します。
次の例は、Debezium MySQL コネクターの簡単な設定を示しています。
{ "name": "inventory-connector", 1 "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", 2 "tasks.max": "1", 3 "database.hostname": "mysql", 4 "database.port": "3306", 5 "database.user": "debezium", 6 "database.password": "dbz", 7 "database.server.id": "184054", 8 "topic.prefix": "dbserver1", 9 "table.include.list": "public.inventory", 10 "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", 11 "schema.history.internal.kafka.topic": "dbhistory.inventory" 12 } }
- 1
- Kafka Connect クラスターに登録するコネクターの名前。
- 2
- コネクタークラスの名前。
- 3
- 同時に動作できるタスクの数。一度に操作できるタスクは 1 つだけです。
- 4
- ホストデータベースインスタンスのホスト名または IP アドレス
- 5
- データベースインスタンスのポート番号。
- 6
- Debezium がデータベースに接続するユーザーアカウントの名前。
- 7
- データベースユーザーアカウントのパスワード
- 8
- コネクターの一意の数値 ID。
このプロパティーは MySQL コネクターにのみ使用されます。 - 9
- コネクターが変更をキャプチャーするサーバーのデータベースサーバーまたはクラスターの論理識別子として機能する文字列。指定された文字列は名前空間を指定します。Avro コンバーターが使用されている場合、Debezium は、コネクターが書き込む各 Kafka トピック、Kafka Connect スキーマの名前、および対応する Avro スキーマの名前空間にこの名前の接頭辞を付けます。
- 10
- コネクターが変更イベントをキャプチャーするテーブルのリスト。
- 11
- コネクターがデータベーススキーマ履歴を送信する Kafka ブローカーの名前。指定されたブローカーは、コネクターが出力する変更イベントも受け取ります。
- 12
- スキーマ履歴を格納する Kafka トピックの名前。
コネクターの再起動後、コネクターは停止した時点からデータベースログの読み取りを再開し、オフライン中に発生したトランザクションのイベントを出力します。コネクターは、未読トランザクションの変更イベントを Kafka に書き込む前に、スキーマ履歴をチェックしてから、元のトランザクションが発生したときに有効だったスキーマを適用します。
Additional information
- コネクターのタイプごとに設定できる設定プロパティーの詳細は、Debezium ユーザーガイド のコネクターのデプロイメントに関するセクションを参照してください。
2.3. Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ
この手順では、Red Hat Enterprise Linux で Debezium のコネクターを設定する方法を説明します。コネクターは、Apache Kafka Connect を使用して AMQ Streams クラスターにデプロイされます。Kafka Connect は Apache Kafka と外部システムとの間でデータをストリーミングするためのフレームワークです。Kafka Connect は、スタンドアロンモードではなく分散モードで実行する必要があります。
前提条件
Debezium のデプロイ先であるホスト環境が、サポートされている設定 で Red Hat Enterprise Linux、AMQ Streams、および Java を実行している。
- AMQ Streams のインストール方法の詳細は、AMQ Streams のインストール を参照してください。
- 単一の ZooKeeper ノードおよび単一の Kafka ノードが含まれる実稼働ではない、基本的な AMQ Streams クラスターをインストールする方法の詳細は、単一ノードの AMQ Streams クラスターの実行 を参照してください。
以前のバージョンの AMQ Streams を実行している場合は、最初に AMQ Streams 2.6 にアップグレードする必要があります。アップグレードプロセスの詳細は、AMQ Streams および Kafka のアップグレード を参照してください。
-
ホストの管理者権限 (
sudo
アクセス) がある。 - Apache ZooKeeper と Apache Kafka ブローカーが実行している。
- Kafka Connect が、スタンドアロンモードではなく、分散モード で実行されている。
-
AMQ Streams のインストール時に作成された
kafka
ユーザーの認証情報を把握している。 - ソースデータベースがデプロイされ、Debezium をデプロイするホストがデータベースにアクセスできる。
- コネクターの設定方法 を理解している。
手順
- Debezium コネクターまたは使用するコネクターを Red Hat Integration ダウンロードサイト からダウンロードします。たとえば、Debezium を MySQL データベースで使用するには、Debezium 2.5.4 MySQL コネクター をダウンロードします。
AMQ Streams をデプロイした Red Hat Enterprise Linux ホストで、ターミナルウィンドウを開き、
/opt/kafka
にconnector-plugins
ディレクトリーを作成します (まだ存在しない場合)。$ sudo mkdir /opt/kafka/connector-plugins
次のコマンドを入力して、
/opt/kafka/connector-plugins
ディレクトリーにダウンロードした Debezium コネクターアーカイブの内容を抽出します。$ sudo unzip debezium-connector-mysql-2.5.4.Final.zip -d /opt/kafka/connector-plugins
- インストールするコネクターごとに、手順 1 - 3 を繰り返します。
ターミナルウィンドウから、
kafka
ユーザーとしてサインインします。$ su - kafka $ Password:
Kafka Connect プロセスが実行中の場合は、停止します。
次のコマンドを入力して、Kafka Connect が分散モードで実行されているかどうかを確認します。
$ jcmd | grep ConnectDistributed
プロセスが実行中の場合、コマンドはプロセス ID を返します。次に例を示します。
18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
プロセス ID を指定して
kill
コマンドを入力し、プロセスを停止します。次に例を示します。$ kill 18514
/opt/kafka/config/
にあるconnect-distributed.properties
ファイルを編集し、plugin.path
の値を Debezium コネクタープラグインの親ディレクトリーの場所に設定します。plugin.path=/opt/kafka/connector-plugins
分散モードで Kafka Connect を起動します。
$ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Kafka Connect の実行後、Kafka Connect API を使用してコネクターを登録します。
curl
コマンドを入力して、「Debezium コネクター設定の計画」 で指定したコネクター設定 JSON をlocalhost:8083/connectors
の Kafka Connect REST API エンドポイントに送信するPOST
リクエストを送信します。
以下に例を示します。curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "inventory-connector", "config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "table.include.list": "public.inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "dbhistory.inventory" } }'
複数のコネクターを登録するには、コネクターごとに個別のリクエストを送信します。
Kafka Connect を再起動して、変更を実装します。
Kafka Connect が起動すると、
connector-plugins
ディレクトリーから、設定済みの Debezium コネクターをロードします。設定が完了すると、デプロイされたコネクターはソースデータベースに接続し、挿入、更新、または削除された行またはドキュメントごとにイベントを生成します。
- 各 Kafka Connect ワーカーノードに対して、ステップ 5 - 10 を繰り返します。
次のステップ
関連情報
2.4. デプロイメントの確認
コネクターが起動すると、設定されたデータベースのスナップショットが実行され、指定したテーブルごとにトピックが作成されます。
前提条件
「Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ」 の手順に基づいて、Red Hat Enterprise Linux にコネクターをデプロイしています。以下の手順を実行します。
ホストのターミナルウィンドウで、次のコマンドを入力して、Kafka Connect API からコネクターのリストを要求します。
$ curl -H "Accept:application/json" localhost:8083/connectors/
クエリーは、デプロイされたコネクターの名前を返します。以下に例を示します。
["inventory-connector"]
ホストのターミナルウィンドウで次のコマンドを入力して、コネクターが実行しているタスクを表示します。
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
コマンドは、以下の例のような出力を返します。
HTTP/1.1 200 OK Date: Thu, 06 Feb 2020 22:12:03 GMT Content-Type: application/json Content-Length: 531 Server: Jetty(9.4.20.v20190813) { "name": "inventory-connector", ... "tasks": [ { "connector": "inventory-connector", "task": 0 } ] }
Kafka クラスター内のトピックのリストを表示します。
ターミナルウィンドウから/opt/kafka/bin/
に移動し、以下のシェルスクリプトを実行します。./kafka-topics.sh --bootstrap-server=localhost:9092 --list
Kafka ブローカーは、コネクターが作成するトピックのリストを返します。使用可能なトピックは、コネクターの
snapshot.mode
、snapshot.include.collection.list
、およびtable.include.list
の設定プロパティーの設定によって異なります。デフォルトでは、コネクターはデータベース内の非システムテーブルごとにトピックを作成します。トピックの内容を表示します。
ターミナルウィンドウから/opt/kafka/bin/
に移動し、kafka-console-consumer.sh
シェルスクリプトを実行して、前のコマンドで返されたトピックの 1 つの内容を表示します。
以下に例を示します。
./kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.1 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.5.4.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
上記の例では、
payload
値は、コネクタースナップショットがテーブルinventory.products_on_hand
から読み込み ("op" ="r"
) イベントを生成したことを示しています。product_id
レコードの"before"
状態はnull
であり、レコードに以前の値が存在しないことを示しています。"after"
状態は、product_id
101
を持つ項目のquantity
が3
であることを示しています。
次の手順
各コネクターで使用できる設定の詳細、および変更データのキャプチャーを有効にするためにソースデータベースを設定する方法は、Debezium ユーザーガイド を参照してください。
2.5. Kafka Connect クラスターの Debezium コネクタープラグインの更新
Red Hat Enterprise Linux にデプロイされているバージョンの Debezium コネクターを置き換えるには、コネクタープラグインを更新します。
手順
- 置き換える Debezium コネクタープラグインのコピーを Red Hat Integration ダウンロードサイト からダウンロードします。
Debezium コネクターアーカイブの内容を
/opt/kafka/connector-plugins
ディレクトリーにデプロイメントします。$ sudo unzip debezium-connector-mysql-2.5.4.Final.zip -d /opt/kafka/connector-plugins
- Kafka Connect を再起動します。
付録A サブスクリプションの使用
Debezium は、ソフトウェアサブスクリプションを通じて提供されます。サブスクリプションを管理するには、Red Hat カスタマーポータルでアカウントにアクセスします。
アカウントへのアクセス
- access.redhat.com に移動します。
- アカウントがない場合は作成します。
- アカウントにログインします。
サブスクリプションのアクティベート
- access.redhat.com に移動します。
- Subscriptions に移動します。
- Activate a subscription に移動し、16 桁のアクティベーション番号を入力します。
zip および tar ファイルのダウンロード
zip または tar ファイルにアクセスするには、カスタマーポータルを使用して、ダウンロードする関連ファイルを検索します。RPM パッケージを使用している場合、この手順は必要ありません。
- ブラウザーを開き、access.redhat.com/downloads で Red Hat カスタマーポータルの Product Downloads ページにログインします。
- INTEGRATION AND AUTOMATION まで下方向にスクロールします。
- Red Hat Integration をクリックして、Red Hat Integration ダウンロードページを表示します。
- コンポーネントの Download リンクをクリックします。
改訂日時: 2024-04-19