7.2. 分散モードでの Kafka Connect
分散モードでは、Kafka Connect は 1 つまたは複数のワーカーノードで実行され、ワークロードはワーカーノード間で分散されます。コネクタープラグインとその設定は、HTTP REST インターフェイスを使用して管理します。
7.2.1. 分散モードでの Kafka Connect の設定 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect をスタンドアロンモードで設定するには、config/connect-distributed.properties 設定ファイルを編集します。以下のオプションが最も重要です。
bootstrap.servers-
Kafka へのブートストラップ接続として使用される Kafka ブローカーアドレスのリスト。たとえば、
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092です。 key.converter-
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。たとえば、
org.apache.kafka.connect.json.JsonConverterです。 value.converter-
メッセージペイロードを Kafka 形式との間で変換するために使用されるクラス。たとえば、
org.apache.kafka.connect.json.JsonConverterです。 group.id-
分散された Kafka Connect クラスターの名前。これは一意でなければならず、他のコンシューマーグループ ID と競合することはできません。デフォルト値は
connect-clusterです。 config.storage.topic-
コネクター設定の保存に使用される Kafka トピック。デフォルト値は
connect-configsです。 offset.storage.topic-
オフセットを保存するために使用される Kafka トピック。デフォルト値は
connect-offsetです。 status.storage.topic-
ワーカーノードのステータスに使用される Kafka トピック。デフォルト値は
connect-statusです。
AMQ Streams には、分散モードの Kafka Connect の設定ファイル例が含まれています。AMQ Streams のインストールディレクトリーにある config/connect-distributed.properties を参照してください。
サポートされるすべての Kafka Connect 設定オプションの完全リストは、付録 F Kafka Connect 設定パラメーター を参照してください。
コネクタープラグインは、ブートストラップアドレスを使用して Kafka ブローカーへのクライアント接続を開きます。これらの接続を設定するには、標準的な Kafka のプロデューサーとコンシューマーの設定オプションを使用し、producer. または consumer. 接頭辞を付けます。
Kafka プロデューサーおよびコンシューマーの設定に関する詳細は、以下を参照してください。
7.2.2. 分散 Kafka Connect でのコネクターの設定 リンクのコピーリンクがクリップボードにコピーされました!
HTTP REST インターフェイス
分散 Kafka Connect のコネクターは、HTTP REST インターフェイスを使用して設定されます。REST インターフェイスはデフォルトで 8083 番ポートをリッスンします。以下のエンドポイントをサポートします。
GET /connectors- 既存のコネクターのリストを返します。
POST /connectors- コネクターを作成します。リクエストボディーは、コネクター設定が含まれる JSON オブジェクトである必要があります。
GET /connectors/<name>- 特定のコネクターの情報を取得します。
GET /connectors/<name>/config- 特定のコネクターの設定を取得します。
PUT /connectors/<name>/config- 特定のコネクターの設定を更新します。
GET /connectors/<name>/status- 特定のコネクターのステータスを取得します。
PUT /connectors/<name>/pause- コネクターとそのすべてのタスクを一時停止します。コネクターはメッセージの処理を停止します。
PUT /connectors/<name>/resume- 一時停止されたコネクターを再開します。
POST /connectors/<name>/restart- コネクターに障害が発生した場合に、コネクターを再起動します。
DELETE /connectors/<name>- コネクターを削除します。
GET /connector-plugins- サポートされるすべてのコネクタープラグインのリストを取得します。
コネクター設定
ほとんどの設定オプションはコネクター固有で、コネクターのドキュメントに含まれています。以下のフィールドは、すべてのコネクターで共通しています。
name- コネクターの名前。特定の Kafka Connect インスタンス内で一意である必要があります。
connector.class-
コネクタープラグインのクラス。たとえば、
org.apache.kafka.connect.file.FileStreamSinkConnectorです。 tasks.max- このコネクターによって使用されるタスクの最大数。タスクは、コネクターが作業を並列処理するために使用します。コネクターは、指定された数よりも少ないタスクを作成する場合があります。
key.converter-
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、
org.apache.kafka.connect.json.JsonConverterです。 value.converter-
メッセージペイロードを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、
org.apache.kafka.connect.json.JsonConverterです。
さらに、シンクコネクターには、以下のオプションの 1 つを設定する必要があります。
topics- 入力として使用されるトピックのコンマ区切りリスト。
topics.regex- 入力として使用されるトピックの Java 正規表現。
その他のオプションについては、特定のコネクターのドキュメントを参照してください。
AMQ Streams には、コネクター設定ファイルのサンプルが含まれています。AMQ Streams インストールディレクトリーの config/connect-file-sink.properties および config/connect-file-source.properties にあります。
7.2.3. 分散 Kafka Connect の実行 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Kafka Connect を分散モードで設定および実行する方法を説明します。
前提条件
- インストールされ、実行されている AMQ Streams クラスター。
クラスターの実行
すべての Kafka Connect ワーカーノードで
/opt/kafka/config/connect-distributed.propertiesKafka Connect 設定ファイルを編集します。-
bootstrap.serverオプションを設定して、Kafka ブローカーを示すようにします。 -
group.idオプションを設定します。 -
config.storage.topicオプションを設定します。 -
offset.storage.topicオプションを設定します。 status.storage.topicオプションを設定します。以下に例を示します。
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-statusCopy to Clipboard Copied! Toggle word wrap Toggle overflow
-
すべての Kafka Connect ワーカーノードで
/opt/kafka/config/connect-distributed.propertiesKafka Connect ワーカーを起動します。su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnect が実行されていることを確認します。
jcmd | grep ConnectDistributed
jcmd | grep ConnectDistributedCopy to Clipboard Copied! Toggle word wrap Toggle overflow
関連情報
- AMQ Streams のインストールに関する詳細は、「AMQ Streams のインストール」 を参照してください。
- AMQ Streams の設定に関する詳細は、「AMQ Streams の設定」 を参照してください。
- サポートされる Kafka Connect 設定オプションの完全なリストは、付録F Kafka Connect 設定パラメーターを参照してください。
7.2.4. コネクターの作成 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Kafka Connect REST API を使用して分散モードで Kafka Connect で使用するコネクタープラグインを作成する方法を説明します。
前提条件
- 分散モードで実行する Kafka Connect インストール。
手順
コネクター設定で JSON ペイロードを準備します。以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow POST リクエストを
<KafkaConnectAddress>:8083/connectorsに送信してコネクターを作成します。以下の例では、curlを使用します。curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow <KafkaConnectAddress>:8083/connectorsに GET リクエストを送信して、コネクターがデプロイされたことを確認します。以下の例では、curlを使用します。curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow
7.2.5. コネクターの削除 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Kafka Connect REST API を使用して分散モードの Kafka Connect からコネクタープラグインを削除する方法を説明します。
前提条件
- 分散モードで実行する Kafka Connect インストール。
コネクターの削除
<KafkaConnectAddress>:8083/connectors/<ConnectorName>にGETリクエストを送信して、コネクターが存在することを確認します。以下の例では、curlを使用します。curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターを削除するには、
DELETEリクエストを<KafkaConnectAddress>:8083/connectorsに送信します。以下の例では、curlを使用します。curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow <KafkaConnectAddress>:8083/connectorsに GET リクエストを送信して、コネクターが削除されたことを確認します。以下の例では、curlを使用します。curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow