5.5. Debezium MySQL コネクターのデプロイメント


以下の方法のいずれかを使用して Debezium MySQL コネクターをデプロイできます。

5.5.1. AMQ Streams を使用した MySQL コネクターデプロイメント

Debezium 1.7 以降、Debezium コネクターのデプロイに推奨される方法は、AMQ Streams を使用してコネクタープラグインが含まれる Kafka Connect コンテナーイメージをビルドすることです。

デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。

  • Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある KafkaConnect CR。
  • コネクターがソースデータベースにアクセスするために使用する情報を提供する KafkaConnector CR。AMQStreams が Kafka Connect Pod を開始し、KafkaConnector CR を適用してコネクターを開始します。

Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。AMQ Streams が Kafka Connect イメージをビルドすると、指定のアーティファクトをダウンロードし、イメージに組み込みます。

Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnectコンテナーイメージを格納する場所を指定します。コンテナーイメージは Docker レジストリーまたは OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。イメージストリームは自動的に作成されません。

注記

KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。

5.5.2. AMQ Streams を使用した Debezium MySQL コネクターのデプロイ

以前のバージョンの AMQ Streams では、OpenShift に Debezium コネクターをデプロイするには、最初にコネクター用の Kafka Connect イメージをビルドする必要がありました。コネクターを OpenShift にデプロイする場合に現在推奨される方法は、AMQ Streams でビルド設定を使用して、使用する Debezium コネクタープラグインが含まれる Kafka Connect コンテナーイメージを自動的にビルドすることです。

ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。新規に作成されたコンテナーは .spec.build.output で指定されたコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドしたら、KafkaConnector カスタムリソースを作成し、ビルドに含まれるコネクターを起動します。

前提条件

  • クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
  • AMQ Streams Operator が稼働している。
  • Kafka クラスターは、Apache Open Shift での AMQ ストリームのデプロイとアップグレードに記載されているようにデプロイされます。
  • Red Hat Integration ライセンスがある。
  • Kafka Connect is deployed on AMQ Streams
  • OpenShift oc CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。
  • Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。

    ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
    • レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
    ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下が必要です。
    • ImageStream リソースがクラスターにデプロイされている。クラスターの ImageStream を明示的に作成している。ImageStreams はデフォルトでは利用できません。

手順

  1. OpenShift クラスターにログインします。
  2. コネクターの Debezium KafkaConnect カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、以下の例のように metadata.annotations および spec.build プロパティーを指定する KafkaConnect CR を作成します。dbz-connect.yaml などの名前でファイルを保存します。

    例5.1 Debezium コネクターを含む KafkaConnect カスタムリソースを定義する dbz-connect.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 
    1
    
    spec:
      version: 3.00
      build: 
    2
    
        output: 
    3
    
          type: imagestream  
    4
    
          image: debezium-streams-connect:latest
        plugins: 
    5
    
          - name: debezium-connector-mysql
            artifacts:
              - type: zip 
    6
    
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/1.7.2.Final-redhat-<build_number>/debezium-connector-mysql-1.7.2.Final-redhat-<build_number>-plugin.zip  
    7
    
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.0-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.0-redhat-<build-number>.zip
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/1.7.2.Final/debezium-scripting-1.7.2.Final.zip
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    Copy to Clipboard Toggle word wrap
    Expand
    表5.22 Kafka Connect 設定の説明
    項目説明

    1

    strimzi.io/use-connector-resources アノテーションを "true" に設定して、クラスター Operator が KafkaConnector リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。

    2

    spec.build 設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。

    3

    build.output は、新しくビルドされたイメージを保存するレジストリーを指定します。

    4

    イメージ出力の名前およびイメージ名を指定します。output.type の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合は docker、内部の OpenShift ImageStream にイメージをプッシュする場合は imagestream です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定で build.output の指定に関する詳細は、AMQ Streams Build スキーマ参照 のドキュメントを参照 してください。

    5

    plugins 設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグイン name と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。

    6

    artifacts.type の値は、artifacts.url で指定するアーティファクトのファイルタイプを指定します。有効なタイプは ziptgz、または jar です。Debezium コネクターアーカイブは、.zip ファイル形式で提供されます。JDBC ドライバーファイルは .jar 形式です。type の値は、url フィールドで参照されるファイルのタイプと一致する必要があります。

    7

    artifacts.url の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。

  3. 以下のコマンドを入力して、KafkaConnect ビルド仕様を OpenShift クラスターに適用します。

    oc create -f dbz-connect.yaml
    Copy to Clipboard Toggle word wrap

    Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
    ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。

  4. KafkaConnector リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
    たとえば、以下の KafkaConnector CR を作成し、mysql-inventory-connector.yaml として保存します。

    例5.2 Debezium コネクターの KafkaConnector カスタムリソースを定義する mysql-inventory-connector.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-mysql 
    1
    
    spec:
      class: io.debezium.connector.mysql.MySqlConnector 
    2
    
      tasksMax: 1  
    3
    
      config:  
    4
    
        database.history.kafka.bootstrap.servers: 'debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092'
        database.history.kafka.topic: schema-changes.inventory
        database.hostname: mysql.debezium-mysql.svc.cluster.local 
    5
    
        database.port: 3306   
    6
    
        database.user: debezium  
    7
    
        database.password: dbz  
    8
    
        database.dbname: mydatabase 
    9
    
        database.server.name: inventory_connector_mysql 
    10
    
        database.include.list: public.inventory  
    11
    Copy to Clipboard Toggle word wrap
    Expand
    表5.23 コネクター設定の説明
    項目説明

    1

    Kafka Connect クラスターに登録するコネクターの名前。

    2

    コネクタークラスの名前。

    3

    同時に動作できるタスクの数。

    4

    コネクターの設定。

    5

    ホストデータベースインスタンスのアドレス。

    6

    データベースインスタンスのポート番号。

    7

    Debezium がデータベースに接続するユーザーアカウントの名前。

    8

    データベースユーザーアカウントのパスワード

    9

    変更をキャプチャーするデータベースの名前。

    10

    データベースインスタンスまたはクラスターの論理名。
    指定の名前は英数字またはアンダースコアからのみ形成する必要があります。
    論理名は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
    コネクターを Avro コネクターと統合する場合、名前空間は関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。

    11

    コネクターが変更イベントをキャプチャーするテーブルのリスト。

  5. 以下のコマンドを実行してコネクターリソースを作成します。

    oc create -n <namespace> -f <kafkaConnector>.yaml
    Copy to Clipboard Toggle word wrap

    以下に例を示します。

    oc create -n debezium -f {context}-inventory-connector.yaml
    Copy to Clipboard Toggle word wrap

    コネクターは Kafka Connect クラスターに登録され、KafkaConnector CR の spec.config.database.dbname で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。

これで、Debezium MySQL のデプロイメントを確認 する準備が整いました。

Debezium MySQL コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。

  • Kafka Connect インスタンスを定義する KafkaConnect CR。image は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる operator およびイメージを提供します。
  • Debezium MySQL コネクターを定義する KafkaConnector CR。この CR を KafkaConnect CR を適用するのと同じ OpenShift インスタンスに適用します。

前提条件

  • MySQL が稼働し、Debezium コネクターと連携するように MySQL を設定する手順 が完了済みである必要があります。
  • AMQ Streams は OpenShift にデプロイされ、Apache Kafka および Kafka Connect が稼働している必要があります。詳細は、Deploying and Upgrading AMQ Streams on OpenShift を参照してください。
  • Podman または Docker がインストールされている。
  • Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (quay.iodocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。

手順

  1. Kafka Connect の Debezium MySQL コンテナーを作成します。

    1. Debezium MySQL コネクターアーカイブをダウンロードします。
    2. Debezium MySQL コネクターアーカイブをデプロイメントして、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。

      ./my-plugins/
      ├── debezium-connector-mysql
      │   ├── ...
      Copy to Clipboard Toggle word wrap
    3. registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0 をベースイメージとして使用して、新規の Dockerfile を作成します。たとえば、ターミナルウィンドウから以下のコマンドを入力します。my-plugins はプラグインディレクトリーの名前に置き換えます。

      cat <<EOF >debezium-container-for-mysql.yaml 
      1
      
      FROM registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0
      USER root:root
      COPY ./<my-plugins>/ /opt/kafka/plugins/ 
      2
      
      USER 1001
      EOF
      Copy to Clipboard Toggle word wrap
      1 1 1 1 1 1 1 1 1
      任意のファイル名を指定できます。
      2 2 2 2 2 2 2 2 2
      my-plugins は、プラグインディレクトリーの名前に置き換えます。

      このコマンドは、現在のディレクトリーに debezium-container-for-mysql.yaml という名前の Dockerfile を作成します。

    4. 前のステップで作成した debezium-container-for-mysql.yaml Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。

      podman build -t debezium-container-for-mysql:latest .
      Copy to Clipboard Toggle word wrap
      docker build -t debezium-container-for-mysql:latest .
      Copy to Clipboard Toggle word wrap

      上記のコマンドは、debezium-container-for-mysql という名前のコンテナーイメージを構築します。

    5. カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。

      podman push <myregistry.io>/debezium-container-for-mysql:latest
      Copy to Clipboard Toggle word wrap
      docker push <myregistry.io>/debezium-container-for-mysql:latest
      Copy to Clipboard Toggle word wrap
    6. 新しい Debezium MySQL KafkaConnect カスタムリソース (CR) を作成します。たとえば、以下の例のように annotations および image プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 
      1
      
      spec:
        #...
        image: debezium-container-for-mysql  
      2
      Copy to Clipboard Toggle word wrap
      1
      KafkaConnector リソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotations は Cluster Operator に示します。
      2
      spec.image は Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数がオーバーライドされます。
    7. 以下のコマンドを入力して、KafkaConnect CR を OpenShift Kafka Connect 環境に適用します。

      oc create -f dbz-connect.yaml
      Copy to Clipboard Toggle word wrap

      このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。

  2. Debezium MySQL コネクターインスタンスを設定する KafkaConnector カスタムリソースを作成します。

    通常、コネクター設定プロパティーを設定する .yaml ファイルに Debezium MySQL コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。

    以下の例では、ポート 3306 の MySQL ホスト (192.168.99.100) に接続し、inventory データベースへの変更をキャプチャーする Debezium コネクターを設定します。dbserver1 は、サーバーの論理名です。

    MySQL inventory-connector.yaml

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector  
    1
    
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1  
    2
    
        config:  
    3
    
          database.hostname: mysql  
    4
    
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054  
    5
    
          database.server.name: dbserver1 
    6
    
          database.include.list: inventory  
    7
    
          database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092  
    8
    
          database.history.kafka.topic: schema-changes.inventory  
    9
    Copy to Clipboard Toggle word wrap

    Expand
    表5.24 コネクター設定の説明
    項目説明

    1

    コネクターの名前。

    2

    一度に 1 つのタスクのみを実行します。MySQL コネクターは MySQL サーバーの binlog を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。

    3

    コネクターの設定。

    4

    データベースホスト。これは、MySQL サーバーを実行しているコンテナーの名前です (mysql)。

    5

    connector の一意 ID。

    6

    MySQL サーバーまたはクラスターの論理名。この名前は、変更イベントレコードを受信するすべての Kafka トピックの接頭辞として使用されます。

    7

    inventory データベースの変更のみがキャプチャーされます。

    8

    DDL ステートメントをデータベース履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。再起動時に、コネクターが読み取りを開始すべき時点で binlog に存在したデータベースのスキーマを復元します。

    9

    データベー履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。

  3. Kafka Connect でコネクターインスタンスを作成します。たとえば、KafkaConnector リソースを inventory-connector.yaml ファイルに保存した場合は、以下のコマンドを実行します。

    oc apply -f inventory-connector.yaml
    Copy to Clipboard Toggle word wrap

    上記のコマンドは inventory-connector を登録し、コネクターは KafkaConnector CR に定義されている inventory データベースに対して実行を開始します。

Debezium MySQL コネクターに設定できる設定プロパティーの完全リストは、MySQL コネクター設定プロパティーを参照してください。

結果

コネクターが起動すると、コネクターが設定された MySQL データベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。

5.5.4. Debezium MySQL コネクターが実行していることの確認

コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。

コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。

  • コネクターのステータスを確認します。
  • コネクターがトピックを生成していることを確認します。
  • 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。

前提条件

  • Debezium コネクターが AMQ Streams on OpenShift にデプロイされている。
  • OpenShift oc CLI クライアントがインストールされている。
  • OpenShift Container Platform Web コンソールにアクセスできる。

手順

  1. 以下の方法のいずれかを使用して KafkaConnector リソースのステータスを確認します。

    • OpenShift Container Platform Web コンソールから以下を実行します。

      1. Home Search に移動します。
      2. Search ページで Resources をクリックし、Select Resource ボックスを開き、KafkaConnector を入力します。
      3. KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-mysql)。
      4. Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
    • ターミナルウィンドウから以下を実行します。

      1. 以下のコマンドを入力します。

        oc describe KafkaConnector <connector-name> -n <project>
        Copy to Clipboard Toggle word wrap

        以下に例を示します。

        oc describe KafkaConnector inventory-connector-mysql -n debezium
        Copy to Clipboard Toggle word wrap

        このコマンドは、以下の出力のようなステータス情報を返します。

        例5.3 KafkaConnector リソースのステータス

        Name:         inventory-connector-mysql
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-mysql
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory_connector_mysql
            inventory_connector_mysql.inventory.addresses
            inventory_connector_mysql.inventory.customers
            inventory_connector_mysql.inventory.geom
            inventory_connector_mysql.inventory.orders
            inventory_connector_mysql.inventory.products
            inventory_connector_mysql.inventory.products_on_hand
        Events:  <none>
        Copy to Clipboard Toggle word wrap
  2. コネクターによって Kafka トピックが作成されたことを確認します。

    • OpenShift Container Platform Web コンソールから以下を実行します。

      1. Home Search に移動します。
      2. Search ページで Resources をクリックし、Select Resource ボックスを開き、KafkaTopic を入力します。
      3. KafkaTopics リストから確認するトピックの名前をクリックします (例: inventory-connector-mysql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。
      4. Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
    • ターミナルウィンドウから以下を実行します。

      1. 以下のコマンドを入力します。

        oc get kafkatopics
        Copy to Clipboard Toggle word wrap

        このコマンドは、以下の出力のようなステータス情報を返します。

        例5.4 KafkaTopic リソースのステータス

        NAME                                                                                                   CLUSTER             PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                                           debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                                           debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                                            debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                       debezium-kafka-cluster   50           1                    True
        inventory-connector-mysql---a96f69b23d6118ff415f772679da623fbbb99421                              debezium-kafka-cluster   1            1                    True
        inventory-connector-mysql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-mysql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-mysql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-mysql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-mysql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-mysql.inventory.products-on-hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                                          debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                    debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster   1            1                    True
        Copy to Clipboard Toggle word wrap
  3. トピックの内容を確認します。

    • ターミナルウィンドウから、以下のコマンドを入力します。
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>
    Copy to Clipboard Toggle word wrap

    以下に例を示します。

     oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory_connector_mysql.inventory.products_on_hand
    Copy to Clipboard Toggle word wrap

    トピック名を指定する形式は、手順 1 で返された oc describe コマンドと同じです (例: inventory_connector_mysql.inventory.addresses )。

    トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。

    例5.5 Debezium 変更イベントの内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Key"},"payload":{"product_id":101}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.7.2.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
    Copy to Clipboard Toggle word wrap

    上記の例では、payload 値は、コネクタースナップショットがテーブル inventory.products_on_hand から 読み込み (op" ="r") イベントを生成したことを示しています。product_id レコードの before 状態は null であり、レコードに以前の値が存在しないことを示します。"after" 状態が product_id 101 で項目の quantity3 で示しています。

5.5.5. Debezium MySQL コネクター設定プロパティーの説明

Debezium MySQL コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。

以下の設定プロパティーは、デフォルト値がない場合は必須です。

Expand
表5.25 必要な Debezium MySQL コネクター設定プロパティー
プロパティーデフォルト説明

name

デフォルトなし

コネクターの一意名。同じ名前で再登録を試みると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。

connector.class

デフォルトなし

コネクターの Java クラスの名前。MySQL コネクターに常に io.debezium.connector.mysql.MySqlConnector を指定します。

tasks.max

1

このコネクターのために作成する必要のあるタスクの最大数。MySQL コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。

database.hostname

デフォルトなし

MySQL データベースサーバーの IP アドレスまたはホスト名。

database.port

3306

MySQL データベースサーバーのポート番号 (整数)。

database.user

デフォルトなし

MySQL データベースサーバーへの接続時に使用する MySQL ユーザーの名前。

database.password

デフォルトなし

MySQL データベースサーバーへの接続時に使用するパスワード。

database.server.name

デフォルトなし

Debezium が変更をキャプチャーする特定の MySQL データベースサーバー/クラスターの namespace を識別および提供する論理名。論理名は、他のコネクター全体で一意となる必要があります。これは、このコネクターによって生成されるイベントを受信するすべての Kafka トピック名の接頭辞として使用されるためです。データベースサーバーの論理名には英数字とハイフン、ドット、アンダースコアのみを使用する必要があります。

database.server.id

random

このデータベースクライアントの数値 ID。MySQL クラスターで現在稼働しているすべてのデータベースプロセスで一意である必要があります。このコネクターは、MySQL データベースクラスターを (この一意の ID を持つ) 別のサーバーとして結合するため、binlog を読み取ることができます。デフォルトでは、5400 から 6400 までの乱数が生成されますが、値を明示的に設定することが推奨されます。

database.include.list

空の文字列

変更をキャプチャーするデータベースの名前と一致する正規表現のコンマ区切りリスト (任意)。コネクターは、名前が database.include.list にないデータベースの変更をキャプチャーしません。デフォルトでは、コネクターはすべてのデータベースの変更をキャプチャーします。また、database.exclude.list コネクター設定プロパティーは設定しないでください。

database.exclude.list

空の文字列

変更をキャプチャーしないデータベースの名前と一致する正規表現のコンマ区切りリスト (任意)。コネクターは、名前が database.exclude.list にないデータベースの変更をキャプチャーします。また、database.include.list コネクター設定プロパティーは設定しないでください。

table.include.list

空の文字列

変更をキャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。コネクターは table.include.list に含まれていないテーブルの変更をキャプチャーしません。各識別子の形式は databaseName.tableName です。デフォルトでは、コネクターは変更がキャプチャーされる各データベースのシステムでないすべてのテーブルの変更をキャプチャーします。また、table.exclude.list コネクター設定プロパティーは指定しないでください。

table.exclude.list

空の文字列

変更をキャプチャーしないテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。コネクターは table.exclude.list に含まれていないテーブルの変更をキャプチャーします。各識別子の形式は databaseName.tableName です。また、table.include.list コネクター設定プロパティーは指定しないでください。

column.exclude.list

空の文字列

変更イベントレコード値から除外する列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は databaseName.tableName.columnName です。

column.include.list

空の文字列

変更イベントレコード値に含める列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は databaseName.tableName.columnName です。

column.truncate.to._length_.chars

該当なし

フィールド値が指定された文字数より長い場合に、変更イベントレコード値で値を省略する必要がある文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。単一の設定で、異なる長さの複数のプロパティーを設定できます。長さは正の整数である必要があります。列の完全修飾名の形式は databaseName.tableName.columnName です。

column.mask.with._length_.chars

該当なし

変更イベントメッセージで、指定された数のアスタリスク (*) で設定されるフィールド値に値を置き換える必要のある文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。単一の設定で、異なる長さの複数のプロパティーを設定できます。それぞれの長さは正の整数またはゼロである必要があります。列の完全修飾名の形式は databaseName.tableName.columnName です。

column.mask.hash.hashAlgorithm.with.salt.salt

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は <databaseName>.<tableName>.<columnName> です。作成された変更イベントレコードでは、指定された列の値は仮名に置き換えられます。

仮名は、指定された hashAlgorithmsalt を適用すると得られるハッシュ化された値で設定されます。使用されるハッシュ関数に基づいて、参照整合性は維持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest section に説明されています。

以下の例では、CzQMA0cB5K が無作為に選択された salt になります。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
Copy to Clipboard Toggle word wrap

必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。

使用される hashAlgorithm、選択された salt、および実際のデータセットによっては、結果として得られるデータセットが完全にマスクされないことがあります。

column.propagate.source.type

該当なし

出力された変更イベントレコードの該当するフィールドスキーマに元の型および長さをパラメーターとして追加する必要がある列の完全修飾名と一致する、正規表現のコンマ区切りリスト (任意)。以下のスキーマパラメーターは、それぞれ可変幅型の元の型名および長さを伝達するために使用されます。

__Debezium.source.column.type

__Debezium.source.column.length

__Debezium.source.column.scale

それぞれ元の型名と長さ (可変幅型の場合) を伝達するために使用されます。これは、シンクデータベースの対応する列を適切にサイズ調整するのに便利です。列の完全修飾名の形式は以下のいずれかになります。

databaseName.tableName.columnName

databaseName.schemaName.tableName.columnName

datatype.propagate.source.type

該当なし

出力された変更イベントレコードの該当するフィールドスキーマに元の型および長さをパラメーターとして追加する必要がある列のデータベース固有のデータ型名と一致する、正規表現のコンマ区切りリスト (任意)。以下のスキーマパラメーターは、それぞれ可変幅型の元の型名および長さを伝達するために使用されます。

__debezium.source.column.type

__debezium.source.column.length

__debezium.source.column.scale

それぞれ元の型名と長さ (可変幅型の場合) を伝達するために使用されます。これは、シンクデータベースの対応する列を適切にサイズ調整するのに便利です。完全修飾データ型名の形式は以下のいずれかになります。

databaseName.tableName.typeName

databaseName.schemaName.tableName.typeName

MySQL 固有のデータ型名のリストは、MySQL コネクターによるデータ型のマッピング方法 を参照してください。

time.precision.mode

adaptive_time_microseconds

時間、日付、およびタイムスタンプは、以下を含む異なる精度の種類で表すことができます。

adaptive_time_microseconds (デフォルト) は、データベース列の型を基にして、ミリ秒、マイクロ秒、またはナノ秒の精度値のいずれかを使用して、データベースの値と全く同じように日付、日時、およびタイムスタンプをキャプチャーします。


connect は、Kafka Connect の Time、Date、および Timestamp の組み込み表現を使用して、常に時間とタイムスタンプ値を表します。この組み込み表現は、データベース列の精度に関わらず、ミリ秒の精度を使用します。

decimal.handling.mode

precise

コネクターによる DECIMAL および NUMERIC 列の値の処理方法を指定します。

precise (デフォルト) はバイナリー形式で変更イベントに表される java.math.BigDecimal 値を使用して正確に表します。

doubledouble値を使用して表します。精度が失われる可能性はありますが、簡単に使用できます。

string は値をフォーマットされた文字列としてエンコードします。簡単に使用できますが、本来の型に関するセマンティック情報は失われます。

bigint.unsigned.handling.mode

long

変更イベントで BIGINT UNSIGNED 列を表す方法を指定します。可能な設定:

long は Java の long を使用して値を表します。これは、精度を提供しない可能性がありますが、コンシューマーでの使用が簡単です。通常、long が推奨設定となります。

precisejava.math.BigDecimal を使用して値を表します。値は、バイナリー表現と Kafka Connect の org.apache.kafka.connect.data.Decimal 型を使用して、変更イベントでエンコードされます。2^63 を超える値は long を使用して提供できないため、このような値を使用する場合はこの設定を使用します。

include.schema.changes

true

コネクターがデータベーススキーマの変更を、データベースサーバー ID と同じ名前の Kafka トピックに公開するかどうかを指定するブール値。各スキーマの変更はデータベース名が含まれるキーを使用して記録され、その値には DDL ステートメントが含まれます。これは、コネクターがデータベース履歴を内部で記録する方法には依存しません。

include.query

false

変更イベントを生成した元の SQL クエリーがコネクターに含まれる必要があるかどうかを指定するブール値。

このオプションを true に設定した場合は、MySQL の binlog_rows_query_log_events オプションを ON に設定する必要があります。include.querytrue の場合、スナップショットプロセスによって生成されるイベントに対するクエリーは存在しません。

include.querytrue に設定すると、変更イベントに元の SQL ステートメントを含めることで明示的に除外またはマスクされたテーブルまたはフィールドが公開される可能性があります。そのため、デフォルト設定は false です。

event.deserialization.failure.handling.mode

fail

binlog イベントのデシリアライズ中にコネクターがどのように例外に反応するかを指定します。

fail は例外を伝播し、問題のあるイベントとその binlog オフセットを示し、コネクターを停止させます。

warn は問題のあるイベントとその binlog オフセットをログに記録し、イベントをスキップします。

ignore は問題のあるイベントを渡し、何もログに記録しません。

inconsistent.schema.handling.mode

fail

内部スキーマ表現に存在しないテーブルに関連する binlog イベントに対してコネクターがどのように反応する必要があるかを指定します。つまり、内部表現はデータベースと一貫性がありません。

fail は例外を出力し、問題のあるイベントとその binlog オフセットを示し、コネクターを停止させます。

warn は問題のあるイベントとその binlog オフセットをログに記録し、イベントをスキップします。

skip は問題のあるイベントを渡し、何もログに記録しません。

max.queue.size

8192

データベースログから読み取られた変更イベントが Kafka に書き込まれる前に配置される、ブロッキングキューの最大サイズを指定する正の整数値。このキューは、Kafka への書き込みが遅い場合や Kafka が利用できない場合などに、binlog リーダーにバックプレシャーを提供できます。キューに発生するイベントは、このコネクターによって定期的に記録されるオフセットには含まれません。デフォルトは 8192 で、max.batch.size プロパティーで指定される最大バッチサイズよりも大きな値を常に指定する必要があります。

max.batch.size

2048

このコネクターの反復処理中に処理される必要があるイベントの各バッチの最大サイズを指定する正の整数値。デフォルトは 2048 です。

max.queue.size.in.bytes

0

ブロッキングキューの最大サイズ (バイト単位) の long 値。この機能はデフォルトで無効になっています。正の long 値が設定されると有効になります。

poll.interval.ms

1000

コネクターがイベントのバッチの処理を開始する前に、新しい変更イベントの発生を待つ期間をミリ秒単位で指定する正の整数値。デフォルトは 1000 ミリ秒 (1 秒) です。

connect.timeout.ms

30000

コネクターが MySQL データベースサーバーへの接続を試行した後、タイムアウトするまでの最大の待機期間をミリ秒単位で指定する正の整数値。デフォルトは 30 秒です。

gtid.source.includes

デフォルトなし

MySQL サーバーで binlog の位置を見つけるために使用される GTID セットのソース UUID に一致する、正規表現のコンマ区切りリスト。これらの include パターンのいずれかに一致するソースを持つ GTID の範囲のみが使用されます。gtid.source.excludes の設定は指定しないでください。

gtid.source.excludes

デフォルトなし

MySQL サーバーで binlog の位置を見つけるために使用される GTID セットのソース UUID に一致する、正規表現のコンマ区切りリスト。これらすべての exclude パターンに一致しないソースを持つ GTID の範囲のみが使用されます。また、gtid.source.includes の値も指定しないでください。

tombstones.on.delete

true

削除 イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。

true: 削除操作は、削除 イベントと後続の破棄 (tombstone) イベントで表されます。

false: 削除イベントのみ出力されます。

log compaction がトピックで有効になっている場合には、ソースレコードの削除後に廃棄 (tombstone) イベントを出力すると (デフォルト動作)、Kafka は削除された行のキーに関連するすべてのイベントを完全に削除できます。

message.key.columns

該当なし

指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。

デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。

テーブルのカスタムメッセージキーを作成するには、テーブルとメッセージキーとして使用する列をリストします。各リストエントリーは以下の形式をとります。

<fully-qualified_tableName>:_<keyColumn>_,<keyColumn>

テーブルのキーを複数の列名に基づいて設定するには、列名の間にコンマを挿入します。

各完全修飾テーブル名は、以下の形式の正規表現です。

<databaseName>.<tableName>

プロパティーには複数のテーブルのエントリーを含めることができます。セミコロンを使用して、リスト内のテーブルエントリーを区切ります。

以下の例では、テーブル inventory.customerspurchase.orders にメッセージキーを設定しています。

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

テーブル inventory.customer では、列 pk1pk2 がメッセージキーとして指定されています。データベースで purchaseorders テーブルは、pk3 および pk4 サーバーのコラムをメッセージキーとして使用します。

カスタムメッセージキーの作成に使用する列の数に制限はありません。ただし、一意の鍵を指定するために必要な最小数を使用することが推奨されます。

binary.handling.mode

bytes

バイナリー列 (例: blobbinaryvarbinary) を変更イベントでどのように表すかを指定します。可能な設定:

bytes はバイナリーデータをバイト配列として表します。

base64 はバイナリーデータを base64 でエンコードされた文字列として表します。

hex は、バイナリーデータを 16 進数でエンコードされた (base16) 文字列として表します。

高度な MySQL コネクター設定プロパティー

以下の表は、高度な MySQL コネクタープロパティー について説明しています。これらのプロパティーのデフォルト値を変更する必要はほとんどありません。そのため、コネクター設定にデフォルト値を指定する必要はありません。

Expand
表5.26 MySQL コネクターの高度な設定プロパティーの説明
プロパティーデフォルト説明

connect.keep.alive

true

MySQL サーバー/クラスターへの接続を確実に維持するために、別のスレッドを使用するかどうかを指定するブール値。

table.ignore.builtin

true

組み込みシステムテーブルを無視するかどうかを指定するブール値。これは、テーブルの include および exclude リストに関係なく適用されます。デフォルトでは、システムテーブルは変更がキャプチャーされないように除外され、システムテーブルに変更が加えられてもイベントは生成されません。

database.ssl.mode

disabled

暗号化された接続を使用するかどうかを指定します。可能な設定:

disabled は暗号化されていない接続の使用を指定します。

preferred は、サーバーがセキュアな接続に対応している場合は暗号化された接続を確立します。サーバーがセキュアな接続に対応していない場合は、暗号化されていない接続にフォールバックします。

required は、暗号化された接続を確立し、何らかの理由で暗号化された接続を確立できない場合は失敗します。

verify_carequired と同様に動作しますが、追加でサーバーの TLS 証明書を設定された認証局 (CA) 証明書に対して検証します。サーバー TLS 証明書が有効な CA 証明書と一致しない場合は失敗します。

verify_identityverify_ca のように動作しますが、追加でサーバー証明書がリモート接続のホストと一致するかを検証します。

snapshot.mode

Initial

コネクターの起動時にスナップショットを実行するための基準を指定します。可能な設定は次のとおりです。

initial - コネクターは、論理サーバー名にオフセットが記録されていない場合にのみスナップショット を実行します。

initial_only - 論理サーバー名に対してオフセットが記録されてから停止した場合のみスナップショットを実行します。つまり、binlog から変更イベントを読み取りません。

when_needed - コネクターは、必要に応じて、コネクターは起動時にスナップショットを実行します。つまり、オフセットが使用できない場合や、以前に記録されたオフセットがサーバーが利用できない binlog の場所や GTID を指定する場合などです。

never - コネクターはスナップショットを使用しません。論理サーバー名での初回起動時に、コネクターは binlog の最初から読み取りします。この動作は注意して設定してください。これは、binlog にデータベースのすべての履歴が含まれることが保証されている場合のみ有効です。

schema_only - コネクターはデータではなく、スキーマのスナップショットを実行します。この設定は、トピックにデータの整合性スナップショットが含まれる必要がなく、コネクターの開始以降の変更のみが含まれる必要がある場合に便利です。

schema_only_recovery - これは、すでに変更をキャプチャーしているコネクターのリカバリー設定です。この設定により、コネクターを再起動すると、破損または損失したデータベース履歴トピックのリカバリーが可能になります。これを定期的に設定して、予想外に増加しているデータベース履歴トピックをクリーンアップすることができます。データベース履歴トピックは無期限に保持する必要があります。

snapshot.locking.mode

minimal

コネクターがグローバル MySQL 読み込みロックを保持するかどうか、およびその期間を制御します。これにより、コネクターによるスナップショットの実行中にデータベースが更新されないようにします。可能な設定:

minimal - コネクターはスナップショットの最初の部分のみグローバル読み取りロックを保持します。その間、データベーススキーマとその他のメタデータを読み取ります。スナップショットの残りの作業では、各テーブルから全行を選択する必要があります。REPEATABLE READ トランザクションを使用すると、コネクターは一貫した方法でこれを行うことができます。これは、グローバル読み取りロックが保持されなくなり、その他の MySQL クライアントがデータベースを更新している場合でも該当します。

minimal_percona - コネクターは、スナップショットの最初の部分のみ グローバルバックアップロック を保持します。その間、コネクターはデータベーススキーマとその他のメタデータを読み取ります。スナップショットの残りの作業では、各テーブルから全行を選択する必要があります。REPEATABLE READ トランザクションを使用すると、コネクターは一貫した方法でこれを行うことができます。これは、グローバルバックアップロックが保持されなくなり、その他の MySQL クライアントがデータベースを更新している場合でも該当します。このモードはテーブルをディスクにフラッシュせず、長時間実行される読み取りによってブロックされず、Percona Server でのみ利用できます。

extended - スナップショットの実行中にすべての書き込みをブロックします。MySQL が REPEATABLE READ セマンティックから除外する操作を送信するクライアントがある場合は、この設定を使用します。

none - スナップショットの実行中にコネクターがテーブルロックを取得できないようにします。この設定はすべてのスナップショットモードで許可されますが、スナップショットの実行中にスキーマの変更がない場合に 限り、安全に使用できます。MyISAM エンジンで定義されたテーブルの場合、MyISAM によってテーブルロックが取得されるようにこのプロパティーが設定されていても、テーブルはロックされます。この動作は、行レベルのロックを取得する InnoDB エンジンの動作とは異なります。

snapshot.include.collection.list

table.include.listに指定したすべてのテーブル

スナップショットに含めるテーブルの完全修飾名 (<databaseName>.<tableName>) と一致する正規表現のコンマ区切りリスト (任意)。指定する項目は、コネクターの table.include.list プロパティーで名前を付ける必要があります。このプロパティーは、コネクターの snapshot.mode プロパティーが never 以外の値に設定されている場合にのみ有効になります。
このプロパティーは増分スナップショットの動作には影響しません。

snapshot.select.statement.overrides

デフォルトなし

スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。

<databaseName>.<tableName> の形式で完全修飾テーブル名のコンマ区切りリストを指定します。たとえば、

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

をリスト内の各テーブルに対して、スナップショットを作成する場合には、その他の設定プロパティーを追加して、コネクターがテーブルで実行するように SELECT ステートメントを指定します。指定した SELECT ステートメントは、スナップショットに追加するテーブル行のサブセットを決定します。以下の形式を使用して、この SELECT ステートメントプロパティーの名前を指定します。

snapshot.select.statement.overrides.<databaseName>.<tableName>例: snapshot.select.statement.overrides.customers.orders.

例:

スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 (delete_flag ) を含む customers.orders テーブルから、以下のプロパティーを追加します。

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
Copy to Clipboard Toggle word wrap

作成されるスナップショットでは、コネクターには delete_flag = 0 のレコードのみが含まれます。

min.row.count.to.stream.results

1000

スナップショットの実行中、コネクターは変更をキャプチャーするように設定されている各テーブルにクエリーを実行します。コネクターは各クエリーの結果を使用して、そのテーブルのすべての行のデータが含まれる読み取りイベントを生成します。このプロパティーは、MySQL コネクターがテーブルの結果をメモリーに格納するか、またはストリーミングを行うかを決定します。メモリーへの格納はすばやく処理できますが、大量のメモリーを必要とします。ストリーミングを行うと、処理は遅くなりますが、非常に大きなテーブルにも対応できます。このプロパティーの設定は、コネクターが結果のストリーミングを行う前にテーブルに含まれる必要がある行の最小数を指定します。

すべてのテーブルサイズチェックを省略し、スナップショットの実行中に常にすべての結果をストリーミングする場合は、このプロパティーを 0 に設定します。

heartbeat.interval.ms

0

コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルトの動作では、コネクターはハートビートメッセージを送信しません。

ハートビートメッセージは、コネクターがデータベースから変更イベントを受信しているかどうかを監視するのに便利です。ハートビートメッセージは、コネクターの再起動時に再送信する必要がある変更イベントの数を減らすのに役立つ可能性があります。ハートビートメッセージを送信するには、このプロパティーを、ハートビートメッセージの間隔をミリ秒単位で示す正の整数に設定します。

heartbeat.topics.prefix

__debezium-heartbeat

コネクターがハートビートメッセージを送信するトピックの名前を制御します。トピック名のパターンは次のようになります。

heartbeat.topics.prefix.server.name

たとえば、データベースサーバー名が fulfillment の場合、デフォルトのトピック名は __debezium-heartbeat.fulfillment になります。

database.initial.statements

デフォルトなし

トランザクションログを読み取る接続ではなく、データベースへの JDBC 接続が確立されたときに実行される SQL ステートメントのセミコロン区切りのリスト。SQL ステートメントでセミコロンを区切り文字としてではなく、文字として指定する場合は、2 つのセミコロン (;;) を使用します。

コネクターは独自の判断で JDBC 接続を確立する可能性があるため、このプロパティーはセッションパラメーターの設定専用です。DML ステートメントを実行するものではありません。

snapshot.delay.ms

デフォルトなし

コネクターの起動時にスナップショットを実行するまでコネクターが待つ必要がある間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。

snapshot.fetch.size

デフォルトなし

スナップショットの実行中、コネクターは行のバッチでテーブルの内容を読み取ります。このプロパティーは、バッチの行の最大数を指定します。

snapshot.lock.timeout.ms

10000

スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する正の整数。コネクターがこの期間にテーブルロックを取得できないと、スナップショットは失敗します。Debezium MySQL コネクターによるデータベーススナップショットの実行方法 を参照してください。

enable.time.adjuster

true

コネクターによって 2 桁の西暦が 4 桁の西暦に変換されるかどうかを示すブール値。変換が完全にデータベースに委譲されている場合は、false に設定します。

MySQL では、2 桁または 4 桁の数値のいずれかで西暦の値を挿入できます。2 桁の値の場合は、値は 1970 - 2069 の範囲の年にマッピングされます。デフォルトの動作では、コネクターは変換を行いません。

sanitize.field.names

コネクターが key.converter または value.converter プロパティーを Avro コンバーターに設定する場合は true に設定します。
それ以外はfalse に設定します。

Avro の命名要件 に準拠するためにフィールド名がサニタイズされるかどうかを示します。

skipped.operations

デフォルトなし

ストリーミング中にスキップする操作タイプのコンマ区切りリスト。以下の値を使用できます (c は挿入/作成、u は更新、d は削除)。デフォルトでは、操作はスキップされません。

signal.data.collection

デフォルト値なし

シグナルをコネクターに送信するために使用されるデータコレクションの完全修飾名
以下の形式を使用してコレクション名を指定します。
<databaseName>.<tableName>

シグナル機能はテクノロジープレビュー機能です。

incremental.snapshot.chunk.size

1024

増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。

増分スナップショットはテクノロジープレビュー機能です。

read.only

false

シグナルデータコレクションへの書き込みを回避するために、別の増分スナップショットのウォーターマーク実装に切り替えます。

provide.transaction.metadata

false

コネクターがトランザクション境界でイベントを生成し、トランザクションメタデータで変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合は true を指定します。詳細は、Transaction metadata を参照してください。

Debezium コネクターデータベース履歴設定プロパティー

Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する database.history.* プロパティーのセットが含まれています。

以下の表は、Debezium コネクターを設定するための database.history プロパティーについて説明しています。

Expand
表5.27 コネクターデータベース履歴設定プロパティー
プロパティーデフォルト説明

database.history.kafka.topic

 

コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。

database.history.kafka.bootstrap.servers

 

Kafka クラスターへの最初の接続を確立するために コネクターが使用するホストとポートのペアのリスト。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。

database.history.kafka.recovery.poll.interval.ms

100

永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。

database.history.kafka.recovery.attempts

4

エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、recovery.attempts x recovery.poll.interval.ms です。

database.history.skip.unparseable.ddl

false

コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは false です。スキップは、binlog の処理中にデータの損失や分割を引き起こす可能性があるため、必ず注意して使用する必要があります。

database.history.store.only.monitored.tables.ddl

今後のリリースで非推奨になり、削除される予定です。代わりに database.history.store.only.captured.tables.ddl を使用してください。

false

コネクターがすべての DDL ステートメントを記録するかどうかを指定するブール値

true は、変更が Debezium によってキャプチャーされるテーブルに関連する DDL ステートメントのみを記録します。変更がキャプチャーされたテーブルを変更すると、不足しているデータが必要になる可能性があるため、true に設定するには注意が必要です。

安全なデフォルトは false です。

database.history.store.only.captured.tables.ddl

false

コネクターがすべての DDL ステートメントを記録するかどうかを指定するブール値

true は、変更が Debezium によってキャプチャーされるテーブルに関連する DDL ステートメントのみを記録します。変更がキャプチャーされたテーブルを変更すると、不足しているデータが必要になる可能性があるため、true に設定するには注意が必要です。

安全なデフォルトは false です。

プロデューサーおよびコンシューマークライアントを設定するためのパススルーデータベース履歴プロパティー


Debezium は、Kafka プロデューサーを使用して、データベース履歴トピックにスキーマの変更を書き込みます。同様に、コネクターが起動すると、データベース履歴トピックから読み取る Kafka コンシューマーに依存します。database.history.producer.* および database.history.consumer.* 接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベース履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234

database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234
Copy to Clipboard Toggle word wrap

Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。

Kafka プロデューサー設定プロパティー および Kafka コンシューマー設定プロパティーの詳細は、Kafka のドキュメントを参照してください。

Debezium コネクター Kafka は設定プロパティーをシグナル化します。

MySQL コネクターが読み取り専用として設定されている場合、シグナルテーブルの代替は Kafka トピックを示します。

Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.* プロパティーのセットを提供します。

以下の表は signal プロパティーについて説明しています。

Expand
表5.28 Kafka のシグナル設定プロパティー
プロパティーデフォルト説明

signal.kafka.topic

 

コネクターがアドホックシグナルについて監視する Kafka トピックの名前。

signal.kafka.bootstrap.servers

 

Kafka クラスターへの最初の接続を確立するために コネクターが使用するホストとポートのペアのリスト。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。

signal.kafka.poll.timeout.ms

100

信号をポーリングするときにコネクターが待機する最大ミリ秒数を指定する整数値。デフォルトは 100 ミリ秒です。

Debezium コネクターのパススルーは Kafka コンシューマークライアント設定プロパティーを示唆します。

Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.* で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL などのプロパティーを Kafka コンシューマーに渡します。

データベース履歴クライアントのパススループロパティー の場合のように、Debezium はプロパティーから接頭辞を削除してから Kafka シグナルコンシューマーに渡します。

Debezium コネクターのパススルーデータベースドライバー設定プロパティー

Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは、接頭辞 database.* で始まります。たとえば、コネクターは database.foobar=false などのプロパティーを JDBC URL に渡します。

データベース履歴クライアントのパススループロパティー の場合のように、Debezium はプロパティーから接頭辞を削除してからデータベースドライバーに渡します。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat