2.2. Kafka Connect のデプロイ
MySQL データベースをデプロイした後、AMQ Streams を使用して、Debezium MySQL コネクタープラグインを含む Kafka Connect コンテナーイメージを構築します。デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、MySQL コネクターアーティファクトに関する情報をイメージに含める
KafkaConnectCR。 -
MySQL コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnectorCR。AMQStreams が Kafka Connect Pod を開始した後、KafkaConnectorCR を適用してコネクターを開始します。
ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーから必要なアーティファクトをダウンロードし、イメージに組み込みますす。新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドした後、 KafkaConnector カスタムリソースを使用してコネクターを開始します。
手順
-
OpenShift クラスターにログインし、
debeziumなどのプロジェクトを作成またはオープンします。 コネクターの Debezium
KafkaConnectカスタムリソース (CR) を作成するか、既存のリソースを変更します。
以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。metadata.annotationsおよびspec.buildプロパティーが必要です。例2.1 Debezium コネクターを含む
KafkaConnectカスタムリソースを定義するdbz-connect.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.1 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resourcesアノテーションをtrueに設定して、クラスターオペレーターがKafkaConnectorリソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所と共にイメージに追加するプラグインを一覧表示します。3
build.outputは、新たにビルドされたイメージが保存されるレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.typeの有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestreamです。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.outputの指定に関する詳細は、AMQ Streams Build スキーマ参照 のドキュメントを参照 してください。5
plugins設定は、Kafka Connect イメージに追加するすべてのコネクターを一覧表示します。一覧の各エントリーについて、プラグインnameと、コネクターのビルドに必要なアーティファクトに関する情報を指定します。任意で、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.typeの値は、artifacts.urlで指定したアーティファクトのファイルタイプを指定します。有効なタイプはzip、tgz、またはjarです。Debezium コネクターアーカイブは、.zipファイル形式で提供されます。JDBC ドライバーファイルは.jar形式です。typeの値は、urlフィールドで参照されるファイルのタイプと一致する必要があります。7
artifacts.urlの値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターは指定されたサーバーにアクセスできる必要があります。以下のコマンドを入力して、
KafkaConnectビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow AMQ Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定に一覧表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnectorリソースを作成して、MySQL コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnectorCR を作成し、debezium-inventory-connector.yamlとして保存します。例2.2 Debezium コネクターの
KafkaConnectorカスタムリソースを定義するmysql-inventory-connector.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.2 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
1 度に 1 つのタスクのみが動作する必要があります。MySQL コネクターは MySQL サーバーの
binlogを読み取るため、単一のコネクタータスクを使用して、適切な順序とイベント処理を確保します。Kafka Connect サービスは、コネクターを使用して 1 つ以上のタスクを開始し、作業を完了します。実行中のタスクを Kafka Connect サービスのクラスター全体に自動的に分散します。サービスが停止またはクラッシュした場合、タスクは実行中のサービスに再分散されます。4
コネクターの設定。
5
MySQL データベースインスタンスのホスト名またはアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースに接続するユーザーアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
MySQL サーバーまたはクラスターのトピック接頭辞。この文字列は、コネクターがイベントレコードを送信する全 Kafka トピックの名前の前に付けます。
10
コネクターが変更イベントをキャプチャーするテーブルの一覧。コネクターは、
inventoryテーブルで発生した場合にのみ、変更を検出します。11
DDL ステートメントをデータベーススキーマの履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。これは、コネクターが変更イベントレコードを送信するブローカーと同じです。再起動後、コネクターは、コネクターが読み取りを再開したときに、binlog 内のポイントに存在していたデータベーススキーマを復元します。
12
データベーススキーマ履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f mysql-inventory-connector.yaml
oc create -n debezium -f mysql-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnectorCR のspec.config.database.dbnameで指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、コネクターが作成されたことを確認 し、inventory データベースの変更のキャプチャーが開始されたことを 確認する 準備が整いました。