7.6. Debezium PostgreSQL コネクターのデプロイメント
以下の方法のいずれかを使用して Debezium PostgreSQL コネクターをデプロイできます。
7.6.1. AMQ Streams を使用した PostgreSQL コネクターデプロイメント
Debezium 1.7 以降、Debezium コネクターのデプロイに推奨される方法は、AMQ Streams を使用してコネクタープラグインが含まれる Kafka Connect コンテナーイメージをビルドすることです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnect
CR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnector
CR。AMQStreams が Kafka Connect Pod を開始し、KafkaConnector
CR を適用してコネクターを開始します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。AMQ Streams が Kafka Connect イメージをビルドすると、指定のアーティファクトをダウンロードし、イメージに組み込みます。
Kafka Connect CR の spec.build.output
パラメーターは、生成される KafkaConnect
コンテナーイメージを格納する場所を指定します。コンテナーイメージは Docker レジストリーまたは OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。
KafkaConnect
リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- OpenShift での AMQ Streams のデプロイと管理の Kafka Connect の設定。
- OpenShift での AMQ Streams のデプロイと管理の 新しいコンテナーイメージの自動構築。
7.6.2. AMQ Streams を使用した Debezium PostgreSQL コネクターのデプロイ
以前のバージョンの AMQ Streams では、OpenShift に Debezium コネクターをデプロイするには、最初にコネクター用の Kafka Connect イメージをビルドする必要がありました。コネクターを OpenShift にデプロイする場合に現在推奨される方法は、AMQ Streams でビルド設定を使用して、使用する Debezium コネクタープラグインが含まれる Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect
カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output
に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドしたら、KafkaConnector
カスタムリソースを作成し、ビルドに含まれるコネクターを起動します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- AMQ Streams Operator が稼働している。
- Kafka クラスターが OpenShift での AMQ Streams のデプロイと管理 に記載されているようにデプロイされている。
- Kafka Connect が AMQ Streams にデプロイされている。
- Red Hat build of Debezium のライセンスを所有している。
-
OpenShift
oc
CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
- 新規コンテナーイメージを保存するために、ImageStream リソースをクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform でのイメージストリームの管理 を参照してください。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnect
カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotations
およびspec.build
プロパティーを指定するdbz-connect.yaml
という名前のKafkaConnect
CR を作成します。以下の例は、KafkaConnect
カスタムリソースを記述するdbz-connect.yaml
ファイルからの抜粋を示しています。
例7.1 Debezium コネクターを含む
KafkaConnect
カスタムリソースを定義したdbz-connect.yaml
ファイル次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。
- Debezium PostgreSQL コネクターアーカイブです。
- Red Hat build of Apicurio Registry アーカイブApicurio Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Apicurio Registry コンポーネントを追加します。
- Debezium スクリプティング SMT アーカイブと、Debezium コネクターで使用する関連スクリプティングエンジン。SMT アーカイブとスクリプト言語の依存関係はオプションのコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: debezium-kafka-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.6.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-postgres artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.5.4.Final-redhat-00001/debezium-connector-postgres-2.5.4.Final-redhat-00001-plugin.zip 7 - type: zip url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.4.4.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.4.4.Final-redhat-<build-number>.zip 8 - type: zip url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.5.4.Final-redhat-00001/debezium-scripting-2.5.4.Final-redhat-00001.zip 9 - type: jar url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/3.0.11/groovy-3.0.11.jar 10 - type: jar url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar - type: jar url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093 ...
表7.26 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resources
アノテーションを"true"
に設定して、クラスター Operator がKafkaConnector
リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build
設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.output
は、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.type
の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker
、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestream
です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.output
を指定する方法の詳細は、OpenShift での AMQ Streams の設定の AMQ Streams ビルドスキーマ参照 を参照してください。5
plugins
設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインname
と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.type
の値は、artifacts.url
で指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip
、tgz
、またはjar
です。Debezium コネクターアーカイブは、.zip
ファイル形式で提供されます。type
の値は、url
フィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.url
の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。8
(オプション) Apicurio Registry コンポーネントをダウンロードするためのアーティファクト
type
とurl
を指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して Red Hat build of Apicurio Registry でイベントのキーと値をシリアル化する場合にのみ、Apicurio Registry アーティファクトを含めます。9
(オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト
type
とurl
を指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。10
(オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト
type
とurl
を指定します。これは、Debezium スクリプト SMT で必要です。重要AMQ Streams を使用して Kafka Connect イメージにコネクタープラグインを組み込む場合は、必要なスクリプト言語コンポーネントごとに、
artifacts.url
に JAR ファイルの場所を指定し、artifacts.type
の値もjar
に設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。
-
groovy
-
groovy-jsr223
(スクリプトエージェント) -
groovy-json
(JSON 文字列を解析するためのモジュール)
別の方法として、Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。
以下のコマンドを入力して、
KafkaConnect
ビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnector
リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnector
CR を作成し、postgresql-inventory-connector.yaml
として保存します。例7.2 Debezium コネクターの
KafkaConnector
カスタムリソースを定義するpostgresql-inventory-connector.yaml
ファイルapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: debezium-kafka-connect-cluster name: inventory-connector-postgresql 1 spec: class: io.debezium.connector.postgresql.PostgresConnector 2 tasksMax: 1 3 config: 4 database.hostname: postgresql.debezium-postgresql.svc.cluster.local 5 database.port: 5432 6 database.user: debezium 7 database.password: dbz 8 database.dbname: mydatabase 9 topic.prefix: inventory-connector-postgresql 10 table.include.list: public.inventory 11 ...
表7.27 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースへの接続に使用するアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
変更をキャプチャーするデータベースの名前。
10
データベースインスタンスまたはクラスターのトピック接頭辞。
指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクター と統合する場合、この名前空間は関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。11
コネクターが変更イベントをキャプチャーするテーブルのリスト。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
以下に例を示します。
oc create -n debezium -f postgresql-inventory-connector.yaml
コネクターは Kafka Connect クラスターに登録され、
KafkaConnector
CR のspec.config.database.dbname
で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium PostgreSQL デプロイメントを検証する 準備が整いました。
7.6.3. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium PostgreSQL コネクターのデプロイ
Debezium PostgreSQL コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、2 つのカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnect
CR。image
は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる operator およびイメージを提供します。 -
Debezium Db2 コネクターを定義する
KafkaConnector
CR。この CR をKafkaConnect
CR を適用したのと同じ OpenShift インスタンスに適用します。
前提条件
- PostgreSQL が実行され、PostgreSQL を設定して Debezium コネクターを実行する 手順が実行済みである。
- AMQ Streams は OpenShift にデプロイされ、Apache Kafka および Kafka Connect が稼働している必要があります。詳細は、OpenShift での AMQ ストリームのデプロイと管理 を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.io
やdocker.io
など) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium PostgreSQL コンテナーを作成します。
registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
をベースイメージとして使用して、新規の Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。cat <<EOF >debezium-container-for-postgresql.yaml 1 FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 USER root:root RUN mkdir -p /opt/kafka/plugins/debezium 2 RUN cd /opt/kafka/plugins/debezium/ \ && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.5.4.Final-redhat-00001/debezium-connector-postgres-2.5.4.Final-redhat-00001-plugin.zip \ && unzip debezium-connector-postgres-2.5.4.Final-redhat-00001-plugin.zip \ && rm debezium-connector-postgres-2.5.4.Final-redhat-00001-plugin.zip RUN cd /opt/kafka/plugins/debezium/ USER 1001 EOF
項目 説明 1
任意のファイル名を指定できます。
2
Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。
このコマンドは、現在のディレクトリーに
debezium-container-for-postgresql.yaml
という名前の Dockerfile を作成します。前のステップで作成した
debezium-container-for-postgresql.yaml
Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-postgresql:latest .
docker build -t debezium-container-for-postgresql:latest .
build
コマンドは、debezium-container-for-postgresql
という名前のコンテナーイメージを構築します。カスタムイメージを
quay.io
などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。podman push <myregistry.io>/debezium-container-for-postgresql:latest
docker push <myregistry.io>/debezium-container-for-postgresql:latest
新しい Debezium PostgreSQL
KafkaConnect
カスタムリソース (CR) を作成します。たとえば、annotations
およびimage
プロパティーを指定するdbz-connect.yaml
という名前のKafkaConnect
CR を作成します。以下の例は、KafkaConnect
カスタムリソースを記述するdbz-connect.yaml
ファイルからの抜粋を示しています。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: image: debezium-container-for-postgresql 2 ...
項目 説明 1
KafkaConnector
リソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotations
は Cluster Operator に示します。2
spec.image
は Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
変数がオーバーライドされます。以下のコマンドを実行して、
KafkaConnect
CR を OpenShift Kafka インスタンスに適用します。oc create -f dbz-connect.yaml
これにより、OpenShift の Kafka Connect 環境が更新され、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connector インスタンスが追加されます。
Debezium PostgreSQL コネクターインスタンスを設定する
KafkaConnector
カスタムリソースを作成します。通常、コネクター設定プロパティーを設定する
.yaml
ファイルに Debezium PostgreSQL コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。Debezium PostgreSQL コネクターに設定できる設定プロパティーの完全リストは PostgreSQL コネクタープロパティー を参照してください。次の例は、ポート
5432
で PostgreSQL サーバーホスト192.168.99.100
に接続する Debezium コネクターを設定するカスタムリソースからの抜粋です。このホストには、sampledb
という名前のデータベース、public
という名前のスキーマ、inventory-connector-postgresql
という論理名のサーバーがあります。
PostgreSQL
inventory-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: inventory-connector-postgresql 1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.postgresql.PostgresConnector tasksMax: 1 2 config: 3 database.hostname: 192.168.99.100 4 database.port: 5432 database.user: debezium database.password: dbz database.dbname: sampledb topic.prefix: inventory-connector-postgresql 5 schema.include.list: public 6 plugin.name: pgoutput 7 ...
表7.28 PostgreSQL の inventory-connector.yaml サンプルの設定の説明 項目 説明 1
コネクターを Kafka Connect に登録するために使用される名前。
2
このコネクターに作成するタスクの最大数。PostgreSQL コネクターは単一のコネクタータスクを使用して PostgreSQL サーバーの
binlog
を読み取るため、適切な順序とイベント処理を確保するため、一度に動作できるタスクは 1 つのみです。Kafka Connect サービスは、コネクターを使用して 1 つ以上のタスクを開始して作業を実行し、実行中のタスクを Kafka Connect サービスのクラスター全体に自動的に分散します。サービスが停止またはクラッシュした場合、タスクは実行中のサービスに再分散されます。3
コネクターの設定。
4
PostgreSQL サーバーを実行するデータベースホストの名前。この例では、データベースのホスト名は
192.168.99.100
です。5
一意のトピック接頭辞。トピック接頭辞は、PostgreSQL サーバーまたはサーバーのクラスターの論理識別子です。この文字列は、コネクターから変更イベントレコードを受け取るすべての Kafka トピックの名前の先頭に付加されます。
6
コネクターは
public
スキーマでのみ変更をキャプチャーします。選択したテーブルでのみ変更をキャプチャーするようにコネクターを設定できます。詳細はtable.include.list
を参照してください。7
PostgreSQL サーバーにインストールされている PostgreSQL 論理デコードプラグイン の名前。コネクターは
pgoutput
プラグインの使用のみをサポートしますが、plugin.name
をpgoutput
に明示的に設定する必要があります。Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnector
リソースをinventory-connector.yaml
ファイルに保存した場合は、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
このコマンドは
inventory-connector
を登録して、コネクターがKafkaConnector
CR に定義されているsampledb
データベースに対して実行を開始します。
結果
コネクターが起動すると、コネクターが設定された PostgreSQL サーバーデータベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。
7.6.4. Debezium PostgreSQL コネクターが実行していることの確認
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが AMQ Streams on OpenShift にデプロイされている。
-
OpenShift
oc
CLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnector
リソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnector
を入力します。 - KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-postgresql)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc describe KafkaConnector <connector-name> -n <project>
以下に例を示します。
oc describe KafkaConnector inventory-connector-postgresql -n debezium
このコマンドは、以下の出力のようなステータス情報を返します。
例7.3
KafkaConnector
リソースのステータスName: inventory-connector-postgresql Namespace: debezium Labels: strimzi.io/cluster=debezium-kafka-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector-postgresql Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: inventory-connector-postgresql.inventory inventory-connector-postgresql.inventory.addresses inventory-connector-postgresql.inventory.customers inventory-connector-postgresql.inventory.geom inventory-connector-postgresql.inventory.orders inventory-connector-postgresql.inventory.products inventory-connector-postgresql.inventory.products_on_hand Events: <none>
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopic
を入力します。 -
KafkaTopics リストから確認するトピックの名前をクリックします (例:
inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
)。 - Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc get kafkatopics
このコマンドは、以下の出力のようなステータス情報を返します。
例7.4
KafkaTopic
リソースのステータスNAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs debezium-kafka-cluster 1 1 True connect-cluster-offsets debezium-kafka-cluster 25 1 True connect-cluster-status debezium-kafka-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a debezium-kafka-cluster 50 1 True inventory-connector-postgresql--a96f69b23d6118ff415f772679da623fbbb99421 debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 debezium-kafka-cluster 1 1 True schema-changes.inventory debezium-kafka-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 debezium-kafka-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b debezium-kafka-cluster 1 1 True
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-postgresql.inventory.products_on_hand
トピック名を指定する形式は、手順 1 で返された
oc describe
コマンドと同じです (例:inventory-connector-postgresql.inventory.addresses
)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例7.5 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.5.4.Final-redhat-00001","connector":"postgresql","name":"inventory-connector-postgresql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"postgresql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
上記の例では、
payload
値は、コネクタースナップショットがテーブルinventory.products_on_hand
から読み込み ("op" ="r"
) イベントを生成したことを示しています。product_id
レコードの"before"
状態はnull
であり、レコードに以前の値が存在しないことを示します。"after"
状態がproduct_id
101
を持つ項目のquantity
を3
であることを示しています。
7.6.5. Debezium PostgreSQL コネクター設定プロパティーの説明
Debezium PostgreSQL コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。
以下の設定プロパティーは、デフォルト値がない場合は必須です。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし | コネクターの一意名。同じ名前で再登録を試みると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。 | |
デフォルトなし |
コネクターの Java クラスの名前。PostgreSQL コネクターには、常に | |
| このコネクターのために作成する必要のあるタスクの最大数。PostgreSQL コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。 | |
| PostgreSQL サーバーにインストールされている PostgreSQL 論理デコードプラグイン の名前。
サポートされている値は | |
| 特定のデータベース/スキーマの特定のプラグインから変更をストリーミングするために作成された PostgreSQL 論理デコードスロットの名前。サーバーはこのスロットを使用して、設定する Debezium コネクターにイベントをストリーミングします。 スロット名は PostgreSQL レプリケーションスロットの命名ルール に準拠する必要があり、命名ルールには "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character." と記載されています。 | |
| コネクターが正常に想定されるように停止した場合に論理レプリケーションスロットを削除するかどうか。デフォルトの動作では、コネクターが停止したときにレプリケーションスロットはコネクターに設定された状態を保持します。コネクターが再起動すると、同じレプリケーションスロットがあるため、コネクターは停止した場所から処理を開始できます。
テストまたは開発環境でのみ | |
|
このパブリケーションが存在しない場合は起動時に作成され、すべてのテーブルが含まれます。Debezium は、設定されている場合は、独自の include/exclude リストフィルターを適用し、対象となる特定のテーブルのイベントのみをパブリケーションが変更するように制限します。コネクターユーザーがこのパブリケーションを作成するには、スーパーユーザーの権限が必要であるため、通常はコネクターを初めて開始する前にパブリケーションを作成することを推奨します。 パブリケーションがすでに存在し、すべてのテーブルが含まれてているか、テーブルのサブセットで設定されている場合、Debezium は定義されているようにパブリケーションを使用します。 | |
デフォルトなし | PostgreSQL データベースサーバーの IP アドレスまたはホスト名。 | |
| PostgreSQL データベースサーバーのポート番号 (整数)。 | |
デフォルトなし | PostgreSQL データベースサーバーに接続するための PostgreSQL データベースユーザーの名前。 | |
デフォルトなし | PostgreSQL データベースサーバーへの接続時に使用するパスワード。 | |
デフォルトなし | 変更をストリーミングする PostgreSQL データベースの名前。 | |
デフォルトなし |
Debezium が変更をキャプチャーする特定の PostgreSQL データベースサーバーまたはクラスターの名前空間を提供するトピック接頭辞。接頭辞は、他のコネクター全体で一意となる必要があります。これは、このコネクターからレコードを受信するすべての Kafka トピックのトピック名接頭辞として使用されるためです。データベースサーバーの論理名には英数字とハイフン、ドット、アンダースコアのみを使用する必要があります。 警告 このプロパティーの値を変更しないでください。名前の値を変更すると、再起動後に、元のトピックにイベントを発行し続けるのではなく、新しい値に基づいた名前のトピックに後続のイベントを発行します。 | |
デフォルトなし |
変更をキャプチャーする対象とするスキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。
スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの ID 全体と照合されます。 | |
デフォルトなし |
変更をキャプチャーする対象としないスキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。システムスキーマ以外で、
スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの ID 全体と照合されます。 | |
デフォルトなし |
変更をキャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。このプロパティーが設定されている場合、コネクターは指定されたテーブルからのみ変更をキャプチャします。各識別子の形式は schemaName.tableName です。デフォルトでは、コネクターは変更がキャプチャーされる各スキーマのシステムでないすべてのテーブルの変更をキャプチャーします。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの ID 全体と照合されます。 | |
デフォルトなし |
変更をキャプチャーしないテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。各識別子の形式は schemaName.tableName です。このプロパティーが設定されている場合、コネクターは、指定のないすべてのテーブルから変更をキャプチャーします。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの ID 全体と照合されます。 | |
デフォルトなし |
変更イベントレコード値に含まれる必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、列の名前文字列全体を一致させるために式が使用され、列名に存在する可能性のある部分文字列とは一致しません。 | |
デフォルトなし |
変更イベントレコード値から除外される必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、列の名前文字列全体を一致させるために式が使用され、列名に存在する可能性のある部分文字列とは一致しません。 | |
|
含まれる列に変更がない場合にメッセージの公開をスキップするかどうかを指定します。これは基本的に、含まれる列に変更がない場合、 注: テーブルの REPLICA IDENTITY が FULL に設定されている場合にのみ機能します。 | |
|
時間、日付、およびタイムスタンプは、異なる精度の種類で表すことができます。 | |
|
コネクターによる | |
|
コネクターによる | |
|
| |
|
PostgreSQL サーバーへの暗号化された接続を使用するかどうか。オプションには以下が含まれます。 | |
デフォルトなし | クライアントの SSL 証明書が含まれるファイルへのパス。詳細は the PostgreSQL のドキュメント を参照してください。 | |
デフォルトなし | クライアントの SSL 秘密鍵が含まれるファイルへのパス。詳細は the PostgreSQL のドキュメント を参照してください。 | |
デフォルトなし |
| |
デフォルトなし | サーバーが検証されるルート証明書が含まれるファイルへのパス。詳細は the PostgreSQL のドキュメント を参照してください。 | |
| TCP keep-alive プローブを有効にして、データベース接続がまだ有効であることを確認します。詳細は the PostgreSQL のドキュメント を参照してください。 | |
|
削除 イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。 | |
該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。プロパティー名の 長さ で指定された文字数を超えた場合に、一連の列のデータを切り捨てる場合は、このプロパティーを設定します。
列の完全修飾名は、 単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。 | |
該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。一連の列の値をコネクターでマスクする場合 (たとえば、列に機密データが含まれている場合) は、このプロパティーを設定します。 列の完全修飾名は、次の形式に従います: schemaName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。 単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。 | |
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は <schemaName>.<tableName>.<columnName> です。
仮名は、指定された hashAlgorithm と salt を適用すると得られるハッシュ化された値で構成されます。使用されるハッシュ関数に基づいて、参照整合性は保持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest セクションに説明されています。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。 |
該当なし | 列のメタデータを表す追加パラメーターをコネクターに発行させたい列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。このプロパティーが設定されている場合、コネクターは次のフィールドをイベントレコードのスキーマに追加します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名は、次のいずれかの形式に従います: databaseName.tableName.columnName、または databaseName.schemaName.tableName.columnName. | |
該当なし | データベース内の列に対して定義されているデータ型の完全修飾名を指定する正規表現のオプションのコンマ区切りリスト。このプロパティーが設定されている場合、データ型が一致する列に対して、コネクターはスキーマに次の追加フィールドを含むイベントレコードを発行します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名の形式は、databaseName.tableName.typeName、または databaseName.schemaName.tableName.typeName のいずれかになります。 PostgreSQL 固有のデータ型名の一覧は、PostgreSQL データ型マッピング を参照してください。 | |
空の文字列 | 指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。
デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。
各完全修飾テーブル名は、 カスタムメッセージキーの作成に使用する列の数に制限はありません。ただし、一意の鍵を指定するために必要な最小数を使用することが推奨されます。
テーブルでこのプロパティーを設定し、 | |
all_tables |
| |
空の文字列 |
この設定は、テーブルレベルで レプリカ ID の値を決定します。 schema1.*:FULL,schema2.table2:NOTHING,schema2.table3:INDEX idx_name | |
bytes |
変更イベントでのバイナリー ( | |
none |
コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。設定可能:
| |
none |
コネクターで使用されるメッセージコンバータとの互換性のために、フィールド名をどのように調整するかを指定します。設定可能:
詳細は、Avro naming を参照してください。 | |
|
Postgres の | |
デフォルトなし | コネクターでキャプチャーする論理デコードメッセージの接頭辞と一致するコンマ区切りの正規表現 (任意)。デフォルトでは、コネクターはすべての論理デコードメッセージをキャプチャーします。このプロパティーが設定されている場合、コネクターはプロパティーで指定された接頭辞が含まれる論理デコードメッセージのみをキャプチャーします。その他の論理デコードメッセージはすべて除外されます。 メッセージの接頭辞名を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、メッセージ接頭辞文字列全体と照合されます。式は、接頭辞に存在する可能性のある部分文字列と一致しません。
このプロパティーを設定に含める場合は、 メッセージ イベントの構造とその順序付けのセマンティクスについては、メッセージイベント を参照してください。 | |
デフォルトなし |
コネクターでキャプチャーしない論理デコードメッセージの接頭辞名と一致するコンマ区切りの正規表現 (任意)。このプロパティーが設定されている場合、コネクターは、指定された接頭部を使用する論理デコードメッセージをキャプチャーしません。その他のメッセージはすべてキャプチャーされます。 メッセージの接頭辞名を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、メッセージ接頭辞文字列全体と照合されます。式は、接頭辞に存在する可能性のある部分文字列と一致しません。
このプロパティーを設定に含める場合は、 メッセージ イベントの構造とその順序付けのセマンティクスについては、メッセージイベント を参照してください。 |
以下の 高度な 設定プロパティーには、ほとんどの状況で機能するデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし |
コネクターが使用できる カスタムコンバーター インスタンスのシンボリック名のコンマ区切りリストを列挙します。以下に例を示します。
コネクターがカスタムコンバーターを使用できるようにするには、
コネクターに設定するコンバーターごとに、コンバーターインターフェイスを実装するクラスの完全修飾名を指定する
以下に例を示します。 isbn.type: io.debezium.test.IsbnConverter
設定されたコンバータの動作をさらに制御したい場合は、1 つ以上の設定パラメーターを追加して、コンバータに値を渡すことができます。追加の設定パラメーターとコンバーターを関連付けるには、パラメーター名の前にコンバーターのシンボリック名を付けます。 isbn.schema.name: io.debezium.postgresql.type.Isbn | |
|
コネクターの起動時にスナップショットを実行する基準を指定します。 | |
|
スナップショットに含めるテーブルの完全修飾名 ( テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
| スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する正の整数値。コネクターがこの期間にテーブルロックを取得できないと、スナップショットは失敗します。詳細は、コネクターによるスナップショットの実行方法 を参照してください。 | |
デフォルトなし | スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。
プロパティーには、
スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 ( "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
作成されるスナップショットでは、コネクターには | |
|
イベントの処理中にコネクターが例外に反応する方法を指定します。 | |
| コネクターが処理するイベントの各バッチの最大サイズを指定する正の整数値。 | |
|
ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。 | |
|
ブロッキングキューの最大容量をバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。 | |
| コネクターがイベントのバッチの処理を開始する前に、新しい変更イベントの発生を待つ期間をミリ秒単位で指定する正の整数値。デフォルトは 500 ミリ秒です。 | |
|
コネクターがデータタイプが不明なフィールドを見つけたときのコネクターの動作を指定します。コネクターが変更イベントからフィールドを省略し、警告をログに記録するのがデフォルトの動作です。 注記
| |
デフォルトなし |
データベースへの JDBC 接続を確立するときにコネクターが実行する SQL ステートメントのセミコロン区切りリスト。セミコロンを区切り文字としてではなく、文字として使用する場合は、2 つの連続したセミコロン | |
|
レプリケーションの接続状態をサーバーに送信する頻度をミリ秒単位で指定します。 | |
|
コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルトの動作では、コネクターはハートビートメッセージを送信しません。 | |
デフォルトなし |
コネクターがハートビートメッセージを送信するときにコネクターがソースデータベースで実行するクエリーを指定します。 | |
|
テーブルのインメモリースキーマの更新をトリガーする条件を指定します。 | |
デフォルトなし | コネクターの起動時にスナップショットを実行するまでコネクターが待つ必要がある間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。 | |
| スナップショットの実行中、コネクターは行のバッチでテーブルの内容を読み取ります。このプロパティーは、バッチの行の最大数を指定します。 | |
デフォルトなし |
設定された論理デコードプラグインに渡すパラメーターのセミコロン区切りリスト。たとえば、 | |
| レプリケーションスロットへの接続に失敗した場合に、連続して接続を試行する最大回数です。 | |
| コネクターがレプリケーションスロットへの接続に失敗した場合に再試行を行う間隔 (ミリ秒単位)。 | |
|
コネクターが提供する定数を指定して、元の値がデータベースによって提供されていない Toast 化された値であることを示します。 | |
|
コネクターがトランザクション境界でイベントを生成し、トランザクションメタデータで変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合は | |
|
WAL ログを削除できるように、コネクターがソース postgres データベースに、処理されたレコードの LSN をコミットする必要があるかどうかを決定します。コネクターでこれを行わない場合は | |
10000 (10 秒) | 再試行可能なエラーが発生した後にコネクターを再起動するまで待機する時間 (ミリ秒単位)。 | |
|
ストリーミング中にスキップされる操作タイプのコンマ区切りリスト。挿入/作成は | |
デフォルト値なし |
シグナルをコネクターへの送信に使用されるデータコレクションの完全修飾名。 | |
source | コネクターに対して有効な信号チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
デフォルトなし | コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
1024 | 増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。 | |
|
増分スナップショットによってキャプチャーされ、ストリーミングの再開後に再キャプチャーされる可能性のあるイベントを重複排除するために、コネクターが増分スナップショット中に使用するウォーターマークメカニズムを指定します。
| |
|
レプリケーションスロットから XMIN が読み込まれる頻度 (ミリ秒単位)。XMIN 値は、新しいレプリケーションスロットの開始位置の下限を示す。デフォルト値の | |
|
データ変更、スキーマ変更、トランザクション、ハートビートイベントなどのトピック名を決定するために使用する TopicNamingStrategy クラスの名前。デフォルトは | |
|
トピック名の区切り文字を指定します。デフォルトは | |
| トピック名を保持するために使用されるサイズ (bounded concurrent hash map)。このキャッシュは、与えられたデータコレクションに対応するトピック名を決定するのに役立つ。 | |
|
コネクターがハートビートメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
|
コネクターがトランザクションのメタデータメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
| 初期スナップショットを実行するときにコネクターが使用するスレッドの数を指定します。並列初期スナップショットを有効にするには、プロパティーを 1 より大きい値に設定します。並列初期スナップショットでは、コネクターは複数のテーブルを同時に処理します。 重要 並列初期スナップショットはテクノロジープレビュー機能のみとなっています。テクノロジープレビュー機能は、Red Hat 製品サポートのサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではない場合があります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。 | |
|
カスタムメトリクスタグは、通常の名前の末尾に追加される MBean オブジェクト名をカスタマイズするためのキーと値のペアを受け入れます。各キーは MBean オブジェクト名のタグを表し、対応する値はキーが示す当該タグの値になります。たとえば、 | |
| 再試行可能なエラー (接続エラーなど) が失敗するまでの最大再試行回数 (-1 = 制限なし、0 = 無効、> 0 = 再試行回数)。 |
パススルーコネクター設定プロパティー
コネクターは、Kafka プロデューサーおよびコンシューマーの作成時に使用される パススルー 設定プロパティーもサポートします。
Kafka プロデューサーおよびコンシューマーのすべての設定プロパティーについては、必ず Kafka ドキュメント を参照してください。PostgreSQL コネクターは 新しいコンシューマー設定プロパティー を使用します。
Debezium コネクター Kafka は設定プロパティーをシグナル化します。
Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.*
プロパティーのセットを提供します。
以下の表は、Kafka signal
プロパティーを説明しています。
プロパティー | デフォルト | 説明 |
---|---|---|
<topic.prefix>-signal | コネクターがアドホックシグナルについて監視する Kafka トピックの名前。 注記 トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナルトピックが必要です。シグナリングトピックには単一のパーティションが必要です。 | |
kafka-signal | Kafka コンシューマーによって使用されるグループ ID の名前。 | |
デフォルトなし | Kafka クラスターへの最初の接続を確立するためにコネクターが使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。 | |
| コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。 | |
| 少なくとも 1 回は確実に配信するために、シグナルトピックのオフセットコミットを有効にします。無効にすると、コンシューマーの稼働中に受信したシグナルのみが処理されます。コンシューマーがダウンしているときに受信したシグナルはすべて失われます。 |
Debezium コネクターのパススルーは Kafka コンシューマークライアント設定プロパティーを示唆します。
Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.*
で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL
などのプロパティーを Kafka コンシューマーに渡します。
Debezium は、プロパティーを Kafka シグナルコンシューマーに渡す前に、プロパティーから接頭辞を削除します。
Debezium コネクターの sink 通知設定プロパティー
以下の表は、notification
プロパティーを説明しています。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし |
Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして |