第8章 Kafka Connect
Kafka Connect は、Apache Kafka と外部システムとの間でデータをストリーミングするためのツールです。スケーラビリティーと信頼性を維持しながら大量のデータを移動するためのフレームワークを提供します。Kafka Connect は通常、Kafka を Kafka クラスター外部のデータベース、ストレージ、メッセージングシステムと統合するために使用されます。
Kafka Connect は、異なるタイプの外部システムの接続を実装するコネクタープラグインを使用します。シンクおよびソースには、2 種類のコネクタープラグインがあります。シンクコネクターは、データを Kafka から外部システムにストリーミングします。ソースコネクターは、外部システムから Kafka にデータをストリーミングします。
Kafka Connect は、スタンドアロンまたは分散モードで実行できます。
- スタンドアロンモード
- スタンドアロンモードでは、Kafka Connect はプロパティーファイルから読み取られたユーザー定義の設定を持つ単一ノードで実行されます。
- 分散モード
- Distributed モードでは、Kafka Connect は 1 つまたは複数のワーカーノードで実行され、ワークロードはそれら間に分散されます。HTTP REST インターフェースを使用して、コネクターおよびその設定を管理します。
8.1. スタンドアロンモードでの Kafka Connect
スタンドアロンモードでは、Kafka Connect は単一のノードで単一のプロセスとして実行されます。プロパティーファイルを使用してスタンドアロンモードで設定を管理します。
8.1.1. スタンドアロンモードでの Kafka Connect の設定
スタンドアロンモードで Kafka Connect を設定するには、config/connect-standalone.properties
設定ファイルを編集します。以下のオプションは最も重要なオプションです。
bootstrap.servers
-
Kafka へのブートストラップ接続として使用される Kafka ブローカーアドレスのリスト。例:
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
key.converter
-
メッセージキーの Kafka 形式への変換に使用されるクラス。例:
org.apache.kafka.connect.json.JsonConverter
value.converter
-
メッセージペイロードの Kafka 形式への変換に使用されるクラス。例:
org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename
- オフセットデータが保存されるファイルを指定します。
設定ファイルの例は、config/connect-standalone.properties
のインストールディレクトリーにあります。サポートされる Kafka Connect 設定オプションの完全リストは、[kafka-connect-configuration-parameters-str] を参照してください。
コネクタープラグインは、ブートストラップアドレスを使用して Kafka ブローカーへのクライアント接続を開きます。これらの接続を設定するには、producer.
または consumer.
で始まる標準の Kafka プロデューサーおよびコンシューマー設定オプションを使用します。
Kafka プロデューサーおよびコンシューマーの設定に関する詳細は、以下を参照してください。
8.1.2. スタンドアロンモードでの Kafka Connect でのコネクターの設定
プロパティーファイルを使用すると、スタンドアロンモードで Kafka Connect のコネクタープラグインを設定できます。ほとんどの設定オプションは各コネクターに固有のものです。以下のオプションはすべてのコネクターに適用されます。
name
- コネクターの名前。現在の Kafka Connect インスタンス内で一意である必要があります。
connector.class
-
コネクタープラグインのクラス。例:
org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max
- 指定のコネクターが使用できるタスクの最大数。タスクを使用すると、コネクターが並行して作業を実行できるようにします。コネクターによって指定の数よりも少ないタスクが作成される可能性があります。
key.converter
-
メッセージキーの Kafka 形式への変換に使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値が上書きされます。例:
org.apache.kafka.connect.json.JsonConverter
value.converter
-
メッセージペイロードの Kafka 形式への変換に使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値が上書きされます。例:
org.apache.kafka.connect.json.JsonConverter
さらに、シンクコネクターに以下のオプションのいずれかを設定する必要があります。
topics
- 入力として使用されるトピックのコンマ区切りリスト。
topics.regex
- 入力として使用するトピックの Java 正規表現。
その他のオプションは、使用できるコネクターのドキュメントを参照してください。
AMQ Streams には、コネクター設定ファイルのサンプルが含まれています。AMQ Streams インストールディレクトリーの config/connect-file-sink.properties
および config/connect-file-source.properties
を参照してください。
8.1.3. スタンドアロンモードでの Kafka Connect の実行
この手順では、スタンドアロンモードで Kafka Connect を設定し、実行する方法を説明します。
前提条件
- AMQ Streams} クラスターがインストールされ、実行している。
手順
/opt/kafka/config/connect-standalone.properties
Kafka Connect 設定ファイルを編集し、bootstrap.server
が Kafka ブローカーを示すように設定します。以下に例を示します。bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
設定ファイルで Kafka Connect を起動し、1 つ以上のコネクター設定を指定します。
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
Kafka Connect が稼働していることを確認します。
jcmd | grep ConnectStandalone
その他のリソース
- AMQ Streams のインストールに関する詳細は、「AMQ Streams のインストール」 を参照してください。
- AMQ Streams の設定に関する詳細は、「AMQ Streams の設定」 を参照してください。
- サポートされる Kafka Connect 設定オプションの完全リストは、付録F Kafka Connect 設定パラメーター を参照してください。