11.2.3. Debezium コンテナーで Avro を使用するコネクターのデプロイ
ご使用の環境で、提供された Debezium コンテナーを使用して、Avro シリアライゼーションを使用する Debezium コネクターをデプロイしなければならない場合があります。Debezium 用のカスタム Kafka Connect コンテナーイメージをビルドし、Avro コンバーターを使用するように Debezium コネクターを設定するには、以下の手順を完了します。
前提条件
- コンテナーを作成および管理するのに十分な権限と共に Docker をインストールしている。
- Avro シリアライゼーションと共にデプロイする Debezium コネクタープラグインをダウンロードしている。
手順
Service Registry のインスタンスをデプロイします。Installing and deploying Service Registry on Open Shiftでは、以下の手順を説明しています。
- Service Registry のインストール
- AMQ Streams のインストール
- AMQ Streams ストレージのセットアップ
Debezium コネクターのアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。複数の Debezium コネクターのアーカイブをダウンロードして展開した場合、作成されるディレクトリー構造は以下の例のようになります。
tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb | ├── ... ├── debezium-connector-mysql │ ├── ... ├── debezium-connector-postgres │ ├── ... └── debezium-connector-sqlserver ├── ...
Avro シリアライゼーションを使用するように設定する Debezium コネクターが含まれるディレクトリーに Avro コンバーターを追加します。
- Red Hat Integration のダウンロードサイト に移動し、Service Registry Kafka Connect の zip ファイルをダウンロードします。
- 目的の Debezium コネクターディレクトリーにアーカイブを展開します。
複数のタイプの Debezium コネクターを Avro シリアライゼーションを使用するように設定するには、該当するそれぞれのコネクタータイプのディレクトリーにアーカイブを展開します。それぞれのディレクトリーにアーカイブを抽出するとファイルが重複しますが、これにより依存関係の競合が生じる可能性がなくなります。
Avro コンバーターを使用するように設定する Debezium コネクターを実行するためのカスタムイメージを作成して公開します。
registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0
をベースイメージとして使用して、新しいDockerfile
を作成します。以下の例の my-plugins を、実際のプラグインディレクトリーの名前に置き換えてください。FROM registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
Kafka Connect は、コネクターの実行を開始する前に、
/opt/kafka/plugins
ディレクトリーにあるサードパーティープラグインをロードします。docker コンテナーイメージをビルドします。例えば、前のステップで作成した docker ファイルを
debezium-container-with-avro
として保存した場合、以下のコマンドを実行します。docker build -t debezium-container-with-avro:latest
カスタムイメージをコンテナーレジストリーにプッシュします。例を以下に示します。
docker push <myregistry.io>/debezium-container-with-avro:latest
新しいコンテナーイメージを示します。次のいずれかを行います。
KafkaConnect
カスタムリソースのKafkaConnect.spec.image
プロパティーを編集します。このプロパティーが設定されていると、クラスターオペレータのSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
変数がオーバーライドされます。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... image: debezium-container-with-avro
-
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
ファイルのSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
変数を編集し、新しいコンテナーイメージを示すようにした後、Cluster Operator を再インストールします。このファイルを編集する場合は、これを OpenShift クラスターに適用する必要があります。
Avro コンバーターを使用するように設定されたそれぞれの Debezium コネクターをデプロイします。それぞれの Debezium コネクターについて、以下の設定を行います。
Debezium コネクターインスタンスを作成します。次の
inventory-connector.yaml
ファイルの例では、Avro コンバーターを使用するように設定された My SQL コネクターインスタンスを定義するKafka Connector
カスタムリソースを作成しています。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: database.hostname: mysql database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 database.server.name: dbserver1 database.include.list: inventory database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory key.converter: io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url: http://apicurio:8080/api key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: http://apicurio:8080/api value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
コネクターインスタンスを適用します。以下に例を示します。
oc apply -f inventory-connector.yaml
これにより
inventory-connector
が登録され、コネクターがinventory
データベースに対して実行されるようになります。
コネクターが作成され、指定されたデータベース内の変更の追跡を開始したことを確認します。例えば
inventory-connector
が起動したときの Kafka Connect のログ出力を見ることで、コネクターのインスタンスを確認することができます。Kafka Connect のログ出力を表示します。
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
ログの出力を確認し、初回のスナップショットが実行されたことを確認します。以下のような行が表示されるはずです。
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
スナップショットは、複数のステップを経て作成されます。
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
スナップショットの作成が完了した後、Debezium は (例として)
inventory
データベースのbinlog
に生じる変更の追跡を開始し、変更イベントの有無を監視します。... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...