3.2. Kafka Connect のデプロイ
MySQL データベースをデプロイした後、Streams for Apache Kafka を使用して、Debezium MySQL コネクタープラグインを含む Kafka Connect コンテナーイメージをビルドします。デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、MySQL コネクターアーティファクトに関する情報をイメージに含める
KafkaConnectCR。 -
MySQL コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnectorCR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnectorCR を適用してコネクターを起動します。
ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドでは、Red Hat Maven リポジトリーから必要なアーティファクトをダウンロードし、イメージに組み込みます。新規に作成されたコンテナーは .spec.build.output で指定されたコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。
コンテナーイメージは、quay.io などの外部コンテナーレジストリー、または OpenShift ImageStream に保存できます。ImageStream は自動的に作成されないため、コンテナーイメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する 必要があります。
Streams for Apache Kafka が Kafka Connect イメージをビルドして保存した後、KafkaConnector カスタムリソースを使用してコネクターを起動します。
前提条件
- Streams for Apache Kafka が OpenShift クラスター上で実行されている。
- Streams for Apache Kafka Cluster Operator が OpenShift クラスターにインストールされている。
- ImageStream を利用できる (KafkaConnect コンテナーイメージを OpenShift ImageStream に保存する場合)。
- Apache Kafka と Kafka Connect が Streams for Apache Kafka 上で実行されている。
手順
-
OpenShift クラスターにログインし、
debeziumなどのプロジェクトを作成または開きます。 コネクターの Debezium
KafkaConnectカスタムリソース (CR) を作成するか、既存のリソースを変更します。
以下の例は、KafkaConnectカスタムリソースを記述したdbz-connect.yamlファイルからの抜粋を示しています。metadata.annotationsおよびspec.buildプロパティーが必要です。例3.1 Debezium コネクターを含む
KafkaConnectカスタムリソースを定義したdbz-connect.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表3.1 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resourcesアノテーションを"true"に設定して、クラスター Operator がKafkaConnectorリソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.outputは、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.typeの有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestreamです。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.outputを指定する方法の詳細は、Streams for Apache Kafka ビルドスキーマリファレンス ドキュメントを参照してください。5
plugins設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインnameと、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.typeの値は、artifacts.urlで指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip、tgz、またはjarです。Debezium コネクターアーカイブは、.zipファイル形式で提供されます。JDBC ドライバーファイルは.jar形式です。typeの値は、urlフィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.urlの値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。以下のコマンドを入力して、
KafkaConnectビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow カスタムリソースで指定された設定に基づいて、Streams for Apache Kafka Operator はデプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnectorリソースを作成して、MySQL コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnectorCR を作成し、debezium-inventory-connector.yamlとして保存します。例3.2 Debezium コネクターの
KafkaConnectorカスタムリソースを定義したmysql-inventory-connector.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表3.2 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
一度に 1 つのタスクのみを実行します。MySQL コネクターは MySQL サーバーの
binlogを読み取るため、単一のコネクタータスクを使用して、適切な順序とイベント処理を確保します。Kafka Connect サービスは、コネクターを使用して 1 つ以上のタスクを開始し、作業を完了します。実行中のタスクを Kafka Connect サービスのクラスター全体に自動的に分散します。サービスが停止またはクラッシュした場合、タスクは実行中のサービスに再分散されます。4
コネクターの設定。
5
MySQL データベースインスタンスのホスト名またはアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースに接続するユーザーアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
MySQL サーバーまたはクラスターのトピック接頭辞。この文字列は、コネクターがイベントレコードを送信する全 Kafka トピックの名前の前に付けます。
10
コネクターが変更イベントをキャプチャーするテーブルのリスト。コネクターは、
inventoryテーブルで発生した場合にのみ、変更を検出します。11
DDL ステートメントをデータベーススキーマの履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。これは、コネクターが変更イベントレコードを送信するブローカーと同じです。再起動後、コネクターは、コネクターが読み取りを再開する時点で binlog に存在しているデータベーススキーマを復元します。
12
データベーススキーマ履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f mysql-inventory-connector.yaml
oc create -n debezium -f mysql-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnectorCR のspec.config.database.dbnameで指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、コネクターが作成されたことを確認 し、inventory データベースの変更のキャプチャーが開始したことを確認する準備が整いました。