7.2. 分散モードでの Kafka Connect
分散モードでは、Kafka Connect は 1 つまたは複数のワーカーノードで実行され、ワークロードはワーカーノード間で分散されます。コネクタープラグインとその設定は、HTTP REST インターフェイスを使用して管理します。
7.2.1. 分散モードでの Kafka Connect の設定 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect をスタンドアロンモードで設定するには、config/connect-distributed.properties 設定ファイルを編集します。以下のオプションが最も重要です。
bootstrap.servers-
Kafka へのブートストラップ接続として使用される Kafka ブローカーアドレスのリスト。たとえば、
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092です。 key.converter-
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。たとえば、
org.apache.kafka.connect.json.JsonConverterです。 value.converter-
メッセージペイロードを Kafka 形式との間で変換するために使用されるクラス。たとえば、
org.apache.kafka.connect.json.JsonConverterです。 group.id-
分散された Kafka Connect クラスターの名前。これは一意でなければならず、他のコンシューマーグループ ID と競合することはできません。デフォルト値は
connect-clusterです。 config.storage.topic-
コネクター設定の保存に使用される Kafka トピック。デフォルト値は
connect-configsです。 offset.storage.topic-
オフセットを保存するために使用される Kafka トピック。デフォルト値は
connect-offsetです。 status.storage.topic-
ワーカーノードのステータスに使用される Kafka トピック。デフォルト値は
connect-statusです。
AMQ Streams には、分散モードの Kafka Connect の設定ファイル例が含まれています。AMQ Streams のインストールディレクトリーにある config/connect-distributed.properties を参照してください。
コネクタープラグインは、ブートストラップアドレスを使用して Kafka ブローカーへのクライアント接続を開きます。これらの接続を設定するには、標準的な Kafka のプロデューサーとコンシューマーの設定オプションを使用し、producer. または consumer. 接頭辞を付けます。
7.2.2. 分散 Kafka Connect でのコネクターの設定 リンクのコピーリンクがクリップボードにコピーされました!
HTTP REST インターフェイス
分散 Kafka Connect のコネクターは、HTTP REST インターフェイスを使用して設定されます。REST インターフェイスはデフォルトで 8083 番ポートをリッスンします。以下のエンドポイントをサポートします。
GET /connectors- 既存のコネクターのリストを返します。
POST /connectors- コネクターを作成します。リクエストボディーは、コネクター設定が含まれる JSON オブジェクトである必要があります。
GET /connectors/<name>- 特定のコネクターの情報を取得します。
GET /connectors/<name>/config- 特定のコネクターの設定を取得します。
PUT /connectors/<name>/config- 特定のコネクターの設定を更新します。
GET /connectors/<name>/status- 特定のコネクターのステータスを取得します。
PUT /connectors/<name>/pause- コネクターとそのすべてのタスクを一時停止します。コネクターはメッセージの処理を停止します。
PUT /connectors/<name>/resume- 一時停止されたコネクターを再開します。
POST /connectors/<name>/restart- コネクターに障害が発生した場合に、コネクターを再起動します。
DELETE /connectors/<name>- コネクターを削除します。
GET /connector-plugins- サポートされるすべてのコネクタープラグインのリストを取得します。
コネクター設定
ほとんどの設定オプションはコネクター固有で、コネクターのドキュメントに含まれています。以下のフィールドは、すべてのコネクターで共通しています。
name- コネクターの名前。特定の Kafka Connect インスタンス内で一意である必要があります。
connector.class-
コネクタープラグインのクラス。たとえば、
org.apache.kafka.connect.file.FileStreamSinkConnectorです。 tasks.max- このコネクターによって使用されるタスクの最大数。タスクは、コネクターが作業を並列処理するために使用します。コネクターは、指定された数よりも少ないタスクを作成する場合があります。
key.converter-
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、
org.apache.kafka.connect.json.JsonConverterです。 value.converter-
メッセージペイロードを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、
org.apache.kafka.connect.json.JsonConverterです。
さらに、シンクコネクターには、以下のオプションの 1 つを設定する必要があります。
topics- 入力として使用されるトピックのコンマ区切りリスト。
topics.regex- 入力として使用されるトピックの Java 正規表現。
その他のオプションについては、特定のコネクターのドキュメントを参照してください。
AMQ Streams には、コネクター設定ファイルのサンプルが含まれています。AMQ Streams インストールディレクトリーの config/connect-file-sink.properties および config/connect-file-source.properties にあります。
7.2.3. 分散 Kafka Connect の実行 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Kafka Connect を分散モードで設定および実行する方法を説明します。
前提条件
- インストールされ、実行されている AMQ Streams クラスター。
クラスターの実行
すべての Kafka Connect ワーカーノードで
/opt/kafka/config/connect-distributed.propertiesKafka Connect 設定ファイルを編集します。-
bootstrap.serverオプションを設定して、Kafka ブローカーを示すようにします。 -
group.idオプションを設定します。 -
config.storage.topicオプションを設定します。 -
offset.storage.topicオプションを設定します。 status.storage.topicオプションを設定します。以下に例を示します。
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
-
すべての Kafka Connect ワーカーノードで
/opt/kafka/config/connect-distributed.propertiesKafka Connect ワーカーを起動します。su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.propertiesKafka Connect が実行されていることを確認します。
jcmd | grep ConnectDistributed
7.2.4. コネクターの作成 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Kafka Connect REST API を使用して分散モードで Kafka Connect で使用するコネクタープラグインを作成する方法を説明します。
前提条件
- 分散モードで実行する Kafka Connect インストール。
手順
コネクター設定で JSON ペイロードを準備します。以下に例を示します。
{ "name": "my-connector", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "my-topic-1,my-topic-2", "file": "/tmp/output-file.txt" } }POST リクエストを
<KafkaConnectAddress>:8083/connectorsに送信してコネクターを作成します。以下の例では、curlを使用します。curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors<KafkaConnectAddress>:8083/connectorsに GET リクエストを送信して、コネクターがデプロイされたことを確認します。以下の例では、curlを使用します。curl http://connect0.my-domain.com:8083/connectors
7.2.5. コネクターの削除 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Kafka Connect REST API を使用して分散モードの Kafka Connect からコネクタープラグインを削除する方法を説明します。
前提条件
- 分散モードで実行する Kafka Connect インストール。
コネクターの削除
<KafkaConnectAddress>:8083/connectors/<ConnectorName>にGETリクエストを送信して、コネクターが存在することを確認します。以下の例では、curlを使用します。curl http://connect0.my-domain.com:8083/connectorsコネクターを削除するには、
DELETEリクエストを<KafkaConnectAddress>:8083/connectorsに送信します。以下の例では、curlを使用します。curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector<KafkaConnectAddress>:8083/connectorsに GET リクエストを送信して、コネクターが削除されたことを確認します。以下の例では、curlを使用します。curl http://connect0.my-domain.com:8083/connectors