12.2. Avro シリアライゼーションを使用する Debezium コネクターの設定


Debezium コネクターは Kafka Connect のフレームワークで動作し、変更イベントレコードを生成することでデータベース内の各行レベルの変更をキャプチャーします。それぞれの変更イベントレコードについて、Debezium コネクターは以下のアクションを完了します。

  1. 設定された変換を適用する。
  2. 設定された Kafka Connect コンバーター を使用して、レコードのキーと値をバイナリー形式にシリアライズする。
  3. レコードを正しい Kafka トピックに書き込む。

個々の Debezium コネクターインスタンスごとにコンバーターを指定することができます。Kafka Connect は、レコードのキーと値を JSON ドキュメントにシリアライズする JSON コンバーターを提供します。デフォルトの動作では、JSON コンバーターはレコードのメッセージスキーマを含めるので、それぞれのレコードが非常に冗長になります。Debezium スタートガイド に、ペイロードとスキーマの両方が含まれる場合にレコードがどのように見えるかが示されています。レコードを JSON でシリアル化したい場合は、以下のコネクター設定プロパティーを false に設定することを検討してください。

  • key.converter.schemas.enable
  • value.converter.schemas.enable

これらのプロパティーを false に設定すると、冗長なスキーマ情報がそれぞれのレコードから除外されます。

あるいは、Apache Avro を使用してレコードのキーと値をシリアライズすることもできます。Avro のバイナリー形式はコンパクトで効率的です。Avro スキーマを使用すると、それぞれのレコードが正しい構造を持つようにすることができます。Avro のスキーマ進化メカニズムにより、スキーマを進化させることが可能です。変更されたデータベーステーブルの構造と一致するように各レコードのスキーマを動的に生成するこのメカニズムは、Debezium コネクターに不可欠です。時間の経過と共に、同じ Kafka トピックに書き込まれる変更イベントレコードが、同じスキーマの別バージョンとなる場合があります。Avro シリアライゼーションを使用すると、変更イベントレコードのコンシューマーはレコードスキーマの変化に容易に対応することができます。

Apache Avro シリアライゼーションを使用するには、Avro メッセージスキーマおよびそのバージョンを管理するスキーマレジストリーをデプロイする必要があります。このレジストリーの設定については、OpenShift での Service Registry のインストールおよびデプロイ のドキュメントを参照してください。

12.2.1. Service Registry の概要

サービスレジストリー

Service Registry は、Avro と共に動作する以下のコンポーネントを提供します。

  • Debezium コネクター設定で指定することができる Avro コンバーター。このコンバーターは、Kafka Connect スキーマを Avro スキーマにマッピングします。続いて、コンバーターは Avro スキーマを使用してレコードのキーと値を Avro のコンパクトなバイナリー形式にシリアライズします。
  • API および以下の項目を追跡するスキーマレジストリー。

    • Kafka トピックで使用される Avro スキーマ
    • Avro コンバーターが生成した Avro スキーマを送信する先

    Avro スキーマはこのレジストリーに保管されるため、各レコードには小さな スキーマ識別子 だけを含める必要があります。これにより、各レコードが非常にコンパクトになります。Kafka など I/O 律速のシステムの場合、これはプロデューサーおよびコンシューマーのトータルスループットが向上することを意味します。

  • Kafka プロデューサーおよびコンシューマー用 Avro Serdes (シリアライザー/デシリアライザー)。変更イベントレコードを使用するために作成する Kafka コンシューマーアプリケーションは、Avro Serdes を使用して変更イベントレコードをデシリアライズすることができます。

Debezium で Service Registry を使用するには、Debezium コネクターを実行するのに使用している Kafka Connect コンテナーイメージに Service Registry コンバーターおよびその依存関係を追加します。

注記

Service Registry プロジェクトは、JSON コンバーターも提供します。このコンバーターは、メッセージが冗長ではないというメリットを持つのに加えて、人間が判読できる JSON を扱うことができます。メッセージ自体にはスキーマ情報は含まれず、スキーマ ID だけが含まれます。

注記

サービスレジストリーが提供するコンバータを使用するには、apicurio.registry.url を指定する必要があります。

12.2.2. Avro シリアライゼーションを使用する Debezium コネクターのデプロイの概要

Avro シリアライゼーションを使用する Debezium コネクターをデプロイするには、以下の 3 つの主要タスクを完了する必要があります。

  1. OpenShift での Service Registry のインストールおよびデプロイ の手順に従って、Service Registry インスタンスをデプロイします。
  2. Debezium Service Registry Kafka Connect の zip ファイルをダウンロードして Debezium コネクターのディレクトリーにデプロイメントし、Avro コンバーターをインストールする。
  3. 以下のように設定プロパティーを設定して、Avro シリアライゼーションを使用するように Debezium コネクターインスタンスを設定する。

    key.converter=io.apicurio.registry.utils.converter.AvroConverter
    key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    key.converter.apicurio.registry.auto-register=true
    key.converter.apicurio.registry.find-latest=true
    value.converter=io.apicurio.registry.utils.converter.AvroConverter
    value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    value.converter.apicurio.registry.auto-register=true
    value.converter.apicurio.registry.find-latest=true
    schema.name.adjustment.mode=avro

内部的には、Kafka Connect は常に JSON キー/値コンバーターを使用して設定およびオフセットを保管します。

12.2.3. Debezium コンテナーで Avro を使用するコネクターのデプロイ

ご使用の環境で、提供された Debezium コンテナーを使用して、Avro シリアライゼーションを使用する Debezium コネクターをデプロイしなければならない場合があります。Debezium 用のカスタム Kafka Connect コンテナーイメージをビルドし、Avro コンバーターを使用するように Debezium コネクターを設定するには、以下の手順を完了します。

前提条件

  • コンテナーを作成および管理するのに十分な権限と共に Docker をインストールしている。
  • Avro シリアライゼーションと共にデプロイする Debezium コネクタープラグインをダウンロードしている。

手順

  1. Service Registry のインスタンスをデプロイします。OpenShift での Service Registry のインストールおよびデプロイ では、以下の手順を説明しています。

    • Service Registry のインストール
    • AMQ Streams のインストール
    • AMQ Streams ストレージのセットアップ
  2. Debezium コネクターのアーカイブをデプロイメントして、コネクタープラグインのディレクトリー構造を作成します。複数の Debezium コネクターのアーカイブをダウンロードしてデプロイメントした場合、作成されるディレクトリー構造は以下の例のようになります。

    tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    |   ├── ...
    ├── debezium-connector-mysql
    │   ├── ...
    ├── debezium-connector-postgres
    │   ├── ...
    └── debezium-connector-sqlserver
        ├── ...
  3. Avro シリアライゼーションを使用するように設定する Debezium コネクターが含まれるディレクトリーに Avro コンバーターを追加します。

    1. Red Hat Integration のダウンロードサイト に移動し、Service Registry Kafka Connect の zip ファイルをダウンロードします。
    2. 目的の Debezium コネクターディレクトリーにアーカイブをデプロイメントします。

    複数のタイプの Debezium コネクターを Avro シリアライゼーションを使用するように設定するには、該当するそれぞれのコネクタータイプのディレクトリーにアーカイブをデプロイメントします。それぞれのディレクトリーにアーカイブを抽出するとファイルが重複しますが、これにより依存関係の競合が生じる可能性がなくなります。

  4. Avro コンバーターを使用するように設定する Debezium コネクターを実行するためのカスタムイメージを作成して公開します。

    1. registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 をベースイメージとして使用して、新しい Dockerfile を作成します。以下の例の my-plugins を、実際のプラグインディレクトリーの名前に置き換えてください。

      FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
      USER root:root
      COPY ./my-plugins/ /opt/kafka/plugins/
      USER 1001

      Kafka Connect は、コネクターの実行を開始する前に、/opt/kafka/plugins ディレクトリーにあるサードパーティープラグインをロードします。

    2. docker コンテナーイメージをビルドします。たとえば、前のステップで作成した docker ファイルを debezium-container-with-avro として保存した場合、以下のコマンドを実行します。

      docker build -t debezium-container-with-avro:latest

    3. カスタムイメージをコンテナーレジストリーにプッシュします。例を以下に示します。

      docker push <myregistry.io>/debezium-container-with-avro:latest

    4. 新しいコンテナーイメージを示します。次のいずれかを行います。

      • KafkaConnect カスタムリソースの KafkaConnect.spec.image プロパティーを編集します。このプロパティーが設定されていると、クラスターオペレータの STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数がオーバーライドされます。以下に例を示します。

        apiVersion: kafka.strimzi.io/v1beta2
        kind: KafkaConnect
        metadata:
          name: my-connect-cluster
        spec:
          #...
          image: debezium-container-with-avro
      • install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml ファイルの STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数を編集し、新しいコンテナーイメージを示すようにした後、Cluster Operator を再インストールします。このファイルを編集する場合は、これを OpenShift クラスターに適用する必要があります。
  5. Avro コンバーターを使用するように設定されたそれぞれの Debezium コネクターをデプロイします。それぞれの Debezium コネクターについて、以下の設定を行います。

    1. Debezium コネクターインスタンスを作成します。次の inventory-connector.yaml ファイルの例では、Avro コンバーターを使用するように設定された MySQL コネクターインスタンスを定義する KafkaConnector カスタムリソースを作成しています。

      apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaConnector
      metadata:
        name: inventory-connector
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1
        config:
          database.hostname: mysql
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054
          topic.prefix: dbserver1
          database.include.list: inventory
          schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
          schema.history.internal.kafka.topic: schema-changes.inventory
          schema.name.adjustment.mode: avro
          key.converter: io.apicurio.registry.utils.converter.AvroConverter
          key.converter.apicurio.registry.url: http://apicurio:8080/api
          key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
          value.converter: io.apicurio.registry.utils.converter.AvroConverter
          value.converter.apicurio.registry.url: http://apicurio:8080/api
          value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
    2. コネクターインスタンスを適用します。以下に例を示します。

      oc apply -f inventory-connector.yaml

      これにより inventory-connector が登録され、コネクターが inventory データベースに対して実行されるようになります。

  6. コネクターが作成され、指定されたデータベース内の変更の追跡を開始したことを確認します。たとえば inventory-connector が起動したときの Kafka Connect のログ出力を見ることで、コネクターのインスタンスを確認することができます。

    1. Kafka Connect のログ出力を表示します。

      oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
    2. ログの出力を確認し、初回のスナップショットが実行されたことを確認します。以下のような行が表示されるはずです。

      ...
      2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      スナップショットは、複数のステップを経て作成されます。

      ...
      2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,843 INFO 	 using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      スナップショットの作成が完了した後、Debezium は (例として) inventory データベースの binlog に生じる変更の追跡を開始し、変更イベントの有無を監視します。

      ...
      2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
      Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
      INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
      2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
      ...

12.2.4. Avro の名前の要件について

Avro の ドキュメント に記載されているように、名前は以下のルールに従う必要があります。

  • [A-Za-z_] で始まる
  • その後に [A-Za-z0-9_] の文字のみが含まれる

Debezium は、対応する Avro フィールドのベースとして列の名前を使用します。これにより、列の名前も Avro の命名規則に従わないと、シリアライズ中に問題が発生する可能性があります。各 Debezium コネクターには、名前に関する Avro ルールに準拠していない列がある場合に、avro に設定できる設定プロパティー field.name.adjustment.mode が用意されています。field.name.adjustment.modeavro に設定すると、スキーマを実際に変更せずに、適合しないフィールドをシリアライズできます。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.