8.2. 分散モードでの Kafka Connect の使用
分散モードでは、Kafka Connect はワーカープロセスのクラスターとして実行され、各ワーカーは個別のノードで実行されます。コネクターはクラスター内の任意のワーカー上で実行できるため、スケーラビリティとフォールトトレランスが向上します。コネクターはワーカーによって管理され、ワーカーは相互に調整して作業を分散し、各コネクターがいつでも単一のノード上で実行されるようにします。
8.2.1. 分散モードでの Kafka Connect の設定
Kafka Connect をスタンドアロンモードで設定するには、config/connect-distributed.properties
設定ファイルを編集します。以下のオプションが最も重要です。
bootstrap.servers
-
Kafka へのブートストラップ接続として使用される Kafka ブローカーアドレスのリスト。たとえば、
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
です。 key.converter
-
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。たとえば、
org.apache.kafka.connect.json.JsonConverter
です。 value.converter
-
メッセージペイロードを Kafka 形式との間で変換するために使用されるクラス。たとえば、
org.apache.kafka.connect.json.JsonConverter
です。 group.id
-
分散された Kafka Connect クラスターの名前。これは一意でなければならず、他のコンシューマーグループ ID と競合することはできません。デフォルト値は
connect-cluster
です。 config.storage.topic
-
コネクター設定の保存に使用される Kafka トピック。デフォルト値は
connect-configs
です。 offset.storage.topic
-
オフセットを保存するために使用される Kafka トピック。デフォルト値は
connect-offset
です。 status.storage.topic
-
ワーカーノードのステータスに使用される Kafka トピック。デフォルト値は
connect-status
です。
Streams for Apache Kafka には、分散モードの Kafka Connect のサンプル設定ファイルが含まれています。Streams for Apache Kafka のインストールディレクトリーにある config/connect-distributed.properties
を参照してください。
コネクタープラグインは、ブートストラップアドレスを使用して Kafka ブローカーへのクライアント接続を開きます。これらの接続を設定するには、標準的な Kafka のプロデューサーとコンシューマーの設定オプションを使用し、producer.
または consumer.
接頭辞を付けます。
8.2.2. 分散モードでの Kafka Connect の起動
Kafka Connect を分散モードで設定して実行します。
前提条件
- Streams for Apache Kafka が 各ホストにインストールされており、設定ファイルが使用可能である。
クラスターの実行
すべての Kafka Connect ワーカーノードで
/opt/kafka/config/connect-distributed.properties
Kafka Connect 設定ファイルを編集します。-
bootstrap.server
オプションを設定して、Kafka ブローカーを示すようにします。 -
group.id
オプションを設定します。 -
config.storage.topic
オプションを設定します。 -
offset.storage.topic
オプションを設定します。 status.storage.topic
オプションを設定します。以下に例を示します。
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
-
すべての Kafka Connect ワーカーノードで
/opt/kafka/config/connect-distributed.properties
Kafka Connect ワーカーを起動します。su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Kafka Connect が実行されていることを確認します。
jcmd | grep ConnectDistributed
- Kafka Connect REST API を使用して コネクターを管理 します。