5.4. ブローカー


5.4.1. ブローカー

ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。イベントは、HTTP POST リクエストとしてイベントソースからブローカーに送信されます。イベントがブローカーに送信された後に、それらはトリガーを使用して CloudEvent 属性 でフィルターされ、HTTP POST リクエストとしてイベントシンクに送信できます。

ブローカーイベント配信の概要

5.4.2. ブローカータイプ

クラスター管理者は、クラスターのデフォルトブローカー実装を設定できます。ブローカーを作成する場合、Broker オブジェクトで設定を指定しない限り、デフォルトのブローカー実装が使用されます。

5.4.2.1. 開発目的でのデフォルトブローカーの実装

Knative は、デフォルトのチャネルベースのブローカー実装を提供します。このチャネルベースのブローカーは、開発およびテストの目的で使用できますが、実稼働環境での適切なイベント配信の保証は提供しません。デフォルトのブローカーは、デフォルトで InMemoryChannel チャネル実装によってサポートされています。

Kafka を使用してネットワークホップを削減する場合は、Kafka ブローカーの実装を使用します。チャネルベースのブローカーが KafkaChannel チャネル実装によってサポートされるように設定しないでください。

5.4.2.2. 実稼働環境対応の Kafka ブローカーの実装

実稼働環境に対応した Knative Eventing デプロイメントの場合、Red Hat は Knative Kafka ブローカーの実装を使用することをお勧めします。Kafka ブローカーは、Knative ブローカーの Apache Kafka ネイティブ実装であり、CloudEvents を Kafka インスタンスに直接送信します。

Kafka ブローカーは、イベントの保存とルーティングのために Kafka とネイティブに統合されています。これにより、他のブローカータイプよりもブローカーとトリガーモデルの Kafka との統合性が向上し、ネットワークホップを削減することができます。Kafka ブローカー実装のその他の利点は次のとおりです。

  • 少なくとも 1 回の配信保証
  • CloudEvents パーティショニング拡張機能に基づくイベントの順序付き配信
  • コントロールプレーンの高可用性
  • 水平方向にスケーラブルなデータプレーン

Knative Kafka ブローカーは、バイナリーコンテンツモードを使用して、受信 CloudEvents を Kafka レコードとして保存します。これは、CloudEvent のすべての属性と拡張機能が Kafka レコードのヘッダーとしてマップされ、CloudEvent の data 仕様が Kafka レコードの値に対応することを意味します。

5.4.3. ブローカーの作成

Knative は、デフォルトのチャネルベースのブローカー実装を提供します。このチャネルベースのブローカーは、開発およびテストの目的で使用できますが、実稼働環境での適切なイベント配信の保証は提供しません。

クラスター管理者がデフォルトのブローカータイプとして Kafka を使用するように OpenShift Serverless デプロイメントを設定している場合、デフォルト設定を使用してブローカーを作成すると、Kafka ベースのブローカーが作成されます。

OpenShift Serverless デプロイメントが Kafka ブローカーをデフォルトのブローカータイプとして使用するように設定されていない場合、以下の手順でデフォルト設定を使用すると、チャネルベースのブローカーが作成されます。

5.4.3.1. Knative CLI を使用したブローカーの作成

ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。ブローカーを作成するために Knative (kn) CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。kn broker create コマンドを使用して、ブローカーを作成できます。

前提条件

  • OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • Knative (kn) CLI をインストールしている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  • ブローカーを作成します。

    $ kn broker create <broker_name>

検証

  1. kn コマンドを使用して、既存のブローカーを一覧表示します。

    $ kn broker list

    出力例

    NAME      URL                                                                     AGE   CONDITIONS   READY   REASON
    default   http://broker-ingress.knative-eventing.svc.cluster.local/test/default   45s   5 OK / 5     True

  2. オプション: OpenShift Container Platform Web コンソールを使用している場合、Developer パースペクティブの Topology ビューに移動し、ブローカーが存在することを確認できます。

    Web コンソールの Topology ビューでのブローカーの表示

5.4.3.2. トリガーのアノテーションによるブローカーの作成

ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。eventing.knative.dev/injection: enabled アノテーションを Trigger オブジェクトに追加してブローカーを作成できます。

重要

knative-eventing-injection: enabled アノテーションを使用してブローカーを作成する場合、クラスター管理者パーミッションなしにこのブローカーを削除することはできません。クラスター管理者が最初にこのアノテーションを削除せずにブローカーを削除する場合、ブローカーは削除後に再び作成されます。

前提条件

  • OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • OpenShift CLI (oc) をインストールしている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. Trigger オブジェクトを、eventing.knative.dev/injection: enabled アノテーションを付けて YAML ファイルとして作成します。

    apiVersion: eventing.knative.dev/v1
    kind: Trigger
    metadata:
      annotations:
        eventing.knative.dev/injection: enabled
      name: <trigger_name>
    spec:
      broker: default
      subscriber: 1
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: <service_name>
    1
    トリガーがイベントを送信するイベントシンクまたは サブスクライバー の詳細を指定します。
  2. Trigger YAML ファイルを適用します。

    $ oc apply -f <filename>

検証

oc CLI を使用してブローカーが正常に作成されていることを確認するか、または Web コンソールの Topology ビューでこれを確認できます。

  1. 以下の oc コマンドを入力してブローカーを取得します。

    $ oc -n <namespace> get broker default

    出力例

    NAME      READY     REASON    URL                                                                     AGE
    default   True                http://broker-ingress.knative-eventing.svc.cluster.local/test/default   3m56s

  2. オプション: OpenShift Container Platform Web コンソールを使用している場合、Developer パースペクティブの Topology ビューに移動し、ブローカーが存在することを確認できます。

    Web コンソールの Topology ビューでのブローカーの表示

5.4.3.3. namespace へのラベル付けによるブローカーの作成

ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。所有しているか、または書き込みパーミッションのある namespace にラベルを付けて default ブローカーを自動的に作成できます。

注記

この方法を使用して作成されたブローカーは、ラベルを削除すると削除されません。これらは手動で削除する必要があります。

前提条件

  • OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • OpenShift CLI (oc) をインストールしている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  • eventing.knative.dev/injection=enabled で namespace にラベルを付ける。

    $ oc label namespace <namespace> eventing.knative.dev/injection=enabled

検証

oc CLI を使用してブローカーが正常に作成されていることを確認するか、または Web コンソールの Topology ビューでこれを確認できます。

  1. oc コマンドを使用してブローカーを取得します。

    $ oc -n <namespace> get broker <broker_name>

    コマンドの例

    $ oc -n default get broker default

    出力例

    NAME      READY     REASON    URL                                                                     AGE
    default   True                http://broker-ingress.knative-eventing.svc.cluster.local/test/default   3m56s

  2. オプション: OpenShift Container Platform Web コンソールを使用している場合、Developer パースペクティブの Topology ビューに移動し、ブローカーが存在することを確認できます。

    Web コンソールの Topology ビューでのブローカーの表示

5.4.3.4. 挿入 (injection) によって作成されたブローカーの削除

挿入によりブローカーを作成し、後でそれを削除する必要がある場合は、手動で削除する必要があります。namespace ラベルまたはトリガーアノテーションを使用して作成されたブローカーは、ラベルまたはアノテーションを削除した場合に永続的に削除されません。

前提条件

  • OpenShift CLI (oc) をインストールしている。

手順

  1. eventing.knative.dev/injection=enabled ラベルを namespace から削除します。

    $ oc label namespace <namespace> eventing.knative.dev/injection-

    アノテーションを削除すると、Knative では削除後にブローカーを再作成できなくなります。

  2. 選択された namespace からブローカーを削除します。

    $ oc -n <namespace> delete broker <broker_name>

検証

  • oc コマンドを使用してブローカーを取得します。

    $ oc -n <namespace> get broker <broker_name>

    コマンドの例

    $ oc -n default get broker default

    出力例

    No resources found.
    Error from server (NotFound): brokers.eventing.knative.dev "default" not found

5.4.3.5. Web コンソールを使用してブローカーを作成する

Knative Eventing がクラスターにインストールされた後、Web コンソールを使用してブローカーを作成できます。OpenShift Container Platform Web コンソールを使用すると、ブローカーを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Container Platform Web コンソールにログインしている。
  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. Developer パースペクティブで、+Add Broker に移動します。Broker ページが表示されます。
  2. オプション:ブローカーの Name を更新します。名前を更新しない場合、生成されたブローカーの名前は default になります。
  3. Create をクリックします。

検証

トポロジー ページでブローカーコンポーネントを表示することにより、ブローカーが作成されたことを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. mt-broker-ingressmt-broker-filter、および mt-broker-controller コンポーネントを表示します。

    トポロジービューでブローカーコンポーネントを表示する

5.4.3.6. Administrator パースペクティブを使用したブローカーの作成

ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。イベントは、HTTP POST リクエストとしてイベントソースからブローカーに送信されます。イベントがブローカーに送信された後に、それらはトリガーを使用して CloudEvent 属性 でフィルターされ、HTTP POST リクエストとしてイベントシンクに送信できます。

ブローカーイベント配信の概要

前提条件

  • OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • Web コンソールにログインしており、Administrator パースペクティブを使用している。
  • OpenShift Container Platform のクラスター管理者パーミッションがある。

手順

  1. OpenShift Container Platform Web コンソールの Administrator パースペクティブで、 Serverless Eventing に移動します。
  2. Create 一覧で、Broker を選択します。Create Broker ページに移動します。
  3. オプション: ブローカーの YAML 設定を変更します。
  4. Create をクリックします。

5.4.3.7. 次のステップ

5.4.3.8. 関連情報

5.4.4. デフォルトのブローカーバッキングチャネルの設定

チャネルベースのブローカーを使用している場合、ブローカーのデフォルトのバッキングチャネルタイプを InMemoryChannel または KafkaChannel に設定できます。

前提条件

  • OpenShift Container Platform に対する管理者権限を持っている。
  • OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされていること。
  • OpenShift (oc) CLI がインストールされている。
  • Kafka チャネルをデフォルトのバッキングチャネルタイプとして使用する場合は、クラスターに KnativeKafka CR もインストールする必要があります。

手順

  1. KnativeEventing カスタムリソース (CR) を変更して、config-br-default-channel 設定マップの設定の詳細を追加します。

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
      config: 1
        config-br-default-channel:
          channel-template-spec: |
            apiVersion: messaging.knative.dev/v1beta1
            kind: KafkaChannel 2
            spec:
              numPartitions: 6 3
              replicationFactor: 3 4
    1
    spec.config で、変更した設定を追加する設定マップを指定できます。
    2
    デフォルトのバッキングチャネルタイプの設定。この例では、クラスターのデフォルトのチャネル実装は KafkaChannel です。
    3
    ブローカーをサポートする Kafka チャネルのパーティションの数。
    4
    ブローカーをサポートする Kafka チャネルのレプリケーションファクター。
  2. 更新された KnativeEventing CR を適用します。

    $ oc apply -f <filename>

5.4.5. デフォルトブローカークラスの設定

config-br-defaults 設定マップを使用して、Knative Eventing のデフォルトのブローカークラス設定を指定できます。クラスター全体または 1 つ以上の namespace に対して、デフォルトのブローカークラスを指定できます。現在、MTChannelBasedBroker および Kafka ブローカータイプがサポートされています。

前提条件

  • OpenShift Container Platform に対する管理者権限を持っている。
  • OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされていること。
  • Kafka ブローカーをデフォルトのブローカー実装として使用する場合は、クラスターに KnativeKafka CR もインストールする必要があります。

手順

  • KnativeEventing カスタムリソースを変更して、config-br-defaults 設定マップの設定の詳細を追加します。

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
      defaultBrokerClass: Kafka 1
      config: 2
        config-br-defaults: 3
          default-br-config: |
            clusterDefault: 4
              brokerClass: Kafka
              apiVersion: v1
              kind: ConfigMap
              name: kafka-broker-config 5
              namespace: knative-eventing 6
            namespaceDefaults: 7
              my-namespace:
                brokerClass: MTChannelBasedBroker
                apiVersion: v1
                kind: ConfigMap
                name: config-br-default-channel 8
                namespace: knative-eventing 9
    ...
    1
    Knative Eventing のデフォルトのブローカークラス。
    2
    spec.config で、変更した設定を追加する設定マップを指定できます。
    3
    config-br-defaults 設定マップは、spec.config 設定またはブローカークラスを指定しないブローカーのデフォルト設定を指定します。
    4
    クラスター全体のデフォルトのブローカークラス設定。この例では、クラスターのデフォルトのブローカークラスの実装は Kafka です。
    5
    kafka-broker-config 設定マップは、Kafka ブローカーのデフォルト設定を指定します。「関連情報」セクションの「Kafka ブローカー構成の設定」を参照してください。
    6
    kafka-broker-config 設定マップが存在する namespace。
    7
    namespace スコープのデフォルトブローカクラス設定。この例では、my-namespace namespace のデフォルトのブローカークラスの実装は MTChannelBasedBroker です。複数の namespace に対してデフォルトのブローカークラスの実装を指定できます。
    8
    config-br-default-channel 設定マップは、ブローカーのデフォルトのバッキングチャネルを指定します。「関連情報」セクションの「デフォルトのブローカーバッキングチャネルの設定」を参照してください。
    9
    config-br-default-channel 設定マップが存在する namespace。
    重要

    namespace 固有のデフォルトを設定すると、クラスター全体の設定が上書きされます。

5.4.6. Kafka ブローカー

実稼働環境に対応した Knative Eventing デプロイメントの場合、Red Hat は Knative Kafka ブローカーの実装を使用することをお勧めします。Kafka ブローカーは、Knative ブローカーの Apache Kafka ネイティブ実装であり、CloudEvents を Kafka インスタンスに直接送信します。

Kafka ブローカーは、イベントの保存とルーティングのために Kafka とネイティブに統合されています。これにより、他のブローカータイプよりもブローカーとトリガーモデルの Kafka との統合性が向上し、ネットワークホップを削減することができます。Kafka ブローカー実装のその他の利点は次のとおりです。

  • 少なくとも 1 回の配信保証
  • CloudEvents パーティショニング拡張機能に基づくイベントの順序付き配信
  • コントロールプレーンの高可用性
  • 水平方向にスケーラブルなデータプレーン

Knative Kafka ブローカーは、バイナリーコンテンツモードを使用して、受信 CloudEvents を Kafka レコードとして保存します。これは、CloudEvent のすべての属性と拡張機能が Kafka レコードのヘッダーとしてマップされ、CloudEvent の data 仕様が Kafka レコードの値に対応することを意味します。

5.4.6.1. デフォルトのブローカータイプとして設定されていない場合の Kafka ブローカーの作成

OpenShift Serverless デプロイメントがデフォルトのブローカータイプとして Kafka ブローカーを使用するように設定されていない場合は、以下の手順のいずれかを使用して、Kafka ベースのブローカーを作成できます。

5.4.6.1.1. YAML を使用した Kafka ブローカーの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。YAML を使用して Kafka ブローカーを作成するには、Broker オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソースは OpenShift Container Platform クラスターにインストールされます。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. Kafka ベースのブローカーを YAML ファイルとして作成します。

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: Kafka 1
      name: example-kafka-broker
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config 2
        namespace: knative-eventing
    1
    ブローカークラス。指定されていない場合、ブローカーはクラスター管理者の設定に従ってデフォルトクラスを使用します。Kafka ブローカーを使用するには、この値を Kafka にする必要があります。
    2
    Knative Kafka ブローカーのデフォルトの設定マップ。この設定マップは、クラスター管理者がクラスター上で Kafka ブローカー機能を有効にした場合に作成されます。
  2. Kafka ベースのブローカー YAML ファイルを適用します。

    $ oc apply -f <filename>
5.4.6.1.2. 外部で管理されている Kafka トピックを使用する Kafka ブローカーの作成

独自の内部トピックの作成を許可せずに Kafka ブローカーを使用する場合は、代わりに外部で管理される Kafka トピックを使用できます。これを実行するには、kafka.eventing.knative.dev/external.topic アノテーションを使用する Kafka Broker オブジェクトを作成する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソースは OpenShift Container Platform クラスターにインストールされます。
  • Red Hat AMQ Streams などの Kafka インスタンスにアクセスでき、Kafka トピックを作成しました。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. Kafka ベースのブローカーを YAML ファイルとして作成します。

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: Kafka 1
        kafka.eventing.knative.dev/external.topic: <topic_name> 2
    ...
    1
    ブローカークラス。指定されていない場合、ブローカーはクラスター管理者の設定に従ってデフォルトクラスを使用します。Kafka ブローカーを使用するには、この値を Kafka にする必要があります。
    2
    使用する Kafka トピックの名前。
  2. Kafka ベースのブローカー YAML ファイルを適用します。

    $ oc apply -f <filename>
5.4.6.1.3. 分離されたデータプレーンのある Apache Kafka の Knative Broker 実装
重要

分離されたデータプレーンを使用した Apache Kafka の Knative Broker 実装は、テクノロジープレビュー機能のみです。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。

Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

Apache Kafka の Knative Broker 実装には 2 つのプレーンがあります。

コントロールプレーン
は、Kubernetes API と通信し、カスタムオブジェクトを監視し、データプレーンを管理するコントローラーで設定されます。
データプレーン
受信イベントをリッスンし、Apache Kafka と通信し、イベントシンクに送信するコンポーネントのコレクション。Apache Kafka データプレーンの Knative Broker 実装はイベントフローになります。この実装は、kafka-broker-receiver および kafka-broker-dispatcher デプロイメントで設定されます。

Kafka の Broker クラスを設定する場合、Apache Kafka の Knative Broker 実装は共有データプレーンを使用します。これは、knative-eventing namespace の kafka-broker-receiver および kafka-broker-dispatcher デプロイメントがクラスター内のすべての Apache Kafka Broker に使用されることを意味します。

ただし、KafkaNamespaced の Broker クラスを設定すると、Apache Kafka ブローカーコントローラーは、ブローカーが存在する namespace ごとに新しいデータプレーンを作成します。このデータプレーンは、その namespace のすべての KafkaNamespaced ブローカーによって使用されます。これにより、データプレーンが分離されるため、ユーザー namespace の kafka-broker-receiver および kafka-broker-dispatcher デプロイメントがその namespace のブローカーにのみ使用されます。

重要

データプレーンを分離することで、このセキュリティー機能により、追加のデプロイメントが作成され、より多くのリソースが使用されます。このような分離要件がない限り、Kafka のクラスで 通常 の Broker を使用します。

5.4.6.1.4. 分離されたデータプレーンを使用する Apache Kafka の Knative ブローカーの作成
重要

分離されたデータプレーンを使用した Apache Kafka の Knative Broker 実装は、テクノロジープレビュー機能のみです。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。

Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

KafkaNamespaced ブローカーを作成するには、eventing.knative.dev/broker.class アノテーションを KafkaNamespaced に設定する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソースは OpenShift Container Platform クラスターにインストールされます。
  • Red Hat AMQ Streams などの Apache Kafka インスタンスにアクセスでき、Kafka トピックを作成している。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションでプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. YAML ファイルを使用して Apache Kafka ベースのブローカーを作成します。

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: KafkaNamespaced 1
      name: default
      namespace: my-namespace 2
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: my-config 3
    ...
    1
    分離されたデータプレーンで Apache Kafka ブローカーを使用するには、ブローカークラスの値は KafkaNamespaced である必要があります。
    2 3
    参照される ConfigMap オブジェクト my-configBroker オブジェクトと同じ namespace (この場合は my-namespace )にある必要があります。
  2. Apache Kafka ベースのブローカー YAML ファイルを適用します。

    $ oc apply -f <filename>
重要

spec.configConfigMap オブジェクトは Broker オブジェクトと同じ namespace にある必要があります。

apiVersion: v1
kind: ConfigMap
metadata:
  name: my-config
  namespace: my-namespace
data:
  ...

KafkaNamespaced クラスで最初の Broker オブジェクトを作成すると、kafka-broker-receiver および kafka-broker-dispatcher デプロイメントが namespace に作成されます。その後、同じ namespace に KafkaNamespaced クラスを持つすべてのブローカーは同じデータプレーンを使用します。KafkaNamespaced クラスを持つブローカーが namespace に存在しない場合は、namespace のデータプレーンが削除されます。

5.4.6.2. Kafka ブローカー構成の設定

設定マップを作成し、Kafka Broker オブジェクトでこの ConfigMap を参照することで、レプリケーション係数、ブートストラップサーバー、および Kafka ブローカーのトピックパーティションの数を設定できます。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソース (CR) は OpenShift Container Platform クラスターにインストールされます。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. kafka-broker-config ConfigMap を変更するか、以下の設定が含まれる独自の ConfigMap を作成します。

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: <config_map_name> 1
      namespace: <namespace> 2
    data:
      default.topic.partitions: <integer> 3
      default.topic.replication.factor: <integer> 4
      bootstrap.servers: <list_of_servers> 5
    1
    ConfigMap 名。
    2
    ConfigMap が存在する namespace。
    3
    Kafka ブローカーのトピックパーティションの数。これは、イベントをブローカーに送信する速度を制御します。パーティションが多い場合には、コンピュートリソースが多く必要です。
    4
    トピックメッセージのレプリケーション係数。これにより、データ損失を防ぐことができます。レプリケーション係数を増やすには、より多くのコンピュートリソースとストレージが必要になります。
    5
    ブートストラップサーバーのコンマ区切りリスト。これは、OpenShift Container Platform クラスターの内部または外部にある可能性があり、ブローカーがイベントを受信してイベントを送信する Kafka クラスターのリストです。
    重要

    default.topic.replication.factor の値は、クラスター内の Kafka ブローカーインスタンスの数以下である必要があります。たとえば、Kafka ブローカーが 1 つしかない場合には、default.topic.replication.factor の値は "1" を超える値にすることはできません。

    Kafka ブローカーの ConfigMap の例

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-broker-config
      namespace: knative-eventing
    data:
      default.topic.partitions: "10"
      default.topic.replication.factor: "3"
      bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

  2. ConfigMap を適用します。

    $ oc apply -f <config_map_filename>
  3. Kafka Broker オブジェクトの ConfigMap を指定します。

    Broker オブジェクトの例

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      name: <broker_name> 1
      namespace: <namespace> 2
      annotations:
        eventing.knative.dev/broker.class: Kafka 3
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: <config_map_name> 4
        namespace: <namespace> 5
    ...

    1
    ブローカー名。
    2
    ブローカーが存在する namespace。
    3
    ブローカークラスアノテーション。この例では、ブローカーはクラス値 Kafka を使用する Kafka ブローカーです。
    4
    ConfigMap 名。
    5
    ConfigMap が存在する namespace。
  4. ブローカーを適用します。

    $ oc apply -f <broker_filename>

5.4.6.3. Knative Kafka ブローカーのセキュリティー設定

Kafka クラスターは、通常、TLS または SASL 認証方法を使用して保護されます。TLS または SASL を使用して、保護された Red Hat AMQ Streams クラスターに対して動作するように Kafka ブローカーまたはチャネルを設定できます。

注記

Red Hat は、SASL と TLS の両方を一緒に有効にすることをお勧めします。

5.4.6.3.1. Kafka ブローカーの TLS 認証の設定

Transport Layer Security (TLS) は、Apache Kafka クライアントおよびサーバーによって、Knative と Kafka 間のトラフィックを暗号化するため、および認証のために使用されます。TLS は、Knative Kafka のトラフィック暗号化でサポートされている唯一の方法です。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は、OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • .pem ファイルとして Kafka クラスター CA 証明書が保存されている。
  • Kafka クラスタークライアント証明書とキーが .pem ファイルとして保存されている。
  • OpenShift CLI (oc) をインストールしている。

手順

  1. 証明書ファイルを knative-eventing namespace にシークレットファイルとして作成します。

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SSL \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    重要

    キー名に ca.crtuser.crt、および user.key を使用します。これらの値は変更しないでください。

  2. KnativeKafka CR を編集し、broker 仕様にシークレットへの参照を追加します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...
5.4.6.3.2. Kafka ブローカーの SASL 認証の設定

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は、OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Kafka クラスターのユーザー名およびパスワードがある。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。
  • TLS が有効にされている場合、Kafka クラスターの ca.crt 証明書ファイルも必要になります。
  • OpenShift CLI (oc) をインストールしている。

手順

  1. 証明書ファイルを knative-eventing namespace にシークレットファイルとして作成します。

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SASL_SSL \
      --from-literal=sasl.mechanism=<sasl_mechanism> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=user="my-sasl-user"
    • キー名に ca.crtpassword、および sasl.mechanism を使用します。これらの値は変更しないでください。
    • パブリック CA 証明書で SASL を使用する場合は、シークレットの作成時に ca.crt 引数ではなく tls.enabled=true フラグを使用する必要があります。以下に例を示します。

      $ oc create secret -n <namespace> generic <kafka_auth_secret> \
        --from-literal=tls.enabled=true \
        --from-literal=password="SecretPassword" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="my-sasl-user"
  2. KnativeKafka CR を編集し、broker 仕様にシークレットへの参照を追加します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...

5.4.6.4. 関連情報

5.4.7. ブローカーの管理

Knative (kn) CLI は、既存のブローカーを記述およびリストするために使用できるコマンドを提供します。

5.4.7.1. Knative CLI を使用した既存ブローカーの一覧表示

Knative (kn) CLI を使用してブローカーを一覧表示すると、合理的で直感的なユーザーインターフェイスが提供されます。kn broker list コマンドを使用し、Knative CLI を使用してクラスター内の既存ブローカーを一覧表示できます。

前提条件

  • OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • Knative (kn) CLI をインストールしている。

手順

  • 既存ブローカーの一覧を表示します。

    $ kn broker list

    出力例

    NAME      URL                                                                     AGE   CONDITIONS   READY   REASON
    default   http://broker-ingress.knative-eventing.svc.cluster.local/test/default   45s   5 OK / 5     True

5.4.7.2. Knative CLI を使用した既存ブローカーの記述

Knative (kn) CLI を使用してブローカーを記述すると、合理的で直感的なユーザーインターフェイスが提供されます。kn broker describe コマンドを使用し、Knative CLI を使用してクラスター内の既存ブローカーに関する情報を出力できます。

前提条件

  • OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • Knative (kn) CLI をインストールしている。

手順

  • 既存ブローカーを記述します。

    $ kn broker describe <broker_name>

    デフォルトブローカーを使用したコマンドの例

    $ kn broker describe default

    出力例

    Name:         default
    Namespace:    default
    Annotations:  eventing.knative.dev/broker.class=MTChannelBasedBroker, eventing.knative.dev/creato ...
    Age:          22s
    
    Address:
      URL:    http://broker-ingress.knative-eventing.svc.cluster.local/default/default
    
    Conditions:
      OK TYPE                   AGE REASON
      ++ Ready                  22s
      ++ Addressable            22s
      ++ FilterReady            22s
      ++ IngressReady           22s
      ++ TriggerChannelReady    22s

5.4.8. イベント配信

イベントがイベントシンクに配信されなかった場合に適用されるイベント配信パラメーターを設定できます。デッドレターシンクを含むイベント配信パラメーターを設定すると、イベントシンクへの配信に失敗したすべてのイベントが再試行されるようになります。それ以外の場合、未配信のイベントは破棄されます。

5.4.8.1. チャネルとブローカーのイベント配信動作パターン

さまざまなチャネルとブローカーのタイプには、イベント配信のために従う独自の動作パターンがあります。

5.4.8.1.1. Knative Kafka のチャネルとブローカー

イベントが Kafka チャネルまたはブローカーレシーバーに正常に配信される場合、受信側は 202 ステータスコードで応答します。つまり、このイベントは Kafka トピック内に安全に保存され、失われることはありません。

受信側がその他のステータスコードを返す場合は、イベントは安全に保存されず、ユーザーがこの問題を解決するために手順を実行する必要があります。

5.4.8.2. 設定可能なイベント配信パラメーター

以下のパラメーターはイベント配信用に設定できます。

dead letter sink
deadLetterSink 配信パラメーターを設定して、イベントが配信に失敗した場合にこれを指定されたイベントシンクに保存することができます。デッドレターシンクに格納されていない未配信のイベントは破棄されます。デッドレターシンクは、Knative サービス、Kubernetes サービス、または URI など、Knative Eventing シンクコントラクトに準拠する任意のアドレス指定可能なオブジェクトです。
retries
retry 配信パラメーターを整数値で設定することで、イベントが dead letter sink に送信される前に配信を再試行する必要のある最小回数を設定できます。
back off delay
backoffDelay 配信パラメーターを設定し、失敗後にイベント配信が再試行される前の遅延の時間を指定できます。backoffDelay パラメーターの期間は ISO 8601 形式を使用して指定されます。たとえば、PT1S は 1 秒の遅延を指定します。
back off policy
backoffPolicy 配信パラメーターは再試行バックオフポリシーを指定するために使用できます。ポリシーは linear または exponential のいずれかとして指定できます。linear バックオフポリシーを使用する場合、バックオフ遅延は backoffDelay * <numberOfRetries> に等しくなります。exponential バックオフポリシーを使用する場合、バックオフ遅延は backoffDelay*2^<numberOfRetries> と等しくなります。

5.4.8.3. イベント配信パラメーターの設定例

BrokerTriggerChannel、および Subscription オブジェクトのイベント配信パラメーターを設定できます。ブローカーまたはチャネルのイベント配信パラメーターを設定すると、これらのパラメーターは、それらのオブジェクト用に作成されたトリガーまたはサブスクリプションに伝播されます。トリガーまたはサブスクリプションのイベント配信パラメーターを設定して、ブローカーまたはチャネルの設定をオーバーライドすることもできます。

Broker オブジェクトの例

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
...
spec:
  delivery:
    deadLetterSink:
      ref:
        apiVersion: eventing.knative.dev/v1alpha1
        kind: KafkaSink
        name: <sink_name>
    backoffDelay: <duration>
    backoffPolicy: <policy_type>
    retry: <integer>
...

Trigger オブジェクトの例

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
...
spec:
  broker: <broker_name>
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: <sink_name>
    backoffDelay: <duration>
    backoffPolicy: <policy_type>
    retry: <integer>
...

Channel オブジェクトの例

apiVersion: messaging.knative.dev/v1
kind: Channel
metadata:
...
spec:
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: <sink_name>
    backoffDelay: <duration>
    backoffPolicy: <policy_type>
    retry: <integer>
...

Subscription オブジェクトの例

apiVersion: messaging.knative.dev/v1
kind: Subscription
metadata:
...
spec:
  channel:
    apiVersion: messaging.knative.dev/v1
    kind: Channel
    name: <channel_name>
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: <sink_name>
    backoffDelay: <duration>
    backoffPolicy: <policy_type>
    retry: <integer>
...

5.4.8.4. トリガーのイベント配信順序の設定

Kafka ブローカーを使用している場合は、トリガーからイベントシンクへのイベントの配信順序を設定できます。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および Knative Kafka が OpenShift Container Platform クラスターにインストールされている。
  • Kafka ブローカーがクラスターで使用可能であり、Kafka ブローカーが作成されている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift (oc) CLI がインストールされている。

手順

  1. Trigger オブジェクトを作成または変更し、kafka.eventing.knative.dev/delivery.order アノテーションを設定します。

    apiVersion: eventing.knative.dev/v1
    kind: Trigger
    metadata:
      name: <trigger_name>
      annotations:
         kafka.eventing.knative.dev/delivery.order: ordered
    ...

    サポートされているコンシューマー配信保証は次のとおりです。

    unordered
    順序付けられていないコンシューマーは、適切なオフセット管理を維持しながら、メッセージを順序付けずに配信するノンブロッキングコンシューマーです。
    ordered

    順序付きコンシューマーは、CloudEvent サブスクライバーからの正常な応答を待ってから、パーティションの次のメッセージを配信する、パーティションごとのブロックコンシューマーです。

    デフォルトの順序保証は unordered です。

  2. Trigger オブジェクトを適用します。

    $ oc apply -f <filename>
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.