6.5. Kafka Connect コネクターの追加


Kafka Connect はコネクターを使用して他のシステムと統合し、データをストリーミングします。コネクターは Kafka Connector クラスのインスタンスであり、次のいずれかのタイプになります。

ソースコネクター
ソースコネクターは、外部システムからデータを取得し、それをメッセージとして Kafka に提供するランタイムエンティティーです。
シンクコネクター
シンクコネクターは、Kafka トピックからメッセージを取得し、外部システムに提供するランタイムエンティティーです。

Kafka Connect はプラグインアーキテクチャーを使用して、コネクターの実装アーティファクトを提供します。プラグインは他のシステムへの接続を可能にし、データを操作するための追加の設定を提供します。プラグインには、コネクターや、データコンバーターや変換などの他のコンポーネントが含まれます。コネクターは、特定のタイプの外部システムで動作します。各コネクターは、その設定のスキーマを定義します。設定を Kafka Connect に指定して、Kafka Connect 内にコネクターインスタンスを作成します。次に、コネクターインスタンスは、システム間でデータを移動するための一連のタスクを定義します。

次のいずれかの方法で、コネクタープラグインを Kafka Connect に追加します。

コンテナーイメージにプラグインを追加したら、次の方法でコネクターインスタンスを開始、停止、および管理できます。

これらのオプションを使用して、新しいコネクターインスタンスを作成することもできます。

6.5.1. コネクタープラグインを使用して新しいコンテナーイメージを自動的にビルドする

Streams for Apache Kafka が追加のコネクターを使用して新しいコンテナーイメージを自動的にビルドするように Kafka Connect を設定します。コネクタープラグインは、KafkaConnect カスタムリソースの .spec.build.plugins プロパティーを使用して定義します。Streams for Apache Kafka はコネクタープラグインを自動的にダウンロードし、新しいコンテナーイメージに追加します。コンテナーは、.spec.build.output に指定されたコンテナーリポジトリーにプッシュされ、Kafka Connect デプロイメントで自動的に使用されます。

前提条件

イメージをプッシュ、保存、およびプルできる独自のコンテナーレジストリーを提供する必要があります。Streams for Apache Kafka は、プライベートコンテナーレジストリーだけでなく、QuayDocker Hub などのパブリックレジストリーもサポートします。

手順

  1. .spec.build.output でコンテナーレジストリーを、.spec.build.plugins で追加のコネクターを指定して、KafkaConnect カスタムリソースを設定します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec: 1
      #...
      build:
        output: 2
          type: docker
          image: my-registry.io/my-org/my-connect-cluster:latest
          pushSecret: my-registry-credentials
        plugins: 3
          - name: connector-1
            artifacts:
              - type: tgz
                url: <url_to_download_connector_1_artifact>
                sha512sum: <SHA-512_checksum_of_connector_1_artifact>
          - name: connector-2
            artifacts:
              - type: jar
                url: <url_to_download_connector_2_artifact>
                sha512sum: <SHA-512_checksum_of_connector_2_artifact>
      #...
    1
    2
    (必須) 新しいイメージがプッシュされるコンテナーレジストリーの設定。
    3
    (必須) 新しいコンテナーイメージに追加するコネクタープラグインとそれらのアーティファクトのリスト。各プラグインは、1 つ以上の artifact を使用して設定する必要があります。
  2. リソースを作成または更新します。

    $ oc apply -f <kafka_connect_configuration_file>
  3. 新しいコンテナーイメージがビルドされ、Kafka Connect クラスターがデプロイされるまで待ちます。
  4. Kafka Connect REST API または KafkaConnector カスタムリソースを使用して、追加したコネクタープラグインを使用します。

6.5.2. Kafka Connect ベースイメージからコネクタープラグインを使用して新しいコンテナーイメージをビルドする

Kafka Connect 基本イメージからコネクタープラグインを使用してカスタム Docker イメージを作成します。カスタムイメージを /opt/kafka/plugins ディレクトリーに追加します。

Red Hat Ecosystem Catalog の Kafka コンテナーイメージは、追加のコネクタープラグインで独自のカスタムイメージを作成するためのベースイメージとして使用できます。

起動時に、Streams for Apache Kafka バージョンの Kafka Connect は、/opt/kafka/plugins ディレクトリーに含まれるサードパーティーのコネクタープラグインをロードします。

手順

  1. ベースイメージとして registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 を使用して、新しい Dockerfile を作成します。

    FROM registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0
    USER root:root
    COPY ./my-plugins/ /opt/kafka/plugins/
    USER 1001

    プラグインファイルの例

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-<version>.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-<version>.jar
    │   ├── debezium-core-<version>.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-core-<version>.jar
    │   ├── README.md
    │   └── # ...
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-<version>.jar
    │   ├── debezium-core-<version>.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-<version>.jar
    │   ├── mysql-connector-java-<version>.jar
    │   ├── README.md
    │   └── # ...
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-<version>.jar
        ├── debezium-core-<version>.jar
        ├── LICENSE.txt
        ├── postgresql-<version>.jar
        ├── protobuf-java-<version>.jar
        ├── README.md
        └── # ...

    COPY コマンドは、コンテナーイメージにコピーするプラグインファイルを指します。

    この例では、Debezium コネクター (MongoDB、MySQL、および PostgreSQL) のプラグインを追加しますが、簡潔にするためにすべてのファイルがリストされているわけではありません。Kafka Connect で実行されている Debezium は、他の Kafka Connect タスクと同じように表示されます。

  2. コンテナーイメージをビルドします。
  3. カスタムイメージをコンテナーレジストリーにプッシュします。
  4. 新しいコンテナーイメージを示します。

    次のいずれかの方法でイメージを指定できます。

    • KafkaConnect カスタムリソースの KafkaConnect.spec.image プロパティーを編集します。

      設定されている場合、このプロパティーは Cluster Operator の STRIMZI_KAFKA_CONNECT_IMAGES 環境変数をオーバーライドします。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
      spec: 1
        #...
        image: my-new-container-image 2
        config: 3
          #...
      1
      2
      Kafka Connect Pod の Docker イメージ。
      3
      Kafka Connect ワーカー (コネクターではない) の設定。
    • install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml ファイルの STRIMZI_KAFKA_CONNECT_IMAGES 環境変数を編集して、新しいコンテナーイメージを指すようにし、Cluster Operator を再インストールします。

6.5.3. KafkaConnector リソースのデプロイ

コネクターを管理するために KafkaConnector リソースをデプロイします。KafkaConnector カスタムリソースは、Cluster Operator によるコネクターの管理に OpenShift ネイティブのアプローチを提供します。Kafka Connect REST API のように、コネクターを管理するために HTTP 要求を送信する必要はありません。該当する KafkaConnector リソースを更新して稼働中のコネクターインスタンスを管理した後、更新を適用します。Cluster Operator は、実行中のコネクターインスタンスの設定を更新します。該当する KafkaConnector を削除して、コネクターを削除します。

KafkaConnector リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。

この手順で示す設定では、失敗したコネクターとタスクを自動的に再起動するために autoRestart 機能が有効になっています (enabled: true)。KafkaConnector リソースにアノテーションを付けて、コネクターを 再起動 するか、コネクタータスクの再起動 を手動で行うこともできます。

コネクターの例

独自のコネクターを使用することも、Streams for Apache Kafka に付属する例を試すこともできます。Apache Kafka 3.1.0 までは、サンプルファイルコネクタープラグインが Apache Kafka に含まれていました。Apache Kafka の 3.1.1 および 3.2.0 リリースから、例を他のコネクターと同様にプラグインパスに追加する必要があります。

Streams for Apache Kafka には、サンプルファイルコネクタープラグイン用の サンプル KafkaConnector 設定ファイル (examples/connect/source-connector.yaml) が用意されています。このファイルでは、次のコネクターインスタンスが KafkaConnector リソースとして作成されます。

  • Kafka ライセンスファイル (ソース) から各行を読み取り、データをメッセージとして単一の Kafka トピックに書き込む FileStreamSourceConnector インスタンス。
  • Kafka トピックからメッセージを読み取り、メッセージを一時ファイル (シンク) に書き込む FileStreamSinkConnector インスタンス。

この手順では、サンプルファイルを使用してコネクターを作成します。

注記

サンプルコネクターは、運用環境での使用を意図したものではありません。

前提条件

  • Kafka Connect デプロイメント。
  • Cluster Operator が稼働している。

手順

  1. 次のいずれかの方法で、FileStreamSourceConnector および FileStreamSinkConnector プラグインを Kafka Connect に追加します。

  2. Kafka Connect 設定で strimzi.io/use-connector-resources annotationtrue に設定します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
        # ...

    KafkaConnector リソースを有効にすると、Cluster Operator はそれらを監視します。

  3. examples/connect/source-connector.yaml ファイルを編集します。

    KafkaConnector ソースコネクター設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector  1
      labels:
        strimzi.io/cluster: my-connect-cluster 2
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
      tasksMax: 2 4
      autoRestart: 5
        enabled: true
      config: 6
        file: "/opt/kafka/LICENSE" 7
        topic: my-topic 8
        # ...

    1
    コネクターの名前として使用される KafkaConnector リソースの名前。OpenShift リソースで有効な名前を使用します。
    2
    コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
    3
    コネクタークラスのフルネーム。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
    4
    コネクターが作成できる Kafka Connect タスクの最大数。
    5
    失敗したコネクターとタスクの自動再起動を有効にします。デフォルトでは、再起動の回数は無制限ですが、maxRestarts プロパティーを使用して自動再起動の最大回数を設定できます。
    6
    キーと値のペア形式の コネクター設定
    7
    外部データファイルの場所。この例では、/opt/kafka/LICENSE ファイルから読み取るように FileStreamSourceConnector を設定しています。
    8
    ソースデータのパブリッシュ先となる Kafka トピック。
  4. OpenShift クラスターでソース KafkaConnector を作成します。

    oc apply -f examples/connect/source-connector.yaml
  5. examples/connect/sink-connector.yaml ファイルを作成します。

    touch examples/connect/sink-connector.yaml
  6. 以下の YAML を sink-connector.yaml ファイルに貼り付けます。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-sink-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      class: org.apache.kafka.connect.file.FileStreamSinkConnector 1
      tasksMax: 2
      config: 2
        file: "/tmp/my-file" 3
        topics: my-topic 4
    1
    コネクタークラスのフルネームまたはエイリアス。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
    2
    キーと値のペア形式の コネクター設定
    3
    ソースデータのパブリッシュ先となる一時ファイル。
    4
    ソースデータの読み取り元となる Kafka トピック。
  7. OpenShift クラスターにシンク KafkaConnector を作成します。

    oc apply -f examples/connect/sink-connector.yaml
  8. コネクターリソースが作成されたことを確認します。

    oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name
    
    my-source-connector
    my-sink-connector

    <my_connect_cluster> を Kafka Connect クラスターの名前に置き換えます。

  9. コンテナーで、kafka -console-consumer.sh を実行して、ソースコネクターによってトピックに書き込まれたメッセージを読み取ります。

    oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning

    <my_kafka_cluster> を Kafka クラスターの名前に置き換えます。

ソースおよびシンクコネクターの設定オプション

コネクター設定は、KafkaConnector リソースの spec.config プロパティーで定義されます。

FileStreamSourceConnector クラスおよび FileStreamSinkConnector クラスは、Kafka Connect REST API と同じ設定オプションをサポートします。他のコネクターは異なる設定オプションをサポートします。

表6.1 FileStreamSource コネクタークラスの設定オプション
名前デフォルト値説明

file

文字列

Null

メッセージを書き込むソースファイル。指定のない場合は、標準入力が使用されます。

topic

List

Null

データのパブリッシュ先となる Kafka トピック。

表6.2 FileStreamSinkConnector クラスの設定オプション
名前デフォルト値説明

file

文字列

Null

メッセージを書き込む宛先ファイル。指定のない場合は標準出力が使用されます。

topics

List

Null

データの読み取り元となる 1 つ以上の Kafka トピック。

topics.regex

文字列

Null

データの読み取り元となる 1 つ以上の Kafka トピックと一致する正規表現。

6.5.4. Kafka Connect API の公開

KafkaConnector リソースを使用してコネクターを管理する代わりに、Kafka Connect REST API を使用します。Kafka Connect REST API は、<connect_cluster_name>-connect-api:8083 で実行しているサービスとして利用できます。ここで、<connect_cluster_name> は、お使いの Kafka Connect クラスターの名前になります。サービスは、Kafka Connect インスタンスの作成時に作成されます。

Kafka Connect REST API でサポートされる操作は、Apache Kafka Connect API のドキュメント で説明されています。

注記

strimzi.io/use-connector-resources アノテーションは KafkaConnectors を有効にします。アノテーションを KafkaConnect リソース設定に適用した場合、そのアノテーションを削除して Kafka Connect API を使用する必要があります。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。

コネクター設定を JSON オブジェクトとして追加できます。

コネクター設定を追加するための curl 要求の例

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'

API には OpenShift クラスター内でのみアクセスできます。OpenShift クラスター外部で実行しているアプリケーションに Kafka Connect API がアクセスできるようにする場合は、以下の機能のいずれかを使用して Kafka Connect API を手動で公開できます。

  • LoadBalancer または NodePort タイプのサービス
  • Ingress リソース (Kubernetes のみ)
  • OpenShift ルート (OpenShift のみ)
注記

接続はセキュアではないため、外部からのアクセスはよく考えてから許可してください。

サービスを作成する場合には、<connect_cluster_name>-connect-api サービスの selector からラベルを使用して、サービスがトラフィックをルーティングする Pod を設定します。

サービスのセレクター設定

# ...
selector:
  strimzi.io/cluster: my-connect-cluster 1
  strimzi.io/kind: KafkaConnect
  strimzi.io/name: my-connect-cluster-connect 2
#...

1
OpenShift クラスターでの Kafka Connect カスタムリソースの名前。
2
Cluster Operator によって作成された Kafka Connect デプロイメントの名前。

また、外部クライアントからの HTTP 要求を許可する NetworkPolicy を作成する必要もあります。

Kafka Connect API への要求を許可する NetworkPolicy の例

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: my-custom-connect-network-policy
spec:
  ingress:
  - from:
    - podSelector: 1
        matchLabels:
          app: my-connector-manager
    ports:
    - port: 8083
      protocol: TCP
  podSelector:
    matchLabels:
      strimzi.io/cluster: my-connect-cluster
      strimzi.io/kind: KafkaConnect
      strimzi.io/name: my-connect-cluster-connect
  policyTypes:
  - Ingress

1
API への接続が許可される Pod のラベル。

クラスター外でコネクター設定を追加するには、curl コマンドで API を公開するリソースの URL を使用します。

6.5.5. Kafka Connect API へのアクセスの制限

Kafka Connect API へのアクセスを信頼できるユーザーのみに制限して、不正なアクションや潜在的なセキュリティーの問題を防ぐことが重要です。Kafka Connect API は、コネクター設定を変更するための広範な機能を提供するため、セキュリティー対策を講じることがさらに重要になります。管理者によりセキュアであると想定されている機密情報が、Kafka Connect API にアクセスできるユーザーに取得されてしまう可能性があります。

Kafka Connect REST API には、OpenShift クラスターへのアクセスが認証されており、ホスト名/IP アドレス、ポート番号など、エンドポイント URL を知っている場合には、アクセスできます。

たとえば、組織が Kafka Connect クラスターとコネクターを使用して、機密データを顧客データベースから中央データベースにストリーミングするとします。管理者は設定プロバイダープラグインを使用して、顧客データベースと中央データベースへの接続に関連する機密情報 (データベース接続の詳細や認証情報など) を保存します。設定プロバイダーは、この機密情報が許可されていないユーザーに公開されるのを防ぎます。ただし、Kafka Connect API にアクセスできるユーザーは、管理者の同意なしに顧客データベースにアクセスできます。これを行うには、偽のデータベースをセットアップし、それに接続するコネクターを設定します。次に、顧客データベースを参照するようにコネクター設定を変更しますが、データを中央データベースに送信する代わりに、偽のデータベースに送信します。偽のデータベースに接続するようにコネクターを設定すると、設定プロバイダーにセキュアに保存されているにもかかわらず、顧客データベースに接続するためのログインの詳細と認証情報が傍受されます。

KafkaConnector カスタムリソースを使用している場合、デフォルトでは、OpenShift RBAC ルールにより、OpenShift クラスター管理者のみがコネクターに変更を加えることが許可されます。Streams for Apache Kafka リソースを管理するクラスター管理者以外のユーザーを指定 することもできます。Kafka Connect 設定で KafkaConnector リソースを有効にすると、Kafka Connect REST API を使用して直接行われた変更は Cluster Operator によって元に戻されます。KafkaConnector リソースを使用していない場合、デフォルトの RBAC ルールは Kafka Connect API へのアクセスを制限しません。OpenShift RBAC を使用して Kafka Connect REST API への直接アクセスを制限する場合は、KafkaConnector リソースを有効にして使用する必要があります。

セキュリティーを強化するために、Kafka Connect API の次のプロパティーを設定することを推奨します。

org.apache.kafka.disallowed.login.modules

(Kafka 3.4 以降) org.apache.kafka.disallowed.login.modules Java システムプロパティーを設定して、セキュアではないログインモジュールの使用を防止します。たとえば、com.sun.security.auth.module.JndiLoginModule を指定すると、Kafka JndiLoginModule が使用できなくなります。

ログインモジュールを禁止する設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  jvmOptions:
    javaSystemProperties:
      - name: org.apache.kafka.disallowed.login.modules
        value: com.sun.security.auth.module.JndiLoginModule, org.apache.kafka.common.security.kerberos.KerberosLoginModule
# ...

信頼できるログインモジュールのみを許可し、使用しているバージョンに対する Kafka からの最新のアドバイスに従ってください。ベストプラクティスとして、org.apache.kafka.disallowed.login.modules システムプロパティーを使用して、Kafka Connect 設定でセキュアではないログインモジュールを明示的に禁止する必要があります。

connector.client.config.override.policy

connector.client.config.override.policy プロパティーを None に設定して、コネクター設定が Kafka Connect 設定とそれが使用するコンシューマーおよびプロデューサーをオーバーライドしないようにします。

コネクターオーバーライドポリシーを指定する設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  config:
    connector.client.config.override.policy: None
# ...

6.5.6. Kafka Connect API の使用から KafkaConnector カスタムリソースの使用への切り替え

Kafka Connect API の使用から KafkaConnector カスタムリソースの使用に切り替えて、コネクターを管理できます。スイッチの作成は、次の作業を以下の順序で行います。

  1. 設定で KafkaConnector リソースをデプロイし、コネクターインスタンスを作成します。
  2. strimzi.io/use-connector-resources アノテーションを true に設定して、Kafka Connect 設定で KafkaConnector リソースを有効にします。
警告

作成する前に KafkaConnector リソースを有効にすると、すべてのコネクターが削除されます。

KafkaConnector リソースの使用から Kafka Connect API の使用に切り替えるには、最初に KafkaConnector リソースを有効にするアノテーションを Kafka Connect 設定から削除します。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。

切り替えを行うときは、KafkaConnect リソースのステータスを確認 してください。metadata.generation (デプロイの現在のバージョン) の値は、status.observedGeneration (リソースの最新の調整) と一致する必要があります。Kafka Connect クラスターが Ready になったら、KafkaConnector リソースを削除できます。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.