3.2. Kafka Connect のデプロイ
MySQL データベースをデプロイした後、AMQ Streams を使用して、Debezium MySQL コネクタープラグインを含む Kafka Connect コンテナーイメージをビルドします。デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、MySQL コネクターアーティファクトに関する情報をイメージに含める
KafkaConnect
CR。 -
MySQL コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnector
CR。AMQ Streams が Kafka Connect Pod を起動したら、KafkaConnector
CR を適用してコネクターを起動します。
ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect
カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドでは、Red Hat Maven リポジトリーから必要なアーティファクトをダウンロードし、イメージに組み込みます。新規に作成されたコンテナーは .spec.build.output
で指定されたコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。
コンテナーイメージは、quay.io
などの外部コンテナーレジストリー、または OpenShift ImageStream に保存できます。ImageStream は自動的に作成されないため、コンテナーイメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する 必要があります。
AMQ Streams が Kafka Connect イメージをビルドして保存した後、KafkaConnector
カスタムリソースを使用してコネクターを起動します。
前提条件
- AMQ Streams が OpenShift クラスターで実行されている。
- AMQ Streams Cluster Operator が OpenShift クラスターにインストールされている。
- ImageStream を利用できる (KafkaConnect コンテナーイメージを OpenShift ImageStream に保存する場合)。
- Apache Kafka と Kafka Connect が AMQ Streams 上で実行されている。
手順
-
OpenShift クラスターにログインし、
debezium
などのプロジェクトを作成または開きます。 コネクターの Debezium
KafkaConnect
カスタムリソース (CR) を作成するか、既存のリソースを変更します。
以下の例は、KafkaConnect
カスタムリソースを記述したdbz-connect.yaml
ファイルからの抜粋を示しています。metadata.annotations
およびspec.build
プロパティーが必要です。例3.1 Debezium コネクターを含む
KafkaConnect
カスタムリソースを定義したdbz-connect.yaml
ファイルapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: replicas: 1 version: 3.6.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.5.4.Final-redhat-00001/debezium-connector-mysql-2.5.4.Final-redhat-00001-plugin.zip 7 bootstrapServers: my-cluster-kafka-bootstrap:9093 ...
表3.1 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resources
アノテーションを"true"
に設定して、クラスター Operator がKafkaConnector
リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build
設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.output
は、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.type
の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker
、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestream
です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.output
の指定に関する詳細は、AMQ Streams Build スキーマ参照のドキュメントを参照 してください。5
plugins
設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインname
と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.type
の値は、artifacts.url
で指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip
、tgz
、またはjar
です。Debezium コネクターアーカイブは、.zip
ファイル形式で提供されます。JDBC ドライバーファイルは.jar
形式です。type
の値は、url
フィールドで参照されるファイルのタイプと一致する必要があります。7
artifacts.url
の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。以下のコマンドを入力して、
KafkaConnect
ビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
AMQ Streams Operator がカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnector
リソースを作成して、MySQL コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnector
CR を作成し、debezium-inventory-connector.yaml
として保存します。例3.2 Debezium コネクターの
KafkaConnector
カスタムリソースを定義したmysql-inventory-connector.yaml
ファイルapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector 1 spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 database.hostname: mysql 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.server.id: 184054 topic.prefix: dbserver1 9 table.include.list: inventory.* 10 schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 11 schema.history.internal.kafka.topic: schema-changes.inventory 12
表3.2 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
一度に 1 つのタスクのみを実行します。MySQL コネクターは MySQL サーバーの
binlog
を読み取るため、単一のコネクタータスクを使用して、適切な順序とイベント処理を確保します。Kafka Connect サービスは、コネクターを使用して 1 つ以上のタスクを開始し、作業を完了します。実行中のタスクを Kafka Connect サービスのクラスター全体に自動的に分散します。サービスが停止またはクラッシュした場合、タスクは実行中のサービスに再分散されます。4
コネクターの設定。
5
MySQL データベースインスタンスのホスト名またはアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースに接続するユーザーアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
MySQL サーバーまたはクラスターのトピック接頭辞。この文字列は、コネクターがイベントレコードを送信する全 Kafka トピックの名前の前に付けます。
10
コネクターが変更イベントをキャプチャーするテーブルのリスト。コネクターは、
inventory
テーブルで発生した場合にのみ、変更を検出します。11
DDL ステートメントをデータベーススキーマの履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。これは、コネクターが変更イベントレコードを送信するブローカーと同じです。再起動後、コネクターは、コネクターが読み取りを再開する時点で binlog に存在しているデータベーススキーマを復元します。
12
データベーススキーマ履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
以下に例を示します。
oc create -n debezium -f mysql-inventory-connector.yaml
コネクターは Kafka Connect クラスターに登録され、
KafkaConnector
CR のspec.config.database.dbname
で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、コネクターが作成されたことを確認 し、inventory
データベースの変更のキャプチャーが開始したことを確認する準備が整いました。