5.4. ブローカー
5.4.1. ブローカー
ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。イベントは、HTTP POST
リクエストとしてイベントソースからブローカーに送信されます。イベントがブローカーに送信された後に、それらはトリガーを使用して CloudEvent 属性 でフィルターされ、HTTP POST
リクエストとしてイベントシンクに送信できます。
5.4.2. ブローカータイプ
クラスター管理者は、クラスターのデフォルトブローカー実装を設定できます。ブローカーを作成する場合、Broker
オブジェクトで設定を指定しない限り、デフォルトのブローカー実装が使用されます。
5.4.2.1. 開発目的でのデフォルトブローカーの実装
Knative は、デフォルトのチャネルベースのブローカー実装を提供します。このチャネルベースのブローカーは、開発およびテストの目的で使用できますが、実稼働環境での適切なイベント配信の保証は提供しません。デフォルトのブローカーは、デフォルトで InMemoryChannel
チャネル実装によってサポートされています。
Kafka を使用してネットワークホップを削減する場合は、Kafka ブローカーの実装を使用します。チャネルベースのブローカーが KafkaChannel
チャネル実装によってサポートされるように設定しないでください。
5.4.2.2. 実稼働環境対応の Kafka ブローカーの実装
実稼働環境に対応した Knative Eventing デプロイメントの場合、Red Hat は Knative Kafka ブローカーの実装を使用することをお勧めします。Kafka ブローカーは、Knative ブローカーの Apache Kafka ネイティブ実装であり、CloudEvents を Kafka インスタンスに直接送信します。
Kafka ブローカーは、イベントの保存とルーティングのために Kafka とネイティブに統合されています。これにより、他のブローカータイプよりもブローカーとトリガーモデルの Kafka との統合性が向上し、ネットワークホップを削減することができます。Kafka ブローカー実装のその他の利点は次のとおりです。
- 少なくとも 1 回の配信保証
- CloudEvents パーティショニング拡張機能に基づくイベントの順序付き配信
- コントロールプレーンの高可用性
- 水平方向にスケーラブルなデータプレーン
Knative Kafka ブローカーは、バイナリーコンテンツモードを使用して、受信 CloudEvents を Kafka レコードとして保存します。これは、CloudEvent のすべての属性と拡張機能が Kafka レコードのヘッダーとしてマップされ、CloudEvent の data
仕様が Kafka レコードの値に対応することを意味します。
5.4.3. ブローカーの作成
Knative は、デフォルトのチャネルベースのブローカー実装を提供します。このチャネルベースのブローカーは、開発およびテストの目的で使用できますが、実稼働環境での適切なイベント配信の保証は提供しません。
クラスター管理者がデフォルトのブローカータイプとして Kafka を使用するように OpenShift Serverless デプロイメントを設定している場合、デフォルト設定を使用してブローカーを作成すると、Kafka ベースのブローカーが作成されます。
OpenShift Serverless デプロイメントが Kafka ブローカーをデフォルトのブローカータイプとして使用するように設定されていない場合、以下の手順でデフォルト設定を使用すると、チャネルベースのブローカーが作成されます。
5.4.3.1. Knative CLI を使用したブローカーの作成
ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。ブローカーを作成するために Knative (kn
) CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。kn broker create
コマンドを使用して、ブローカーを作成できます。
前提条件
- OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
-
Knative (
kn
) CLI をインストールしている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
ブローカーを作成します。
$ kn broker create <broker_name>
検証
kn
コマンドを使用して、既存のブローカーを一覧表示します。$ kn broker list
出力例
NAME URL AGE CONDITIONS READY REASON default http://broker-ingress.knative-eventing.svc.cluster.local/test/default 45s 5 OK / 5 True
オプション: OpenShift Container Platform Web コンソールを使用している場合、Developer パースペクティブの Topology ビューに移動し、ブローカーが存在することを確認できます。
5.4.3.2. トリガーのアノテーションによるブローカーの作成
ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。eventing.knative.dev/injection: enabled
アノテーションを Trigger
オブジェクトに追加してブローカーを作成できます。
knative-eventing-injection: enabled
アノテーションを使用してブローカーを作成する場合、クラスター管理者パーミッションなしにこのブローカーを削除することはできません。クラスター管理者が最初にこのアノテーションを削除せずにブローカーを削除する場合、ブローカーは削除後に再び作成されます。
前提条件
- OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
-
OpenShift CLI (
oc
) をインストールしている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
Trigger
オブジェクトを、eventing.knative.dev/injection: enabled
アノテーションを付けて YAML ファイルとして作成します。apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: annotations: eventing.knative.dev/injection: enabled name: <trigger_name> spec: broker: default subscriber: 1 ref: apiVersion: serving.knative.dev/v1 kind: Service name: <service_name>
- 1
- トリガーがイベントを送信するイベントシンクまたは サブスクライバー の詳細を指定します。
Trigger
YAML ファイルを適用します。$ oc apply -f <filename>
検証
oc
CLI を使用してブローカーが正常に作成されていることを確認するか、または Web コンソールの Topology ビューでこれを確認できます。
以下の
oc
コマンドを入力してブローカーを取得します。$ oc -n <namespace> get broker default
出力例
NAME READY REASON URL AGE default True http://broker-ingress.knative-eventing.svc.cluster.local/test/default 3m56s
オプション: OpenShift Container Platform Web コンソールを使用している場合、Developer パースペクティブの Topology ビューに移動し、ブローカーが存在することを確認できます。
5.4.3.3. namespace へのラベル付けによるブローカーの作成
ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。所有しているか、または書き込みパーミッションのある namespace にラベルを付けて default
ブローカーを自動的に作成できます。
この方法を使用して作成されたブローカーは、ラベルを削除すると削除されません。これらは手動で削除する必要があります。
前提条件
- OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
-
OpenShift CLI (
oc
) をインストールしている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
eventing.knative.dev/injection=enabled
で namespace にラベルを付ける。$ oc label namespace <namespace> eventing.knative.dev/injection=enabled
検証
oc
CLI を使用してブローカーが正常に作成されていることを確認するか、または Web コンソールの Topology ビューでこれを確認できます。
oc
コマンドを使用してブローカーを取得します。$ oc -n <namespace> get broker <broker_name>
コマンドの例
$ oc -n default get broker default
出力例
NAME READY REASON URL AGE default True http://broker-ingress.knative-eventing.svc.cluster.local/test/default 3m56s
オプション: OpenShift Container Platform Web コンソールを使用している場合、Developer パースペクティブの Topology ビューに移動し、ブローカーが存在することを確認できます。
5.4.3.4. 挿入 (injection) によって作成されたブローカーの削除
挿入によりブローカーを作成し、後でそれを削除する必要がある場合は、手動で削除する必要があります。namespace ラベルまたはトリガーアノテーションを使用して作成されたブローカーは、ラベルまたはアノテーションを削除した場合に永続的に削除されません。
前提条件
-
OpenShift CLI (
oc
) をインストールしている。
手順
eventing.knative.dev/injection=enabled
ラベルを namespace から削除します。$ oc label namespace <namespace> eventing.knative.dev/injection-
アノテーションを削除すると、Knative では削除後にブローカーを再作成できなくなります。
選択された namespace からブローカーを削除します。
$ oc -n <namespace> delete broker <broker_name>
検証
oc
コマンドを使用してブローカーを取得します。$ oc -n <namespace> get broker <broker_name>
コマンドの例
$ oc -n default get broker default
出力例
No resources found. Error from server (NotFound): brokers.eventing.knative.dev "default" not found
5.4.3.5. Web コンソールを使用してブローカーを作成する
Knative Eventing がクラスターにインストールされた後、Web コンソールを使用してブローカーを作成できます。OpenShift Container Platform Web コンソールを使用すると、ブローカーを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Container Platform Web コンソールにログインしている。
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
-
Developer パースペクティブで、+Add
Broker に移動します。Broker ページが表示されます。 -
オプション:ブローカーの Name を更新します。名前を更新しない場合、生成されたブローカーの名前は
default
になります。 - Create をクリックします。
検証
トポロジー ページでブローカーコンポーネントを表示することにより、ブローカーが作成されたことを確認できます。
- Developer パースペクティブで、Topology に移動します。
mt-broker-ingress
、mt-broker-filter
、およびmt-broker-controller
コンポーネントを表示します。
5.4.3.6. Administrator パースペクティブを使用したブローカーの作成
ブローカーはトリガーと組み合わせて、イベントをイベントソースからイベントシンクに配信できます。イベントは、HTTP POST
リクエストとしてイベントソースからブローカーに送信されます。イベントがブローカーに送信された後に、それらはトリガーを使用して CloudEvent 属性 でフィルターされ、HTTP POST
リクエストとしてイベントシンクに送信できます。
前提条件
- OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
- Web コンソールにログインしており、Administrator パースペクティブを使用している。
- OpenShift Container Platform のクラスター管理者パーミッションがある。
手順
-
OpenShift Container Platform Web コンソールの Administrator パースペクティブで、 Serverless
Eventing に移動します。 - Create 一覧で、Broker を選択します。Create Broker ページに移動します。
- オプション: ブローカーの YAML 設定を変更します。
- Create をクリックします。
5.4.3.7. 次のステップ
- イベントがイベントシンクに配信されなかった場合に適用されるイベント配信パラメーターを設定します。イベント配信パラメーターの設定例 を参照してください。
5.4.3.8. 関連情報
5.4.4. デフォルトのブローカーバッキングチャネルの設定
チャネルベースのブローカーを使用している場合、ブローカーのデフォルトのバッキングチャネルタイプを InMemoryChannel
または KafkaChannel
に設定できます。
前提条件
- OpenShift Container Platform に対する管理者権限を持っている。
- OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされていること。
-
OpenShift (
oc
) CLI がインストールされている。 -
Kafka チャネルをデフォルトのバッキングチャネルタイプとして使用する場合は、クラスターに
KnativeKafka
CR もインストールする必要があります。
手順
KnativeEventing
カスタムリソース (CR) を変更して、config-br-default-channel
設定マップの設定の詳細を追加します。apiVersion: operator.knative.dev/v1beta1 kind: KnativeEventing metadata: name: knative-eventing namespace: knative-eventing spec: config: 1 config-br-default-channel: channel-template-spec: | apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel 2 spec: numPartitions: 6 3 replicationFactor: 3 4
更新された
KnativeEventing
CR を適用します。$ oc apply -f <filename>
5.4.5. デフォルトブローカークラスの設定
config-br-defaults
設定マップを使用して、Knative Eventing のデフォルトのブローカークラス設定を指定できます。クラスター全体または 1 つ以上の namespace に対して、デフォルトのブローカークラスを指定できます。現在、MTChannelBasedBroker
および Kafka
ブローカータイプがサポートされています。
前提条件
- OpenShift Container Platform に対する管理者権限を持っている。
- OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされていること。
-
Kafka ブローカーをデフォルトのブローカー実装として使用する場合は、クラスターに
KnativeKafka
CR もインストールする必要があります。
手順
KnativeEventing
カスタムリソースを変更して、config-br-defaults
設定マップの設定の詳細を追加します。apiVersion: operator.knative.dev/v1beta1 kind: KnativeEventing metadata: name: knative-eventing namespace: knative-eventing spec: defaultBrokerClass: Kafka 1 config: 2 config-br-defaults: 3 default-br-config: | clusterDefault: 4 brokerClass: Kafka apiVersion: v1 kind: ConfigMap name: kafka-broker-config 5 namespace: knative-eventing 6 namespaceDefaults: 7 my-namespace: brokerClass: MTChannelBasedBroker apiVersion: v1 kind: ConfigMap name: config-br-default-channel 8 namespace: knative-eventing 9 ...
- 1
- Knative Eventing のデフォルトのブローカークラス。
- 2
spec.config
で、変更した設定を追加する設定マップを指定できます。- 3
config-br-defaults
設定マップは、spec.config
設定またはブローカークラスを指定しないブローカーのデフォルト設定を指定します。- 4
- クラスター全体のデフォルトのブローカークラス設定。この例では、クラスターのデフォルトのブローカークラスの実装は
Kafka
です。 - 5
kafka-broker-config
設定マップは、Kafka ブローカーのデフォルト設定を指定します。「関連情報」セクションの「Kafka ブローカー構成の設定」を参照してください。- 6
kafka-broker-config
設定マップが存在する namespace。- 7
- namespace スコープのデフォルトブローカクラス設定。この例では、
my-namespace
namespace のデフォルトのブローカークラスの実装はMTChannelBasedBroker
です。複数の namespace に対してデフォルトのブローカークラスの実装を指定できます。 - 8
config-br-default-channel
設定マップは、ブローカーのデフォルトのバッキングチャネルを指定します。「関連情報」セクションの「デフォルトのブローカーバッキングチャネルの設定」を参照してください。- 9
config-br-default-channel
設定マップが存在する namespace。
重要namespace 固有のデフォルトを設定すると、クラスター全体の設定が上書きされます。
5.4.6. Kafka ブローカー
実稼働環境に対応した Knative Eventing デプロイメントの場合、Red Hat は Knative Kafka ブローカーの実装を使用することをお勧めします。Kafka ブローカーは、Knative ブローカーの Apache Kafka ネイティブ実装であり、CloudEvents を Kafka インスタンスに直接送信します。
Kafka ブローカーは、イベントの保存とルーティングのために Kafka とネイティブに統合されています。これにより、他のブローカータイプよりもブローカーとトリガーモデルの Kafka との統合性が向上し、ネットワークホップを削減することができます。Kafka ブローカー実装のその他の利点は次のとおりです。
- 少なくとも 1 回の配信保証
- CloudEvents パーティショニング拡張機能に基づくイベントの順序付き配信
- コントロールプレーンの高可用性
- 水平方向にスケーラブルなデータプレーン
Knative Kafka ブローカーは、バイナリーコンテンツモードを使用して、受信 CloudEvents を Kafka レコードとして保存します。これは、CloudEvent のすべての属性と拡張機能が Kafka レコードのヘッダーとしてマップされ、CloudEvent の data
仕様が Kafka レコードの値に対応することを意味します。
5.4.6.1. デフォルトのブローカータイプとして設定されていない場合の Kafka ブローカーの作成
OpenShift Serverless デプロイメントがデフォルトのブローカータイプとして Kafka ブローカーを使用するように設定されていない場合は、以下の手順のいずれかを使用して、Kafka ベースのブローカーを作成できます。
5.4.6.1.1. YAML を使用した Kafka ブローカーの作成
YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。YAML を使用して Kafka ブローカーを作成するには、Broker
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソースは OpenShift Container Platform クラスターにインストールされます。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
Kafka ベースのブローカーを YAML ファイルとして作成します。
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: annotations: eventing.knative.dev/broker.class: Kafka 1 name: example-kafka-broker spec: config: apiVersion: v1 kind: ConfigMap name: kafka-broker-config 2 namespace: knative-eventing
Kafka ベースのブローカー YAML ファイルを適用します。
$ oc apply -f <filename>
5.4.6.1.2. 外部で管理されている Kafka トピックを使用する Kafka ブローカーの作成
独自の内部トピックの作成を許可せずに Kafka ブローカーを使用する場合は、代わりに外部で管理される Kafka トピックを使用できます。これを実行するには、kafka.eventing.knative.dev/external.topic
アノテーションを使用する Kafka Broker
オブジェクトを作成する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソースは OpenShift Container Platform クラスターにインストールされます。 - Red Hat AMQ Streams などの Kafka インスタンスにアクセスでき、Kafka トピックを作成しました。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
Kafka ベースのブローカーを YAML ファイルとして作成します。
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: annotations: eventing.knative.dev/broker.class: Kafka 1 kafka.eventing.knative.dev/external.topic: <topic_name> 2 ...
Kafka ベースのブローカー YAML ファイルを適用します。
$ oc apply -f <filename>
5.4.6.1.3. 分離されたデータプレーンのある Apache Kafka の Knative Broker 実装
分離されたデータプレーンを使用した Apache Kafka の Knative Broker 実装は、テクノロジープレビュー機能のみです。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。
Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
Apache Kafka の Knative Broker 実装には 2 つのプレーンがあります。
- コントロールプレーン
- は、Kubernetes API と通信し、カスタムオブジェクトを監視し、データプレーンを管理するコントローラーで設定されます。
- データプレーン
-
受信イベントをリッスンし、Apache Kafka と通信し、イベントシンクに送信するコンポーネントのコレクション。Apache Kafka データプレーンの Knative Broker 実装はイベントフローになります。この実装は、
kafka-broker-receiver
およびkafka-broker-dispatcher
デプロイメントで設定されます。
Kafka の Broker クラスを設定する場合、Apache Kafka
の Knative Broker 実装は共有データプレーンを使用します。これは、knative-eventing
namespace の kafka-broker-receiver
および kafka-broker-dispatcher
デプロイメントがクラスター内のすべての Apache Kafka Broker に使用されることを意味します。
ただし、KafkaNamespaced
の Broker クラスを設定すると、Apache Kafka ブローカーコントローラーは、ブローカーが存在する namespace ごとに新しいデータプレーンを作成します。このデータプレーンは、その namespace のすべての KafkaNamespaced
ブローカーによって使用されます。これにより、データプレーンが分離されるため、ユーザー namespace の kafka-broker-receiver
および kafka-broker-dispatcher
デプロイメントがその namespace のブローカーにのみ使用されます。
データプレーンを分離することで、このセキュリティー機能により、追加のデプロイメントが作成され、より多くのリソースが使用されます。このような分離要件がない限り、Kafka
のクラスで 通常 の Broker を使用します。
5.4.6.1.4. 分離されたデータプレーンを使用する Apache Kafka の Knative ブローカーの作成
分離されたデータプレーンを使用した Apache Kafka の Knative Broker 実装は、テクノロジープレビュー機能のみです。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。
Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
KafkaNamespaced
ブローカーを作成するには、eventing.knative.dev/broker.class
アノテーションを KafkaNamespaced
に設定する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソースは OpenShift Container Platform クラスターにインストールされます。 - Red Hat AMQ Streams などの Apache Kafka インスタンスにアクセスでき、Kafka トピックを作成している。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションでプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
YAML ファイルを使用して Apache Kafka ベースのブローカーを作成します。
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: annotations: eventing.knative.dev/broker.class: KafkaNamespaced 1 name: default namespace: my-namespace 2 spec: config: apiVersion: v1 kind: ConfigMap name: my-config 3 ...
Apache Kafka ベースのブローカー YAML ファイルを適用します。
$ oc apply -f <filename>
spec.config
の ConfigMap
オブジェクトは Broker
オブジェクトと同じ namespace にある必要があります。
apiVersion: v1 kind: ConfigMap metadata: name: my-config namespace: my-namespace data: ...
KafkaNamespaced
クラスで最初の Broker
オブジェクトを作成すると、kafka-broker-receiver
および kafka-broker-dispatcher
デプロイメントが namespace に作成されます。その後、同じ namespace に KafkaNamespaced
クラスを持つすべてのブローカーは同じデータプレーンを使用します。KafkaNamespaced
クラスを持つブローカーが namespace に存在しない場合は、namespace のデータプレーンが削除されます。
5.4.6.2. Kafka ブローカー構成の設定
設定マップを作成し、Kafka Broker
オブジェクトでこの ConfigMap を参照することで、レプリケーション係数、ブートストラップサーバー、および Kafka ブローカーのトピックパーティションの数を設定できます。
前提条件
- OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソース (CR) は OpenShift Container Platform クラスターにインストールされます。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
kafka-broker-config
ConfigMap を変更するか、以下の設定が含まれる独自の ConfigMap を作成します。apiVersion: v1 kind: ConfigMap metadata: name: <config_map_name> 1 namespace: <namespace> 2 data: default.topic.partitions: <integer> 3 default.topic.replication.factor: <integer> 4 bootstrap.servers: <list_of_servers> 5
- 1
- ConfigMap 名。
- 2
- ConfigMap が存在する namespace。
- 3
- Kafka ブローカーのトピックパーティションの数。これは、イベントをブローカーに送信する速度を制御します。パーティションが多い場合には、コンピュートリソースが多く必要です。
- 4
- トピックメッセージのレプリケーション係数。これにより、データ損失を防ぐことができます。レプリケーション係数を増やすには、より多くのコンピュートリソースとストレージが必要になります。
- 5
- ブートストラップサーバーのコンマ区切りリスト。これは、OpenShift Container Platform クラスターの内部または外部にある可能性があり、ブローカーがイベントを受信してイベントを送信する Kafka クラスターのリストです。
重要default.topic.replication.factor
の値は、クラスター内の Kafka ブローカーインスタンスの数以下である必要があります。たとえば、Kafka ブローカーが 1 つしかない場合には、default.topic.replication.factor
の値は"1"
を超える値にすることはできません。Kafka ブローカーの ConfigMap の例
apiVersion: v1 kind: ConfigMap metadata: name: kafka-broker-config namespace: knative-eventing data: default.topic.partitions: "10" default.topic.replication.factor: "3" bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
ConfigMap を適用します。
$ oc apply -f <config_map_filename>
Kafka
Broker
オブジェクトの ConfigMap を指定します。Broker オブジェクトの例
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: <broker_name> 1 namespace: <namespace> 2 annotations: eventing.knative.dev/broker.class: Kafka 3 spec: config: apiVersion: v1 kind: ConfigMap name: <config_map_name> 4 namespace: <namespace> 5 ...
ブローカーを適用します。
$ oc apply -f <broker_filename>
5.4.6.3. Knative Kafka ブローカーのセキュリティー設定
Kafka クラスターは、通常、TLS または SASL 認証方法を使用して保護されます。TLS または SASL を使用して、保護された Red Hat AMQ Streams クラスターに対して動作するように Kafka ブローカーまたはチャネルを設定できます。
Red Hat は、SASL と TLS の両方を一緒に有効にすることをお勧めします。
5.4.6.3.1. Kafka ブローカーの TLS 認証の設定
Transport Layer Security (TLS) は、Apache Kafka クライアントおよびサーバーによって、Knative と Kafka 間のトラフィックを暗号化するため、および認証のために使用されます。TLS は、Knative Kafka のトラフィック暗号化でサポートされている唯一の方法です。
前提条件
- OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
CR は、OpenShift Container Platform クラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
.pem
ファイルとして Kafka クラスター CA 証明書が保存されている。 -
Kafka クラスタークライアント証明書とキーが
.pem
ファイルとして保存されている。 -
OpenShift CLI (
oc
) をインストールしている。
手順
証明書ファイルを
knative-eventing
namespace にシークレットファイルとして作成します。$ oc create secret -n knative-eventing generic <secret_name> \ --from-literal=protocol=SSL \ --from-file=ca.crt=caroot.pem \ --from-file=user.crt=certificate.pem \ --from-file=user.key=key.pem
重要キー名に
ca.crt
、user.crt
、およびuser.key
を使用します。これらの値は変更しないでください。KnativeKafka
CR を編集し、broker
仕様にシークレットへの参照を追加します。apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: broker: enabled: true defaultConfig: authSecretName: <secret_name> ...
5.4.6.3.2. Kafka ブローカーの SASL 認証の設定
Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。
前提条件
- OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
CR は、OpenShift Container Platform クラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- Kafka クラスターのユーザー名およびパスワードがある。
-
使用する SASL メカニズムを選択している (例:
PLAIN
、SCRAM-SHA-256
、またはSCRAM-SHA-512
)。 -
TLS が有効にされている場合、Kafka クラスターの
ca.crt
証明書ファイルも必要になります。 -
OpenShift CLI (
oc
) をインストールしている。
手順
証明書ファイルを
knative-eventing
namespace にシークレットファイルとして作成します。$ oc create secret -n knative-eventing generic <secret_name> \ --from-literal=protocol=SASL_SSL \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=user="my-sasl-user"
-
キー名に
ca.crt
、password
、およびsasl.mechanism
を使用します。これらの値は変更しないでください。 パブリック CA 証明書で SASL を使用する場合は、シークレットの作成時に
ca.crt
引数ではなくtls.enabled=true
フラグを使用する必要があります。以下に例を示します。$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-literal=tls.enabled=true \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="my-sasl-user"
-
キー名に
KnativeKafka
CR を編集し、broker
仕様にシークレットへの参照を追加します。apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: broker: enabled: true defaultConfig: authSecretName: <secret_name> ...
5.4.6.4. 関連情報
5.4.7. ブローカーの管理
Knative (kn
) CLI は、既存のブローカーを記述およびリストするために使用できるコマンドを提供します。
5.4.7.1. Knative CLI を使用した既存ブローカーの一覧表示
Knative (kn
) CLI を使用してブローカーを一覧表示すると、合理的で直感的なユーザーインターフェイスが提供されます。kn broker list
コマンドを使用し、Knative CLI を使用してクラスター内の既存ブローカーを一覧表示できます。
前提条件
- OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
-
Knative (
kn
) CLI をインストールしている。
手順
既存ブローカーの一覧を表示します。
$ kn broker list
出力例
NAME URL AGE CONDITIONS READY REASON default http://broker-ingress.knative-eventing.svc.cluster.local/test/default 45s 5 OK / 5 True
5.4.7.2. Knative CLI を使用した既存ブローカーの記述
Knative (kn
) CLI を使用してブローカーを記述すると、合理的で直感的なユーザーインターフェイスが提供されます。kn broker describe
コマンドを使用し、Knative CLI を使用してクラスター内の既存ブローカーに関する情報を出力できます。
前提条件
- OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
-
Knative (
kn
) CLI をインストールしている。
手順
既存ブローカーを記述します。
$ kn broker describe <broker_name>
デフォルトブローカーを使用したコマンドの例
$ kn broker describe default
出力例
Name: default Namespace: default Annotations: eventing.knative.dev/broker.class=MTChannelBasedBroker, eventing.knative.dev/creato ... Age: 22s Address: URL: http://broker-ingress.knative-eventing.svc.cluster.local/default/default Conditions: OK TYPE AGE REASON ++ Ready 22s ++ Addressable 22s ++ FilterReady 22s ++ IngressReady 22s ++ TriggerChannelReady 22s
5.4.8. イベント配信
イベントがイベントシンクに配信されなかった場合に適用されるイベント配信パラメーターを設定できます。デッドレターシンクを含むイベント配信パラメーターを設定すると、イベントシンクへの配信に失敗したすべてのイベントが再試行されるようになります。それ以外の場合、未配信のイベントは破棄されます。
5.4.8.1. チャネルとブローカーのイベント配信動作パターン
さまざまなチャネルとブローカーのタイプには、イベント配信のために従う独自の動作パターンがあります。
5.4.8.1.1. Knative Kafka のチャネルとブローカー
イベントが Kafka チャネルまたはブローカーレシーバーに正常に配信される場合、受信側は 202
ステータスコードで応答します。つまり、このイベントは Kafka トピック内に安全に保存され、失われることはありません。
受信側がその他のステータスコードを返す場合は、イベントは安全に保存されず、ユーザーがこの問題を解決するために手順を実行する必要があります。
5.4.8.2. 設定可能なイベント配信パラメーター
以下のパラメーターはイベント配信用に設定できます。
- dead letter sink
-
deadLetterSink
配信パラメーターを設定して、イベントが配信に失敗した場合にこれを指定されたイベントシンクに保存することができます。デッドレターシンクに格納されていない未配信のイベントは破棄されます。デッドレターシンクは、Knative サービス、Kubernetes サービス、または URI など、Knative Eventing シンクコントラクトに準拠する任意のアドレス指定可能なオブジェクトです。 - retries
-
retry
配信パラメーターを整数値で設定することで、イベントが dead letter sink に送信される前に配信を再試行する必要のある最小回数を設定できます。 - back off delay
-
backoffDelay
配信パラメーターを設定し、失敗後にイベント配信が再試行される前の遅延の時間を指定できます。backoffDelay
パラメーターの期間は ISO 8601 形式を使用して指定されます。たとえば、PT1S
は 1 秒の遅延を指定します。 - back off policy
-
backoffPolicy
配信パラメーターは再試行バックオフポリシーを指定するために使用できます。ポリシーはlinear
またはexponential
のいずれかとして指定できます。linear
バックオフポリシーを使用する場合、バックオフ遅延はbackoffDelay * <numberOfRetries>
に等しくなります。exponential
バックオフポリシーを使用する場合、バックオフ遅延はbackoffDelay*2^<numberOfRetries>
と等しくなります。
5.4.8.3. イベント配信パラメーターの設定例
Broker
、Trigger
、Channel
、および Subscription
オブジェクトのイベント配信パラメーターを設定できます。ブローカーまたはチャネルのイベント配信パラメーターを設定すると、これらのパラメーターは、それらのオブジェクト用に作成されたトリガーまたはサブスクリプションに伝播されます。トリガーまたはサブスクリプションのイベント配信パラメーターを設定して、ブローカーまたはチャネルの設定をオーバーライドすることもできます。
Broker
オブジェクトの例
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: ... spec: delivery: deadLetterSink: ref: apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink name: <sink_name> backoffDelay: <duration> backoffPolicy: <policy_type> retry: <integer> ...
Trigger
オブジェクトの例
apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: ... spec: broker: <broker_name> delivery: deadLetterSink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: <sink_name> backoffDelay: <duration> backoffPolicy: <policy_type> retry: <integer> ...
Channel
オブジェクトの例
apiVersion: messaging.knative.dev/v1 kind: Channel metadata: ... spec: delivery: deadLetterSink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: <sink_name> backoffDelay: <duration> backoffPolicy: <policy_type> retry: <integer> ...
Subscription
オブジェクトの例
apiVersion: messaging.knative.dev/v1 kind: Subscription metadata: ... spec: channel: apiVersion: messaging.knative.dev/v1 kind: Channel name: <channel_name> delivery: deadLetterSink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: <sink_name> backoffDelay: <duration> backoffPolicy: <policy_type> retry: <integer> ...
5.4.8.4. トリガーのイベント配信順序の設定
Kafka ブローカーを使用している場合は、トリガーからイベントシンクへのイベントの配信順序を設定できます。
前提条件
- OpenShift Serverless Operator、Knative Eventing、および Knative Kafka が OpenShift Container Platform クラスターにインストールされている。
- Kafka ブローカーがクラスターで使用可能であり、Kafka ブローカーが作成されている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift (
oc
) CLI がインストールされている。
手順
Trigger
オブジェクトを作成または変更し、kafka.eventing.knative.dev/delivery.order
アノテーションを設定します。apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: <trigger_name> annotations: kafka.eventing.knative.dev/delivery.order: ordered ...
サポートされているコンシューマー配信保証は次のとおりです。
unordered
- 順序付けられていないコンシューマーは、適切なオフセット管理を維持しながら、メッセージを順序付けずに配信するノンブロッキングコンシューマーです。
ordered
順序付きコンシューマーは、CloudEvent サブスクライバーからの正常な応答を待ってから、パーティションの次のメッセージを配信する、パーティションごとのブロックコンシューマーです。
デフォルトの順序保証は
unordered
です。
Trigger
オブジェクトを適用します。$ oc apply -f <filename>