8.3. コネクターの管理


Kafka Connect REST API は、コネクターを直接作成、更新、削除するためのエンドポイントを提供します。API を使用して、コネクターのステータスを確認したり、ログレベルを変更したりすることもできます。API を通じてコネクターを作成する場合は、API 呼び出しの一部としてコネクターの設定の詳細を指定します。

コネクターをプラグインとして追加および管理することもできます。プラグインは、Kafka Connect API を通じてコネクターを実装するためのクラスを含む JAR ファイルとしてパッケージ化されます。クラスパスでプラグインを指定するか、Kafka Connect のプラグインパスに追加するだけで、起動時にコネクタープラグインが実行されます。

Kafka Connect REST API またはプラグインを使用してコネクターを管理するだけでなく、Kafka Connect をスタンドアロンモードで実行するときにプロパティーファイルを使用してコネクター設定を追加することもできます。これを行うには、Kafka Connect ワーカープロセスの開始時にプロパティーファイルの場所を指定するだけです。プロパティーファイルには、コネクタークラス、ソースおよび宛先トピック、必要な認証またはシリアル化設定など、コネクターの設定の詳細が含まれている必要があります。

8.3.1. Kafka Connect API へのアクセスの制限

Kafka Connect REST API には、アクセスが認証され、ホスト名/IP アドレスおよびポート番号を含むエンドポイント URL を知っている人なら誰でもアクセスできます。Kafka Connect API へのアクセスを信頼できるユーザーのみに制限して、不正なアクションや潜在的なセキュリティーの問題を防ぐことが重要です。

セキュリティーを強化するために、Kafka Connect API の次のプロパティーを設定することを推奨します。

  • (Kafka 3.4 以降) org.apache.kafka.disallowed.login.modules により、セキュアではないログインモジュールを明確に除外する
  • connector.client.config.override.policyNONE に設定して、コネクター設定が Kafka Connect 設定と、それが使用するコンシューマーおよびプロデューサーをオーバーライドしないようにします。

8.3.2. コネクターの設定

Kafka Connect REST API またはプロパティーファイルを使用して、コネクターインスタンスを作成、管理、監視します。Kafka Connect をスタンドアロンモードまたは分散モードで使用する場合は、REST API を使用できます。Kafka Connect をスタンドアロンモードで使用する場合は、プロパティーファイルを使用できます。

8.3.2.1. Kafka Connect REST API を使用したコネクターの管理

Kafka Connect REST API を使用する場合、PUT または POST HTTP リクエストを Kafka Connect REST API に送信し、リクエスト本文でコネクター設定の詳細を指定することで、コネクターを動的に作成できます。

ヒント

PUT コマンドを使用する場合、これはコネクターの起動と更新に使用するコマンドと同じです。

REST インターフェイスはデフォルトでポート 8083 をリッスンし、次のエンドポイントをサポートします。

GET /connectors
既存のコネクターのリストを返します。
POST /connectors
コネクターを作成します。リクエストボディーは、コネクター設定が含まれる JSON オブジェクトである必要があります。
GET /connectors/<connector_name>
特定のコネクターの情報を取得します。
GET /connectors/<connector_name>/config
特定のコネクターの設定を取得します。
PUT /connectors/<connector_name>/config
特定のコネクターの設定を更新します。
GET /connectors/<connector_name>/status
特定のコネクターのステータスを取得します。
GET /connectors/<connector_name>/tasks
特定のコネクターのタスクのリストを取得する
GET /connectors/<connector_name>/tasks/<task_id>/status
特定のコネクターのタスクのステータスを取得する
PUT /connectors/<connector_name>/pause
コネクターとそのすべてのタスクを一時停止します。コネクターはメッセージの処理を停止します。
PUT /connectors/<connector_name>/stop
コネクターとそのすべてのタスクを停止します。コネクターはメッセージの処理を停止します。コネクター実行の停止は、単に一時停止するよりも長時間停止する場合に適しています。
PUT /connectors/<connector_name>/resume
一時停止されたコネクターを再開します。
POST /connectors/<connector_name>/restart
コネクターに障害が発生した場合に、コネクターを再起動します。
POST /connectors/<connector_name>/tasks/<task_id>/restart
特定のタスクを再起動します。
DELETE /connectors/<connector_name>
コネクターを削除します。
GET /connectors/<connector_name>/topics
特定のコネクターのトピックを取得します。
PUT /connectors/<connector_name>/topics/reset
特定のコネクターのアクティブなトピックのセットを空にします。
GET /connectors/<connector_name>/offsets
コネクターの現在のオフセットを取得します。
DELETE /connectors/<connector_name>/offsets
コネクターのオフセットをリセットします。コネクターは停止状態である必要があります。
PATCH /connectors/<connector_name>/offsets
コネクターのオフセットを調整します (リクエスト内の offset プロパティーを使用)。コネクターは停止状態である必要があります。
GET /connector-plugins
サポートされるすべてのコネクタープラグインのリストを取得します。
GET /connector-plugins/<connector_plugin_type>/config
コネクタープラグインの設定を取得します。
PUT /connector-plugins/<connector_type>/config/validate
コネクター設定を検証します。

8.3.2.2. コネクター設定プロパティーの指定

Kafka Connect コネクターを設定するには、ソースコネクターまたはシンクコネクターの設定の詳細を指定する必要があります。これを行うには 2 つの方法があります。Kafka Connect REST API を使用する方法、JSON を使用して設定を提供する方法、またはプロパティーファイルを使用して設定プロパティーを定義する方法です。コネクターの種類ごとに利用できる特定の設定オプションは異なる場合がありますが、どちらの方法でも必要な設定を指定する柔軟な方法が提供されます。

以下のオプションはすべてのコネクターに適用されます。

name
現在の Kafka Connect インスタンス内で一意である必要があるコネクターの名前。
connector.class
コネクタープラグインのクラス。たとえば、org.apache.kafka.connect.file.FileStreamSinkConnector です。
tasks.max
指定のコネクターが使用できるタスクの最大数。タスクにより、コネクターは並行して作業を実行できます。コネクターは、指定された数よりも少ないタスクを作成する可能性があります。
key.converter
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、org.apache.kafka.connect.json.JsonConverter です。
value.converter
メッセージペイロードを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、org.apache.kafka.connect.json.JsonConverter です。

シンクコネクターに対して次のオプションの少なくとも 1 つを設定する必要があります。

topics
入力として使用されるトピックのコンマ区切りリスト。
topics.regex
入力として使用されるトピックの Java 正規表現。

他のすべてのオプションについては、Apache Kafka ドキュメント のコネクタープロパティーを参照してください。

注記

Streams for Apache Kafka では、サンプルのコネクター設定ファイル (config/connect-file-sink.properties および config/connect-file-source.properties) が Streams for Apache Kafka のインストールディレクトリーに含まれています。

8.3.3. Kafka Connect API を使用したコネクターの作成

Kafka Connect REST API を使用して、Kafka Connect で使用するコネクターを作成します。

前提条件

  • Kafka Connect のインストール。

手順

  1. コネクター設定で JSON ペイロードを準備します。以下に例を示します。

    {
      "name": "my-connector",
      "config": {
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-topic-1,my-topic-2",
        "file": "/tmp/output-file.txt"
      }
    }
  2. POST リクエストを <KafkaConnectAddress>:8083/connectors に送信してコネクターを作成します。以下の例では、curl を使用します。

    curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
  3. <KafkaConnectAddress>:8083/connectors に GET リクエストを送信して、コネクターがデプロイされたことを確認します。以下の例では、curl を使用します。

    curl http://connect0.my-domain.com:8083/connectors

8.3.4. Kafka Connect API を使用したコネクターの削除

Kafka Connect REST API を使用して、Kafka Connect からコネクターを削除します。

前提条件

  • Kafka Connect のインストール。

コネクターの削除

  1. <KafkaConnectAddress>:8083/connectors/<ConnectorName>GET リクエストを送信して、コネクターが存在することを確認します。以下の例では、curl を使用します。

    curl http://connect0.my-domain.com:8083/connectors
  2. コネクターを削除するには、DELETE リクエストを <KafkaConnectAddress>:8083/connectors に送信します。以下の例では、curl を使用します。

    curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
  3. <KafkaConnectAddress>:8083/connectors に GET リクエストを送信して、コネクターが削除されたことを確認します。以下の例では、curl を使用します。

    curl http://connect0.my-domain.com:8083/connectors

8.3.5. コネクタープラグインの追加

Kafka は、コネクター開発の開始点として使用できるサンプルコネクターを提供します。Streams for Apache Kafka には、次のサンプルコネクターが含まれています。

FileStreamSink
Kafka トピックからデータを読み取り、データをファイルに書き込みます。
FileStreamSource
ファイルからデータを読み取り、そのデータを Kafka トピックに送信します。

どちらのコネクターも libs/connect-file-<kafka_version>.redhat-<build>.jar プラグインに含まれています。

Kafka Connect でコネクタープラグインを使用するには、コネクタープラグインをクラスパスに追加するか、Kafka Connect プロパティーファイルでプラグインパスを指定してその場所にプラグインをコピーします。

クラスパスでのサンプルコネクターの指定

CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh

プラグインパスの設定

plugin.path=/opt/kafka/connector-plugins,/opt/connectors

plugin.path 設定オプションには、コンマ区切りのパスのリストを含めることができます。

必要に応じて、さらにコネクタープラグインを追加できます。Kafka Connect は起動時にコネクタープラグインを検索して実行します。

注記

Kafka Connect を分散モードで実行する場合は、すべてのワーカーノードでプラグインを利用できるようにする必要があります。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.