5.5. Debezium MySQL コネクターのデプロイ
Debezium MySQL コネクターをデプロイするには、コネクターファイルを Kafka Connect に追加し、コネクターを実行するカスタムコンテナーを作成して、続いてコネクター設定をコンテナーに追加します。Debezium MySQL コネクターのデプロイに関する詳細は、以下を参照してください。
5.5.1. Debezium MySQL コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Debezium MySQL コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、続いてこのコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。その後、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnectCR。CR のimageプロパティーは、Debezium コネクターを実行するために作成するコンテナーイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる Operator およびイメージを提供します。 -
Debezium MySQL コネクターを定義する
KafkaConnectorCR。この CR をKafkaConnectCR を適用するのと同じ OpenShift インスタンスに適用します。
前提条件
- MySQL が稼働し、Debezium コネクターと連携するように MySQL を設定する手順 が完了済みである必要があります。
- AMQ Streams が OpenShift にデプロイされ、Apache Kafka および Kafka Connect を実行している。詳細は、『Deploying and Upgrading AMQ Streams on OpenShift』を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー(
quay.ioまたはdocker.ioなど)でコンテナーを作成および管理するアカウントおよびパーミッションがある。
手順
Kafka Connect の Debezium MySQL コンテナーを作成します。
- Debezium MySQL コネクターアーカイブ をダウンロードします。
Debezium MySQL コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。
./my-plugins/ ├── debezium-connector-mysql │ ├── ...registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.0をベースイメージとして使用する Docker ファイルを作成します。たとえば、ターミナルウィンドウに以下のコマンドを入力します。my-pluginsはプラグインディレクトリーの名前に置き換えます。cat <<EOF >debezium-container-for-mysql.yaml1 FROM registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.0 USER root:root COPY ./<my-plugins>/ /opt/kafka/plugins/2 USER 1001 EOFこのコマンドは、現在のディレクトリーに
debezium-container-for-mysql.yamlという名前の Docker ファイルを作成します。前の手順で作成した
debezium-container-for-mysql.yamlDocker ファイルからコンテナーイメージをビルドします。ファイルを含むディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-mysql:latest .docker build -t debezium-container-for-mysql:latest .上記のコマンドは、
debezium-container-for-mysqlという名前のコンテナーイメージを構築します。カスタムイメージを
quay.ioまたは内部コンテナーレジストリーなどのコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。podman push <myregistry.io>/debezium-container-for-mysql:latestdocker push <myregistry.io>/debezium-container-for-mysql:latest新しい Debezium MySQL
KafkaConnectカスタムリソース(CR)を作成します。たとえば、以下の例のようにアノテーションおよびイメージプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true"1 spec: #... image: debezium-container-for-mysql2 以下のコマンドを入力して、
KafkaConnectCR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yamlこのコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium MySQL コネクターインスタンスを設定する
KafkaConnectorカスタムリソースを作成します。コネクターの設定プロパティーを指定する a
.yamlファイルで Debezium MySQL コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。以下の例では、ポート
3306で MySQL ホスト192.168.99.100に接続し、インベントリーデータベースへの変更をキャプチャーする Debezium コネクターを設定します。dbserver1はサーバーの論理名です。MySQL
inventory-connector.yamlapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: inventory-connector1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 12 config:3 database.hostname: mysql4 database.port: 3306 database.user: debezium database.password: dbz database.server.id: 1840545 database.server.name: dbserver16 database.include.list: inventory7 database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:90928 database.history.kafka.topic: schema-changes.inventory9 Expand 表5.19 コネクター設定の説明 項目 説明 1
コネクターの名前。
2
1 度に 1 つのタスクのみが動作する必要があります。MySQL コネクターは MySQL サーバーの
binlogを読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。3
コネクターの設定。
4
データベースホスト。MySQL サーバー(mysql
)を実行しているコンテナーの名前です。5
コネクターの一意の ID。
6
MySQL サーバーまたはクラスターの論理名。この名前は、変更イベントレコードを受信するすべての Kafka トピックのプレフィックスとして使用されます。
7
インベントリーデータベースの変更のみがキャプチャーされます。8
DDL ステートメントをデータベース履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。再起動時に、コネクターが読み取りを開始すべき時点で binlog に存在したデータベースのスキーマを復元します。
9
データベース履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnectorリソースをinventory-connector.yamlファイルに保存した場合、以下のコマンドを実行します。oc apply -f inventory-connector.yaml上記のコマンドは
inventory-connectorを登録し、コネクターはKafkaConnectorCR に定義されているインベントリーデータベースに対して実行を開始します。コネクターが作成され、起動されたことを確認します。
Kafka Connect ログ出力を表示して、コネクターが作成され、指定データベースの変更のキャプチャーが開始されたことを確認します。
oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)ログの出力を確認し、Debezium が初回のスナップショットを実行することを確認します。ログには、以下のメッセージと同様の出力が表示されます。
... INFO Starting snapshot for ... ... INFO Snapshot is using user 'debezium' ...コネクターがエラーがなく正常に起動すると、コネクターが変更をキャプチャーする各テーブルのトピックが作成されます。CR のサンプルでは、
include.listプロパティーに指定されたテーブルのトピックがあります。ダウンストリームアプリケーションは、これらのトピックをサブスクライブできます。以下のコマンドを実行して、コネクターによってトピックが作成されたことを検証します。
oc get kafkatopics
Debezium MySQL コネクターに設定できる設定プロパティーの完全リストは、MySQL コネクター設定プロパティーを参照してください。
結果
コネクターが起動すると、コネクターが設定された MySQL データベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。