3.3. Apache Kafka のシンク


Apache Kafka シンクは、クラスター管理者がクラスターで Apache Kafka を有効にした場合に使用できる イベントシンク の一種です。Kafka シンクを使用して、イベントソース から Kafka トピックにイベントを直接送信できます。

3.3.1. YAML を使用した Apache Kafka シンクの作成

イベントを Kafka トピックに送信する Kafka シンクを作成できます。デフォルトでは、Kafka シンクはバイナリーコンテンツモードを使用します。これは、構造化モードよりも効率的です。YAML を使用して Kafka シンクを作成するには、KafkaSink オブジェクトを定義する YAML ファイルを作成してから、ocapply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソース (CR) がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. KafkaSink オブジェクト定義を YAML ファイルとして作成します。

    Kafka シンク YAML

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
      name: <sink-name>
      namespace: <namespace>
    spec:
      topic: <topic-name>
      bootstrapServers:
       - <bootstrap-server>

  2. Kafka シンクを作成するには、KafkaSink YAML ファイルを適用します。

    $ oc apply -f <filename>
  3. シンクが仕様で指定されるようにイベントソースを設定します。

    API サーバーソースに接続された Kafka シンクの例

    apiVersion: sources.knative.dev/v1alpha2
    kind: ApiServerSource
    metadata:
      name: <source-name> 1
      namespace: <namespace> 2
    spec:
      serviceAccountName: <service-account-name> 3
      mode: Resource
      resources:
      - apiVersion: v1
        kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1alpha1
          kind: KafkaSink
          name: <sink-name> 4

    1
    イベントソースの名前。
    2
    イベントソースの namespace。
    3
    イベントソースのサービスアカウント。
    4
    Kafka シンクの名前。

3.3.2. OpenShift Container Platform Web コンソールを使用した Apache Kafka のイベントシンクの作成

OpenShift Container Platform Web コンソールの Developer パースペクティブを使用して、イベントを Kafka トピックに送信する Kafka シンクを作成できます。デフォルトでは、Kafka シンクはバイナリーコンテンツモードを使用します。これは、構造化モードよりも効率的です。

開発者は、イベントシンクを作成して、特定のソースからイベントを受信し、それを Kafka トピックに送信できます。

前提条件

  • OperatorHub から、Knative Serving、Knative Eventing、および Apache Kafka API 用の Knative ブローカーを使用して OpenShift Serverless Operator をインストールしている。
  • Kafka 環境で Kafka トピックを作成しました。

手順

  1. Developer パースペクティブで、+Add ビューに移動します。
  2. Eventing カタログEvent Sink をクリックします。
  3. カタログ項目で KafkaSink を検索してクリックします。
  4. イベントシンクの作成 をクリックします。
  5. フォームビューで、ホスト名とポートの組み合わせであるブートストラップサーバーの URL を入力します。

    イベントシンクを作成する
  6. イベントデータを送信するトピックの名前を入力します。
  7. イベントシンクの名前を入力します。
  8. Create をクリックします。

検証

  1. Developer パースペクティブで、Topology ビューに移動します。
  2. 作成したイベントシンクをクリックして、右側のパネルに詳細を表示します。

3.3.3. Apache Kafka シンクのセキュリティーの設定

Transport Layer Security (TLS) は、Apache Kafka クライアントおよびサーバーによって、Knative と Kafka 間のトラフィックを暗号化するため、および認証のために使用されます。TLS は、Apache Kafka の Knative ブローカー実装でサポートされている唯一のトラフィック暗号化方式です。

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソース (CR) が OpenShift Container Platform クラスターにインストールされている。
  • Kafka シンクが KnativeKafka CR で有効になっている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • .pem ファイルとして Kafka クラスター CA 証明書が保存されている。
  • Kafka クラスタークライアント証明書とキーが .pem ファイルとして保存されている。
  • OpenShift (oc) CLI がインストールされている。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。

手順

  1. KafkaSink オブジェクトと同じ namespace に証明書ファイルをシークレットとして作成します。

    重要

    証明書とキーは PEM 形式である必要があります。

    • 暗号化なしで SASL を使用した認証の場合:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_PLAINTEXT \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-literal=user=<username> \
        --from-literal=password=<password>
    • SASL を使用した認証と TLS を使用した暗号化の場合:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_SSL \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 1
        --from-literal=user=<username> \
        --from-literal=password=<password>
      1
      パブリッククラウドで管理される Kafka サービスを使用している場合は、ca.crt を省略してシステムのルート CA セットを使用できます。
    • TLS を使用した認証と暗号化の場合:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SSL \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 1
        --from-file=user.crt=<my_cert.pem_file_path> \
        --from-file=user.key=<my_key.pem_file_path>
      1
      パブリッククラウドで管理される Kafka サービスを使用している場合は、ca.crt を省略してシステムのルート CA セットを使用できます。
  2. KafkaSink オブジェクトを作成または変更し、auth 仕様にシークレットへの参照を追加します。

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
       name: <sink_name>
       namespace: <namespace>
    spec:
    ...
       auth:
         secret:
           ref:
             name: <secret_name>
    ...
  3. KafkaSink オブジェクトを適用します。

    $ oc apply -f <filename>
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.