Debezium の RHEL へのインストール


Red Hat build of Debezium 2.5.4

Red Hat Enterprise Linux (RHEL) での Red Hat build of Debezium 2.5.4 の使用

Red Hat build of Debezium Documentation Team

概要

このガイドでは、AMQ Streams を使用して Red Hat build of Debezium を RHEL にインストールする方法を説明します。

はじめに

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。まずは、マスター (master)、スレーブ (slave)、ブラックリスト (blacklist)、ホワイトリスト (whitelist) の 4 つの用語の置き換えから始めます。この取り組みは膨大な作業を要するため、用語の置き換えは、今後の複数のリリースにわたって段階的に実施されます。詳細は、Red Hat CTO である Chris Wright のメッセージ をご覧ください。

第1章 Debezium の概要

Debezium for Red Hat Integration は、データベース操作をキャプチャーし、行レベル操作のデータ変更イベントレコードを作成して、Apache Kafka トピックに変更イベントレコードをストリーミングする分散型プラットフォームです。Debezium は Apache Kafka に構築され、AMQ Streams とデプロイおよび統合されます。

Debezium によって、データベーステーブルの行レベルの変更がキャプチャーされ、対応する変更イベントが AMQ Streams に渡されます。アプリケーションはこれらの 変更イベントストリーム を読み取りでき、変更イベントが発生した順にアクセスできます。

Debezium は、Debezium for Red Hat Integration のアップストリームコミュニティープロジェクトです。

Debezium には、以下を含む複数の用途があります。

  • データレプリケーション
  • キャッシュの更新およびインデックスの検索
  • モノリシックアプリケーションの簡素化
  • データ統合
  • ストリーミングクエリーの有効化

Debezium は、以下の一般的なデータベース用の Apache Kafka Connect コネクターを提供します。

第2章 Debezium コネクターの RHEL へのインストール

コネクタープラグインで Kafka Connect を拡張して、AMQ Streams 経由で Debezium コネクターをインストールします。AMQ Streams のデプロイ後に、Kafka Connect で Debezium をコネクター設定としてデプロイできます。

2.1. Kafka トピック作成に関する推奨事項

Debezium は、データを複数の Apache Kafka トピックに保存します。トピックは、管理者が事前に作成する必要があります。または、Kafka Connect を設定して トピックを自動的に設定します

以下のリストで、トピックの作成時に考慮すべき制限および推奨事項を説明します。

Debezium Db2、MySQL、Oracle、および SQL Server コネクターのデータベーススキーマ履歴トピック

上記の各コネクターには、データベーススキーマ履歴トピックが必要です。データベーススキーマ履歴トピックを手動で作成する場合でも、Kafka ブローカーを使用してトピックを自動的に作成するか、Kafka Connect を使用してトピックを作成する 場合でも、トピックが以下のように設定されていることを確認してください。

  • 無限または非常に長い保持期間
  • 3 以上の実稼働環境でのレプリケーション係数
  • 単一パーティション
その他のトピック
  • 指定のレコードの 最後の変更イベントのみが保存されるように Kafka ログコンパクション を有効にする場合は、Apache Kafka で以下のトピックプロパティーを設定します。

    • min.compaction.lag.ms
    • delete.retention.ms

      トピックコンシューマーがすべてのイベントを受信してマーカーを削除するのに十分な時間を確保するには、シンクコネクターに予想される最大ダウンタイムよりも大きい値を前述のプロパティーの値に指定します。たとえば、シンクコネクターに更新を適用する際に発生する可能性のあるダウンタイムを考慮してください。

  • 実稼働でレプリケート
  • 単一パーティション

    単一パーティションルールを緩和できますが、アプリケーションはデータベースの異なる行に対して順不同のイベントを処理する必要があります。単一行のイベントは、引き続き完全に順序付けされます。複数のパーティションを使用する場合は、Kafka がキーをハッシュ化してパーティションを決定するのがデフォルトの動作になります。その他のパーティションストラテジーでは、単一メッセージ変換 (SMT: Single Message Transformations) を使用して、各レコードにパーティション番号を設定する必要があります。

2.2. Debezium コネクター設定の計画

Debezium コネクターをデプロイする前に、コネクターの設定方法を決定してください。設定は、コネクターの動作を指定する情報を提供し、Debezium がソースデータベースに接続することを可能にします。

コネクター設定を JSON として指定し、コネクターを登録する準備ができたら、curl を使用して設定を Kafka Connect API エンドポイントに送信します。

前提条件

  • ソースデータベースがデプロイされ、Debezium コネクターがデータベースにアクセスできる。
  • コネクターがソースデータベースにアクセスするために必要な以下の情報を把握している。

    • データベースホストの名前または IP アドレス
    • データベースに接続するためのポート番号
    • コネクターがデータベースへのサインインに使用できるアカウントの名前
    • データベースユーザーアカウントのパスワード
    • データベースの名前
    • コネクターの情報の取得元であるテーブルの名前
    • コネクターの変更イベントの出力先である Kafka ブローカーの名前
    • コネクターのデータベース履歴情報の送信先である Kafka トピックの名前

手順

  • Debezium コネクターに適用する設定を JSON 形式で指定します。

    次の例は、Debezium MySQL コネクターの簡単な設定を示しています。

    {
      "name": "inventory-connector",  1
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 2
        "tasks.max": "1",  3
        "database.hostname": "mysql",  4
        "database.port": "3306", 5
        "database.user": "debezium", 6
        "database.password": "dbz", 7
        "database.server.id": "184054",  8
        "topic.prefix": "dbserver1",   9
        "table.include.list": "public.inventory",  10
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",  11
        "schema.history.internal.kafka.topic": "dbhistory.inventory"  12
      }
    }
    1
    Kafka Connect クラスターに登録するコネクターの名前。
    2
    コネクタークラスの名前。
    3
    同時に動作できるタスクの数。一度に操作できるタスクは 1 つだけです。
    4
    ホストデータベースインスタンスのホスト名または IP アドレス
    5
    データベースインスタンスのポート番号。
    6
    Debezium がデータベースに接続するユーザーアカウントの名前。
    7
    データベースユーザーアカウントのパスワード
    8
    コネクターの一意の数値 ID。
    このプロパティーは MySQL コネクターにのみ使用されます。
    9
    コネクターが変更をキャプチャーするサーバーのデータベースサーバーまたはクラスターの論理識別子として機能する文字列。指定された文字列は名前空間を指定します。Avro コンバーターが使用されている場合、Debezium は、コネクターが書き込む各 Kafka トピック、Kafka Connect スキーマの名前、および対応する Avro スキーマの名前空間にこの名前の接頭辞を付けます。
    10
    コネクターが変更イベントをキャプチャーするテーブルのリスト。
    11
    コネクターがデータベーススキーマ履歴を送信する Kafka ブローカーの名前。指定されたブローカーは、コネクターが出力する変更イベントも受け取ります。
    12
    スキーマ履歴を格納する Kafka トピックの名前。
    コネクターの再起動後、コネクターは停止した時点からデータベースログの読み取りを再開し、オフライン中に発生したトランザクションのイベントを出力します。コネクターは、未読トランザクションの変更イベントを Kafka に書き込む前に、スキーマ履歴をチェックしてから、元のトランザクションが発生したときに有効だったスキーマを適用します。

Additional information

  • コネクターのタイプごとに設定できる設定プロパティーの詳細は、Debezium ユーザーガイド のコネクターのデプロイメントに関するセクションを参照してください。

2.3. Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ

この手順では、Red Hat Enterprise Linux で Debezium のコネクターを設定する方法を説明します。コネクターは、Apache Kafka Connect を使用して AMQ Streams クラスターにデプロイされます。Kafka Connect は Apache Kafka と外部システムとの間でデータをストリーミングするためのフレームワークです。Kafka Connect は、スタンドアロンモードではなく分散モードで実行する必要があります。

前提条件

注記

以前のバージョンの AMQ Streams を実行している場合は、最初に AMQ Streams 2.6 にアップグレードする必要があります。アップグレードプロセスの詳細は、AMQ Streams および Kafka のアップグレード を参照してください。

  • ホストの管理者権限 (sudo アクセス) がある。
  • Apache ZooKeeper と Apache Kafka ブローカーが実行している。
  • Kafka Connect が、スタンドアロンモードではなく、分散モード で実行されている。
  • AMQ Streams のインストール時に作成された kafka ユーザーの認証情報を把握している。
  • ソースデータベースがデプロイされ、Debezium をデプロイするホストがデータベースにアクセスできる。
  • コネクターの設定方法 を理解している。

手順

  1. Debezium コネクターまたは使用するコネクターを Red Hat Integration ダウンロードサイト からダウンロードします。たとえば、Debezium を MySQL データベースで使用するには、Debezium 2.5.4 MySQL コネクター をダウンロードします。
  2. AMQ Streams をデプロイした Red Hat Enterprise Linux ホストで、ターミナルウィンドウを開き、/opt/kafkaconnector-plugins ディレクトリーを作成します (まだ存在しない場合)。

    $ sudo mkdir /opt/kafka/connector-plugins
  3. 次のコマンドを入力して、/opt/kafka/connector-plugins ディレクトリーにダウンロードした Debezium コネクターアーカイブの内容を抽出します。

    $ sudo unzip debezium-connector-mysql-2.5.4.Final.zip -d /opt/kafka/connector-plugins
  4. インストールするコネクターごとに、手順 1 - 3 を繰り返します。
  5. ターミナルウィンドウから、kafka ユーザーとしてサインインします。

    $ su - kafka
    $ Password:
  6. Kafka Connect プロセスが実行中の場合は、停止します。

    1. 次のコマンドを入力して、Kafka Connect が分散モードで実行されているかどうかを確認します。

      $ jcmd | grep ConnectDistributed

      プロセスが実行中の場合、コマンドはプロセス ID を返します。次に例を示します。

      18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
    2. プロセス ID を指定して kill コマンドを入力し、プロセスを停止します。次に例を示します。

      $ kill 18514
  7. /opt/kafka/config/ にある connect-distributed.properties ファイルを編集し、plugin.path の値を Debezium コネクタープラグインの親ディレクトリーの場所に設定します。

    plugin.path=/opt/kafka/connector-plugins
  8. 分散モードで Kafka Connect を起動します。

    $ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
  9. Kafka Connect の実行後、Kafka Connect API を使用してコネクターを登録します。
    curl コマンドを入力して、「Debezium コネクター設定の計画」 で指定したコネクター設定 JSON を localhost:8083/connectors の Kafka Connect REST API エンドポイントに送信する POST リクエストを送信します。
    以下に例を示します。

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "inventory-connector", "config":
    {"connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "table.include.list": "public.inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "dbhistory.inventory" }
    }'

    複数のコネクターを登録するには、コネクターごとに個別のリクエストを送信します。

  10. Kafka Connect を再起動して、変更を実装します。

    Kafka Connect が起動すると、connector-plugins ディレクトリーから、設定済みの Debezium コネクターをロードします。

    設定が完了すると、デプロイされたコネクターはソースデータベースに接続し、挿入、更新、または削除された行またはドキュメントごとにイベントを生成します。

  11. 各 Kafka Connect ワーカーノードに対して、ステップ 5 - 10 を繰り返します。

2.4. デプロイメントの確認

コネクターが起動すると、設定されたデータベースのスナップショットが実行され、指定したテーブルごとにトピックが作成されます。

前提条件

  • 「Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ」 の手順に基づいて、Red Hat Enterprise Linux にコネクターをデプロイしています。以下の手順を実行します。

    1. ホストのターミナルウィンドウで、次のコマンドを入力して、Kafka Connect API からコネクターのリストを要求します。

      $ curl -H "Accept:application/json" localhost:8083/connectors/

      クエリーは、デプロイされたコネクターの名前を返します。以下に例を示します。

      ["inventory-connector"]
    2. ホストのターミナルウィンドウで次のコマンドを入力して、コネクターが実行しているタスクを表示します。

      $ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

      コマンドは、以下の例のような出力を返します。

      HTTP/1.1 200 OK
      Date: Thu, 06 Feb 2020 22:12:03 GMT
      Content-Type: application/json
      Content-Length: 531
      Server: Jetty(9.4.20.v20190813)
      
      {
        "name": "inventory-connector",
        ...
        "tasks": [
          {
            "connector": "inventory-connector",
            "task": 0
          }
        ]
      }
    3. Kafka クラスター内のトピックのリストを表示します。
      ターミナルウィンドウから /opt/kafka/bin/ に移動し、以下のシェルスクリプトを実行します。

      ./kafka-topics.sh --bootstrap-server=localhost:9092 --list

      Kafka ブローカーは、コネクターが作成するトピックのリストを返します。使用可能なトピックは、コネクターの snapshot.modesnapshot.include.collection.list、および table.include.list の設定プロパティーの設定によって異なります。デフォルトでは、コネクターはデータベース内の非システムテーブルごとにトピックを作成します。

    4. トピックの内容を表示します。
      ターミナルウィンドウから /opt/kafka/bin/ に移動し、kafka-console-consumer.sh シェルスクリプトを実行して、前のコマンドで返されたトピックの 1 つの内容を表示します。

      以下に例を示します。

      ./kafka-console-consumer.sh \
      >     --bootstrap-server localhost:9092 \
      >     --from-beginning \
      >     --property print.key=true \
      >     --topic=dbserver1.inventory.products_on_hand

      トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。

      例2.1 Debezium 変更イベントの内容

      {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.5.4.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}

      上記の例では、payload 値は、コネクタースナップショットがテーブル inventory.products_on_hand から読み込み ("op" ="r") イベントを生成したことを示しています。product_id レコードの "before" 状態は null であり、レコードに以前の値が存在しないことを示しています。"after" 状態は、product_id 101 を持つ項目の quantity3 であることを示しています。

次の手順

各コネクターで使用できる設定の詳細、および変更データのキャプチャーを有効にするためにソースデータベースを設定する方法は、Debezium ユーザーガイド を参照してください。

2.5. Kafka Connect クラスターの Debezium コネクタープラグインの更新

Red Hat Enterprise Linux にデプロイされているバージョンの Debezium コネクターを置き換えるには、コネクタープラグインを更新します。

手順

  1. 置き換える Debezium コネクタープラグインのコピーを Red Hat Integration ダウンロードサイト からダウンロードします。
  2. Debezium コネクターアーカイブの内容を /opt/kafka/connector-plugins ディレクトリーにデプロイメントします。

    $ sudo unzip debezium-connector-mysql-2.5.4.Final.zip -d /opt/kafka/connector-plugins
  3. Kafka Connect を再起動します。

付録A サブスクリプションの使用

Debezium は、ソフトウェアサブスクリプションを通じて提供されます。サブスクリプションを管理するには、Red Hat カスタマーポータルでアカウントにアクセスします。

アカウントへのアクセス

  1. access.redhat.com に移動します。
  2. アカウントがない場合は作成します。
  3. アカウントにログインします。

サブスクリプションのアクティベート

  1. access.redhat.com に移動します。
  2. Subscriptions に移動します。
  3. Activate a subscription に移動し、16 桁のアクティベーション番号を入力します。

zip および tar ファイルのダウンロード

zip または tar ファイルにアクセスするには、カスタマーポータルを使用して、ダウンロードする関連ファイルを検索します。RPM パッケージを使用している場合、この手順は必要ありません。

  1. ブラウザーを開き、access.redhat.com/downloads で Red Hat カスタマーポータルの Product Downloads ページにログインします。
  2. INTEGRATION AND AUTOMATION まで下方向にスクロールします。
  3. Red Hat Integration をクリックして、Red Hat Integration ダウンロードページを表示します。
  4. コンポーネントの Download リンクをクリックします。

改訂日時: 2024-04-19

法律上の通知

Copyright © 2024 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.