8.3. コネクターの管理
Kafka Connect REST API は、コネクターを直接作成、更新、削除するためのエンドポイントを提供します。API を使用して、コネクターのステータスを確認したり、ログレベルを変更したりすることもできます。API を通じてコネクターを作成する場合は、API 呼び出しの一部としてコネクターの設定の詳細を指定します。
コネクターをプラグインとして追加および管理することもできます。プラグインは、Kafka Connect API を通じてコネクターを実装するためのクラスを含む JAR ファイルとしてパッケージ化されます。クラスパスでプラグインを指定するか、Kafka Connect のプラグインパスに追加するだけで、起動時にコネクタープラグインが実行されます。
Kafka Connect REST API またはプラグインを使用してコネクターを管理するだけでなく、Kafka Connect をスタンドアロンモードで実行するときにプロパティーファイルを使用してコネクター設定を追加することもできます。これを行うには、Kafka Connect ワーカープロセスの開始時にプロパティーファイルの場所を指定するだけです。プロパティーファイルには、コネクタークラス、ソースおよび宛先トピック、必要な認証またはシリアル化設定など、コネクターの設定の詳細が含まれている必要があります。
8.3.1. Kafka Connect API へのアクセスの制限
Kafka Connect REST API には、アクセスが認証され、ホスト名/IP アドレスおよびポート番号を含むエンドポイント URL を知っている人なら誰でもアクセスできます。Kafka Connect API へのアクセスを信頼できるユーザーのみに制限して、不正なアクションや潜在的なセキュリティーの問題を防ぐことが重要です。
セキュリティーを強化するために、Kafka Connect API の次のプロパティーを設定することを推奨します。
-
(Kafka 3.4 以降)
org.apache.kafka.disallowed.login.modules
により、セキュアではないログインモジュールを明確に除外する -
connector.client.config.override.policy
をNONE
に設定して、コネクター設定が Kafka Connect 設定と、それが使用するコンシューマーおよびプロデューサーをオーバーライドしないようにします。
8.3.2. コネクターの設定
Kafka Connect REST API またはプロパティーファイルを使用して、コネクターインスタンスを作成、管理、監視します。Kafka Connect をスタンドアロンモードまたは分散モードで使用する場合は、REST API を使用できます。Kafka Connect をスタンドアロンモードで使用する場合は、プロパティーファイルを使用できます。
8.3.2.1. Kafka Connect REST API を使用したコネクターの管理
Kafka Connect REST API を使用する場合、PUT
または POST
HTTP リクエストを Kafka Connect REST API に送信し、リクエスト本文でコネクター設定の詳細を指定することで、コネクターを動的に作成できます。
PUT
コマンドを使用する場合、これはコネクターの起動と更新に使用するコマンドと同じです。
REST インターフェイスはデフォルトでポート 8083 をリッスンし、次のエンドポイントをサポートします。
GET /connectors
- 既存のコネクターのリストを返します。
POST /connectors
- コネクターを作成します。リクエストボディーは、コネクター設定が含まれる JSON オブジェクトである必要があります。
GET /connectors/<connector_name>
- 特定のコネクターの情報を取得します。
GET /connectors/<connector_name>/config
- 特定のコネクターの設定を取得します。
PUT /connectors/<connector_name>/config
- 特定のコネクターの設定を更新します。
GET /connectors/<connector_name>/status
- 特定のコネクターのステータスを取得します。
GET /connectors/<connector_name>/tasks
- 特定のコネクターのタスクのリストを取得する
GET /connectors/<connector_name>/tasks/<task_id>/status
- 特定のコネクターのタスクのステータスを取得する
PUT /connectors/<connector_name>/pause
- コネクターとそのすべてのタスクを一時停止します。コネクターはメッセージの処理を停止します。
PUT /connectors/<connector_name>/stop
- コネクターとそのすべてのタスクを停止します。コネクターはメッセージの処理を停止します。コネクター実行の停止は、単に一時停止するよりも長時間停止する場合に適しています。
PUT /connectors/<connector_name>/resume
- 一時停止されたコネクターを再開します。
POST /connectors/<connector_name>/restart
- コネクターに障害が発生した場合に、コネクターを再起動します。
POST /connectors/<connector_name>/tasks/<task_id>/restart
- 特定のタスクを再起動します。
DELETE /connectors/<connector_name>
- コネクターを削除します。
GET /connectors/<connector_name>/topics
- 特定のコネクターのトピックを取得します。
PUT /connectors/<connector_name>/topics/reset
- 特定のコネクターのアクティブなトピックのセットを空にします。
GET /connectors/<connector_name>/offsets
- コネクターの現在のオフセットを取得します。
DELETE /connectors/<connector_name>/offsets
- コネクターのオフセットをリセットします。コネクターは停止状態である必要があります。
PATCH /connectors/<connector_name>/offsets
-
コネクターのオフセットを調整します (リクエスト内の
offset
プロパティーを使用)。コネクターは停止状態である必要があります。 GET /connector-plugins
- サポートされるすべてのコネクタープラグインのリストを取得します。
GET /connector-plugins/<connector_plugin_type>/config
- コネクタープラグインの設定を取得します。
PUT /connector-plugins/<connector_type>/config/validate
- コネクター設定を検証します。
8.3.2.2. コネクター設定プロパティーの指定
Kafka Connect コネクターを設定するには、ソースコネクターまたはシンクコネクターの設定の詳細を指定する必要があります。これを行うには 2 つの方法があります。Kafka Connect REST API を使用する方法、JSON を使用して設定を提供する方法、またはプロパティーファイルを使用して設定プロパティーを定義する方法です。コネクターの種類ごとに利用できる特定の設定オプションは異なる場合がありますが、どちらの方法でも必要な設定を指定する柔軟な方法が提供されます。
以下のオプションはすべてのコネクターに適用されます。
name
- 現在の Kafka Connect インスタンス内で一意である必要があるコネクターの名前。
connector.class
-
コネクタープラグインのクラス。たとえば、
org.apache.kafka.connect.file.FileStreamSinkConnector
です。 tasks.max
- 指定のコネクターが使用できるタスクの最大数。タスクにより、コネクターは並行して作業を実行できます。コネクターは、指定された数よりも少ないタスクを作成する可能性があります。
key.converter
-
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、
org.apache.kafka.connect.json.JsonConverter
です。 value.converter
-
メッセージペイロードを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、
org.apache.kafka.connect.json.JsonConverter
です。
シンクコネクターに対して次のオプションの少なくとも 1 つを設定する必要があります。
topics
- 入力として使用されるトピックのコンマ区切りリスト。
topics.regex
- 入力として使用されるトピックの Java 正規表現。
他のすべてのオプションについては、Apache Kafka ドキュメント のコネクタープロパティーを参照してください。
Streams for Apache Kafka では、サンプルのコネクター設定ファイル (config/connect-file-sink.properties
および config/connect-file-source.properties
) が Streams for Apache Kafka のインストールディレクトリーに含まれています。
8.3.3. Kafka Connect API を使用したコネクターの作成
Kafka Connect REST API を使用して、Kafka Connect で使用するコネクターを作成します。
前提条件
- Kafka Connect のインストール。
手順
コネクター設定で JSON ペイロードを準備します。以下に例を示します。
{ "name": "my-connector", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "my-topic-1,my-topic-2", "file": "/tmp/output-file.txt" } }
POST リクエストを
<KafkaConnectAddress>:8083/connectors
に送信してコネクターを作成します。以下の例では、curl
を使用します。curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
<KafkaConnectAddress>:8083/connectors
に GET リクエストを送信して、コネクターがデプロイされたことを確認します。以下の例では、curl
を使用します。curl http://connect0.my-domain.com:8083/connectors
8.3.4. Kafka Connect API を使用したコネクターの削除
Kafka Connect REST API を使用して、Kafka Connect からコネクターを削除します。
前提条件
- Kafka Connect のインストール。
コネクターの削除
<KafkaConnectAddress>:8083/connectors/<ConnectorName>
にGET
リクエストを送信して、コネクターが存在することを確認します。以下の例では、curl
を使用します。curl http://connect0.my-domain.com:8083/connectors
コネクターを削除するには、
DELETE
リクエストを<KafkaConnectAddress>:8083/connectors
に送信します。以下の例では、curl
を使用します。curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
<KafkaConnectAddress>:8083/connectors
に GET リクエストを送信して、コネクターが削除されたことを確認します。以下の例では、curl
を使用します。curl http://connect0.my-domain.com:8083/connectors
8.3.5. コネクタープラグインの追加
Kafka は、コネクター開発の開始点として使用できるサンプルコネクターを提供します。Streams for Apache Kafka には、次のサンプルコネクターが含まれています。
- FileStreamSink
- Kafka トピックからデータを読み取り、データをファイルに書き込みます。
- FileStreamSource
- ファイルからデータを読み取り、そのデータを Kafka トピックに送信します。
どちらのコネクターも libs/connect-file-<kafka_version>.redhat-<build>.jar
プラグインに含まれています。
Kafka Connect でコネクタープラグインを使用するには、コネクタープラグインをクラスパスに追加するか、Kafka Connect プロパティーファイルでプラグインパスを指定してその場所にプラグインをコピーします。
クラスパスでのサンプルコネクターの指定
CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh
プラグインパスの設定
plugin.path=/opt/kafka/connector-plugins,/opt/connectors
plugin.path
設定オプションには、コンマ区切りのパスのリストを含めることができます。
必要に応じて、さらにコネクタープラグインを追加できます。Kafka Connect は起動時にコネクタープラグインを検索して実行します。
Kafka Connect を分散モードで実行する場合は、すべてのワーカーノードでプラグインを利用できるようにする必要があります。