6.4. Kafka Connect のデプロイ


Kafka Connect は、コネクタープラグインを使用して Kafka ブローカーと他のシステムの間でデータをストリーミングする統合ツールです。Kafka Connect は、Kafka と、データベースやメッセージングシステムなどの外部データソースまたはターゲットを統合するためのフレームワークを提供し、コネクターを使用してデータをインポートまたはエクスポートします。コネクターは、必要な接続設定を提供するプラグインです。

AMQ Streams では、Kafka Connect は分散 (distributed) モードでデプロイされます。Kafka Connect はスタンドアロンモードでも動作しますが、AMQ Streams ではサポートされません。

Kafka Connect は、コネクター の概念を使用し、スケーラビリティーと信頼性を維持しながら Kafka クラスターで大量のデータを移動するフレームワークを提供します。

Cluster Operator は、KafkaConnect リソースを使用してデプロイされた Kafka Connect クラスターと、KafkaConnector リソースを使用して作成されたコネクターを管理します。

Kafka Connect を使用するには、次のことを行う必要があります。

注記

コネクター という用語は、Kafka Connect クラスター内で実行されているコネクターインスタンスや、コネクタークラスと同じ意味で使用されます。本ガイドでは、本文の内容で意味が明確である場合に コネクター という用語を使用します。

6.4.1. Kafka Connect の OpenShift クラスターへのデプロイ

この手順では、Cluster Operator を使用して Kafka Connect クラスターを OpenShift クラスターにデプロイする方法を説明します。

Kafka Connect クラスターのデプロイメントは、コネクターのワークロードを タスク として分散する設定可能な数のノード (ワーカー とも呼ばれます) を使用して実装されるため、メッセージフローのスケーラビリティと信頼性が高くなります。

デプロイメントでは、YAML ファイルの仕様を使用して KafkaConnect リソースが作成されます。

AMQ Streams には、設定ファイルのサンプル が用意されています。この手順では、以下のサンプルファイルを使用します。

  • examples/connect/kafka-connect.yaml

手順

  1. Kafka Connect を OpenShift クラスターにデプロイします。examples/connect/kafka-connect.yaml ファイルを使用して Kafka Connect をデプロイします。

    oc apply -f examples/connect/kafka-connect.yaml
    Copy to Clipboard Toggle word wrap
  2. デプロイメントのステータスを確認します。

    oc get pods -n <my_cluster_operator_namespace>
    Copy to Clipboard Toggle word wrap

    デプロイメント名と準備状態が表示されている出力

    NAME                                 READY  STATUS   RESTARTS
    my-connect-cluster-connect-<pod_id>  1/1    Running  0
    Copy to Clipboard Toggle word wrap

    my-connect-cluster は、Kafka Connect クラスターの名前です。

    Pod ID は、作成された各 Pod を識別します。

    デフォルトのデプロイでは、単一の Kafka Connect Pod を作成します。

    READY は、Ready/expected 状態のレプリカ数を表示します。STATUSRunning と表示されれば、デプロイメントは成功です。

6.4.2. 複数のインスタンス用の Kafka Connect の設定

Kafka Connect のインスタンスを複数実行している場合は、以下の config プロパティーのデフォルト設定を変更する必要があります。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: connect-cluster 
1

    offset.storage.topic: connect-cluster-offsets 
2

    config.storage.topic: connect-cluster-configs 
3

    status.storage.topic: connect-cluster-status  
4

    # ...
# ...
Copy to Clipboard Toggle word wrap
1
Kafka 内の Kafka Connect クラスター ID。
2
コネクターオフセットを保存する Kafka トピック。
3
コネクターおよびタスクステータスの設定を保存する Kafka トピック。
4
コネクターおよびタスクステータスの更新を保存する Kafka トピック。
注記

group.id が同じすべての Kafka Connect インスタンスで、これら 3 つのトピックの値を揃える必要があります。

デフォルト設定を変更しない限り、同じ Kafka クラスターに接続する Kafka Connect インスタンスはそれぞれ同じ値でデプロイされます。事実上、すべてのインスタンスが結合されてクラスターで実行されて同じトピックを使用するようになります。

複数の Kafka Connect クラスターが同じトピックの使用を試みると、Kafka Connect は想定どおりに動作せず、エラーが生成されます。

複数の Kafka Connect インスタンスを実行する場合は、インスタンスごとにこれらのプロパティーの値を変更してください。

6.4.3. コネクターの追加

Kafka Connect はコネクターを使用して他のシステムと統合し、データをストリーミングします。コネクターは Kafka Connector クラスのインスタンスであり、次のいずれかのタイプになります。

ソースコネクター
ソースコネクターは、外部システムからデータを取得し、それをメッセージとして Kafka に提供するランタイムエンティティーです。
シンクコネクター
シンクコネクターは、Kafka トピックからメッセージを取得し、外部システムに提供するランタイムエンティティーです。

Kafka Connect はプラグインアーキテクチャーを使用して、コネクターの実装アーティファクトを提供します。プラグインは他のシステムへの接続を可能にし、データを操作するための追加の設定を提供します。プラグインには、コネクターや、データコンバーターや変換などの他のコンポーネントが含まれます。コネクターは、特定のタイプの外部システムで動作します。各コネクターは、その設定のスキーマを定義します。設定を Kafka Connect に指定して、Kafka Connect 内にコネクターインスタンス を作成します。次に、コネクターインスタンスは、システム間でデータを移動するための一連のタスクを定義します。

次のいずれかの方法で、コネクタープラグインを Kafka Connect に追加します。

コンテナーイメージにプラグインを追加したら、次の方法でコネクターインスタンスを開始、停止、および管理できます。

これらのオプションを使用して、新しいコネクターインスタンスを作成することもできます。

6.4.3.1. コネクタープラグインを使用して新しいコンテナーイメージを自動的に構築する

AMQ Streams が追加のコネクターを使用して新しいコンテナーイメージを自動的に構築するように、Kafka Connect を設定します。コネクタープラグインは、KafkaConnect カスタムリソースの .spec.build.plugins プロパティーを使用して定義します。AMQ Streams はコネクタープラグインを自動的にダウンロードし、新しいコンテナーイメージに追加します。コンテナーは、.spec.build.output に指定されたコンテナーリポジトリーにプッシュされ、Kafka Connect デプロイメントで自動的に使用されます。

前提条件

イメージをプッシュ、保存、およびプルできる独自のコンテナーレジストリーを提供する必要があります。AMQ Streams は、プライベートコンテナーレジストリーだけでなく、QuayDocker Hub などのパブリックレジストリーもサポートします。

手順

  1. .spec.build.output でコンテナーレジストリーを、.spec.build.plugins で追加のコネクターを指定して、KafkaConnect カスタムリソースを設定します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec: 
    1
    
      #...
      build:
        output: 
    2
    
          type: docker
          image: my-registry.io/my-org/my-connect-cluster:latest
          pushSecret: my-registry-credentials
        plugins: 
    3
    
          - name: debezium-postgres-connector
            artifacts:
              - type: tgz
                url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.1.3.Final/debezium-connector-postgres-2.1.3.Final-plugin.tar.gz
                sha512sum: c4ddc97846de561755dc0b021a62aba656098829c70eb3ade3b817ce06d852ca12ae50c0281cc791a5a131cb7fc21fb15f4b8ee76c6cae5dd07f9c11cb7c6e79
          - name: camel-telegram
            artifacts:
              - type: tgz
                url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.11.5/camel-telegram-kafka-connector-0.11.5-package.tar.gz
                sha512sum: d6d9f45e0d1dbfcc9f6d1c7ca2046168c764389c78bc4b867dab32d24f710bb74ccf2a007d7d7a8af2dfca09d9a52ccbc2831fc715c195a3634cca055185bd91
      #...
    Copy to Clipboard Toggle word wrap
    1
    2
    (必須) 新しいイメージがプッシュされるコンテナーレジストリーの設定。
    3
    (必須) 新しいコンテナーイメージに追加するコネクタープラグインとそれらのアーティファクトのリスト。各プラグインは、1 つ以上の artifact を使用して設定する必要があります。
  2. リソースを作成または更新します。

    $ oc apply -f <kafka_connect_configuration_file>
    Copy to Clipboard Toggle word wrap
  3. 新しいコンテナーイメージがビルドされ、Kafka Connect クラスターがデプロイされるまで待ちます。
  4. Kafka Connect REST API または KafkaConnector カスタムリソースを使用して、追加したコネクタープラグインを使用します。

Kafka Connect ベースイメージからコネクタープラグインを使用してカスタム Docker イメージを作成するカスタムイメージを /opt/kafka/plugins ディレクトリーに追加します。

Red Hat Ecosystem Catalog の Kafka コンテナーイメージは、追加のコネクタープラグインで独自のカスタムイメージを作成するためのベースイメージとして使用できます。

AMQ Stream バージョンの Kafka Connect は起動時に、/opt/kafka/plugins ディレクトリーに含まれるサードパーティーのコネクタープラグインをロードします。

手順

  1. ベースイメージとして registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1 を使用して、新しい Dockerfile を作成します。

    FROM registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1
    USER root:root
    COPY ./my-plugins/ /opt/kafka/plugins/
    USER 1001
    Copy to Clipboard Toggle word wrap

    プラグインファイルの例

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-<version>.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-<version>.jar
    │   ├── debezium-core-<version>.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-core-<version>.jar
    │   ├── README.md
    │   └── # ...
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-<version>.jar
    │   ├── debezium-core-<version>.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-<version>.jar
    │   ├── mysql-connector-java-<version>.jar
    │   ├── README.md
    │   └── # ...
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-<version>.jar
        ├── debezium-core-<version>.jar
        ├── LICENSE.txt
        ├── postgresql-<version>.jar
        ├── protobuf-java-<version>.jar
        ├── README.md
        └── # ...
    Copy to Clipboard Toggle word wrap

    COPY コマンドは、コンテナーイメージにコピーするプラグインファイルを指します。

    この例では、Debezium コネクター (MongoDB、MySQL、および PostgreSQL) のプラグインを追加しますが、簡潔にするためにすべてのファイルがリストされているわけではありません。Kafka Connect で実行されている Debezium は、他の Kafka Connect タスクと同じように表示されます。

  2. コンテナーイメージをビルドします。
  3. カスタムイメージをコンテナーレジストリーにプッシュします。
  4. 新しいコンテナーイメージを示します。

    次のいずれかの方法でイメージを指定できます。

    • KafkaConnect カスタムリソースの KafkaConnect.spec.image プロパティーを編集します。

      設定されている場合、このプロパティーは Cluster Operator の STRIMZI_KAFKA_CONNECT_IMAGES 環境変数をオーバーライドします。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
      spec: 
      1
      
        #...
        image: my-new-container-image 
      2
      
        config: 
      3
      
          #...
      Copy to Clipboard Toggle word wrap
      1
      2
      Pod の Docker イメージ。
      3
      Kafka Connect ワーカー (コネクターではない) の設定。
    • install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml ファイルの STRIMZI_KAFKA_CONNECT_IMAGES 環境変数を編集して、新しいコンテナーイメージを指すようにし、Cluster Operator を再インストールします。

6.4.3.3. KafkaConnector リソースのデプロイ

コネクターを管理するために KafkaConnector リソースをデプロイします。KafkaConnector カスタムリソースは、Cluster Operator によるコネクターの管理に OpenShift ネイティブのアプローチを提供します。Kafka Connect REST API のように、コネクターを管理するために HTTP 要求を送信する必要はありません。該当する KafkaConnector リソースを更新して稼働中のコネクターインスタンスを管理した後、更新を適用します。Cluster Operator は、実行中のコネクターインスタンスの設定を更新します。該当する KafkaConnector を削除して、コネクターを削除します。

KafkaConnector リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。

この手順で示す設定では、autoRestart プロパティーが true に設定されています。失敗したコネクターとタスクの自動再起動を有効にします。再起動は最大 7 回試行され、その後は手動で再起動する必要があります。KafkaConnector リソースにアノテーションを付けて、コネクターを 再起動するコネクタータスク を手動で再起動します。

コネクターの例

独自のコネクターを使用するか、AMQ Streams が提供する例を試すことができます。Apache Kafka 3.1.0 までは、サンプルファイルコネクタープラグインが Apache Kafka に含まれていました。Apache Kafka の 3.1.1 および 3.2.0 リリースから、例を他のコネクターと同様にプラグインパスに追加する必要があります。

AMQ Streams は、サンプルファイルコネクタープラグイン用の サンプル KafkaConnector 設定ファイル (examples/connect/source-connector.yaml) を提供します。これにより、次のコネクターインスタンスが KafkaConnector リソースとして作成されます。

  • Kafka ライセンスファイル (ソース) から各行を読み取り、データをメッセージとして単一の Kafka トピックに書き込む FileStreamSourceConnector インスタンス。
  • Kafka トピックからメッセージを読み取り、メッセージを一時ファイル (シンク) に書き込む FileStreamSinkConnector インスタンス。

この手順では、サンプルファイルを使用してコネクターを作成します。

注記

サンプルコネクターは、運用環境での使用を意図したものではありません。

前提条件

  • Kafka Connect デプロイメント。
  • Cluster Operator が稼働している。

手順

  1. 次のいずれかの方法で、FileStreamSourceConnector および FileStreamSinkConnector プラグインを Kafka Connect に追加します。

  2. Kafka Connect 設定で strimzi.io/use-connector-resources annotationtrue に設定します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
        # ...
    Copy to Clipboard Toggle word wrap

    KafkaConnector リソースを有効にすると、Cluster Operator はそれらを監視します。

  3. examples/connect/source-connector.yaml ファイルを編集します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector 
    1
    
      labels:
        strimzi.io/cluster: my-connect-cluster 
    2
    
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector 
    3
    
      tasksMax: 2 
    4
    
      autoRestart: 
    5
    
        enabled: true
      config: 
    6
    
        file: "/opt/kafka/LICENSE" 
    7
    
        topic: my-topic 
    8
    
        # ...
    Copy to Clipboard Toggle word wrap
    1
    コネクターの名前として使用される KafkaConnector リソースの名前。OpenShift リソースで有効な名前を使用します。
    2
    コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
    3
    コネクタークラスのフルネームまたはエイリアス。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
    4
    コネクターが作成できる Kafka Connect タスクの最大数。
    5
    失敗したコネクターとタスクの自動再起動を有効にします。
    6
    キーと値のペアとしての コネクター設定
    7
    このサンプルソースコネクター設定では、/opt/kafka/LICENSE ファイルからデータが読み取られます。
    8
    ソースデータのパブリッシュ先となる Kafka トピック。
  4. OpenShift クラスターでソース KafkaConnector を作成します。

    oc apply -f examples/connect/source-connector.yaml
    Copy to Clipboard Toggle word wrap
  5. examples/connect/sink-connector.yaml ファイルを作成します。

    touch examples/connect/sink-connector.yaml
    Copy to Clipboard Toggle word wrap
  6. 以下の YAML を sink-connector.yaml ファイルに貼り付けます。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-sink-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      class: org.apache.kafka.connect.file.FileStreamSinkConnector 
    1
    
      tasksMax: 2
      config: 
    2
    
        file: "/tmp/my-file" 
    3
    
        topics: my-topic 
    4
    Copy to Clipboard Toggle word wrap
    1
    コネクタークラスのフルネームまたはエイリアス。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
    2
    キーと値のペアとしての コネクター設定
    3
    ソースデータのパブリッシュ先となる一時ファイル。
    4
    ソースデータの読み取り元となる Kafka トピック。
  7. OpenShift クラスターにシンク KafkaConnector を作成します。

    oc apply -f examples/connect/sink-connector.yaml
    Copy to Clipboard Toggle word wrap
  8. コネクターリソースが作成されたことを確認します。

    oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name
    
    my-source-connector
    my-sink-connector
    Copy to Clipboard Toggle word wrap

    <my_connect_cluster> を Kafka Connect クラスターの名前に置き換えます。

  9. コンテナーで、kafka -console-consumer.sh を実行して、ソースコネクターによってトピックに書き込まれたメッセージを読み取ります。

    oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
    Copy to Clipboard Toggle word wrap

    <my_kafka_cluster> を Kafka クラスターの名前に置き換えます。

ソースおよびシンクコネクターの設定オプション

コネクター設定は、KafkaConnector リソースの spec.config プロパティーで定義されます。

FileStreamSourceConnector クラスおよび FileStreamSinkConnector クラスは、Kafka Connect REST API と同じ設定オプションをサポートします。他のコネクターは異なる設定オプションをサポートします。

Expand
表6.1 FileStreamSource コネクタークラスの設定オプション
名前タイプデフォルト値説明

file

文字列

Null

メッセージを書き込むソースファイル。指定のない場合は、標準入力が使用されます。

topic

List

Null

データのパブリッシュ先となる Kafka トピック。

Expand
表6.2 FileStreamSinkConnector クラスの設定オプション
名前タイプデフォルト値説明

file

文字列

Null

メッセージを書き込む宛先ファイル。指定のない場合は標準出力が使用されます。

topics

List

Null

データの読み取り元となる 1 つ以上の Kafka トピック。

topics.regex

文字列

Null

データの読み取り元となる 1 つ以上の Kafka トピックと一致する正規表現。

6.4.3.4. コネクターの手動再起動

KafkaConnector リソースを使用してコネクターを管理している場合は、restart アノテーションを使用してコネクターの再起動を手動でトリガーします。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 再起動する Kafka コネクターを制御する KafkaConnector カスタムリソースの名前を見つけます。

    oc get KafkaConnector
    Copy to Clipboard Toggle word wrap
  2. OpenShift で KafkaConnector リソースにアノテーションを付けて、コネクターを再起動します。

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart=true
    Copy to Clipboard Toggle word wrap

    restart アノテーションは true に設定されています。

  3. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。

    アノテーションが調整プロセスで検出されれば、Kafka コネクターは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは KafkaConnector カスタムリソースから削除されます。

6.4.3.5. Kafka コネクタータスクを手動で再起動する

KafkaConnector リソースを使用してコネクターを管理している場合は、restart-task アノテーションを使用して、コネクタータスクの再起動を手動でトリガーします。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 再起動する Kafka コネクタータスクを制御する KafkaConnector カスタムリソースの名前を見つけます。

    oc get KafkaConnector
    Copy to Clipboard Toggle word wrap
  2. KafkaConnector カスタムリソースから再起動するタスクの ID を検索します。タスク ID は、0 から始まる非負の整数です。

    oc describe KafkaConnector <kafka_connector_name>
    Copy to Clipboard Toggle word wrap
  3. OpenShift で KafkaConnector リソースにアノテーションを付けて、ID を使用してコネクタータスクを再開します。

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task=0
    Copy to Clipboard Toggle word wrap

    この例では、タスク 0 が再起動されます。

  4. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。

    アノテーションが調整プロセスで検出されれば、Kafka コネクタータスクは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは KafkaConnector カスタムリソースから削除されます。

6.4.3.6. Kafka Connect API の公開

KafkaConnector リソースを使用してコネクターを管理する代わりに、Kafka Connect REST API を使用します。Kafka Connect REST API は、<connect_cluster_name>-connect-api:8083 で実行しているサービスとして利用できます。ここで、<connect_cluster_name> は、お使いの Kafka Connect クラスターの名前になります。サービスは、Kafka Connect インスタンスの作成時に作成されます。

Kafka Connect REST API でサポートされる操作は、Apache Kafka Connect API のドキュメント で説明されています。

注記

strimzi.io/use-connector-resources アノテーションは KafkaConnectors を有効にします。アノテーションを KafkaConnect リソース設定に適用した場合、そのアノテーションを削除して Kafka Connect API を使用する必要があります。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。

コネクター設定を JSON オブジェクトとして追加できます。

コネクター設定を追加するための curl 要求の例

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'
Copy to Clipboard Toggle word wrap

API には OpenShift クラスター内でのみアクセスできます。OpenShift クラスター外部で実行しているアプリケーションに Kafka Connect API がアクセスできるようにする場合は、以下の機能のいずれかを使用して Kafka Connect API を手動で公開できます。

  • LoadBalancer または NodePort タイプのサービス
  • Ingress リソース (Kubernetes のみ)
  • OpenShift ルート (OpenShift のみ)
注記

接続はセキュアではないため、外部からのアクセスはよく考えてから許可してください。

サービスを作成する場合には、<connect_cluster_name>-connect-api サービスの selector からラベルを使用して、サービスがトラフィックをルーティングする Pod を設定します。

サービスのセレクター設定

# ...
selector:
  strimzi.io/cluster: my-connect-cluster 
1

  strimzi.io/kind: KafkaConnect
  strimzi.io/name: my-connect-cluster-connect 
2

#...
Copy to Clipboard Toggle word wrap

1
OpenShift クラスターでの Kafka Connect カスタムリソースの名前。
2
Cluster Operator によって作成された Kafka Connect デプロイメントの名前。

また、外部クライアントからの HTTP 要求を許可する NetworkPolicy を作成する必要もあります。

Kafka Connect API への要求を許可する NetworkPolicy の例

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: my-custom-connect-network-policy
spec:
  ingress:
  - from:
    - podSelector: 
1

        matchLabels:
          app: my-connector-manager
    ports:
    - port: 8083
      protocol: TCP
  podSelector:
    matchLabels:
      strimzi.io/cluster: my-connect-cluster
      strimzi.io/kind: KafkaConnect
      strimzi.io/name: my-connect-cluster-connect
  policyTypes:
  - Ingress
Copy to Clipboard Toggle word wrap

1
API への接続が許可される Pod のラベル。

クラスター外でコネクター設定を追加するには、curl コマンドで API を公開するリソースの URL を使用します。

6.4.3.7. Kafka Connect API へのアクセスの制限

Kafka Connect API へのアクセスを信頼できるユーザーのみに制限して、不正なアクションや潜在的なセキュリティーの問題を防ぐことが重要です。Kafka Connect API は、コネクター設定を変更するための広範な機能を提供するため、セキュリティー対策を講じることがさらに重要になります。管理者によりセキュアであると想定されている機密情報が、Kafka Connect API にアクセスできるユーザーに取得されてしまう可能性があります。

Kafka Connect REST API には、OpenShift クラスターへのアクセスが認証されており、ホスト名/IP アドレス、ポート番号など、エンドポイント URL を知っている場合には、アクセスできます。

たとえば、組織が Kafka Connect クラスターとコネクターを使用して、機密データを顧客データベースから中央データベースにストリーミングするとします。管理者は設定プロバイダープラグインを使用して、顧客データベースと中央データベースへの接続に関連する機密情報 (データベース接続の詳細や認証情報など) を保存します。設定プロバイダーは、この機密情報が許可されていないユーザーに公開されるのを防ぎます。ただし、Kafka Connect API にアクセスできるユーザーは、管理者の同意なしに顧客データベースにアクセスできます。これを行うには、偽のデータベースをセットアップし、それに接続するコネクターを設定します。次に、顧客データベースを参照するようにコネクター設定を変更しますが、データを中央データベースに送信する代わりに、偽のデータベースに送信します。偽のデータベースに接続するようにコネクターを設定すると、設定プロバイダーにセキュアに保存されているにもかかわらず、顧客データベースに接続するためのログインの詳細と認証情報が傍受されます。

KafkaConnector カスタムリソースを使用している場合、デフォルトでは、OpenShift RBAC ルールにより、OpenShift クラスター管理者のみがコネクターに変更を加えることが許可されます。AMQ Streams リソースを管理するクラスター管理者以外のユーザーを指定 することもできます。Kafka Connect 設定で KafkaConnector リソースを有効にすると、Kafka Connect REST API を使用して直接行われた変更は Cluster Operator によって元に戻されます。KafkaConnector リソースを使用していない場合、デフォルトの RBAC ルールは Kafka Connect API へのアクセスを制限しません。OpenShift RBAC を使用して Kafka Connect REST API への直接アクセスを制限する場合は、KafkaConnector リソースを有効にして使用する必要があります。

セキュリティーを強化するために、Kafka Connect API の次のプロパティーを設定することを推奨します。

org.apache.kafka.disallowed.login.modules

(Kafka 3.4 以降) org.apache.kafka.disallowed.login.modules Java システムプロパティーを設定して、セキュアではないログインモジュールの使用を防止します。たとえば、com.sun.security.auth.module.JndiLoginModule を指定すると、Kafka JndiLoginModule が使用できなくなります。

ログインモジュールを禁止する設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  jvmOptions:
    javaSystemProperties:
      - name: org.apache.kafka.disallowed.login.modules
        value: com.sun.security.auth.module.JndiLoginModule, org.apache.kafka.common.security.kerberos.KerberosLoginModule
# ...
Copy to Clipboard Toggle word wrap

信頼できるログインモジュールのみを許可し、使用しているバージョンに対する Kafka からの最新のアドバイスに従ってください。ベストプラクティスとして、org.apache.kafka.disallowed.login.modules システムプロパティーを使用して、Kafka Connect 設定でセキュアではないログインモジュールを明示的に禁止する必要があります。

connector.client.config.override.policy

connector.client.config.override.policy プロパティーを None に設定して、コネクター設定が Kafka Connect 設定とそれが使用するコンシューマーおよびプロデューサーをオーバーライドしないようにします。

コネクターオーバーライドポリシーを指定する設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  config:
    connector.client.config.override.policy: None
# ...
Copy to Clipboard Toggle word wrap

6.4.3.8. Kafka Connect API の使用から KafkaConnector カスタムリソースの使用への切り替え

Kafka Connect API の使用から KafkaConnector カスタムリソースの使用に切り替えて、コネクターを管理できます。スイッチの作成は、次の作業を以下の順序で行います。

  1. 設定で KafkaConnector リソースをデプロイし、コネクターインスタンスを作成します。
  2. strimzi.io/use-connector-resources アノテーションを true に設定して、Kafka Connect 設定で KafkaConnector リソースを有効にします。
警告

作成する前に KafkaConnector リソースを有効にすると、すべてのコネクターが削除されます。

KafkaConnector リソースの使用から Kafka Connect API の使用に切り替えるには、最初に KafkaConnector リソースを有効にするアノテーションを Kafka Connect 設定から削除します。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。

切り替えを行うときは、KafkaConnect リソースのステータスを確認 してください。metadata.generation (デプロイの現在のバージョン) の値は、status.observedGeneration (リソースの最新の調整) と一致する必要があります。Kafka Connect クラスターが Ready になったら、KafkaConnector リソースを削除できます。

6.4.4. Kafka Connect クラスターリソースのリスト

以下のリソースは、OpenShift クラスターの Cluster Operator によって作成されます。

connect-cluster-name-connect

次の Kafka Connect リソースに付けられた名前:

  • Kafka Connect ワーカーノード Pod を作成するデプロイメント (StableConnectIdentities フィーチャーゲートが無効な場合)。
  • Kafka Connect ワーカーノード Pod を作成する StrimziPodSet (StableConnectIdentities フィーチャーゲートが有効な場合)。
  • 安定した DNS 名を Connect Pod に提供するヘッドレスサービス (StableConnectIdentities フィーチャーゲートが有効な場合)。
  • Kafka Connect ワーカーノードに設定された Pod の Disruption Budget。
connect-cluster-name-connect-idx
Kafka Connect StrimziPodSet によって作成された Pod (StableConnectIdentities フィーチャーゲートが有効な場合)。
connect-cluster-name-connect-api
Kafka Connect クラスターを管理するために REST インターフェイスを公開するサービス。
connect-cluster-name-config
Kafka Connect 補助設定が含まれ、Kafka ブローカー Pod によってボリュームとしてマウントされる ConfigMap。
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat