8.6. Debezium PostgreSQL コネクターのデプロイメント


以下の方法のいずれかを使用して Debezium PostgreSQL コネクターをデプロイできます。

8.6.1. AMQ Streams を使用した PostgreSQL コネクターデプロイメント

Debezium 1.7 以降、Debezium コネクターのデプロイに推奨される方法は、AMQ Streams を使用してコネクタープラグインが含まれる Kafka Connect コンテナーイメージをビルドすることです。

デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。

  • Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある KafkaConnect CR。
  • コネクターがソースデータベースにアクセスするために使用する情報を提供する KafkaConnector CR。AMQ Streams が Kafka Connect Pod を起動したら、KafkaConnector CR を適用してコネクターを起動します。

Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。AMQ Streams が Kafka Connect イメージをビルドすると、指定のアーティファクトをダウンロードし、イメージに組み込みます。

Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnectコンテナーイメージを格納する場所を指定します。コンテナーイメージは Docker レジストリーまたは OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。

注記

KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。

関連情報

8.6.2. AMQ Streams を使用した Debezium PostgreSQL コネクターのデプロイ

以前のバージョンの AMQ Streams では、OpenShift に Debezium コネクターをデプロイするには、最初にコネクター用の Kafka Connect イメージをビルドする必要がありました。コネクターを OpenShift にデプロイする場合に現在推奨される方法は、AMQ Streams でビルド設定を使用して、使用する Debezium コネクタープラグインが含まれる Kafka Connect コンテナーイメージを自動的にビルドすることです。

ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。

新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドしたら、KafkaConnector カスタムリソースを作成し、ビルドに含まれるコネクターを起動します。

前提条件

  • クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
  • AMQ Streams Operator が稼働している。
  • Apache Kafka クラスターは、OpenShift での AMQ Streams のデプロイおよびアップグレード に記載されているとおりにデプロイされます。
  • Kafka Connect が AMQ Streams にデプロイされている。
  • Red Hat Integration ライセンスがある。
  • OpenShift oc CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。
  • Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。

    ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
    • レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
    ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。

手順

  1. OpenShift クラスターにログインします。
  2. コネクターの Debezium KafkaConnect カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotations および spec.build プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。以下の例は、KafkaConnect カスタムリソースを記述する dbz-connect.yaml ファイルからの抜粋を示しています。

    例8.1 Debezium コネクターを含む KafkaConnect カスタムリソースを定義した dbz-connect.yaml ファイル

    次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。

    • Debezium PostgreSQL コネクターアーカイブです。
    • サービスレジストリーアーカイブ。Service Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Service Registry コンポーネントを追加します。
    • Debezium スクリプティング SMT アーカイブと、Debezium コネクターで使用する関連スクリプティングエンジン。SMT アーカイブとスクリプト言語の依存関係はオプションのコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.5.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-postgres
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.3.4.Final-redhat-00001/debezium-connector-postgres-2.3.4.Final-redhat-00001-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.4.4.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.4.4.Final-redhat-<build-number>.zip  8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.3.4.Final-redhat-00001/debezium-scripting-2.3.4.Final-redhat-00001.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    表8.25 Kafka Connect 設定の説明
    項目説明

    1

    strimzi.io/use-connector-resources アノテーションを "true" に設定して、クラスター Operator が KafkaConnector リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。

    2

    spec.build 設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。

    3

    build.output は、新しくビルドされたイメージを保存するレジストリーを指定します。

    4

    イメージ出力の名前およびイメージ名を指定します。output.type の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合は docker、内部の OpenShift ImageStream にイメージをプッシュする場合は imagestream です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定で build.output を指定する方法の詳細は、OpenShift での AMQ Streams の設定の AMQ Streams ビルドスキーマ参照 を参照してください。

    5

    plugins 設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグイン name と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。

    6

    artifacts.type の値は、artifacts.url で指定するアーティファクトのファイルタイプを指定します。有効なタイプは ziptgz、または jar です。Debezium コネクターアーカイブは、.zip ファイル形式で提供されます。type の値は、url フィールドで参照されるファイルのタイプと一致させる必要があります。

    7

    artifacts.url の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。

    8

    (オプション) サービスレジストリーコンポーネントをダウンロードするためのアーティファクト typeurl を指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して、Service Registry を使用してイベントのキーと値をシリアル化する場合にのみ、Service Registry アーティファクトを含めます。

    9

    (オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト typeurl を指定します。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。

    10

    (オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト typeurl を指定します。これは、Debezium スクリプト SMT で必要です。

    重要

    AMQ Streams を使用して Kafka Connect イメージにコネクタープラグインを組み込む場合は、必要なスクリプト言語コンポーネントごとに、artifacts.url に JAR ファイルの場所を指定し、artifacts.type の値も jar に設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。

    スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。

    • groovy
    • groovy-jsr223 (スクリプトエージェント)
    • groovy-json (JSON 文字列を解析するためのモジュール)

    別の方法として、Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。

  3. 以下のコマンドを入力して、KafkaConnect ビルド仕様を OpenShift クラスターに適用します。

    oc create -f dbz-connect.yaml

    Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
    ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。

  4. KafkaConnector リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
    たとえば、以下の KafkaConnector CR を作成し、postgresql-inventory-connector.yaml として保存します。

    例8.2 Debezium コネクターの KafkaConnector カスタムリソースを定義する postgresql-inventory-connector.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-postgresql 1
    spec:
      class: io.debezium.connector.postgresql.PostgresConnector 2
      tasksMax: 1  3
      config:  4
        database.hostname: postgresql.debezium-postgresql.svc.cluster.local 5
        database.port: 5432   6
        database.user: debezium  7
        database.password: dbz  8
        database.dbname: mydatabase 9
        topic.prefix: inventory-connector-postgresql 10
        table.include.list: public.inventory  11
    
        ...
    表8.26 コネクター設定の説明
    項目説明

    1

    Kafka Connect クラスターに登録するコネクターの名前。

    2

    コネクタークラスの名前。

    3

    同時に動作できるタスクの数。

    4

    コネクターの設定。

    5

    ホストデータベースインスタンスのアドレス。

    6

    データベースインスタンスのポート番号。

    7

    Debezium がデータベースへの接続に使用するアカウントの名前。

    8

    Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。

    9

    変更をキャプチャーするデータベースの名前。

    10

    データベースインスタンスまたはクラスターのトピック接頭辞。
    指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
    トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
    コネクターを Avro コネクター と統合する場合、この名前空間は関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。

    11

    コネクターが変更イベントをキャプチャーするテーブルのリスト。

  5. 以下のコマンドを実行してコネクターリソースを作成します。

    oc create -n <namespace> -f <kafkaConnector>.yaml

    以下に例を示します。

    oc create -n debezium -f {context}-inventory-connector.yaml

    コネクターは Kafka Connect クラスターに登録され、KafkaConnector CR の spec.config.database.dbname で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。

これで、Debezium PostgreSQL デプロイメントを検証する 準備が整いました。

8.6.3. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium PostgreSQL コネクターのデプロイ

Debezium PostgreSQL コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、2 つのカスタムリソース (CR) を作成する必要があります。

  • Kafka Connect インスタンスを定義する KafkaConnect CR。image は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる operator およびイメージを提供します。
  • Debezium Db2 コネクターを定義する KafkaConnector CR。この CR を KafkaConnect CR を適用したのと同じ OpenShift インスタンスに適用します。

前提条件

  • PostgreSQL が実行され、PostgreSQL を設定して Debezium コネクターを実行する 手順が実行済みである。
  • AMQ Streams は OpenShift にデプロイされ、Apache Kafka および Kafka Connect が稼働している必要があります。詳細は、Deploying and Upgrading AMQ Streams on OpenShift を参照してください。
  • Podman または Docker がインストールされている。
  • Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (quay.iodocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。

手順

  1. Kafka Connect の Debezium PostgreSQL コンテナーを作成します。

    1. registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 をベースイメージとして使用して、新規の Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。

      cat <<EOF >debezium-container-for-postgresql.yaml 1
      FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 2
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.3.4.Final-redhat-00001/debezium-connector-postgres-2.3.4.Final-redhat-00001-plugin.zip \
      && unzip debezium-connector-postgres-2.3.4.Final-redhat-00001-plugin.zip \
      && rm debezium-connector-postgres-2.3.4.Final-redhat-00001-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/
      USER 1001
      EOF
      項目説明

      1

      任意のファイル名を指定できます。

      2

      Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。

      このコマンドは、現在のディレクトリーに debezium-container-for-postgresql.yaml という名前の Dockerfile を作成します。

    2. 前のステップで作成した debezium-container-for-postgresql.yaml Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。

      podman build -t debezium-container-for-postgresql:latest .
      docker build -t debezium-container-for-postgresql:latest .

      build コマンドは、debezium-container-for-postgresql という名前のコンテナーイメージを構築します。

    3. カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。

      podman push <myregistry.io>/debezium-container-for-postgresql:latest
      docker push <myregistry.io>/debezium-container-for-postgresql:latest
    4. 新しい Debezium PostgreSQL KafkaConnect カスタムリソース (CR) を作成します。たとえば、annotations および image プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。以下の例は、KafkaConnect カスタムリソースを記述する dbz-connect.yaml ファイルからの抜粋を示しています。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        image: debezium-container-for-postgresql 2
      
        ...
      項目説明

      1

      KafkaConnector リソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotations は Cluster Operator に示します。

      2

      spec.image は Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数がオーバーライドされます。

    5. 以下のコマンドを実行して、KafkaConnect CR を OpenShift Kafka インスタンスに適用します。

      oc create -f dbz-connect.yaml

      これにより、OpenShift の Kafka Connect 環境が更新され、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connector インスタンスが追加されます。

  2. Debezium PostgreSQL コネクターインスタンスを設定する KafkaConnector カスタムリソースを作成します。

    通常、コネクター設定プロパティーを設定する .yaml ファイルに Debezium PostgreSQL コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。Debezium PostgreSQL コネクターに設定できる設定プロパティーの完全リストは PostgreSQL コネクタープロパティー を参照してください。

    次の例は、ポート 5432 で PostgreSQL サーバーホスト 192.168.99.100 に接続する Debezium コネクターを設定するカスタムリソースからの抜粋です。このホストには、sampledb という名前のデータベース、public という名前のスキーマ、inventory-connector-postgresql という論理名のサーバーがあります。

    inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector-postgresql  1
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.postgresql.PostgresConnector
        tasksMax: 1  2
        config:  3
          database.hostname: 192.168.99.100   4
          database.port: 5432
          database.user: debezium
          database.password: dbz
          database.dbname: sampledb
          topic.prefix: inventory-connector-postgresql   5
          schema.include.list: public   6
          plugin.name: pgoutput    7
    
          ...

    1 1 1 1 1
    コネクターの名前。
    2 2 2 2 2
    一度に実行できるタスクは 1 つだけです。PostgreSQL コネクターは PostgreSQL サーバーの 192.168.99.100 を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。
    3 3 3
    コネクターの設定。
    4 4 4
    PostgreSQL サーバーを実行しているデータベースホストの名前。この例では、データベースのホスト名は 192.168.99.100 です。
    5 5 5
    一意のトピック接頭辞。サーバー名は、PostgreSQL サーバーまたはサーバーのクラスターの論理識別子です。この名前は、変更イベントレコードを受信するすべての Kafka トピックの接頭辞として使用されます。
    6 6 6
    コネクターは public スキーマでのみ変更をキャプチャーします。選択したテーブルでのみ変更をキャプチャーするようにコネクターを設定できます。詳細は table.include.list を参照してください。
    7 7 7
    PostgreSQL サーバーにインストールされている PostgreSQL 論理デコードプラグイン の名前。PostgreSQL 10 以降でサポートされている値は pgoutput のみですが、明示的に plugin.namepgoutput に設定する必要があります。
  3. Kafka Connect でコネクターインスタンスを作成します。たとえば、KafkaConnector リソースを inventory-connector.yaml ファイルに保存した場合は、以下のコマンドを実行します。

    oc apply -f inventory-connector.yaml

    このコマンドは inventory-connector を登録して、コネクターが KafkaConnector CR に定義されている sampledb データベースに対して実行を開始します。

結果

コネクターが起動すると、コネクターが設定された PostgreSQL サーバーデータベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。

8.6.4. Debezium PostgreSQL コネクターが実行していることの確認

コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。

コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。

  • コネクターのステータスを確認します。
  • コネクターがトピックを生成していることを確認します。
  • 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。

前提条件

  • Debezium コネクターが AMQ Streams on OpenShift にデプロイされている。
  • OpenShift oc CLI クライアントがインストールされている。
  • OpenShift Container Platform Web コンソールにアクセスできる。

手順

  1. 以下の方法のいずれかを使用して KafkaConnector リソースのステータスを確認します。

    • OpenShift Container Platform Web コンソールから以下を実行します。

      1. Home Search に移動します。
      2. Search ページで Resources をクリックし、Select Resource ボックスを開き、KafkaConnector を入力します。
      3. KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-postgresql)。
      4. Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
    • ターミナルウィンドウから以下を実行します。

      1. 以下のコマンドを入力します。

        oc describe KafkaConnector <connector-name> -n <project>

        以下に例を示します。

        oc describe KafkaConnector inventory-connector-postgresql -n debezium

        このコマンドは、以下の出力のようなステータス情報を返します。

        例8.3 KafkaConnector リソースのステータス

        Name:         inventory-connector-postgresql
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-postgresql
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-postgresql.inventory
            inventory-connector-postgresql.inventory.addresses
            inventory-connector-postgresql.inventory.customers
            inventory-connector-postgresql.inventory.geom
            inventory-connector-postgresql.inventory.orders
            inventory-connector-postgresql.inventory.products
            inventory-connector-postgresql.inventory.products_on_hand
        Events:  <none>
  2. コネクターによって Kafka トピックが作成されたことを確認します。

    • OpenShift Container Platform Web コンソールから以下を実行します。

      1. Home Search に移動します。
      2. Search ページで Resources をクリックし、Select Resource ボックスを開き、KafkaTopic を入力します。
      3. KafkaTopics リストから確認するトピックの名前をクリックします (例: inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。
      4. Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
    • ターミナルウィンドウから以下を実行します。

      1. 以下のコマンドを入力します。

        oc get kafkatopics

        このコマンドは、以下の出力のようなステータス情報を返します。

        例8.4 KafkaTopic リソースのステータス

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-postgresql--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
  3. トピックの内容を確認します。

    • ターミナルウィンドウから、以下のコマンドを入力します。
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    以下に例を示します。

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-postgresql.inventory.products_on_hand

    トピック名を指定する形式は、手順 1 で返された oc describe コマンドと同じです (例: inventory-connector-postgresql.inventory.addresses)。

    トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。

    例8.5 Debezium 変更イベントの内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.3.4.Final-redhat-00001","connector":"postgresql","name":"inventory-connector-postgresql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"postgresql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}

    上記の例では、payload 値は、コネクタースナップショットがテーブル inventory.products_on_hand から読み込み ("op" ="r") イベントを生成したことを示しています。product_id レコードの "before" 状態は null であり、レコードに以前の値が存在しないことを示しています。"after" 状態は、product_id 101 を持つ項目の quantity3 であることを示しています。

8.6.5. Debezium PostgreSQL コネクター設定プロパティーの説明

Debezium PostgreSQL コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。

以下の設定プロパティーは、デフォルト値がない場合は必須です。

表8.27 必要なコネクター設定プロパティー
プロパティーデフォルト説明

name

デフォルトなし

コネクターの一意名。同じ名前で再登録を試みると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。

connector.class

デフォルトなし

コネクターの Java クラスの名前。PostgreSQL コネクターには、常に io.debezium.connector.postgresql.PostgresConnector の値を使用してください。

tasks.max

1

このコネクターのために作成する必要のあるタスクの最大数。PostgreSQL コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。

plugin.name

decoderbufs

PostgreSQL サーバーにインストールされている PostgreSQL 論理デコードプラグイン の名前。

サポートされている値は pgoutput のみです。pgoutput には plugin.name を明示的に設定する必要があります。

slot.name

debezium

特定のデータベース/スキーマの特定のプラグインから変更をストリーミングするために作成された PostgreSQL 論理デコードスロットの名前。サーバーはこのスロットを使用して、設定する Debezium コネクターにイベントをストリーミングします。

スロット名は PostgreSQL レプリケーションスロットの命名ルール に準拠する必要があり、命名ルールには "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character." と記載されています。

slot.drop.on.stop

false

コネクターが正常に想定されるように停止した場合に論理レプリケーションスロットを削除するかどうか。デフォルトの動作では、コネクターが停止したときにレプリケーションスロットはコネクターに設定された状態を保持します。コネクターが再起動すると、同じレプリケーションスロットがあるため、コネクターは停止した場所から処理を開始できます。

テストまたは開発環境でのみ true に設定します。スロットを削除すると、データベースは WAL セグメントを破棄できます。コネクターが再起動すると、新しいスナップショットが実行されるか、Kafka Connect オフセットトピックの永続オフセットから続行できます。

publication.name

dbz_publication

pgoutput の使用時に変更をストリーミングするために作成される PostgreSQL パブリケーションの名前。

このパブリケーションが存在しない場合は起動時に作成され、すべてのテーブルが含まれます。Debezium は、設定されている場合は、独自の include/exclude リストフィルターを適用し、対象となる特定のテーブルのイベントのみをパブリケーションが変更するように制限します。コネクターユーザーがこのパブリケーションを作成するには、スーパーユーザーの権限が必要であるため、通常はコネクターを初めて開始する前にパブリケーションを作成することを推奨します。

パブリケーションがすでに存在し、すべてのテーブルが含まれてているか、テーブルのサブセットで設定されている場合、Debezium は定義されているようにパブリケーションを使用します。

database.hostname

デフォルトなし

PostgreSQL データベースサーバーの IP アドレスまたはホスト名。

database.port

5432

PostgreSQL データベースサーバーのポート番号 (整数)。

database.user

デフォルトなし

PostgreSQL データベースサーバーに接続するための PostgreSQL データベースユーザーの名前。

database.password

デフォルトなし

PostgreSQL データベースサーバーへの接続時に使用するパスワード。

database.dbname

デフォルトなし

変更をストリーミングする PostgreSQL データベースの名前。

topic.prefix

デフォルトなし

Debezium が変更をキャプチャーする特定の PostgreSQL データベースサーバーまたはクラスターの名前空間を提供するトピック接頭辞。接頭辞は、他のコネクター全体で一意となる必要があります。これは、このコネクターからレコードを受信するすべての Kafka トピックのトピック名接頭辞として使用されるためです。データベースサーバーの論理名には英数字とハイフン、ドット、アンダースコアのみを使用する必要があります。

警告

このプロパティーの値を変更しないでください。名前の値を変更すると、再起動後に、元のトピックにイベントを発行し続けるのではなく、新しい値に基づいた名前のトピックに後続のイベントを発行します。

schema.include.list

デフォルトなし

変更をキャプチャーする対象とするスキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。schema.include.list に含まれていないスキーマ名は、変更をキャプチャーする対象から除外されます。デフォルトでは、システム以外のスキーマはすべて変更がキャプチャーされます。

スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの ID 全体と照合されます。
このプロパティーを設定に含める場合は、schema.exclude.list プロパティーも設定しないでください。

schema.exclude.list

デフォルトなし

変更をキャプチャーする対象としないスキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。システムスキーマ以外で、schema.exclude.list に名前が含まれていないスキーマの変更がキャプチャーされます。

スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの ID 全体と照合されます。
このプロパティーを設定に含める場合は、schema.include.list プロパティーを設定しないでください。

table.include.list

デフォルトなし

変更をキャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。このプロパティーが設定されている場合、コネクターは指定されたテーブルからのみ変更をキャプチャします。各識別子の形式は schemaName.tableName です。デフォルトでは、コネクターは変更がキャプチャーされる各スキーマのシステムでないすべてのテーブルの変更をキャプチャーします。

テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの ID 全体と照合されます。
このプロパティーを設定に含める場合は、table.exclude.list プロパティーも設定しないでください。

table.exclude.list

デフォルトなし

変更をキャプチャーしないテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。各識別子の形式は schemaName.tableName です。このプロパティーが設定されている場合、コネクターは、指定のないすべてのテーブルから変更をキャプチャーします。

テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの ID 全体と照合されます。
このプロパティーを設定に含める場合は、table.include.list プロパティーも設定しないでください。

column.include.list

デフォルトなし

変更イベントレコード値に含まれる必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。

列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、列の名前文字列全体を一致させるために式が使用され、列名に存在する可能性のある部分文字列とは一致しません。
このプロパティーを設定に含める場合は、column.exclude.list プロパティーを設定しないでください。

column.exclude.list

デフォルトなし

変更イベントレコード値から除外される必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。

列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、列の名前文字列全体を一致させるために式が使用され、列名に存在する可能性のある部分文字列とは一致しません。
このプロパティーを設定に含める場合は、column.include.list プロパティーを設定しないでください。

skip.messages.without.change

false

含まれる列に変更がない場合にメッセージの公開をスキップするかどうかを指定します。これは基本的に、含まれる列に変更がない場合、column.include.list プロパティーまたは column.exclude.list プロパティーに従ってメッセージをフィルタリングします。

注: テーブルの REPLICA IDENTITY が FULL に設定されている場合にのみ機能します。

time.precision.mode

adaptive

時間、日付、およびタイムスタンプは、異なる精度の種類で表すことができます。

adaptive は、データベース列の型を基にして、ミリ秒、マイクロ秒、またはナノ秒の精度値のいずれかを使用して、データベースの値と全く同じように時間およびタイムスタンプ値をキャプチャーします。

adaptive_time_microseconds は、データベース列の型を基にして、ミリ秒、マイクロ秒、またはナノ秒の精度値のいずれかを使用して、データベースの値と全く同じように日付、日時、およびタイムスタンプ値をキャプチャーします。例外は TIME 型フィールドで、これは常にマイクロ秒としてキャプチャーされます。

connectTimeDate および Timestamp の組み込み表現を使用して、常に時間とタイムスタンプ値を表します。この組み込み表現は、データベース列の精度に関わらず、ミリ秒の精度を使用します詳細は、一時的な値 を参照してください。

decimal.handling.mode

precise

コネクターによる DECIMAL および NUMERIC 列の値の処理方法を指定します。

precise はバイナリー形式で変更イベントに表される java.math.BigDecimal 値を使用して正確に表します。

doubledouble値を使用して表します。精度が失われる可能性はありますが、簡単に使用できます。

string は値をフォーマットされた文字列としてエンコードします。簡単に使用できますが、本来の型に関するセマンティック情報は失われます。詳細は、10 進数の型 を参照してください。

hstore.handling.mode

map

コネクターによる hstore 列の値の処理方法を指定します。

mapMAP を使用して値を表します。

jsonjson string を使用して値を表します。この設定では、値は {"key" : "val"} などのフォーマットされた文字列としてエンコードされます。詳細は、PostgreSQL HSTORE タイプ を参照してください。

interval.handling.mode

numeric



numericは、マイクロ秒単位の概算値で間隔を表します。

string は、P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S の文字列パターン表現を使用して間隔を正確に表します。例: P1Y2M3DT4H5M6.78S。詳細は、PostgreSQL の基本タイプ を参照してください。

database.sslmode

prefer

PostgreSQL サーバーへの暗号化された接続を使用するかどうか。オプションには以下が含まれます。

disable は、暗号化されていない接続を使用します。

allow は、最初に暗号化されていない接続の使用が試みられ、失敗した場合は (暗号化された) 安全な接続が使用されます。

prefer は、最初に (暗号化された) 安全な接続の使用を試行し、失敗した場合は暗号化されていない接続を使用します。

require は、(暗号化された) 安全な接続を使用し、確立できない場合は失敗します。

verify-carequire と同様に動作しますが、サーバー TLS 証明書を設定された認証局 (CA) 証明書と照合して検証するか、有効な一致する CA 証明書が見つからない場合は失敗します。

verify-fullverify-ca と同様に動作しますが、サーバー証明書がコネクターが接続しようとしているホストと一致することも検証します。詳細は the PostgreSQL のドキュメント を参照してください。

database.sslcert

デフォルトなし

クライアントの SSL 証明書が含まれるファイルへのパス。詳細は the PostgreSQL のドキュメント を参照してください。

database.sslkey

デフォルトなし

クライアントの SSL 秘密鍵が含まれるファイルへのパス。詳細は the PostgreSQL のドキュメント を参照してください。

database.sslpassword

デフォルトなし

database.sslkey で指定されたファイルからクライアントの秘密鍵にアクセスするためのパスワード。詳細は the PostgreSQL のドキュメント を参照してください。

database.sslrootcert

デフォルトなし

サーバーが検証されるルート証明書が含まれるファイルへのパス。詳細は the PostgreSQL のドキュメント を参照してください。

database.tcpKeepAlive

true

TCP keep-alive プローブを有効にして、データベース接続がまだ有効であることを確認します。詳細は the PostgreSQL のドキュメント を参照してください。

tombstones.on.delete

true

削除 イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。

true: 削除操作は、削除 イベントと後続の破棄 (tombstone) イベントで表されます。

false - delete イベントのみが出力されます。

log compaction がトピックで有効になっている場合には、ソースレコードの削除後に廃棄 (tombstone) イベントを出力すると (デフォルト動作)、Kafka は削除された行のキーに関連するすべてのイベントを完全に削除できます。

column.truncate.to.length.chars

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。プロパティー名の 長さ で指定された文字数を超えた場合に、一連の列のデータを切り捨てる場合は、このプロパティーを設定します。length を正の整数値に設定します (例: column.truncate.to.20.chars)

列の完全修飾名は、<schemaName>.<tableName>.<columnName> の形式に従います。列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。

単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。

column.mask.with.length.chars

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。一連の列の値をコネクターでマスクする場合 (たとえば、列に機密データが含まれている場合) は、このプロパティーを設定します。length を正の整数に設定して、指定された列のデータをプロパティー名の 長さ で指定されたアスタリスク (*) 文字数で置き換えます。指定した列のデータを空の文字列に置き換えるには、長さ0 (ゼロ) に設定します。

列の完全修飾名は、次の形式に従います: schemaName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。

単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。

column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は <schemaName>.<tableName>.<columnName> です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。作成された変更イベントレコードでは、指定された列の値は仮名に置き換えられます。

仮名は、指定された hashAlgorithmsalt を適用すると得られるハッシュ化された値で構成されます。使用されるハッシュ関数に基づいて、参照整合性は保持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest セクションに説明されています。

以下の例では、CzQMA0cB5K が無作為に選択された salt になります。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。

使用される hashAlgorithm、選択された salt、および実際のデータセットによっては、結果のデータセットが完全にマスクされない場合があります。

値が異なる場所やシステムでハッシュ化されている場合は、ハッシュ化ストラテジーバージョン 2 を使用する必要があります。

column.propagate.source.type

該当なし

列のメタデータを表す追加パラメーターをコネクターに発行させたい列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。このプロパティーが設定されている場合、コネクターは次のフィールドをイベントレコードのスキーマに追加します。

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
コネクターがこの余分なデータを発行できるようにすると、シンクデータベース内の特定の数値または文字ベースの列のサイズを適切に設定するのに役立ちます。

列の完全修飾名は、次のいずれかの形式に従います: databaseName.tableName.columnName、または databaseName.schemaName.tableName.columnName.
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。

datatype.propagate.source.type

該当なし

データベース内の列に対して定義されているデータ型の完全修飾名を指定する正規表現のオプションのコンマ区切りリスト。このプロパティーが設定されている場合、データ型が一致する列に対して、コネクターはスキーマに次の追加フィールドを含むイベントレコードを発行します。

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
コネクターがこの余分なデータを発行できるようにすると、シンクデータベース内の特定の数値または文字ベースの列のサイズを適切に設定するのに役立ちます。

列の完全修飾名の形式は、databaseName.tableName.typeName、または databaseName.schemaName.tableName.typeName のいずれかになります。
データ型の名前を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、データ型の名前文字列全体に対して照合されます。式は、型名に存在する可能性のある部分文字列と一致しません。

PostgreSQL 固有のデータ型名の一覧は、PostgreSQL データ型マッピング を参照してください。

message.key.columns

空の文字列

指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。

デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。

テーブルにカスタムメッセージキーを設定するには、テーブルを列挙した後、メッセージキーとして使用する列を列挙します。各リストエントリーは、

<fully-qualified_tableName>:<keyColumn>,<keyColumn>

の形式を取ります。複数の列名をベースにテーブルキーを作成するには、列名の間にコンマを挿入します。

各完全修飾テーブル名は、

<schemaName>.<tableName> の形式の正規表現です。

プロパティーには複数のテーブルのエントリーを含めることができます。セミコロンを使用して、リスト内のテーブルエントリーを区切ります。

以下の例は、テーブル inventory.customers および purchase.orders:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

のメッセージキーを設定します。テーブル inventory.customer の場合、列 pk1pk2 がメッセージキーとして指定されます。どのスキーマのpurchaseorders テーブルでも、pk3pk4 の列がメッセージキーとして使用されます。

カスタムメッセージキーの作成に使用する列の数に制限はありません。ただし、一意の鍵を指定するために必要な最小数を使用することが推奨されます。

テーブルでこのプロパティーを設定し、REPLICA IDENTITYDEFAULT に設定すると、キー列がテーブルのプライマリーキーの一部でない場合に、tombstone イベントが適切に作成されないことに注意してください。
REPLICA IDENTITYFULL に設定することが唯一の解決策です。

publication.autocreate.mode

all_tables

pgoutput プラグインを使用して変更をストリーミングする場合にのみ適用されます。この設定は、パブリケーション の作成がどのように機能するかを決定します。次の値のいずれかを指定します。

all_tables - コネクターは、パブリケーションが存在する場合に使用します。パブリケーションが存在しない場合は、コネクターが変更をキャプチャーするデータベースのすべてのテーブルに対してパブリケーションを作成します。コネクターがパブリケーションを作成するには、パブリケーションの作成およびレプリケーションの実行権限を持つデータベースユーザーアカウントを使用してデータベースにアクセスする必要があります。次の SQL コマンド CREATE PUBLICATION <publication_name> FOR ALL TABLES を使用して、必要な権限を付与します。

disabled - コネクターはパブリケーションの作成を試みません。レプリケーションを実行するよう設定されたデータベース管理者またはユーザーは、コネクターを実行する前にパブリケーションを作成する必要があります。コネクターがパブリケーションを見つけられない場合は、コネクターは例外を出力し、停止します。

filtered: パブリケーションが存在する場合、コネクターはそれを使用します。パブリケーションが存在しない場合は、schema.include.listschema.exclude.listtable.include.listtable.exclude.list の各コネクター設定プロパティーで指定された現在のフィルター設定に一致するテーブルの新しいパブリケーションが作成されます。例: CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>。パブリケーションが存在する場合、コネクターは現在のフィルター設定に一致するテーブルのパブリケーションを更新します。例: ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>

replica.identity.autoset.values

空の文字列

この設定は、テーブルレベルで レプリカ ID の値を決定します。

このオプションは、データベース内の既存の値を上書きします。テーブルで使用されるレプリカ ID 値と、完全修飾テーブルとを一致させる正規表現のコンマ区切りのリスト。

各式は、'<fully-qualified table name>:<replica identity>' というパターンと一致する必要があります。ここで、テーブル名は (SCHEMA_NAME.TABLE_NAME) として定義します。また、レプリカ ID の値は

DEFAULT で、プライマリーキーの列の古い値 (存在する場合) を記録します。これは、システム以外のテーブルのデフォルトです。

INDEX index_name - 名前付きインデックスの対象となる列の古い値を記録します。これは一意である必要があります。また、部分的ではなく、延期できず、NOT NULL とマークされている必要があります。このインデックスが削除された場合、動作は NOTHING と同じになります。

FULL - 行内のすべての列の古い値を記録します。

NOTHING - 古い行に関する情報を記録しません。これは、システムテーブルのデフォルトです。

以下に例を示します。

schema1.*:FULL,schema2.table2:NOTHING,schema2.table3:INDEX idx_name

binary.handling.mode

bytes

変更イベントでのバイナリー (bytea) 列の表現方法を指定します。

bytes はバイナリーデータをバイト配列として表します。

base64 は、base64 でエンコードされた文字列としてバイナリーデータを表します。

base64-url-safe は、base64-url-safe でエンコードした文字列としてバイナリーデータを表します。

hex は、バイナリーデータを 16 進数でエンコードした (base16) 文字列として表します。

schema.name.adjustment.mode

none

コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。設定可能:

  • none は、調整を適用しません。
  • Avro は Avro タイプ名で使用できない文字をアンダースコアに置き換えます。
  • avro_unicode は、Avro タイプ名で使用できないアンダースコアまたは文字を、_uxxxx などの対応する Unicode に置き換えます。注: _ は Java のバックスラッシュなどのエスケープシーケンスです。

field.name.adjustment.mode

none

コネクターで使用されるメッセージコンバータとの互換性のために、フィールド名をどのように調整するかを指定します。設定可能:

  • none は、調整を適用しません。
  • Avro は Avro タイプ名で使用できない文字をアンダースコアに置き換えます。
  • avro_unicode は、Avro タイプ名で使用できないアンダースコアまたは文字を、_uxxxx などの対応する Unicode に置き換えます。注: _ は Java のバックスラッシュなどのエスケープシーケンスです。

詳しい情報は、Avro naming を参照してください。

money.fraction.digits

2

Postgres の money タイプを、変更イベントの値を表す java.math.BigDecimal に変換する際に、何桁の 10 進数を使用するかを指定します。decimal.handling.modeprecise に設定されている場合のみ適用されます。

message.prefix.include.list

デフォルトなし

コネクターでキャプチャーする論理デコードメッセージの接頭辞と一致するコンマ区切りの正規表現 (任意)。デフォルトでは、コネクターはすべての論理デコードメッセージをキャプチャーします。このプロパティーが設定されている場合、コネクターはプロパティーで指定された接頭辞が含まれる論理デコードメッセージのみをキャプチャーします。その他の論理デコードメッセージはすべて除外されます。

メッセージの接頭辞名を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、メッセージ接頭辞文字列全体と照合されます。式は、接頭辞に存在する可能性のある部分文字列と一致しません。

このプロパティーを設定に含める場合は、message.prefix.exclude.list プロパティーも設定しないでください。

メッセージ イベントの構造とその順序付けのセマンティクスについては、メッセージイベント を参照してください。

message.prefix.exclude.list

デフォルトなし

コネクターでキャプチャーしない論理デコードメッセージの接頭辞名と一致するコンマ区切りの正規表現 (任意)。このプロパティーが設定されている場合、コネクターは、指定された接頭部を使用する論理デコードメッセージをキャプチャーしません。その他のメッセージはすべてキャプチャーされます。
論理デコードメッセージをすべて除外するには、このプロパティーの値を .* に設定します。

メッセージの接頭辞名を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、メッセージ接頭辞文字列全体と照合されます。式は、接頭辞に存在する可能性のある部分文字列と一致しません。

このプロパティーを設定に含める場合は、message.prefix.include.list プロパティーも設定しないでください。

メッセージ イベントの構造とその順序付けのセマンティクスについては、メッセージイベント を参照してください。

以下の 高度な 設定プロパティーには、ほとんどの状況で機能するデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。

表8.28 高度なコネクター設定プロパティー
プロパティーデフォルト説明

converters

デフォルトなし

コネクターが使用できる カスタムコンバーター インスタンスのシンボリック名のコンマ区切りリストを列挙します。以下に例を示します。

isbn

コネクターがカスタムコンバーターを使用できるようにするには、converters タプロパティーを設定する必要があります。

コネクターに設定するコンバーターごとに、コンバーターインターフェイスを実装するクラスの完全修飾名を指定する .type プロパティーも追加する必要があります。.type プロパティーでは、以下の形式を使用します。

<converterSymbolicName>.type

以下に例を示します。

isbn.type: io.debezium.test.IsbnConverter

設定されたコンバータの動作をさらに制御したい場合は、1 つ以上の設定パラメーターを追加して、コンバータに値を渡すことができます。追加の設定パラメーターとコンバーターを関連付けるには、パラメーター名の前にコンバーターのシンボリック名を付けます。
以下に例を示します。

isbn.schema.name: io.debezium.postgresql.type.Isbn

snapshot.mode

Initial

コネクターの起動時にスナップショットを実行する基準を指定します。

initial - コネクターは、論理サーバー名に対してオフセットが記録されていない場合のみスナップショットを実行します。

always - コネクターはコネクターが開始するたびにスナップショットを実行します。

never - コネクターはスナップショットを実行しません。このようにコネクターを設定したすると、起動時の動作は次のようになります。Kafka オフセットトピックに以前保存された LSN がある場合、コネクターはその位置から変更をストリーミングを続行します。保存された LSN がない場合、コネクターはサーバーで PostgreSQL の論理レプリケーションスロットが作成された時点で変更のストリーミングを開始します。never スナップショットモードは、対象のデータがすべて WAL に反映されたままであることが分かっている場合にのみ有効です。

initial_only: コネクターは最初のスナップショットを実行し、その後の変更を処理せずに停止します。

exported - deprecated


詳細は、snapshot.mode オプションのテーブル を参照してください。

snapshot.include.collection.list

table.include.listに指定したすべてのテーブル

スナップショットに含めるテーブルの完全修飾名 (<schemaName>.<tableName>) と一致する正規表現のコンマ区切りリスト (オプション) です。指定する項目は、コネクターの table.include.list プロパティーで名前を付ける必要があります。このプロパティーは、コネクターの snapshot.mode プロパティーが never 以外の値に設定されている場合にのみ有効です。
このプロパティーは増分スナップショットの動作には影響しません。

テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。

snapshot.lock.timeout.ms

10000

スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する正の整数値。コネクターがこの期間にテーブルロックを取得できないと、スナップショットは失敗します。詳細は、コネクターによるスナップショットの実行方法 を参照してください。

snapshot.select.statement.overrides

デフォルトなし

スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。

プロパティーには、<schemaName>.<tableName> の形式で完全修飾テーブル名のコンマ区切りリストが含まれます。たとえば、

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

をリスト内の各テーブルに対して、スナップショットを作成する場合には、その他の設定プロパティーを追加して、コネクターがテーブルで実行するように SELECT ステートメントを指定します。指定した SELECT ステートメントは、スナップショットに追加するテーブル行のサブセットを決定します。以下の形式を使用して、この SELECT ステートメントプロパティーの名前 (

snapshot.select.statement.overrides.<schemaName>.<tableName>) を指定します。例: snapshot.select.statement.overrides.customers.orders.

以下に例を示します。

スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 (delete_flag ) を含む customers.orders テーブルから、以下のプロパティーを追加します。

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"

作成されるスナップショットでは、コネクターには delete_flag = 0 のレコードのみが含まれます。

event.processing.failure.handling.mode

fail

イベントの処理中にコネクターが例外に反応する方法を指定します。

fail は例外を伝播し、問題のあるイベントのオフセットを示し、コネクターを停止させます。

warn は問題のあるイベントのオフセットをログに記録し、そのイベントを省略し、処理を継続します。

skip は問題のあるイベントを省略し、処理を継続します。

max.batch.size

2048

コネクターが処理するイベントの各バッチの最大サイズを指定する正の整数値。

max.queue.size

8192

ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。max.queue.size の値を、max.batch.size の値よりも大きくなるように設定します。

max.queue.size.in.bytes

0

ブロッキングキューの最大容量をバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。
max.queue.size も設定されている場合、キューのサイズがどちらかのプロパティーで指定された上限に達すると、キューへの書き込みがブロックされます。たとえば、max.queue.size=1000max.queue.size.in.bytes=5000 と設定した場合、キューに 1000 レコードが入った後、あるいはキュー内のレコードの量が 5000 バイトに達した後、キューへの書き込みがブロックされます。

poll.interval.ms

500

コネクターがイベントのバッチの処理を開始する前に、新しい変更イベントの発生を待つ期間をミリ秒単位で指定する正の整数値。デフォルトは 500 ミリ秒です。

include.unknown.datatypes

false

コネクターがデータタイプが不明なフィールドを見つけたときのコネクターの動作を指定します。コネクターが変更イベントからフィールドを省略し、警告をログに記録するのがデフォルトの動作です。

変更イベントにフィールドの不透明なバイナリー表現を含める場合は、このプロパティーを true に設定します。これにより、コンシューマーはフィールドをデコードできます。binary handling mode プロパティーを設定すると、正確な表現を制御できます。

注記

include.unknown.datatypestrue に設定されていると、コンシューマーは後方互換性の問題を抱えることになります。リリース間でデータベース固有のバイナリー表現の変更があるだけでなく、最終的にデータ型が Debezium によってサポートされる場合、データ型は論理型でダウンストリームに送信され、コンシューマーによる調整が必要になります。通常、サポートされていないデータ型が検出された場合は、機能リクエストを作成して、サポートを追加できるようにします。

database.initial.statements

デフォルトなし

データベースへの JDBC 接続を確立するときにコネクターが実行する SQL ステートメントのセミコロン区切りリスト。セミコロンを区切り文字としてではなく、文字として使用する場合は、2 つの連続したセミコロン ;; を指定します。

コネクターは JDBC 接続を独自の判断で確立する可能性があります。そのため、このプロパティーはセッションパラメーターのみの設定に便利です。また、DML ステートメントの実行には適していません。

トランザクションログを読み取るコネクションを作成する場合、コネクターはこれらのステートメントを実行しません。

status.update.interval.ms

10000

レプリケーションの接続状態をサーバーに送信する頻度をミリ秒単位で指定します。
また、このプロパティーは、データベースがシャットダウンされた場合にデッドコネクションを検出するために、データベースの状態をチェックする頻度を制御します。

heartbeat.interval.ms

0

コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルトの動作では、コネクターはハートビートメッセージを送信しません。

ハートビートメッセージは、コネクターがデータベースから変更イベントを受信しているかどうかを監視するのに便利です。ハートビートメッセージは、コネクターの再起動時に再送信する必要がある変更イベントの数を減らすのに役立つ可能性があります。ハートビートメッセージを送信するには、このプロパティーを、ハートビートメッセージの間隔をミリ秒単位で示す正の整数に設定します。

追跡されるデータベースに多くの更新がある場合にハートビートメッセージが必要になりますが、一部の更新のみがコネクターの変更をキャプチャーするテーブルおよびスキーマに関連します。この場合、コネクターは通常どおりにデータベーストランザクションログから読み取りしますが、変更レコードを Kafka に出力することはほとんどありません。つまり、オフセットの更新は Kafka にコミットされず、コネクターには最新の LSN をデータベースに送信する機会はありません。データベースは、コネクターによってすでに処理されたイベントが含まれる WAL ファイルを保持します。ハートビートメッセージを送信すると、コネクターは最新の取得された LSN をデータベースに送信できます。これにより、データベースは不必要になった WAL ファイルによって使用されるディスク領域を解放できます。

heartbeat.action.query

デフォルトなし

コネクターがハートビートメッセージを送信するときにコネクターがソースデータベースで実行するクエリーを指定します。

これは、WAL ディスク領域の消費 で説明されている状況を解決するのに役立ちます。この状況では、トラフィック量の多いデータベースと同じホスト上のトラフィック量の少ないデータベースから変更をキャプチャーすると、Debezium が WAL レコードを処理できなくなり、データベースで WAL の位置を認識できなくなります。この状況に対処するには、トラフィックの少ないデータベースでハートビートテーブルを作成し、このプロパティーをそのテーブルにレコードを挿入するステートメントに設定します (例:

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

)。これにより、コネクターはトラフィックの少ないデータベースから変更を受信し、LSN を受け入れでき、データベースホストでバインドされていない WAL が増加しないようにします。

schema.refresh.mode

columns_diff

テーブルのインメモリースキーマの更新をトリガーする条件を指定します。

columns_diff は最も安全なモードです。インメモリースキーマがデータベーステーブルの水ーまと常に同期されるようにします。

columns_diff_exclude_unchanged_toast は、未変更の TOASTable データのみが不一致の原因である場合を除き、受信メッセージから派生するスキーマに不一致があれば、インメモリースキーマキャッシュを更新するようコネクターに指示します。

この設定は、ほとんど更新の対象とならない TOASTed データが頻繁に更新されるテーブルがある場合に、コネクターのパフォーマンスを大幅に向上できます。ただし、TOASTable 列がテーブルから削除されると、インメモリースキーマが古い状態になる可能性があります。

snapshot.delay.ms

デフォルトなし

コネクターの起動時にスナップショットを実行するまでコネクターが待つ必要がある間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。

snapshot.fetch.size

10240

スナップショットの実行中、コネクターは行のバッチでテーブルの内容を読み取ります。このプロパティーは、バッチの行の最大数を指定します。

slot.stream.params

デフォルトなし

設定された論理デコードプラグインに渡すパラメーターのセミコロン区切りリスト。たとえば、add-tables=public.table,public.table2;include-lsn=true のようにします。

slot.max.retries

6

レプリケーションスロットへの接続に失敗した場合に、連続して接続を試行する最大回数です。

slot.retry.delay.ms

10000 (10 秒)

コネクターがレプリケーションスロットへの接続に失敗した場合に再試行を行う間隔 (ミリ秒単位)。

unavailable.value.placeholder

__debezium_unavailable_value

コネクターが提供する定数を指定して、元の値がデータベースによって提供されていない Toast 化された値であることを示します。unavailable.value.placeholder の設定が hex: 接頭辞で始まる場合は、残りの文字列が 16 進数でエンコードされたオクテットを表すことが想定されます。詳細は、toast 化された値 を参照してください。

provide.transaction.metadata

false

コネクターがトランザクション境界でイベントを生成し、トランザクションメタデータで変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合は true を指定します。詳細は、トランザクションメタデータ を参照してください。

flush.lsn.source

true

WAL ログを削除できるように、コネクターがソース postgres データベースに、処理されたレコードの LSN をコミットする必要があるかどうかを決定します。コネクターでこれを行わない場合は false を指定します。false に設定すると、LSN が Debezium によって認識されず、結果として WAL ログが消去されないので、ディスク容量の問題が発生する可能性があることに注意してください。ユーザーは、Debezium 外で LSN を確認する必要があります。

retriable.restart.connector.wait.ms

10000 (10 秒)

再試行可能なエラーが発生した後にコネクターを再起動するまで待機する時間 (ミリ秒単位)。

skipped.operations

t

ストリーミング中にスキップされる操作タイプのコンマ区切りリスト。挿入/作成は c、更新は u、削除は d、切り捨ては t、操作をスキップしない場合は none と なります。デフォルトでは、切り捨て操作が省略されます。

signal.data.collection

デフォルト値なし

シグナルをコネクターへの送信に使用されるデータコレクションの完全修飾名。
コレクション名の指定には、
<schemaName>.<tableName>
の形式を使用します。

signal.enabled.channels

source

コネクターに対して有効な信号チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。

  • source
  • kafka
  • file
  • jmx

notification.enabled.channels

デフォルトなし

コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。

  • sink
  • log
  • jmx

incremental.snapshot.chunk.size

1024

増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。

xmin.fetch.interval.ms

0

レプリケーションスロットから XMIN が読み込まれる頻度 (ミリ秒単位)。XMIN 値は、新しいレプリケーションスロットの開始位置の下限を示す。デフォルト値の 0 は、XMIN の追跡を無効にします。

topic.naming.strategy

io.debezium.schema.SchemaTopicNamingStrategy

データ変更、スキーマ変更、トランザクション、ハートビートイベントなどのトピック名を決定するために使用する TopicNamingStrategy クラスの名前。デフォルトは SchemaTopicNamingStrategy

topic.delimiter

.

トピック名の区切り文字を指定します。デフォルトは . です。

topic.cache.size

10000

トピック名を保持するために使用されるサイズ (bounded concurrent hash map)。このキャッシュは、与えられたデータコレクションに対応するトピック名を決定するのに役立つ。

topic.heartbeat.prefix

__debezium-heartbeat

コネクターがハートビートメッセージを送信するトピックの名前を制御します。トピック名のパターンは、

topic.heartbeat.prefix.topic.prefix

です。たとえば、トピックの接頭辞が fulfillment の場合は、デフォルトのトピック名は __debezium-heartbeat.fulfillment になります。

topic.transaction

transaction

コネクターがトランザクションのメタデータメッセージを送信するトピックの名前を制御します。トピック名のパターンは、

topic.prefix.topic.transaction

です。たとえば、トピックの接頭辞が fulfillment の場合、デフォルトのトピック名は fulfillment.transaction になります。

snapshot.max.threads

1

初期スナップショットを実行するときにコネクターが使用するスレッドの数を指定します。並列初期スナップショットを有効にするには、プロパティーを 1 より大きい値に設定します。並列初期スナップショットでは、コネクターは複数のテーブルを同時に処理します。

重要

並列初期スナップショットはテクノロジープレビュー機能のみとなっています。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

errors.max.retries

-1

再試行可能なエラー (接続エラーなど) が失敗するまでの最大再試行回数 (-1 = 制限なし、0 = 無効、> 0 = 再試行回数)。

パススルーコネクター設定プロパティー

コネクターは、Kafka プロデューサーおよびコンシューマーの作成時に使用される パススルー 設定プロパティーもサポートします。

Kafka プロデューサーおよびコンシューマーのすべての設定プロパティーについては、必ず Kafka ドキュメント を参照してください。PostgreSQL コネクターは 新しいコンシューマー設定プロパティー を使用します。

Debezium コネクター Kafka は設定プロパティーをシグナル化します。

Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.* プロパティーのセットを提供します。

以下の表は、Kafka signal プロパティーを説明しています。

表8.29 Kafka のシグナル設定プロパティー
プロパティーデフォルト説明

signal.kafka.topic

<topic.prefix>-signal

コネクターがアドホックシグナルについて監視する Kafka トピックの名前。

注記

トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナルトピックが必要です。シグナリングトピックには単一のパーティションが必要です。

signal.kafka.groupId

kafka-signal

Kafka コンシューマーによって使用されるグループ ID の名前。

signal.kafka.bootstrap.servers

デフォルトなし

Kafka クラスターへの最初の接続を確立するためにコネクターが使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。

signal.kafka.poll.timeout.ms

100

コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。

Debezium コネクターのパススルーは Kafka コンシューマークライアント設定プロパティーを示唆します。

Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.* で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL などのプロパティーを Kafka コンシューマーに渡します。

Debezium は、プロパティーを Kafka シグナルコンシューマーに渡す前に、プロパティーから接頭辞を削除します。

Debezium コネクターの sink 通知設定プロパティー

以下の表は、notification プロパティーを説明しています。

表8.30 Sink 通知設定プロパティー
プロパティーデフォルト説明

notification.sink.topic.name

デフォルトなし

Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして sink を含めるように notification.enabled.channels プロパティーを設定する場合に必要です。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.