3.6. Debezium PostgreSQL コネクターのデプロイメント
Debezium PostgreSQL コネクターをデプロイするには、コネクターファイルを Kafka Connect に追加し、コネクターを実行するカスタムコンテナーを作成して、コネクター設定をコンテナーに追加します。詳細は以下を参照してください。
- https://access.redhat.com/documentation/ja-jp/red_hat_integration/2021.Q1/html-single/debezium_user_guide/index#deploying-debezium-postgresql-connectors
- https://access.redhat.com/documentation/ja-jp/red_hat_integration/2021.Q1/html-single/debezium_user_guide/index#descriptions-of-debezium-postgresql-connector-configuration-properties
3.6.1. Debezium PostgreSQL コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Debezium PostgreSQL コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、2 つのカスタムリソース (CR) を作成する必要があります。
-
Kafka Connector を設定し、Debezium コネクターを実行するために作成したイメージの名前を指定する
KafkaConnectCR。この CR を OpenShift Kafka インスタンスに適用します。 -
Debezium PostgreSQL コネクターを設定する
KafkaConnectorCR。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。
前提条件
- PostgreSQL が実行され、PostgreSQL を設定して Debezium コネクターを実行 する手順が実行済みである。
- Red Hat AMQ Streams を使用して、OpenShift で Apache Kafka および Kafka Connect を設定し、実行済みである。AMQ Streams は、Kafka を OpenShift に取り入れる operator およびイメージを提供します。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.ioやdocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium PostgreSQL コンテナーを作成します。
- Debezium PostgreSQL コネクターアーカイブ をダウンロードします。
Debezium PostgreSQL コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。
./my-plugins/ ├── debezium-connector-postgresql │ ├── ...registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0をベースイメージとして使用する Docker ファイルを作成します。たとえば、ターミナルウィンドウに以下を入力します。cat <<EOF >debezium-container-for-postgresql.yaml1 FROM {DockerKafkaConnect} USER root:root COPY ./my-plugins/ /opt/kafka/plugins/2 USER 1001 EOFこのコマンドは、現在のディレクトリーに
debezium-container-for-postgresql.yamlという名前の Docker ファイルを作成します。前のステップで作成した
debezium-container-for-postgresql.yamlDocker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、以下のコマンドを実行します。podman build -t debezium-container-for-postgresql:latest .docker build -t debezium-container-for-postgresql:latest .buildコマンドは、debezium-container-for-postgresqlという名前のコンテナーイメージを構築します。カスタムイメージを
quay.ioなどのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。このレジストリーが OpenShift インスタンスからアクセス可能であることを確認します。以下に例を示します。podman push debezium-container-for-postgresql:latest新しい Debezium PostgreSQL
KafkaConnectカスタムリソース (CR) を作成します。たとえば、以下の例のようにannotationsおよびimageプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true"1 spec: image: debezium-container-for-postgresql2 以下のコマンドを実行して、
KafkaConnectCR を OpenShift Kafka インスタンスに適用します。oc create -f dbz-connect.yamlこれにより、OpenShift の Kafka Connect 環境が更新され、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connector インスタンスが追加されます。
Debezium PostgreSQL コネクターインスタンスを設定する
KafkaConnectorカスタムリソースを作成します。コネクター設定プロパティーを設定する
.yamlファイルに Debezium PostgreSQL コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。これらの設定で指定できる PostgreSQL コネクタープロパティーの完全リスト を参照してください。以下の例では、ポート
5432で PostgreSQL サーバーホスト192.168.99.100に接続する Debezium コネクターを設定します。このホストには、sampledbという名前のデータベース、publicという名前のスキーマがあり、fulfillmentはサーバーの論理名です。fulfillment-connector.yamlapiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: fulfillment-connector1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.postgresql.PostgresConnector tasksMax: 12 config:3 database.hostname: 192.168.99.1004 database.port: 5432 database.user: debezium database.password: dbz database.dbname: sampledb database.server.name: fulfillment5 schema.include.list: public6 plugin.name: pgoutput7 - 1
- コネクターの名前。
- 2
- 1 度に 1 つのタスクのみが動作する必要があります。PostgreSQL コネクターは PostgreSQL サーバーの
192.168.99.100を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。 - 3
- コネクターの設定。
- 4
- PostgreSQL サーバーを実行しているデータベースホストの名前。この例では、データベースのホスト名は
192.168.99.100です。 - 5
- 一意のサーバー名。サーバー名は、PostgreSQL サーバーまたはサーバーのクラスターの論理識別子です。この名前は、変更イベントレコードを受信するすべての Kafka トピックの接頭辞として使用されます。
- 6
- コネクターは
publicスキーマでのみ変更をキャプチャーします。選択したテーブルでのみ変更をキャプチャーするようにコネクターを設定できます。table.include.listコネクター設定プロパティーを参照してください。 - 7
- PostgreSQL サーバーにインストールされている PostgreSQL 論理デコードプラグイン の名前。Postgre SQL 10 以降でサポートされている値は
pgoutputのみですが、明示的にplugin.nameをpgoutputに設定する必要があります。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnectorリソースをfulfillment-connector.yamlファイルに保存した場合は、以下のコマンドを実行します。oc apply -f fulfillment-connector.yamlこのコマンドは
meetment-connectorを登録して、コネクターがKafkaConnectorCR に定義されているsampledbデータベースに対して実行を開始します。コネクターが作成され、起動されたことを確認します。
Kafka Connect ログ出力を表示して、コネクターが作成され、指定データベースの変更のキャプチャーが開始されたことを確認します。
oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)ログの出力を確認し、初回のスナップショットが実行されたことを確認します。以下のような出力が表示されるはずです。
... INFO Starting snapshot for ... ... INFO Snapshot is using user 'debezium' ...コネクターがエラーがなく正常に起動すると、コネクターが変更をキャプチャーする各テーブルのトピックが作成されます。CR のサンプルでは、
publicスキーマの各テーブルにトピックがあります。ダウンストリームアプリケーションは、これらのトピックをサブスクライブできます。以下のコマンドを実行して、コネクターによってトピックが作成されたことを検証します。
oc get kafkatopics
結果
コネクターが起動すると、コネクターが設定された PostgreSQL サーバーデータベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。