3.6. Debezium PostgreSQL コネクターのデプロイメント
Debezium PostgreSQL コネクターをデプロイするには、コネクターファイルを Kafka Connect に追加し、コネクターを実行するカスタムコンテナーを作成して、コネクター設定をコンテナーに追加します。詳細は以下を参照してください。
- https://access.redhat.com/documentation/ja-jp/red_hat_integration/2021.Q1/html-single/debezium_user_guide/index#deploying-debezium-postgresql-connectors
- https://access.redhat.com/documentation/ja-jp/red_hat_integration/2021.Q1/html-single/debezium_user_guide/index#descriptions-of-debezium-postgresql-connector-configuration-properties
3.6.1. Debezium PostgreSQL コネクターのデプロイ
Debezium PostgreSQL コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、2 つのカスタムリソース (CR) を作成する必要があります。
-
Kafka Connector を設定し、Debezium コネクターを実行するために作成したイメージの名前を指定する
KafkaConnect
CR。この CR を OpenShift Kafka インスタンスに適用します。 -
Debezium PostgreSQL コネクターを設定する
KafkaConnector
CR。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。
前提条件
- PostgreSQL が実行され、PostgreSQL を設定して Debezium コネクターを実行 する手順が実行済みである。
- Red Hat AMQ Streams を使用して、OpenShift で Apache Kafka および Kafka Connect を設定し、実行済みである。AMQ Streams は、Kafka を OpenShift に取り入れる operator およびイメージを提供します。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.io
やdocker.io
など) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium PostgreSQL コンテナーを作成します。
- Debezium PostgreSQL コネクターアーカイブ をダウンロードします。
Debezium PostgreSQL コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。
./my-plugins/ ├── debezium-connector-postgresql │ ├── ...
registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0
をベースイメージとして使用する Docker ファイルを作成します。たとえば、ターミナルウィンドウに以下を入力します。cat <<EOF >debezium-container-for-postgresql.yaml 1 FROM {DockerKafkaConnect} USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ 2 USER 1001 EOF
このコマンドは、現在のディレクトリーに
debezium-container-for-postgresql.yaml
という名前の Docker ファイルを作成します。前のステップで作成した
debezium-container-for-postgresql.yaml
Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、以下のコマンドを実行します。podman build -t debezium-container-for-postgresql:latest .
docker build -t debezium-container-for-postgresql:latest .
build
コマンドは、debezium-container-for-postgresql
という名前のコンテナーイメージを構築します。カスタムイメージを
quay.io
などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。このレジストリーが OpenShift インスタンスからアクセス可能であることを確認します。以下に例を示します。podman push debezium-container-for-postgresql:latest
新しい Debezium PostgreSQL
KafkaConnect
カスタムリソース (CR) を作成します。たとえば、以下の例のようにannotations
およびimage
プロパティーを指定するdbz-connect.yaml
という名前のKafkaConnect
CR を作成します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: image: debezium-container-for-postgresql 2
以下のコマンドを実行して、
KafkaConnect
CR を OpenShift Kafka インスタンスに適用します。oc create -f dbz-connect.yaml
これにより、OpenShift の Kafka Connect 環境が更新され、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connector インスタンスが追加されます。
Debezium PostgreSQL コネクターインスタンスを設定する
KafkaConnector
カスタムリソースを作成します。コネクター設定プロパティーを設定する
.yaml
ファイルに Debezium PostgreSQL コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。これらの設定で指定できる PostgreSQL コネクタープロパティーの完全リスト を参照してください。以下の例では、ポート
5432
で PostgreSQL サーバーホスト192.168.99.100
に接続する Debezium コネクターを設定します。このホストには、sampledb
という名前のデータベース、public
という名前のスキーマがあり、fulfillment
はサーバーの論理名です。fulfillment-connector.yaml
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: fulfillment-connector 1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.postgresql.PostgresConnector tasksMax: 1 2 config: 3 database.hostname: 192.168.99.100 4 database.port: 5432 database.user: debezium database.password: dbz database.dbname: sampledb database.server.name: fulfillment 5 schema.include.list: public 6 plugin.name: pgoutput 7
- 1
- コネクターの名前。
- 2
- 1 度に 1 つのタスクのみが動作する必要があります。PostgreSQL コネクターは PostgreSQL サーバーの
192.168.99.100
を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。 - 3
- コネクターの設定。
- 4
- PostgreSQL サーバーを実行しているデータベースホストの名前。この例では、データベースのホスト名は
192.168.99.100
です。 - 5
- 一意のサーバー名。サーバー名は、PostgreSQL サーバーまたはサーバーのクラスターの論理識別子です。この名前は、変更イベントレコードを受信するすべての Kafka トピックの接頭辞として使用されます。
- 6
- コネクターは
public
スキーマでのみ変更をキャプチャーします。選択したテーブルでのみ変更をキャプチャーするようにコネクターを設定できます。table.include.list
コネクター設定プロパティーを参照してください。 - 7
- PostgreSQL サーバーにインストールされている PostgreSQL 論理デコードプラグイン の名前。Postgre SQL 10 以降でサポートされている値は
pgoutput
のみですが、明示的にplugin.name
をpgoutput
に設定する必要があります。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnector
リソースをfulfillment-connector.yaml
ファイルに保存した場合は、以下のコマンドを実行します。oc apply -f fulfillment-connector.yaml
このコマンドは
meetment-connector
を登録して、コネクターがKafkaConnector
CR に定義されているsampledb
データベースに対して実行を開始します。コネクターが作成され、起動されたことを確認します。
Kafka Connect ログ出力を表示して、コネクターが作成され、指定データベースの変更のキャプチャーが開始されたことを確認します。
oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)
ログの出力を確認し、初回のスナップショットが実行されたことを確認します。以下のような出力が表示されるはずです。
... INFO Starting snapshot for ... ... INFO Snapshot is using user 'debezium' ...
コネクターがエラーがなく正常に起動すると、コネクターが変更をキャプチャーする各テーブルのトピックが作成されます。CR のサンプルでは、
public
スキーマの各テーブルにトピックがあります。ダウンストリームアプリケーションは、これらのトピックをサブスクライブできます。以下のコマンドを実行して、コネクターによってトピックが作成されたことを検証します。
oc get kafkatopics
結果
コネクターが起動すると、コネクターが設定された PostgreSQL サーバーデータベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。