5.5.2. AMQ Streams を使用した Debezium MySQL コネクターのデプロイ
以前のバージョンの AMQ Streams では、OpenShift に Debezium コネクターをデプロイするには、最初にコネクター用の Kafka Connect イメージをビルドする必要がありました。コネクターを OpenShift にデプロイするのに現在推奨される方法は、AMQ Streams でビルド設定を使用して、使用する Debezium コネクタープラグインが含まれる Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect
カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output
に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドしたら、KafkaConnector
カスタムリソースを作成し、ビルドに含まれるコネクターを起動します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる必要があります。
- AMQ Streams Operator が稼働している必要があります。
- Kafka クラスターは、Apache Open Shift での AMQ ストリームのデプロイとアップグレードに記載されているようにデプロイされます。
- Kafka Connect is deployed on AMQ Streams
- Red Hat Integration ライセンスがある。
-
OpenShift
oc
CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションが必要であるか、ImageStream リソースを作成する必要があります。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存するには、以下を実行します。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション。
- ビルドイメージをネイティブ OpenShift ImageStream として保存します。
- ImageStream リソースがクラスターにデプロイされている。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams はデフォルトでは利用できません。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnect
カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、以下の例のようにmetadata.annotations
およびspec.build
プロパティーを指定するKafkaConnect
CR を作成します。dbz-connect.yaml
などの名前でファイルを保存します。例5.1 Debezium コネクターを含む
KafkaConnect
カスタムリソースを定義するdbz-connect.yaml
ファイル次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。
- Debezium MySQL コネクターアーカイブです。
- サービスレジストリーアーカイブ。Service Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Service Registry コンポーネントを追加します。
- Debezium スクリプティング SMT アーカイブと、Debezium コネクターで使用する関連スクリプティングエンジン。SMT アーカイブとスクリプト言語の依存関係はオプションのコンポーネントです。これらのコンポーネントは、Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ追加してください。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: debezium-kafka-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.00 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/1.9.5.Final-redhat-<build_number>/debezium-connector-mysql-1.9.5.Final-redhat-<build_number>-plugin.zip 7 - type: zip url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.0-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.0-redhat-<build-number>.zip 8 - type: zip url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/1.9.5.Final/debezium-scripting-1.9.5.Final.zip 9 - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar 10 - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
表5.22 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resources
アノテーションをtrue
に設定して、クラスターオペレーターがKafkaConnector
リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build
設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所と共にイメージに追加するプラグインを一覧表示します。3
build.output
は、新たにビルドされたイメージが保存されるレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.type
の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker
、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestream
です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.output
の指定に関する詳細は、AMQ Streams Build スキーマ参照 のドキュメントを参照 してください。5
plugins
設定は、Kafka Connect イメージに追加するすべてのコネクターを一覧表示します。一覧の各エントリーについて、プラグインname
と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。任意で、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.type
の値は、artifacts.url
で指定したアーティファクトのファイルタイプを指定します。有効なタイプはzip
、tgz
、またはjar
です。Debezium コネクターアーカイブは、.zip
ファイル形式で提供されます。type
の値は、url
フィールドで参照されるファイルのタイプと一致する必要があります。7
artifacts.url
の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターは指定されたサーバーにアクセスできる必要があります。8
(オプション) サービスレジストリーコンポーネントをダウンロードするための アーティファクト
type
とurl
を 指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して、Service Registry を使用してイベントのキーと値をシリアル化する場合にのみ、ServiceRegistry アーティファクトを含めます。9
(オプション)Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト
type
とurl
を指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。10
(オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト
type
とurl
を指定します。これは、Debezium スクリプト SMT で必要です。重要AMQ Streams を使用して Kafka Connect イメージにコネクタープラグインを組み込む場合、必要なスクリプト言語コンポーネントごとに、
artifacts.url
に JAR ファイルの場所を指定し、artifacts.type
の値もjar
に設定する必要があります。値が無効な場合、実行時にコネクターが失敗します。スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。
-
groovy
-
groovy-jsr223
(スクリプトエージェント) -
groovy-json
(JSON 文字列を解析するためのモジュール)
別の方法として、Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。
以下のコマンドを入力して、
KafkaConnect
ビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定に一覧表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnector
リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnector
CR を作成し、mysql-inventory-connector.yaml
として保存します。例5.2 Debezium コネクターの
KafkaConnector
カスタムリソースを定義するmysql-inventory-connector.yaml
ファイルapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: debezium-kafka-connect-cluster name: inventory-connector-mysql 1 spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 database.history.kafka.bootstrap.servers: 'debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092' database.history.kafka.topic: schema-changes.inventory database.hostname: mysql.debezium-mysql.svc.cluster.local 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.dbname: mydatabase 9 database.server.name: inventory_connector_mysql 10 database.include.list: public.inventory 11
表5.23 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースに接続するユーザーアカウントの名前。
8
データベースユーザーアカウントのパスワード
9
変更をキャプチャーするデータベースの名前。
10
データベースインスタンスまたはクラスターの論理名。
指定の名前は英数字またはアンダースコアからのみ形成する必要があります。
論理名は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクターと統合する場合、名前空間は関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。11
コネクターが変更イベントをキャプチャーするテーブルの一覧。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
以下に例を示します。
oc create -n debezium -f {context}-inventory-connector.yaml
コネクターは Kafka Connect クラスターに登録され、
KafkaConnector
CR のspec.config.database.dbname
で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium MySQL のデプロイメントを確認 する準備が整いました。