9.6. Kafka Connect の設定


KafkaConnect カスタムリソースの spec プロパティーを更新して、Kafka Connect デプロイメントを設定します。

Kafka Connect を使用して、Kafka クラスターへの外部データ接続を設定します。KafkaConnect リソースのプロパティーを使用して、Kafka Connect デプロイメントを設定します。

Kafka Connect クラスターの設定オプションの詳細は、Streams for Apache Kafka カスタムリソース API リファレンス を参照してください。

KafkaConnector の設定

KafkaConnect リソースを使用すると、Kafka Connect のコネクターインスタンスを OpenShift ネイティブに作成および管理できます。

Kafka Connect 設定では、strimzi.io/use-connector-resources アノテーションを追加して、Kafka Connect クラスターの KafkaConnectors を有効にします。また、build 設定を追加して、データ接続に必要なコネクタープラグインを備えたコンテナーイメージを Streams for Apache Kafka が自動的にビルドするようにすることもできます。Kafka Connect コネクターの外部設定は、externalConfiguration プロパティーで指定します。

コネクターを管理するには、KafkaConnector カスタムリソースまたは Kafka Connect REST API を使用できます。KafkaConnector リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。これらの方法を使用してコネクターを作成、再設定、または削除する方法の詳細については、コネクターの追加 を参照してください。

コネクター設定は、HTTP リクエストの一部として Kafka Connect に渡され、Kafka 自体に保存されます。ConfigMap およびシークレットは、設定やデータの保存に使用される標準的な OpenShift リソースです。ConfigMap およびシークレットを使用してコネクターの特定の要素を設定できます。その後、HTTP REST コマンドで設定値を参照できます。これにより、必要な場合は設定が分離され、よりセキュアになります。この方法は、ユーザー名、パスワード、証明書などの機密性の高いデータに適用されます。

大量のメッセージ処理

設定を調整して、大量のメッセージを処理できます。詳細は、大量のメッセージの処理 を参照してください。

KafkaConnect カスタムリソース設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect 1
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true" 2
spec:
  replicas: 3 3
  authentication: 4
    type: tls
    certificateAndKey:
      certificate: source.crt
      key: source.key
      secretName: my-user-source
  bootstrapServers: my-cluster-kafka-bootstrap:9092 5
  tls: 6
    trustedCertificates:
      - secretName: my-cluster-cluster-cert
        certificate: ca.crt
      - secretName: my-cluster-cluster-cert
        certificate: ca2.crt
  config: 7
    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  build: 8
    output: 9
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 10
      - name: connector-1
        artifacts:
          - type: tgz
            url: <url_to_download_connector_1_artifact>
            sha512sum: <SHA-512_checksum_of_connector_1_artifact>
      - name: connector-2
        artifacts:
          - type: jar
            url: <url_to_download_connector_2_artifact>
            sha512sum: <SHA-512_checksum_of_connector_2_artifact>
  externalConfiguration: 11
    env:
      - name: AWS_ACCESS_KEY_ID
        valueFrom:
          secretKeyRef:
            name: aws-creds
            key: awsAccessKey
      - name: AWS_SECRET_ACCESS_KEY
        valueFrom:
          secretKeyRef:
            name: aws-creds
            key: awsSecretAccessKey
  resources: 12
    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  logging: 13
    type: inline
    loggers:
      log4j.rootLogger: INFO
  readinessProbe: 14
    initialDelaySeconds: 15
    timeoutSeconds: 5
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  metricsConfig: 15
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        name: my-config-map
        key: my-key
  jvmOptions: 16
    "-Xmx": "1g"
    "-Xms": "1g"
  image: my-org/my-image:latest 17
  rack:
    topologyKey: topology.kubernetes.io/zone 18
  template: 19
    pod:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: application
                    operator: In
                    values:
                      - postgresql
                      - mongodb
              topologyKey: "kubernetes.io/hostname"
    connectContainer: 20
      env:
        - name: OTEL_SERVICE_NAME
          value: my-otel-service
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: "http://otlp-host:4317"
  tracing:
    type: opentelemetry 21

1
KafkaConnect を使用します。
2
Kafka Connect クラスターの KafkaConnectors を有効にします。
3
タスクを実行するワーカーのレプリカノード数。
4
mTLS、トークンベースの OAuth、SASL ベース SCRAM-SHA-256/SCRAM-SHA-512、または PLAIN として指定された Kafka Connect クラスターの認証。デフォルトでは、Kafka Connect はプレーンテキスト接続を使用して Kafka ブローカーに接続します。
5
Kafka クラスターに接続するためのブートストラップサーバー。
6
クラスターの TLS 証明書が X.509 形式で保存されるキー名のある TLS による暗号化。複数の証明書が同じシークレットに保存されている場合は、複数回リストできます。
7
ワーカーの Kafka Connect 設定 (コネクターではない)。Streams for Apache Kafka によって直接管理されないプロパティーに限り、標準の Apache Kafka 設定を指定できます。
8
コネクタープラグインで自動的にコンテナーイメージをビルドするためのビルド設定プロパティー。
9
(必須) 新しいイメージがプッシュされるコンテナーレジストリーの設定。
10
(必須) 新しいコンテナーイメージに追加するコネクタープラグインとそれらのアーティファクトのリスト。各プラグインは、1 つ以上の artifact を使用して設定する必要があります。
11
ここで示す環境変数や、ボリュームを使用したコネクターの外部設定。設定プロバイダープラグインを使用して、外部ソースから設定値を読み込むこともできます。
12
現在 cpu および memory である、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。
13
指定された Kafka ロガーおよびログレベルが ConfigMap を介して直接的に (inline) または間接的に (external) に追加されます。カスタム Log4j 設定は、ConfigMap の log4j.properties キーまたは log4j2.properties キーの下に配置する必要があります。Kafka Connect log4j.rootLogger ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。
14
コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
15
Prometheus メトリクス。この例では、Prometheus JMX エクスポーターの設定が含まれる ConfigMap を参照して有効になります。metricsConfig.valueFrom.configMapKeyRef.key 配下に空のファイルが含まれる ConfigMap の参照を使用して、追加設定なしでメトリックを有効にできます。
16
Kafka Connect を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
17
高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
18
特別なオプション: 展開のための Rack awareness 設定。これは、リージョン間ではなく、同じロケーション内でのデプロイメントを目的とした特殊なオプションです。このオプションは、コネクターがリーダーレプリカではなく、最も近いレプリカから消費する場合に使用できます。場合によっては、最も近いレプリカから消費することで、ネットワークの使用率を改善したり、コストを削減したりできます。topologyKey は、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準の topology.kubernetes.io/zone ラベルを使用するゾーンを指定します。最も近いレプリカから消費するには、Kafka ブローカー設定で RackAwareReplicaSelector を有効にします。
19
テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
20
分散トレース用に環境変数が設定されます。
21
分散トレーシングは、OpenTelemetry を使用して有効になります。

9.6.1. 複数のインスタンス用の Kafka Connect の設定

デフォルトでは、Streams for Apache Kafka は、Kafka Connect によって使用される内部トピックのグループ ID と名前を設定します。Kafka Connect の複数のインスタンスを実行する場合は、次の config プロパティーを使用してこれらのデフォルト設定を変更する必要があります。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  config:
    group.id: my-connect-cluster 1
    offset.storage.topic: my-connect-cluster-offsets 2
    config.storage.topic: my-connect-cluster-configs 3
    status.storage.topic: my-connect-cluster-status 4
    # ...
  # ...
1
Kafka 内の Kafka Connect クラスターグループ ID。
2
コネクターオフセットを保存する Kafka トピック。
3
コネクターおよびタスクステータスの設定を保存する Kafka トピック。
4
コネクターおよびタスクステータスの更新を保存する Kafka トピック。
注記

group.id が同じすべてのインスタンスで、3 つのトピックの値を同じにする必要があります。

これらのデフォルト設定を変更しない限り、同じ Kafka クラスターに接続する各インスタンスは同じ値でデプロイされます。実際には、これはすべてのインスタンスがクラスターを形成し、同じ内部トピックを使用することを意味します。

複数のインスタンスが同じ内部トピックを使用しようとすると、予期しないエラーが発生するため、インスタンスごとにこれらのプロパティーの値を変更する必要があります。

9.6.2. Kafka Connect のユーザー認可の設定

Kafka で認可を使用する場合、Kafka Connect ユーザーはクラスターグループおよび Kafka Connect の内部トピックへの読み取り/書き込みアクセス権が必要です。この手順では、simple 認可および ACL を使用してアクセス権限を付与する方法を説明します。

Kafka Connect クラスターグループ ID と内部トピックのプロパティーは、デフォルトで Streams for Apache Kafka によって設定されます。あるいは、KafkaConnect リソースの spec で明示的に定義することもできます。複数の Kafka Connect インスタンスを実行する 場合はグループ ID とトピックの値が異なる必要があるため、これは複数のインスタンスに対して Kafka Connect を設定する場合に便利です。

簡易認可は、Kafka AclAuthorizer および StandardAuthorizer プラグインで管理される ACL ルールを使用し、適切なアクセスレベルを確保します。KafkaUser リソースに簡易認証を使用するように設定する方法については、AclRule スキーマリファレンスを参照してください。

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. KafkaUser リソースの authorization プロパティーを編集し、アクセス権限をユーザーに付与します。

    アクセス権は、literal 名の値を使用して、Kafka Connect トピックとクラスターグループに対して設定されます。次の表に、トピックとクラスターグループ ID に設定されたデフォルトの名前を示します。

    表9.2 アクセス権設定の名前
    プロパティー名前

    offset.storage.topic

    connect-cluster-offsets

    status.storage.topic

    connect-cluster-status

    config.storage.topic

    connect-cluster-configs

    group

    connect-cluster

    この設定例では、デフォルト名を使用してアクセス権を指定します。Kafka Connect インスタンスに別の名前を使用している場合は、ACL 設定でそれらの名前を使用します。

    簡易認可の設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      # ...
      authorization:
        type: simple
        acls:
          # access to offset.storage.topic
          - resource:
              type: topic
              name: connect-cluster-offsets
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to status.storage.topic
          - resource:
              type: topic
              name: connect-cluster-status
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to config.storage.topic
          - resource:
              type: topic
              name: connect-cluster-configs
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # cluster group
          - resource:
              type: group
              name: connect-cluster
              patternType: literal
            operations:
              - Read
            host: "*"

  2. リソースを作成または更新します。

    oc apply -f KAFKA-USER-CONFIG-FILE

9.6.3. Kafka Connect コネクターの手動停止または一時停止

KafkaConnector リソースを使用してコネクターを設定している場合は、state 設定を使用してコネクターを停止または一時停止します。コネクターとタスクがインスタンス化されたままになる一時停止状態とは対照的に、コネクターを停止すると設定のみが保持され、アクティブなプロセスは保持されません。コネクター実行の停止は、単に一時停止するよりも長時間停止する場合に適しています。一時停止されたコネクターはすぐに再開されますが、停止されたコネクターにはメモリーとリソースが解放されるという利点があります。

注記

state 設定は、KafkaConnectorSpec スキーマの (非推奨) pause 設定を置き換えるもので、コネクターでの一時停止を許可します。これまでに pause 設定を使用してコネクターを一時停止した場合には、競合の回避目的にのみ state 設定の使用に移行することを推奨します。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 一時停止または停止するコネクターを制御する KafkaConnector カスタムリソースの名前を見つけます。

    oc get KafkaConnector
  2. KafkaConnector リソースを編集して、コネクターを停止または一時停止します。

    Kafka Connect コネクターを停止する設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector
      tasksMax: 2
      config:
        file: "/opt/kafka/LICENSE"
        topic: my-topic
      state: stopped
      # ...

    state 設定を stopped または paused に変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態は running です。

  3. 変更を KafkaConnector 設定に適用します。

    staterunning に変更するか、設定を削除して、コネクターを再開できます。

注記

あるいは、Kafka Connect API を公開しstop エンドポイントと 一時pause エンドポイントを使用してコネクターの実行を停止することもできます。たとえば、PUT /connectors/<connector_name>/stop などです。その後、resume エンドポイントを使用して再起動できます。

9.6.4. Kafka Connect コネクターの手動での再起動

KafkaConnector リソースを使用してコネクターを管理している場合は、strimzi.io/restart アノテーションを使用してコネクターの再起動を手動でトリガーします。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 再起動する Kafka コネクターを制御する KafkaConnector カスタムリソースの名前を見つけます。

    oc get KafkaConnector
  2. OpenShift で KafkaConnector リソースにアノテーションを付けて、コネクターを再起動します。

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"

    restart アノテーションは true に設定されています。

  3. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。

    アノテーションが調整プロセスで検出されれば、Kafka コネクターは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは KafkaConnector カスタムリソースから削除されます。

9.6.5. Kafka Connect コネクタータスクの手動での再起動

KafkaConnector リソースを使用してコネクターを管理している場合は、strimzi.io/restart-task アノテーションを使用して、コネクタータスクの再起動を手動でトリガーします。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 再起動する Kafka コネクタータスクを制御する KafkaConnector カスタムリソースの名前を見つけます。

    oc get KafkaConnector
  2. KafkaConnector カスタムリソースから再起動するタスクの ID を検索します。

    oc describe KafkaConnector <kafka_connector_name>

    タスク ID は 0 から始まる負の値ではない整数です。

  3. OpenShift で KafkaConnector リソースにアノテーションを付けて、ID を使用してコネクタータスクを再開します。

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"

    この例では、タスク 0 が再起動されます。

  4. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。

    アノテーションが調整プロセスで検出されれば、Kafka コネクタータスクは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは KafkaConnector カスタムリソースから削除されます。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.