2.2. Kafka Connect/S2I クラスターの設定


ここでは、AMQ Streams クラスターにて Kafka Connect や S2I (Source-to-Image) のある Kafka Connect デプロイメントを設定する方法を説明します。

Kafka Connect は、Connector プラグインを使用して Kafka ブローカーと他のシステムの間でデータをストリーミングする統合ツールです。Kafka Connect は、Kafka と、データベースなどの外部データソースまたはターゲットと統合するためのフレームワークを提供し、コネクターを使用してデータをインポートまたはエクスポートします。コネクターは、必要な接続設定を提供するプラグインです。

Kafka Connect を使用している場合は、KafkaConnect または KafkaConnectS2I リソースを設定します。Source-to-Image(S 2I)フレームワークを使用して Kafka Connect をデプロイする場合は、KafkaConnectS 2I リソースを使用します。

2.2.1. Kafka Connect の設定

Kafka Connect を使用して、Kafka クラスターへの外部データ接続を設定します。

KafkaConnect または KafkaConnectS2I リソースのプロパティーを使用して、Kafka Connect デプロイメントを設定します。この手順の例は、KafkaConnect リソースの場合 ですが、プロパティーは KafkaConnectS2I リソースと同じです。

Kafka Connector の設定

KafkaConnector リソースを使用すると、Kafka Connect のコネクターインスタンスを OpenShift ネイティブに作成および管理できます。

設定で、strimzi.io/use-connector-resources アノテーションを追加して、Kafka Connect クラスターの KafkaConnectors を有効にします。externalConfiguration プロパティーを使用して Kafka Connect コネクターの外部設定を指定することもできます。

コネクターは、Kafka Connect の HTTP REST インターフェースまたは KafkaConnectors を使用して作成、再設定、および削除されます。これらの方法の詳細は、『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「コネクターの作成および管理」を参照してください。

コネクター設定は、HTTP リクエストの一部として Kafka Connect に渡され、Kafka 自体に保存されます。ConfigMap およびシークレットは、設定やデータの保存に使用される標準的な OpenShift リソースです。ConfigMap およびシークレットを使用してコネクターの特定の要素を設定できます。その後、HTTP REST コマンドで設定値を参照できます (これにより、必要な場合は設定が分離され、よりセキュアになります)。この方法は、ユーザー名、パスワード、証明書などの機密性の高いデータに適用されます。

前提条件

  • OpenShift クラスター。
  • 稼働中の Cluster Operator。

以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。

手順

  1. KafkaConnect または KafkaConnectS2I リソースの spec プロパティーを編集します。

    設定可能なプロパティーは以下の例のとおりです。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnect 1
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 2
    spec:
      replicas: 3 3
      authentication: 4
        type: tls
        certificateAndKey:
          certificate: source.crt
          key: source.key
          secretName: my-user-source
      bootstrapServers: my-cluster-kafka-bootstrap:9092 5
      tls: 6
        trustedCertificates:
          - secretName: my-cluster-cluster-cert
            certificate: ca.crt
          - secretName: my-cluster-cluster-cert
            certificate: ca2.crt
      config: 7
        group.id: my-connect-cluster
        offset.storage.topic: my-connect-cluster-offsets
        config.storage.topic: my-connect-cluster-configs
        status.storage.topic: my-connect-cluster-status
        key.converter: org.apache.kafka.connect.json.JsonConverter
        value.converter: org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable: true
        value.converter.schemas.enable: true
        config.storage.replication.factor: 3
        offset.storage.replication.factor: 3
        status.storage.replication.factor: 3
      externalConfiguration: 8
        env:
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsAccessKey
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsSecretAccessKey
      resources: 9
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 10
        type: inline
        loggers:
          log4j.rootLogger: "INFO"
      readinessProbe: 11
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      metrics: 12
        lowercaseOutputName: true
        lowercaseOutputLabelNames: true
        rules:
          - pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+)
            name: kafka_connect_worker_$1
            help: "Kafka Connect JMX metric worker"
            type: GAUGE
          - pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+)
            name: kafka_connect_worker_rebalance_$1
            help: "Kafka Connect JMX metric rebalance information"
            type: GAUGE
      jvmOptions: 13
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 14
      template: 15
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 16
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
    1
    必要に応じて KafkaConnect または KafkaConnectS2I を使用します。
    2
    Kafka Connect クラスターの KafkaConnectors を有効にします。
    3
    4
    OAuth ベアラートークン、SASL ベースの SCRAM-SHA-512 または PLAIN メカニズムを使用し、ここで示された TLS メカニズム を使用する、Kafka Connect クラスターの認証。デフォルトでは、Kafka Connect はプレーンテキスト接続を使用して Kafka ブローカーに接続します。
    5
    Kafka Connect クラスターに接続するためのブートストラップサーバー
    6
    クラスターの TLS 証明書が X.509 形式で保存されるキー名のある TLS による暗号化。複数の証明書が同じシークレットに保存されている場合は、複数回リストできます。
    7
    ワーカーKafka Connect 設定 (コネクターではない)。標準の Apache Kafka 設定が提供されることがありますが、AMQ Streams によって直接管理されないプロパティーに制限されます。
    8
    ここで示す環境変数や、ボリュームを使用した Kafka コネクターの外部設定
    9
    サポートされているリソース(現在は cpumemory)の予約と、消費可能な最大リソースを指定するための制限を要求します。
    10
    指定された Kafka loggers and log levels が ConfigMap を介して直接的に (inline) または間接的に (external) に追加されます。カスタム ConfigMap は、log4j.properties または log4j2.properties キー下に配置する必要があります。Kafka Connect log4j.rootLogger ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。
    11
    コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック
    12
    Prometheus メトリクス。この例では、Prometheus JMX エクスポーターの設定で有効になっています。metrics: {} を使用すると追加設定なしでメトリクスを有効にすることができます。
    13
    Kafka Connect を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション
    14
    高度なオプション:特別な場合のみ 推奨される コンテナーイメージの設定。
    15
    テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
    16
  2. リソースを作成または更新します。

    oc apply -f KAFKA-CONNECT-CONFIG-FILE
  3. Kafka Connect の承認が有効である場合、Kafka Connect ユーザーを設定し、Kafka Connect のコンシューマーグループおよびトピックへのアクセスを有効にします
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.