4.6. Apache Kafka の Knative ブローカー実装
実稼働環境対応の Knative Eventing デプロイメントの場合、Red Hat は Apache Kafka に Knative ブローカー実装を使用することを推奨します。ブローカーは、Knative ブローカーの Apache Kafka ネイティブ実装であり、CloudEvents を Kafka インスタンスに直接送信します。
Kafka ブローカーは、イベントを保存してルーティングできるように Kafka とネイティブに統合されています。これにより、他のブローカータイプよりもブローカーとトリガーモデルの Kafka との統合性が向上し、ネットワークホップを削減することができます。Knative ブローカー実装のその他の利点は次のとおりです。
- 少なくとも 1 回の配信保証
- CloudEvents パーティショニング拡張機能に基づくイベントの順序付き配信
- コントロールプレーンの高可用性
- 水平方向にスケーラブルなデータプレーン
Apache Kafka の Knative ブローカー実装は、バイナリーコンテンツモードを使用して、受信した CloudEvent を Kafka レコードとして保存します。これは、CloudEvent のすべての属性と拡張機能が Kafka レコードのヘッダーとしてマップされ、CloudEvent の data
仕様が Kafka レコードの値に対応することを意味します。
4.6.1. デフォルトのブローカータイプとして設定されていない場合の Apache Kafka ブローカーの作成
OpenShift Serverless デプロイメントがデフォルトのブローカータイプとして Kafka ブローカーを使用するように設定されていない場合は、以下の手順のいずれかを使用して、Kafka ベースのブローカーを作成できます。
4.6.1.1. YAML を使用した Apache Kafka ブローカーの作成
YAML ファイルを使用して Knative リソースを作成する場合は、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述できます。YAML を使用して Kafka ブローカーを作成するには、Broker
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソースが OpenShift Container Platform クラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
Kafka ベースのブローカーを YAML ファイルとして作成します。
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: annotations: eventing.knative.dev/broker.class: Kafka 1 name: example-kafka-broker spec: config: apiVersion: v1 kind: ConfigMap name: kafka-broker-config 2 namespace: knative-eventing
Kafka ベースのブローカー YAML ファイルを適用します。
$ oc apply -f <filename>
4.6.1.2. 外部で管理される Kafka トピックを使用する Apache Kafka ブローカーの作成
独自の内部トピックの作成を許可せずに Kafka ブローカーを使用する場合は、代わりに外部で管理される Kafka トピックを使用できます。これを実行するには、kafka.eventing.knative.dev/external.topic
アノテーションを使用する Kafka Broker
オブジェクトを作成する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソースが OpenShift Container Platform クラスターにインストールされている。 - Red Hat AMQ Streams などの Kafka インスタンスにアクセスでき、Kafka トピックを作成している。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
Kafka ベースのブローカーを YAML ファイルとして作成します。
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: annotations: eventing.knative.dev/broker.class: Kafka 1 kafka.eventing.knative.dev/external.topic: <topic_name> 2 ...
Kafka ベースのブローカー YAML ファイルを適用します。
$ oc apply -f <filename>
4.6.1.3. 分離されたデータプレーンのある Apache Kafka の Knative Broker 実装
分離されたデータプレーンを使用した Apache Kafka の Knative Broker 実装は、テクノロジープレビュー機能としてのみ提供されます。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。
Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
Apache Kafka の Knative Broker 実装には 2 つのプレーンがあります。
- コントロールプレーン
- Kubernetes API と通信し、カスタムオブジェクトを監視し、データプレーンを管理するコントローラーで設定されます。
- データプレーン
-
受信イベントをリッスンし、Apache Kafka と通信し、イベントをイベントシンクに送信するコンポーネントのコレクション。Apache Kafka データプレーンの Knative Broker 実装は、イベントが送信される場所です。この実装は、
kafka-broker-receiver
およびkafka-broker-dispatcher
デプロイメントで構成されます。
Kafka の Broker クラスを設定する場合、Apache Kafka
の Knative Broker 実装は共有データプレーンを使用します。つまり、knative-eventing
namespace の kafka-broker-receiver
および kafka-broker-dispatcher
デプロイメントがクラスター内のすべての Apache Kafka Broker に使用されます。
ただし、KafkaNamespaced
の Broker クラスを設定すると、Apache Kafka ブローカーコントローラーは、ブローカーが存在する namespace ごとに新しいデータプレーンを作成します。このデータプレーンは、その namespace のすべての KafkaNamespaced
ブローカーによって使用されます。これにより、データプレーンが分離されるため、ユーザーの namespace の kafka-broker-receiver
および kafka-broker-dispatcher
デプロイメントは、その namespace のブローカーに対してのみ使用されます。
データプレーンを分離した結果、このセキュリティー機能はより多くのデプロイメントを作成し、より多くのリソースを使用します。このような分離要件がない限り、Kafka
のクラスで 通常 の Broker を使用します。
4.6.1.4. 分離されたデータプレーンを使用する Apache Kafka の Knative ブローカーの作成
分離されたデータプレーンを使用した Apache Kafka の Knative Broker 実装は、テクノロジープレビュー機能としてのみ提供されます。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。
Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
KafkaNamespaced
ブローカーを作成するには、eventing.knative.dev/broker.class
アノテーションを KafkaNamespaced
に設定する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソースが OpenShift Container Platform クラスターにインストールされている。 - Red Hat AMQ Streams などの Apache Kafka インスタンスにアクセスでき、Kafka トピックを作成している。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
YAML ファイルを使用して Apache Kafka ベースのブローカーを作成します。
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: annotations: eventing.knative.dev/broker.class: KafkaNamespaced 1 name: default namespace: my-namespace 2 spec: config: apiVersion: v1 kind: ConfigMap name: my-config 3 ...
Apache Kafka ベースのブローカー YAML ファイルを適用します。
$ oc apply -f <filename>
spec.config
の ConfigMap
オブジェクトは Broker
オブジェクトと同じ namespace にある必要があります。
apiVersion: v1 kind: ConfigMap metadata: name: my-config namespace: my-namespace data: ...
KafkaNamespaced
クラスで最初の Broker
オブジェクトを作成すると、kafka-broker-receiver
および kafka-broker-dispatcher
デプロイメントが namespace に作成されます。その後、同じ namespace 内で KafkaNamespaced
クラスが含まれる全ブローカーにより、同じデータプレーンが使用されます。KafkaNamespaced
クラスを持つブローカーが namespace に存在しない場合は、namespace のデータプレーンが削除されます。
4.6.2. Apache Kafka ブローカー設定
config map を作成し、Kafka Broker
オブジェクトでこの config map を参照することで、レプリケーション係数、ブートストラップサーバー、および Kafka ブローカーのトピックパーティションの数を設定できます。
Knative Eventing は、Kafka がサポートするトピック設定オプションの完全なセットをサポートします。これらのオプションを設定するには、default.topic.config.
接頭辞を持つキーを ConfigMap に追加する必要があります。
前提条件
- OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソース (CR) が OpenShift Container Platform クラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
kafka-broker-config
config map を変更するか、以下の設定が含まれる独自の config map を作成します。apiVersion: v1 kind: ConfigMap metadata: name: <config_map_name> 1 namespace: <namespace> 2 data: default.topic.partitions: <integer> 3 default.topic.replication.factor: <integer> 4 bootstrap.servers: <list_of_servers> 5 default.topic.config.<config_option>: <value> 6
- 1
- config map 名。
- 2
- config map が存在する namespace。
- 3
- Kafka ブローカーのトピックパーティションの数。これは、イベントをブローカーに送信する速度を制御します。パーティションが多い場合には、コンピュートリソースが多く必要です。
- 4
- トピックメッセージのレプリケーション係数。これにより、データ損失を防ぐことができます。レプリケーション係数を増やすには、より多くのコンピュートリソースとストレージが必要になります。
- 5
- ブートストラップサーバーのコンマ区切りリスト。これは、OpenShift Container Platform クラスターの内部または外部にある可能性があり、ブローカーがイベントを受信してイベントを送信する Kafka クラスターのリストです。
- 6
- トピック設定オプション。詳細は、使用可能なオプションと値の完全なセット を参照してください。
重要default.topic.replication.factor
の値は、クラスター内の Kafka ブローカーインスタンスの数以下である必要があります。たとえば、Kafka ブローカーが 1 つしかない場合、default.topic.replication.factor
の値は"1"
より大きな値にすることはできません。Kafka ブローカーの config map の例
apiVersion: v1 kind: ConfigMap metadata: name: kafka-broker-config namespace: knative-eventing data: default.topic.partitions: "10" default.topic.replication.factor: "3" bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092" default.topic.config.retention.ms: "3600"
config map を適用します。
$ oc apply -f <config_map_filename>
Kafka
Broker
オブジェクトの config map を指定します。Broker オブジェクトの例
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: <broker_name> 1 namespace: <namespace> 2 annotations: eventing.knative.dev/broker.class: Kafka 3 spec: config: apiVersion: v1 kind: ConfigMap name: <config_map_name> 4 namespace: <namespace> 5 ...
ブローカーを適用します。
$ oc apply -f <broker_filename>
4.6.3. Apache Kafka の Knative ブローカー実装のセキュリティー設定
Kafka クラスターは、通常、TLS または SASL 認証方法を使用して保護されます。TLS または SASL を使用して、保護された Red Hat AMQ Streams クラスターに対して動作するように Kafka ブローカーまたはチャネルを設定できます。
Red Hat は、SASL と TLS の両方を一緒に有効にすることを推奨します。
4.6.3.1. Apache Kafka ブローカーの TLS 認証の設定
Transport Layer Security (TLS) は、Apache Kafka クライアントおよびサーバーによって、Knative と Kafka 間のトラフィックを暗号化するため、および認証のために使用されます。TLS は、Apache Kafka の Knative ブローカー実装でサポートされている唯一のトラフィック暗号化方式です。
前提条件
- OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
CR は、OpenShift Container Platform クラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
.pem
ファイルとして Kafka クラスター CA 証明書が保存されている。 -
Kafka クラスタークライアント証明書とキーが
.pem
ファイルとして保存されている。 -
OpenShift CLI (
oc
) がインストールされている。
手順
証明書ファイルを
knative-eventing
namespace にシークレットファイルとして作成します。$ oc create secret -n knative-eventing generic <secret_name> \ --from-literal=protocol=SSL \ --from-file=ca.crt=caroot.pem \ --from-file=user.crt=certificate.pem \ --from-file=user.key=key.pem
重要キー名に
ca.crt
、user.crt
、およびuser.key
を使用します。これらの値は変更しないでください。KnativeKafka
CR を編集し、broker
仕様にシークレットへの参照を追加します。apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: broker: enabled: true defaultConfig: authSecretName: <secret_name> ...
4.6.3.2. Apache Kafka ブローカーの SASL 認証の設定
Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。
前提条件
- OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
CR は、OpenShift Container Platform クラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- Kafka クラスターのユーザー名およびパスワードがある。
-
使用する SASL メカニズムを選択している (例:
PLAIN
、SCRAM-SHA-256
、またはSCRAM-SHA-512
)。 -
TLS が有効になっている場合は、Kafka クラスターの
ca.crt
証明書ファイルがある。 -
OpenShift CLI (
oc
) がインストールされている。
手順
証明書ファイルを
knative-eventing
namespace にシークレットファイルとして作成します。$ oc create secret -n knative-eventing generic <secret_name> \ --from-literal=protocol=SASL_SSL \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=user="my-sasl-user"
-
キー名に
ca.crt
、password
、およびsasl.mechanism
を使用します。これらの値は変更しないでください。 パブリック CA 証明書で SASL を使用する場合は、シークレットの作成時に
ca.crt
引数ではなくtls.enabled=true
フラグを使用する必要があります。以下に例を示します。$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-literal=tls.enabled=true \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="my-sasl-user"
-
キー名に
KnativeKafka
CR を編集し、broker
仕様にシークレットへの参照を追加します。apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: broker: enabled: true defaultConfig: authSecretName: <secret_name> ...