5.5. Debezium MySQL コネクターのデプロイ
Debezium MySQL コネクターをデプロイするには、コネクターファイルを Kafka Connect に追加し、コネクターを実行するカスタムコンテナーを作成して、続いてコネクター設定をコンテナーに追加します。Debezium MySQL コネクターのデプロイに関する詳細は、以下を参照してください。
5.5.1. Debezium MySQL コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Debezium MySQL コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、続いてこのコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。その後、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnect
CR。CR のimage
プロパティーは、Debezium コネクターを実行するために作成するコンテナーイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる Operator およびイメージを提供します。 -
Debezium MySQL コネクターを定義する
KafkaConnector
CR。この CR をKafkaConnect
CR を適用するのと同じ OpenShift インスタンスに適用します。
前提条件
- MySQL が稼働し、Debezium コネクターと連携するように MySQL を設定する手順 が完了済みである必要があります。
- AMQ Streams が OpenShift にデプロイされ、Apache Kafka および Kafka Connect を実行している。詳細は、『Deploying and Upgrading AMQ Streams on OpenShift』を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー(
quay.io
またはdocker.io
など)でコンテナーを作成および管理するアカウントおよびパーミッションがある。
手順
Kafka Connect の Debezium MySQL コンテナーを作成します。
- Debezium MySQL コネクターアーカイブ をダウンロードします。
Debezium MySQL コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。
./my-plugins/ ├── debezium-connector-mysql │ ├── ...
./my-plugins/ ├── debezium-connector-mysql │ ├── ...
Copy to Clipboard Copied! Toggle word wrap Toggle overflow registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.0
をベースイメージとして使用する Docker ファイルを作成します。たとえば、ターミナルウィンドウに以下のコマンドを入力します。my-plugins
はプラグインディレクトリーの名前に置き換えます。Copy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、現在のディレクトリーに
debezium-container-for-mysql.yaml
という名前の Docker ファイルを作成します。前の手順で作成した
debezium-container-for-mysql.yaml
Docker ファイルからコンテナーイメージをビルドします。ファイルを含むディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-mysql:latest .
podman build -t debezium-container-for-mysql:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-container-for-mysql:latest .
docker build -t debezium-container-for-mysql:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは、
debezium-container-for-mysql
という名前のコンテナーイメージを構築します。カスタムイメージを
quay.io
または内部コンテナーレジストリーなどのコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。podman push <myregistry.io>/debezium-container-for-mysql:latest
podman push <myregistry.io>/debezium-container-for-mysql:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-container-for-mysql:latest
docker push <myregistry.io>/debezium-container-for-mysql:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい Debezium MySQL
KafkaConnect
カスタムリソース(CR)を作成します。たとえば、以下の例のようにアノテーション
およびイメージ
プロパティーを指定するdbz-connect.yaml
という名前のKafkaConnect
CR を作成します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下のコマンドを入力して、
KafkaConnect
CR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium MySQL コネクターインスタンスを設定する
KafkaConnector
カスタムリソースを作成します。コネクターの設定プロパティーを指定する a
.yaml
ファイルで Debezium MySQL コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。以下の例では、ポート
3306
で MySQL ホスト192.168.99.100
に接続し、インベントリー
データベースへの変更をキャプチャーする Debezium コネクターを設定します。dbserver1
はサーバーの論理名です。MySQL
inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表5.19 コネクター設定の説明 項目 説明 1
コネクターの名前。
2
1 度に 1 つのタスクのみが動作する必要があります。MySQL コネクターは MySQL サーバーの
binlog
を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。3
コネクターの設定。
4
データベースホスト。MySQL サーバー(mysql
)
を実行しているコンテナーの名前です。5
コネクターの一意の ID。
6
MySQL サーバーまたはクラスターの論理名。この名前は、変更イベントレコードを受信するすべての Kafka トピックのプレフィックスとして使用されます。
7
インベントリー
データベースの変更のみがキャプチャーされます。8
DDL ステートメントをデータベース履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。再起動時に、コネクターが読み取りを開始すべき時点で binlog に存在したデータベースのスキーマを復元します。
9
データベース履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnector
リソースをinventory-connector.yaml
ファイルに保存した場合、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
oc apply -f inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは
inventory-connector
を登録し、コネクターはKafkaConnector
CR に定義されているインベントリー
データベースに対して実行を開始します。コネクターが作成され、起動されたことを確認します。
Kafka Connect ログ出力を表示して、コネクターが作成され、指定データベースの変更のキャプチャーが開始されたことを確認します。
oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)
oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow ログの出力を確認し、Debezium が初回のスナップショットを実行することを確認します。ログには、以下のメッセージと同様の出力が表示されます。
... INFO Starting snapshot for ... ... INFO Snapshot is using user 'debezium' ...
... INFO Starting snapshot for ... ... INFO Snapshot is using user 'debezium' ...
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターがエラーがなく正常に起動すると、コネクターが変更をキャプチャーする各テーブルのトピックが作成されます。CR のサンプルでは、
include.list
プロパティーに指定されたテーブルのトピックがあります。ダウンストリームアプリケーションは、これらのトピックをサブスクライブできます。以下のコマンドを実行して、コネクターによってトピックが作成されたことを検証します。
oc get kafkatopics
oc get kafkatopics
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Debezium MySQL コネクターに設定できる設定プロパティーの完全リストは、MySQL コネクター設定プロパティーを参照してください。
結果
コネクターが起動すると、コネクターが設定された MySQL データベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。