2.3. Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ
この手順では、Red Hat Enterprise Linux で Debezium のコネクターを設定する方法を説明します。コネクターは、Apache Kafka Connect を使用して AMQ Streams クラスターにデプロイされます。Kafka Connect は Apache Kafka と外部システムとの間でデータをストリーミングするためのフレームワークです。Kafka Connect は、スタンドアロンモードではなく分散モードで実行する必要があります。
前提条件
Debezium のデプロイ先であるホスト環境が、サポートされている設定 で Red Hat Enterprise Linux、AMQ Streams、および Java を実行している。
- AMQ Streams のインストール方法の詳細は、AMQ Streams のインストール を参照してください。
- 単一の ZooKeeper ノードおよび単一の Kafka ノードが含まれる実稼働ではない、基本的な AMQ Streams クラスターをインストールする方法の詳細は、単一ノードの AMQ Streams クラスターの実行 を参照してください。
以前のバージョンの AMQ Streams を実行している場合は、最初に AMQ Streams 2.6 にアップグレードする必要があります。アップグレードプロセスの詳細は、AMQ Streams および Kafka のアップグレード を参照してください。
-
ホストの管理者権限 (
sudo
アクセス) がある。 - Apache ZooKeeper と Apache Kafka ブローカーが実行している。
- Kafka Connect が、スタンドアロンモードではなく、分散モード で実行されている。
-
AMQ Streams のインストール時に作成された
kafka
ユーザーの認証情報を把握している。 - ソースデータベースがデプロイされ、Debezium をデプロイするホストがデータベースにアクセスできる。
- コネクターの設定方法 を理解している。
手順
- Debezium コネクターまたは使用するコネクターを Red Hat Integration ダウンロードサイト からダウンロードします。たとえば、Debezium を MySQL データベースで使用するには、Debezium 2.5.4 MySQL コネクター をダウンロードします。
AMQ Streams をデプロイした Red Hat Enterprise Linux ホストで、ターミナルウィンドウを開き、
/opt/kafka
にconnector-plugins
ディレクトリーを作成します (まだ存在しない場合)。$ sudo mkdir /opt/kafka/connector-plugins
次のコマンドを入力して、
/opt/kafka/connector-plugins
ディレクトリーにダウンロードした Debezium コネクターアーカイブの内容を抽出します。$ sudo unzip debezium-connector-mysql-2.5.4.Final.zip -d /opt/kafka/connector-plugins
- インストールするコネクターごとに、手順 1 - 3 を繰り返します。
ターミナルウィンドウから、
kafka
ユーザーとしてサインインします。$ su - kafka $ Password:
Kafka Connect プロセスが実行中の場合は、停止します。
次のコマンドを入力して、Kafka Connect が分散モードで実行されているかどうかを確認します。
$ jcmd | grep ConnectDistributed
プロセスが実行中の場合、コマンドはプロセス ID を返します。次に例を示します。
18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
プロセス ID を指定して
kill
コマンドを入力し、プロセスを停止します。次に例を示します。$ kill 18514
/opt/kafka/config/
にあるconnect-distributed.properties
ファイルを編集し、plugin.path
の値を Debezium コネクタープラグインの親ディレクトリーの場所に設定します。plugin.path=/opt/kafka/connector-plugins
分散モードで Kafka Connect を起動します。
$ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Kafka Connect の実行後、Kafka Connect API を使用してコネクターを登録します。
curl
コマンドを入力して、「Debezium コネクター設定の計画」 で指定したコネクター設定 JSON をlocalhost:8083/connectors
の Kafka Connect REST API エンドポイントに送信するPOST
リクエストを送信します。
以下に例を示します。curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "inventory-connector", "config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "table.include.list": "public.inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "dbhistory.inventory" } }'
複数のコネクターを登録するには、コネクターごとに個別のリクエストを送信します。
Kafka Connect を再起動して、変更を実装します。
Kafka Connect が起動すると、
connector-plugins
ディレクトリーから、設定済みの Debezium コネクターをロードします。設定が完了すると、デプロイされたコネクターはソースデータベースに接続し、挿入、更新、または削除された行またはドキュメントごとにイベントを生成します。
- 各 Kafka Connect ワーカーノードに対して、ステップ 5 - 10 を繰り返します。
次のステップ
関連情報