5.5. Kafka Connect の設定


Kafka Connect の基本設定には、Kafka クラスターに接続するブートストラップアドレスと、暗号化および認証の詳細が必要です。

Kafka Connect インスタンスはデフォルトでは、以下が同じ値で設定されます。

  • Kafka Connect クラスターのグループ ID
  • コネクターオフセットを保存する Kafka トピック
  • コネクターおよびタスクステータス設定を保存する Kafka トピック
  • コネクターおよびタスクステータスの更新情報を保存する Kafka トピック

複数の異なる Kafka Connect インスタンスが使用されている場合には、上記の設定はインスタンスごとに反映する必要があります。

Kafka Connect 設定の YAML 例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
  # ...

コネクター

コネクターは Kafka Connect とは別に設定されます。この設定では、Kafka Connect にフィードするソース入力データおよびターゲット出力データを記述します。外部ソースデータは、対象のメッセージを格納する特定のトピックを参照する必要があります。

Kafka には、以下のようにビルトインコネクターが 2 つあります。

  • FileStreamSourceConnector は、外部システムから Kafka にデータをストリーミングし、入力ソースから行を読み取り、各行を Kafka トピックに送信します。
  • FileStreamSinkConnector は、Kafka から外部システムにデータをストリーミングし、Kafka トピックからメッセージを読み取り、出力ファイルにメッセージごとに 1 行を作成します。

コネクタープラグインを使用して他のコネクターを追加できます。コネクタープラグインは、JAR ファイルまたは TGZ アーカイブのセットで、特定タイプの外部システムへの接続に必要な実装を定義します。

新しいコネクタープラグインを使用するカスタム Kafka Connect イメージを作成します。

イメージを作成するには、以下を使用します。

  • AMQ Streams が新しいイメージを自動的に作成するための Kafka Connect の設定。
  • ベースイメージとしての Red Hat Ecosystem Catalog の Kafka コンテナーイメージ
  • 新規コンテナーイメージを作成する OpenShift ビルドS2I (Source-to-Image) フレームワーク。

AMQ Streams で新しいイメージを自動的に作成するには、build 設定に、コンテナーイメージを格納するコンテナーレジストリーを参照する output プロパティーと、イメージに追加するコネクタープラグインとそれらのアーティファクトをリストする plugins プロパティーが必要です。

output プロパティーは、イメージのタイプおよび名前を記述し、任意でコンテナーレジストリーへのアクセスに必要なクレデンシャルが含まれる Secret の名前を記述します。plugins プロパティーは、アーティファクトのタイプとアーティファクトのダウンロード元となる URL を記述します。さらに、SHA-512 チェックサムを指定して、アーティファクトを展開する前に検証することもできます。

新しいイメージを自動的に作成する Kafka Connect の設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  # ...
  build:
    output:
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins:
      - name: debezium-postgres-connector
        artifacts:
          - type: tgz
            url: https://ARTIFACT-ADDRESS.tgz
            sha512sum: HASH-NUMBER-TO-VERIFY-ARTIFACT
      # ...
  #...

コネクターの管理

KafkaConnector リソースまたは Kafka Connect REST API を使用して、Kafka Connect クラスターでコネクターインスタンスを作成および管理できます。KafkaConnector リソースでは OpenShift ネイティブな方法が提供され、Cluster Operator によって管理されます。

KafkaConnector リソースの spec では、コネクタークラスと設定、およびデータを処理するコネクター タスク の最大数を指定します。

KafkaConnector 設定の YAML 例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    file: "/opt/kafka/LICENSE"
    topic: my-topic
    # ...

アノテーションを KafkaConnect リソースに追加して、KafkaConnector を有効にします。KafkaConnector リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。

KafkaConnector を有効にするアノテーションの YAML 例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.