2.3. Red Hat Enterprise Linux 上の Streams for Apache Kafka を使用した Debezium のデプロイ
この手順では、Red Hat Enterprise Linux で Debezium のコネクターを設定する方法を説明します。コネクターは、Apache Kafka と外部システム間でデータをストリーミングするためのフレームワークである Apache Kafka Connect を使用して、Streams for Apache Kafka クラスターにデプロイされます。Kafka Connect は、スタンドアロンモードではなく分散モードで実行する必要があります。
前提条件
Debezium のデプロイ先であるホスト環境が、サポートされている設定 で Red Hat Enterprise Linux、AMQ Streams、および Java を実行している。
- Streams for Apache Kafka のインストール方法に関する詳細は、ZooKeeper を使用した RHEL 上の Streams for Apache Kafka の使用 を参照してください。
- 単一の ZooKeeper ノードと単一の Kafka ノードを含む基本的な非実稼働環境の Streams for Apache Kafka クラスターをインストールする方法は、「ZooKeeper を使用した RHEL 上の Streams for Apache Kafka の使用」の シングルノード Kafka クラスターの実行 を参照してください。
以前のバージョンの AMQ Streams を実行している場合は、まず Streams for Apache Kafka 2.7 にアップグレードする必要があります。アップグレードプロセスの詳細は、Streams for Apache Kafka および Kafka のアップグレード を参照してください。
-
ホストの管理者権限 (
sudo
アクセス) がある。 - Apache ZooKeeper と Apache Kafka ブローカーが実行している。
- Kafka Connect が、スタンドアロンモードではなく、分散モード で実行されている。
-
Streams for Apache Kafka のインストール時に作成された
kafka
ユーザーの認証情報を把握している。 - ソースデータベースがデプロイされ、Debezium をデプロイするホストがデータベースにアクセスできる。
- コネクターの設定方法 を理解している。
手順
- 使用する Debezium コネクターを ソフトウェアダウンロード サイトからダウンロードします。たとえば、Debezium を MySQL データベースで使用するには、Debezium 2.7.3 MySQL Connector をダウンロードします。
Streams for Apache Kafka をデプロイした Red Hat Enterprise Linux ホストで、ターミナルウィンドウを開き、
/opt/kafka
にconnector-plugins
ディレクトリーを作成します (まだ存在しない場合)。$ sudo mkdir /opt/kafka/connector-plugins
次のコマンドを入力して、
/opt/kafka/connector-plugins
ディレクトリーにダウンロードした Debezium コネクターアーカイブの内容を抽出します。$ sudo unzip debezium-connector-mysql-2.7.3.Final.zip -d /opt/kafka/connector-plugins
- インストールするコネクターごとに、手順 1 - 3 を繰り返します。
ターミナルウィンドウから、
kafka
ユーザーとしてサインインします。$ su - kafka $ Password:
Kafka Connect プロセスが実行中の場合は、停止します。
次のコマンドを入力して、Kafka Connect が分散モードで実行されているかどうかを確認します。
$ jcmd | grep ConnectDistributed
プロセスが実行中の場合、コマンドはプロセス ID を返します。次に例を示します。
18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
プロセス ID を指定して
kill
コマンドを入力し、プロセスを停止します。次に例を示します。$ kill 18514
/opt/kafka/config/
にあるconnect-distributed.properties
ファイルを編集し、plugin.path
の値を Debezium コネクタープラグインの親ディレクトリーの場所に設定します。plugin.path=/opt/kafka/connector-plugins
分散モードで Kafka Connect を起動します。
$ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Kafka Connect の実行後、Kafka Connect API を使用してコネクターを登録します。
curl
コマンドを入力して、「Debezium コネクター設定の計画」 で指定したコネクター設定 JSON をlocalhost:8083/connectors
の Kafka Connect REST API エンドポイントに送信するPOST
リクエストを送信します。
以下に例を示します。curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "inventory-connector", "config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "table.include.list": "public.inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "dbhistory.inventory" } }'
複数のコネクターを登録するには、コネクターごとに個別のリクエストを送信します。
Kafka Connect を再起動して、変更を実装します。
Kafka Connect が起動すると、
connector-plugins
ディレクトリーから、設定済みの Debezium コネクターをロードします。設定が完了すると、デプロイされたコネクターはソースデータベースに接続し、挿入、更新、または削除された行またはドキュメントごとにイベントを生成します。
- 各 Kafka Connect ワーカーノードに対して、ステップ 5 - 10 を繰り返します。
次のステップ
関連情報