第9章 Kafka Connect
Kafka Connect は、Apache Kafka と外部システムとの間でデータをストリーミングするためのツールです。スケーラビリティーと信頼性を維持しながら、大量のデータを移動するためのフレームワークが提供されます。通常、Kafka Connect は、Kafka クラスター外部のデータベース、ストレージ、メッセージングシステムと統合するために使用されます。
Kafka Connect は、異なるタイプの外部システムへの接続を実装するコネクタープラグインを使用します。シンクコネクターには、sink および source の 2 つのタイプがあります。シンクコネクターは、Kafka から外部システムにデータをストリーミングします。ソースコネクターは、外部システムから Kafka にデータをストリーミングします。
Kafka Connect はスタンドアロンまたは分散モードで実行できます。
- スタンドアロンモード
- スタンドアロンモードでは、Kafka Connect はプロパティーファイルから読み取られたユーザー定義の設定を持つ単一ノードで実行されます。
- 分散モード
- Distributed モードでは、Kafka Connect は 1 つまたは複数のワーカーノードで実行され、ワークロードはそれらのワーカーノード間で分散されます。HTTP REST インターフェースを使用して、コネクターとその設定を管理します。
9.1. スタンドアロンモードでの Kafka Connect
スタンドアロンモードでは、Kafka Connect は単一ノードで単一のプロセスとして実行されます。プロパティーファイルを使用してスタンドアロンモードの設定を管理します。
9.1.1. スタンドアロンモードでの Kafka Connect の設定
スタンドアロンモードで Kafka Connect を設定するには、config/connect-standalone.properties
設定ファイルを編集します。以下のオプションは最も重要なオプションです。
bootstrap.servers
-
Kafka へのブートストラップ接続として使用される Kafka ブローカーアドレスの一覧。たとえば、
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
のようになります。 key.converter
-
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。たとえば、
org.apache.kafka.connect.json.JsonConverter
のようになります。 value.converter
-
メッセージペイロードの Kafka 形式への変換に使用されるクラス。たとえば、
org.apache.kafka.connect.json.JsonConverter
のようになります。 offset.storage.file.filename
- オフセットデータが保存されるファイルを指定します。
設定ファイルの例は、config/connect-standalone.properties
のインストールディレクトリーで提供されます。サポートされるすべての Kafka Connect 設定オプションの完全リストは、[kafka-connect-configuration-parameters-str] を参照してください。
コネクタープラグインは、ブートストラップアドレスを使用して Kafka ブローカーへのクライアント接続を開きます。これらの接続を設定するには、producer.
または consumer.
で始まる標準の Kafka プロデューサーおよびコンシューマー設定オプションを使用します。
Kafka プロデューサーおよびコンシューマーの設定に関する詳細は、以下を参照してください。
9.1.2. スタンドアロンモードでの Kafka Connect でのコネクターの設定
プロパティーファイルを使用すると、スタンドアロンモードで Kafka Connect のコネクタープラグインを設定できます。ほとんどの設定オプションは、各コネクターに固有のものです。以下のオプションはすべてのコネクターに適用されます。
name
- 現在の Kafka Connect インスタンス内で一意である必要があります。
connector.class
-
コネクタープラグインのクラス。たとえば、
org.apache.kafka.connect.file.FileStreamSinkConnector
のようになります。 tasks.max
- 指定されたコネクターが使用可能なタスクの最大数。タスクを使用すると、コネクターが並行して機能できるようになります。コネクターは指定された値よりも少ないタスクが作成される可能性があります。
key.converter
-
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されるデフォルト値が上書きされます。たとえば、
org.apache.kafka.connect.json.JsonConverter
のようになります。 value.converter
-
メッセージペイロードの Kafka 形式への変換に使用されるクラス。これにより、Kafka Connect 設定によって設定されるデフォルト値が上書きされます。たとえば、
org.apache.kafka.connect.json.JsonConverter
のようになります。
さらに、シンクコネクターの以下のオプションのいずれかを設定する必要があります。
topics
- 入力に使用されるトピックのコンマ区切りリスト。
topics.regex
- 入力に使用されるトピックの Java 正規表現。
他のすべてのオプションは、利用可能なコネクターのドキュメントを参照してください。
AMQ Streams には、コネクター設定ファイルのサンプルが含まれています。AMQ Streams インストールディレクトリーの config/connect-file-sink.properties
および config/connect-file-source.properties
を参照してください。
9.1.3. スタンドアロンモードでの Kafka Connect の実行
この手順では、スタンドアロンモードで Kafka Connect を設定して実行する方法を説明します。
前提条件
- AMQ Streams} クラスターがインストールされ、実行されている。
手順
/opt/kafka/config/connect-standalone.properties
Kafka Connect 設定ファイルを編集し、bootstrap.server
を設定して Kafka ブローカーを示すように設定します。以下に例を示します。bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
設定ファイルを使用して Kafka Connect を開始し、1 つ以上のコネクター設定を指定します。
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
Kafka Connect が実行されていることを確認します。
jcmd | grep ConnectStandalone
関連情報
- AMQ Streams のインストールに関する詳細は、「AMQ Streams のインストール」 を参照してください。
- AMQ Streams の設定に関する詳細は、「AMQ Streams の設定」 を参照してください。
- サポートされる Kafka Connect 設定オプションの完全リストは、付録F Kafka Connect 設定パラメーター を参照してください。