10.9. Kafka Connect コネクターの設定


KafkaConnector リソースは、Cluster Operator によるコネクターの管理に OpenShift ネイティブのアプローチを提供します。KafkaConnector リソースを使用してコネクターを作成、削除、または再設定するには、KafkaConnect カスタムリソースで use-connector-resources アノテーションを true に設定する必要があります。

KafkaConnectors を有効にするためのアノテーション

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...
Copy to Clipboard Toggle word wrap

KafkaConnect 設定で use-connector-resources アノテーションが有効になっている場合は、KafkaConnector リソースを使用してコネクターを定義および管理する必要があります。

注記

または、KafkaConnector リソースの代わりに Kafka Connect REST API を使用してコネクターを管理することもできます。API を使用するには、KafkaConnect リソースで KafkaConnector リソースを使用するために strimzi.io/use-connector-resources アノテーションを削除する必要があります。

KafkaConnector リソースは、Kafka Connect クラスター内でコネクターを作成するために必要な設定を提供します。Kafka Connect クラスターは、KafkaConnect 設定で指定されたとおりに Kafka クラスターとやり取りします。Kafka クラスターは、Streams for Apache Kafka によって管理する必要はなく、OpenShift クラスターにデプロイする必要もありません。

同じ OpenShift クラスター内に含まれる Kafka コンポーネント

Kafka and Kafka Connect clusters

設定では、必要な認証方法など、コネクターインスタンスが外部データシステムとやり取りする方法も指定します。さらに、監視するデータを定義する必要があります。たとえば、データベースからデータを読み取るソースコネクターでは、設定にデータベース名を含める場合があります。ターゲットトピック名を指定して、このデータを Kafka のどこに配置するかを定義することもできます。

タスクの最大数を指定するには、tasksMax プロパティーを使用します。たとえば、ソースコネクターに tasksMax: 2 を使用すると、ソースデータのインポートを 2 つのタスクに分割できます。

ソースコネクター設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector  
1

  labels:
    strimzi.io/cluster: my-connect-cluster 
2

spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector 
3

  tasksMax: 2 
4

  autoRestart: 
5

    enabled: true
  config: 
6

    file: "/opt/kafka/LICENSE" 
7

    topic: my-topic 
8

    # ...
Copy to Clipboard Toggle word wrap

1
コネクターの名前として使用される KafkaConnector リソースの名前。OpenShift リソースで有効な名前を使用します。
2
コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
3
コネクタークラスのフルネーム。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
4
コネクターが作成できる Kafka Connect タスクの最大数。
5
失敗したコネクターとタスクの自動再起動を有効にします。デフォルトでは、再起動の回数は無制限ですが、maxRestarts プロパティーを使用して自動再起動の最大回数を設定できます。
6
キーと値のペア形式の コネクター設定
7
外部データファイルの場所。この例では、/opt/kafka/LICENSE ファイルから読み取るように FileStreamSourceConnector を設定しています。
8
ソースデータのパブリッシュ先となる Kafka トピック。

外部コネクター設定 (シークレットに保存されているユーザーアクセス認証情報など) を含めるには、KafkaConnect リソースの template プロパティーを使用します。設定プロバイダー を使用して値を読み込むこともできます。

10.9.1. Kafka Connect コネクターの手動停止または一時停止

KafkaConnector リソースを使用してコネクターを設定している場合は、state 設定を使用してコネクターを停止または一時停止します。コネクターとタスクがインスタンス化されたままになる一時停止状態とは対照的に、コネクターを停止すると設定のみが保持され、アクティブなプロセスは保持されません。コネクター実行の停止は、単に一時停止するよりも長時間停止する場合に適しています。一時停止されたコネクターはすぐに再開されますが、停止されたコネクターにはメモリーとリソースが解放されるという利点があります。

注記

state 設定は、KafkaConnectorSpec スキーマの (非推奨) pause 設定を置き換えるもので、コネクターでの一時停止を許可します。これまでに pause 設定を使用してコネクターを一時停止した場合には、競合の回避目的にのみ state 設定の使用に移行することを推奨します。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 一時停止または停止するコネクターを制御する KafkaConnector カスタムリソースの名前を見つけます。

    oc get KafkaConnector
    Copy to Clipboard Toggle word wrap
  2. KafkaConnector リソースを編集して、コネクターを停止または一時停止します。

    Kafka Connect コネクターを停止する設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector
      tasksMax: 2
      config:
        file: "/opt/kafka/LICENSE"
        topic: my-topic
      state: stopped
      # ...
    Copy to Clipboard Toggle word wrap

    state 設定を stopped または paused に変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態は running です。

  3. 変更を KafkaConnector 設定に適用します。

    staterunning に変更するか、設定を削除して、コネクターを再開できます。

注記

または、Kafka Connect API を公開 し、stop エンドポイントと pause エンドポイントを使用してコネクターの実行を停止することもできます。たとえば、PUT /connectors/<connector_name>/stop などです。その後、resume エンドポイントを使用して再起動できます。

10.9.2. Kafka Connect コネクターの手動での再起動

KafkaConnector リソースを使用してコネクターを管理している場合は、strimzi.io/restart アノテーションを使用してコネクターの再起動を手動でトリガーします。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 再起動する Kafka コネクターを制御する KafkaConnector カスタムリソースの名前を見つけます。

    oc get KafkaConnector
    Copy to Clipboard Toggle word wrap
  2. OpenShift で KafkaConnector リソースにアノテーションを付けて、コネクターを再起動します。

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
    Copy to Clipboard Toggle word wrap

    restart アノテーションは true に設定されています。

  3. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。

    アノテーションが調整プロセスで検出されれば、Kafka コネクターは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは KafkaConnector カスタムリソースから削除されます。

10.9.3. Kafka Connect コネクタータスクの手動での再起動

KafkaConnector リソースを使用してコネクターを管理している場合は、strimzi.io/restart-task アノテーションを使用して、コネクタータスクの再起動を手動でトリガーします。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 再起動する Kafka コネクタータスクを制御する KafkaConnector カスタムリソースの名前を見つけます。

    oc get KafkaConnector
    Copy to Clipboard Toggle word wrap
  2. KafkaConnector カスタムリソースから再起動するタスクの ID を検索します。

    oc describe KafkaConnector <kafka_connector_name>
    Copy to Clipboard Toggle word wrap

    タスク ID は 0 から始まる負の値ではない整数です。

  3. OpenShift で KafkaConnector リソースにアノテーションを付けて、ID を使用してコネクタータスクを再開します。

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
    Copy to Clipboard Toggle word wrap

    この例では、タスク 0 が再起動されます。

  4. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。

    アノテーションが調整プロセスで検出されれば、Kafka コネクタータスクは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは KafkaConnector カスタムリソースから削除されます。

10.9.4. コネクターオフセットのリスト表示

KafkaConnector リソースを使用してコネクターのオフセットを追跡するには、listOffsets 設定を追加します。データの流れを追跡するオフセットは、設定で指定された config map に書き込まれます。config map が存在しない場合は、Streams for Apache Kafka が作成します。

設定が完了したら、KafkaConnector リソースにアノテーションを付けて、リストを config map に書き込みます。

シンクコネクターは Kafka の標準コンシューマーオフセットメカニズムを使用し、ソースコネクターは Kafka トピック内にカスタム形式でオフセットを保存します。

  • シンクコネクターの場合、リストには Kafka トピックパーティションと、各パーティションの最後にコミットされたオフセットが表示されます。
  • ソースコネクターの場合、リストにはソースシステムのパーティションと最後に処理されたオフセットが表示されます。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. コネクターの KafkaConnector リソースを、listOffsets 設定が含まれるように編集します。

    オフセットをリストするための設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      listOffsets:
        toConfigMap: 
    1
    
          name: my-connector-offsets 
    2
    
      # ...
    Copy to Clipboard Toggle word wrap

    1
    オフセットのリストが書き込まれる config map への参照。
    2
    config map の名前。この例では my-connector-offsets という名前です。
  2. KafkaConnector リソースにアノテーションを付けて、リストを config map に書き込むコマンドを実行します。

    oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n <namespace>
    Copy to Clipboard Toggle word wrap

    アノテーションは、リスト操作が成功するか、リソースから手動で削除されるまで残ります。

  3. KafkaConnector リソースを更新した後に、次のコマンドを使用して、オフセットを含む config map が作成されたか確認します。

    oc get configmap my-connector-offsets -n <namespace>
    Copy to Clipboard Toggle word wrap
  4. config map の内容を調べて、オフセットがリストされていることを確認します。

    oc describe configmap my-connector-offsets -n <namespace>
    Copy to Clipboard Toggle word wrap

    Streams for Apache Kafka は、オフセット情報を offsets.json プロパティーに置きます。既存 config map の更新時に他のプロパティーは上書きされません。

    ソースコネクターオフセットリストの例

    apiVersion: v1
    kind: ConfigMap
    metadata:
      # ...
      ownerReferences: 
    1
    
      - apiVersion: kafka.strimzi.io/v1beta2
        blockOwnerDeletion: false
        controller: false
        kind: KafkaConnector
        name: my-source-connector
        uid: 637e3be7-bd96-43ab-abde-c55b4c4550e0
      resourceVersion: "66951"
      uid: 641d60a9-36eb-4f29-9895-8f2c1eb9638e
    data:
      offsets.json: |-
        {
          "offsets" : [ {
            "partition" : {
              "filename" : "/data/myfile.txt" 
    2
    
            },
            "offset" : {
              "position" : 15295 
    3
    
            }
          } ]
        }
    Copy to Clipboard Toggle word wrap

    1
    ソースコネクターの KafkaConnector リソースを指す所有者参照。カスタムの所有者参照を指定するには、事前に config map を作成して所有者参照を設定します。
    2
    この例でファイルベースのコネクターのファイル名 /data/myfile.txt で表されるソースパーティション。
    3
    ソースパーティション内の最後に処理されたオフセットの位置。

    シンクコネクターオフセットリストの例

    apiVersion: v1
    kind: ConfigMap
    metadata:
      # ...
      ownerReferences: 
    1
    
      - apiVersion: kafka.strimzi.io/v1beta2
        blockOwnerDeletion: false
        controller: false
        kind: KafkaConnector
        name: my-sink-connector
        uid: 84a29d7f-77e6-43ac-bfbb-719f9b9a4b3b
      resourceVersion: "79241"
      uid: 721e30bc-23df-41a2-9b48-fb2b7d9b042c
    data:
      offsets.json: |-
        {
          "offsets": [
            {
              "partition": {
                "kafka_topic": "my-topic", 
    2
    
                "kafka_partition": 2 
    3
    
              },
              "offset": {
                "kafka_offset": 4 
    4
    
              }
            }
          ]
        }
    Copy to Clipboard Toggle word wrap

    1
    シンクコネクターの KafkaConnector リソースを指す所有者参照。
    2
    シンクコネクターが使用する Kafka トピック。
    3
    Kafka トピックのパーティション。
    4
    このトピックとパーティションの、最後にコミットされた Kafka オフセット。

10.9.5. コネクターオフセットの変更

KafkaConnector リソースを使用してコネクターのオフセットを変更するには、コネクターを停止するようにリソースを設定し、alterOffsets 設定を追加して config map でオフセットの変更を指定します。オフセットをリストする ために使用したのと同じ config map を再利用できます。

コネクターが停止し、設定が完了したら、オフセットの変更を適用するように KafkaConnector リソースにアノテーションを付け、コネクターを再起動します。

コネクターオフセットの変更は、たとえば poison レコードをスキップしたり、レコードを再生したりする場合に役立ちます。

この手順では、my-source-connector という名前のソースコネクターのオフセット位置を変更します。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. KafkaConnector リソースを編集してコネクターを停止し、alterOffsets 設定を追加します。

    コネクターを停止してオフセットを変更する設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      state: stopped 
    1
    
      alterOffsets:
        fromConfigMap: 
    2
    
          name: my-connector-offsets 
    3
    
      # ...
    Copy to Clipboard Toggle word wrap

    1
    コネクターの状態を stopped に変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態は running です。
    2
    更新を提供する config map への参照。
    3
    config map の名前。この例では my-connector-offsets という名前です。
  2. 変更を加えるために、config map を編集します。

    この例では、ソースコネクターのオフセット位置を 15000 にリセットします。

    ソースコネクターオフセットリストの設定例

    apiVersion: v1
    kind: ConfigMap
    metadata:
      # ...
    data:
      offsets.json: |- 
    1
    
        {
          "offsets" : [ {
            "partition" : {
              "filename" : "/data/myfile.txt"
            },
            "offset" : {
              "position" : 15000 
    2
    
            }
          } ]
        }
    Copy to Clipboard Toggle word wrap

    1
    offsets.json プロパティー内で編集する必要があります。
    2
    ソースパーティション内の更新されたオフセット位置。
  3. KafkaConnector リソースにアノテーションを付けて、オフセット位置を更新するコマンドを実行します。

    oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=alter -n <namespace>
    Copy to Clipboard Toggle word wrap

    アノテーションは、更新操作が成功するか、リソースから手動で削除されるまで残ります。

  4. コネクターオフセットのリスト表示 の手順を使用して変更を確認します。
  5. 状態を running に変更してコネクターを再起動します。

    コネクターを起動するための設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      state: running
      # ...
    Copy to Clipboard Toggle word wrap

10.9.6. コネクターオフセットのリセット

KafkaConnector リソースを使用してコネクターオフセットをリセットするには、コネクターを停止するようにリソースを設定します。

コネクターの停止後、KafkaConnector リソースにアノテーションを付けてオフセットをクリアし、コネクターを再起動します。

この手順では、my-source-connector という名前のソースコネクターのオフセット位置をリセットします。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. コネクターを停止するように、KafkaConnector リソースを編集します。

    コネクターを停止するための設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      # ...
      state: stopped 
    1
    
      # ...
    Copy to Clipboard Toggle word wrap

    1
    コネクターの状態を stopped に変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態は running です。
  2. KafkaConnector リソースにアノテーションを付けて、オフセット位置をリセットするコマンドを実行します。

    oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=reset -n <namespace>
    Copy to Clipboard Toggle word wrap

    アノテーションは、リセット操作が成功するか、リソースから手動で削除されるまで残ります。

  3. コネクターオフセットのリスト表示 の手順を使用して変更を確認します。

    リセット後、offsets.json プロパティーは空になります。

    ソースコネクターオフセットリストの例

    apiVersion: v1
    kind: ConfigMap
    metadata:
      # ...
    data:
      offsets.json: |-
        {
          "offsets" : []
        }
    Copy to Clipboard Toggle word wrap

  4. 状態を running に変更してコネクターを再起動します。

    コネクターを起動するための設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      state: running
      # ...
    Copy to Clipboard Toggle word wrap

Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2026 Red Hat
トップに戻る